1package pubsub
  2
  3import (
  4	"context"
  5	"sync"
  6)
  7
  8const bufferSize = 64
  9
 10type Broker[T any] struct {
 11	subs      map[chan Event[T]]struct{}
 12	mu        sync.RWMutex
 13	done      chan struct{}
 14	subCount  int
 15	maxEvents int
 16}
 17
 18func NewBroker[T any]() *Broker[T] {
 19	return NewBrokerWithOptions[T](bufferSize, 1000)
 20}
 21
 22func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
 23	b := &Broker[T]{
 24		subs:      make(map[chan Event[T]]struct{}),
 25		done:      make(chan struct{}),
 26		subCount:  0,
 27		maxEvents: maxEvents,
 28	}
 29	return b
 30}
 31
 32func (b *Broker[T]) Shutdown() {
 33	select {
 34	case <-b.done: // Already closed
 35		return
 36	default:
 37		close(b.done)
 38	}
 39
 40	b.mu.Lock()
 41	defer b.mu.Unlock()
 42
 43	for ch := range b.subs {
 44		delete(b.subs, ch)
 45		close(ch)
 46	}
 47
 48	b.subCount = 0
 49}
 50
 51func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
 52	b.mu.Lock()
 53	defer b.mu.Unlock()
 54
 55	select {
 56	case <-b.done:
 57		ch := make(chan Event[T])
 58		close(ch)
 59		return ch
 60	default:
 61	}
 62
 63	sub := make(chan Event[T], bufferSize)
 64	b.subs[sub] = struct{}{}
 65	b.subCount++
 66
 67	go func() {
 68		<-ctx.Done()
 69
 70		b.mu.Lock()
 71		defer b.mu.Unlock()
 72
 73		select {
 74		case <-b.done:
 75			return
 76		default:
 77		}
 78
 79		delete(b.subs, sub)
 80		close(sub)
 81		b.subCount--
 82	}()
 83
 84	return sub
 85}
 86
 87func (b *Broker[T]) GetSubscriberCount() int {
 88	b.mu.RLock()
 89	defer b.mu.RUnlock()
 90	return b.subCount
 91}
 92
 93func (b *Broker[T]) Publish(t EventType, payload T) {
 94	b.mu.RLock()
 95	select {
 96	case <-b.done:
 97		b.mu.RUnlock()
 98		return
 99	default:
100	}
101
102	subscribers := make([]chan Event[T], 0, len(b.subs))
103	for sub := range b.subs {
104		subscribers = append(subscribers, sub)
105	}
106	b.mu.RUnlock()
107
108	event := Event[T]{Type: t, Payload: payload}
109
110	for _, sub := range subscribers {
111		select {
112		case sub <- event:
113		default:
114			// Channel is full, subscriber is slow - skip this event
115			// This prevents blocking the publisher
116		}
117	}
118}