broker.go

  1package pubsub
  2
  3import (
  4	"context"
  5	"sync"
  6)
  7
  8const bufferSize = 64
  9
 10type Broker[T any] struct {
 11	subs      map[chan Event[T]]struct{}
 12	mu        sync.RWMutex
 13	done      chan struct{}
 14	subCount  int
 15	maxEvents int
 16}
 17
 18func NewBroker[T any]() *Broker[T] {
 19	return NewBrokerWithOptions[T](bufferSize, 1000)
 20}
 21
 22func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
 23	return &Broker[T]{
 24		subs:      make(map[chan Event[T]]struct{}),
 25		done:      make(chan struct{}),
 26		maxEvents: maxEvents,
 27	}
 28}
 29
 30func (b *Broker[T]) Shutdown() {
 31	select {
 32	case <-b.done: // Already closed
 33		return
 34	default:
 35		close(b.done)
 36	}
 37
 38	b.mu.Lock()
 39	defer b.mu.Unlock()
 40
 41	for ch := range b.subs {
 42		delete(b.subs, ch)
 43		close(ch)
 44	}
 45
 46	b.subCount = 0
 47}
 48
 49func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
 50	b.mu.Lock()
 51	defer b.mu.Unlock()
 52
 53	select {
 54	case <-b.done:
 55		ch := make(chan Event[T])
 56		close(ch)
 57		return ch
 58	default:
 59	}
 60
 61	sub := make(chan Event[T], bufferSize)
 62	b.subs[sub] = struct{}{}
 63	b.subCount++
 64
 65	go func() {
 66		<-ctx.Done()
 67
 68		b.mu.Lock()
 69		defer b.mu.Unlock()
 70
 71		select {
 72		case <-b.done:
 73			return
 74		default:
 75		}
 76
 77		delete(b.subs, sub)
 78		close(sub)
 79		b.subCount--
 80	}()
 81
 82	return sub
 83}
 84
 85func (b *Broker[T]) GetSubscriberCount() int {
 86	b.mu.RLock()
 87	defer b.mu.RUnlock()
 88	return b.subCount
 89}
 90
 91func (b *Broker[T]) Publish(t EventType, payload T) {
 92	b.mu.RLock()
 93	defer b.mu.RUnlock()
 94
 95	select {
 96	case <-b.done:
 97		return
 98	default:
 99	}
100
101	event := Event[T]{Type: t, Payload: payload}
102
103	for sub := range b.subs {
104		select {
105		case sub <- event:
106		default:
107			// Channel is full, subscriber is slow - skip this event
108			// This prevents blocking the publisher
109		}
110	}
111}