1package pubsub
  2
  3import (
  4	"context"
  5	"log/slog"
  6	"sync"
  7)
  8
  9const bufferSize = 64
 10
 11type Broker[T any] struct {
 12	subs      map[chan Event[T]]struct{}
 13	mu        sync.RWMutex
 14	done      chan struct{}
 15	subCount  int
 16	maxEvents int
 17}
 18
 19func NewBroker[T any]() *Broker[T] {
 20	return NewBrokerWithOptions[T](bufferSize, 1000)
 21}
 22
 23func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
 24	b := &Broker[T]{
 25		subs:      make(map[chan Event[T]]struct{}),
 26		done:      make(chan struct{}),
 27		subCount:  0,
 28		maxEvents: maxEvents,
 29	}
 30	return b
 31}
 32
 33func (b *Broker[T]) Shutdown() {
 34	select {
 35	case <-b.done: // Already closed
 36		return
 37	default:
 38		close(b.done)
 39	}
 40
 41	b.mu.Lock()
 42	defer b.mu.Unlock()
 43
 44	for ch := range b.subs {
 45		delete(b.subs, ch)
 46		close(ch)
 47	}
 48
 49	b.subCount = 0
 50}
 51
 52func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
 53	b.mu.Lock()
 54	defer b.mu.Unlock()
 55
 56	select {
 57	case <-b.done:
 58		ch := make(chan Event[T])
 59		close(ch)
 60		return ch
 61	default:
 62	}
 63
 64	sub := make(chan Event[T], bufferSize)
 65	b.subs[sub] = struct{}{}
 66	b.subCount++
 67
 68	go func() {
 69		<-ctx.Done()
 70
 71		b.mu.Lock()
 72		defer b.mu.Unlock()
 73
 74		select {
 75		case <-b.done:
 76			return
 77		default:
 78		}
 79
 80		delete(b.subs, sub)
 81		close(sub)
 82		b.subCount--
 83	}()
 84
 85	return sub
 86}
 87
 88func (b *Broker[T]) GetSubscriberCount() int {
 89	b.mu.RLock()
 90	defer b.mu.RUnlock()
 91	return b.subCount
 92}
 93
 94func (b *Broker[T]) Publish(t EventType, payload T) {
 95	b.mu.RLock()
 96	select {
 97	case <-b.done:
 98		b.mu.RUnlock()
 99		return
100	default:
101	}
102
103	subscribers := make([]chan Event[T], 0, len(b.subs))
104	for sub := range b.subs {
105		subscribers = append(subscribers, sub)
106	}
107	b.mu.RUnlock()
108
109	event := Event[T]{Type: t, Payload: payload}
110
111	for _, sub := range subscribers {
112		select {
113		case sub <- event:
114		default:
115			// Channel is full, subscriber is slow - skip this event
116			// This prevents blocking the publisher
117			slog.Warn("Skipping event for slow subscriber", "event_type", t)
118		}
119	}
120}