broker.go

  1package pubsub
  2
  3import (
  4	"context"
  5	"sync"
  6)
  7
  8// bufferSize is the per-subscriber channel capacity for any broker
  9// created via NewBroker.  Publish is non-blocking, so a full buffer
 10// silently drops events; sized to cover a long streaming assistant
 11// turn (~one UpdatedEvent per token) even under TUI render stalls.
 12const bufferSize = 4096
 13
 14type Broker[T any] struct {
 15	subs              map[chan Event[T]]struct{}
 16	mu                sync.RWMutex
 17	done              chan struct{}
 18	subCount          int
 19	channelBufferSize int
 20}
 21
 22func NewBroker[T any]() *Broker[T] {
 23	return NewBrokerWithOptions[T](bufferSize)
 24}
 25
 26func NewBrokerWithOptions[T any](channelBufferSize int) *Broker[T] {
 27	return &Broker[T]{
 28		subs:              make(map[chan Event[T]]struct{}),
 29		done:              make(chan struct{}),
 30		channelBufferSize: channelBufferSize,
 31	}
 32}
 33
 34func (b *Broker[T]) Shutdown() {
 35	select {
 36	case <-b.done: // Already closed
 37		return
 38	default:
 39		close(b.done)
 40	}
 41
 42	b.mu.Lock()
 43	defer b.mu.Unlock()
 44
 45	for ch := range b.subs {
 46		delete(b.subs, ch)
 47		close(ch)
 48	}
 49
 50	b.subCount = 0
 51}
 52
 53func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
 54	b.mu.Lock()
 55	defer b.mu.Unlock()
 56
 57	select {
 58	case <-b.done:
 59		ch := make(chan Event[T])
 60		close(ch)
 61		return ch
 62	default:
 63	}
 64
 65	sub := make(chan Event[T], b.channelBufferSize)
 66	b.subs[sub] = struct{}{}
 67	b.subCount++
 68
 69	go func() {
 70		<-ctx.Done()
 71
 72		b.mu.Lock()
 73		defer b.mu.Unlock()
 74
 75		select {
 76		case <-b.done:
 77			return
 78		default:
 79		}
 80
 81		delete(b.subs, sub)
 82		close(sub)
 83		b.subCount--
 84	}()
 85
 86	return sub
 87}
 88
 89func (b *Broker[T]) GetSubscriberCount() int {
 90	b.mu.RLock()
 91	defer b.mu.RUnlock()
 92	return b.subCount
 93}
 94
 95func (b *Broker[T]) Publish(t EventType, payload T) {
 96	b.mu.RLock()
 97	defer b.mu.RUnlock()
 98
 99	select {
100	case <-b.done:
101		return
102	default:
103	}
104
105	event := Event[T]{Type: t, Payload: payload}
106
107	for sub := range b.subs {
108		select {
109		case sub <- event:
110		default:
111			// Channel is full, subscriber is slow - skip this event
112			// This prevents blocking the publisher
113		}
114	}
115}