broker.go

  1package pubsub
  2
  3import (
  4	"context"
  5	"log/slog"
  6	"sync"
  7)
  8
  9// bufferSize is the per-subscriber channel capacity for any broker
 10// created via NewBroker.  Publish is non-blocking, so a full buffer
 11// drops events (with a warning log); sized to cover a long streaming
 12// assistant turn (~one UpdatedEvent per token) even under TUI render
 13// stalls.
 14const bufferSize = 4096
 15
 16type Broker[T any] struct {
 17	subs              map[chan Event[T]]struct{}
 18	mu                sync.RWMutex
 19	done              chan struct{}
 20	subCount          int
 21	channelBufferSize int
 22}
 23
 24func NewBroker[T any]() *Broker[T] {
 25	return NewBrokerWithOptions[T](bufferSize)
 26}
 27
 28func NewBrokerWithOptions[T any](channelBufferSize int) *Broker[T] {
 29	return &Broker[T]{
 30		subs:              make(map[chan Event[T]]struct{}),
 31		done:              make(chan struct{}),
 32		channelBufferSize: channelBufferSize,
 33	}
 34}
 35
 36func (b *Broker[T]) Shutdown() {
 37	select {
 38	case <-b.done: // Already closed
 39		return
 40	default:
 41		close(b.done)
 42	}
 43
 44	b.mu.Lock()
 45	defer b.mu.Unlock()
 46
 47	for ch := range b.subs {
 48		delete(b.subs, ch)
 49		close(ch)
 50	}
 51
 52	b.subCount = 0
 53}
 54
 55func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
 56	b.mu.Lock()
 57	defer b.mu.Unlock()
 58
 59	select {
 60	case <-b.done:
 61		ch := make(chan Event[T])
 62		close(ch)
 63		return ch
 64	default:
 65	}
 66
 67	sub := make(chan Event[T], b.channelBufferSize)
 68	b.subs[sub] = struct{}{}
 69	b.subCount++
 70
 71	go func() {
 72		<-ctx.Done()
 73
 74		b.mu.Lock()
 75		defer b.mu.Unlock()
 76
 77		select {
 78		case <-b.done:
 79			return
 80		default:
 81		}
 82
 83		delete(b.subs, sub)
 84		close(sub)
 85		b.subCount--
 86	}()
 87
 88	return sub
 89}
 90
 91func (b *Broker[T]) GetSubscriberCount() int {
 92	b.mu.RLock()
 93	defer b.mu.RUnlock()
 94	return b.subCount
 95}
 96
 97func (b *Broker[T]) Publish(t EventType, payload T) {
 98	b.mu.RLock()
 99	defer b.mu.RUnlock()
100
101	select {
102	case <-b.done:
103		return
104	default:
105	}
106
107	event := Event[T]{Type: t, Payload: payload}
108
109	for sub := range b.subs {
110		select {
111		case sub <- event:
112		default:
113			// Channel is full, subscriber is slow — skip this event.
114			// This prevents blocking the publisher.
115			slog.Warn("Pubsub buffer full; dropping event", "type", t)
116		}
117	}
118}