broker.go

  1// Package pubsub provides a lightweight in-process broker for fan-out
  2// event delivery between services and the UI.
  3//
  4// Delivery semantics:
  5//
  6//   - [Broker.Publish] is best-effort and lossy under contention. If a
  7//     subscriber's channel is full, the event is dropped for that
  8//     subscriber and a counter is incremented. This is the right choice
  9//     for high-frequency intermediate updates (e.g. streaming token
 10//     deltas) where only the latest state matters.
 11//
 12//   - [Broker.PublishMustDeliver] is bounded-blocking. For each
 13//     subscriber it first tries a non-blocking send, then falls back to
 14//     a per-subscriber blocking send with a hard timeout. On timeout the
 15//     event is dropped for that subscriber, an error is logged, and the
 16//     must-deliver drop counter is incremented. The publisher never
 17//     blocks indefinitely. This is the right choice for terminal events
 18//     (finish, tool result, error, cancel) that must not be silently
 19//     coalesced away.
 20//
 21// Drop counters ([Broker.DropCount], [Broker.MustDeliverDropCount]) are
 22// exposed so callers can surface saturation in telemetry.
 23package pubsub
 24
 25import (
 26	"context"
 27	"log/slog"
 28	"sync"
 29	"sync/atomic"
 30	"time"
 31)
 32
 33const (
 34	bufferSize = 64
 35
 36	// defaultMustDeliverTimeout is the per-subscriber upper bound on how
 37	// long [Broker.PublishMustDeliver] will block waiting for buffer
 38	// space before giving up on that subscriber.
 39	defaultMustDeliverTimeout = 50 * time.Millisecond
 40)
 41
 42type Broker[T any] struct {
 43	subs                 map[chan Event[T]]struct{}
 44	mu                   sync.RWMutex
 45	done                 chan struct{}
 46	subCount             int
 47	maxEvents            int
 48	channelBufferSize    int
 49	mustDeliverTimeout   time.Duration
 50	dropCount            atomic.Uint64
 51	mustDeliverDropCount atomic.Uint64
 52}
 53
 54func NewBroker[T any]() *Broker[T] {
 55	return NewBrokerWithOptions[T](bufferSize, 1000)
 56}
 57
 58func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
 59	return &Broker[T]{
 60		subs:               make(map[chan Event[T]]struct{}),
 61		done:               make(chan struct{}),
 62		maxEvents:          maxEvents,
 63		channelBufferSize:  channelBufferSize,
 64		mustDeliverTimeout: defaultMustDeliverTimeout,
 65	}
 66}
 67
 68// SetMustDeliverTimeout overrides the per-subscriber timeout used by
 69// [Broker.PublishMustDeliver]. A zero or negative value resets to the
 70// default. Intended primarily for tests.
 71func (b *Broker[T]) SetMustDeliverTimeout(d time.Duration) {
 72	b.mu.Lock()
 73	defer b.mu.Unlock()
 74	if d <= 0 {
 75		b.mustDeliverTimeout = defaultMustDeliverTimeout
 76		return
 77	}
 78	b.mustDeliverTimeout = d
 79}
 80
 81func (b *Broker[T]) Shutdown() {
 82	select {
 83	case <-b.done: // Already closed
 84		return
 85	default:
 86		close(b.done)
 87	}
 88
 89	b.mu.Lock()
 90	defer b.mu.Unlock()
 91
 92	for ch := range b.subs {
 93		delete(b.subs, ch)
 94		close(ch)
 95	}
 96
 97	b.subCount = 0
 98}
 99
100func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
101	b.mu.Lock()
102	defer b.mu.Unlock()
103
104	select {
105	case <-b.done:
106		ch := make(chan Event[T])
107		close(ch)
108		return ch
109	default:
110	}
111
112	sub := make(chan Event[T], b.channelBufferSize)
113	b.subs[sub] = struct{}{}
114	b.subCount++
115
116	go func() {
117		<-ctx.Done()
118
119		b.mu.Lock()
120		defer b.mu.Unlock()
121
122		select {
123		case <-b.done:
124			return
125		default:
126		}
127
128		delete(b.subs, sub)
129		close(sub)
130		b.subCount--
131	}()
132
133	return sub
134}
135
136func (b *Broker[T]) GetSubscriberCount() int {
137	b.mu.RLock()
138	defer b.mu.RUnlock()
139	return b.subCount
140}
141
142// DropCount returns the cumulative number of events dropped by
143// [Broker.Publish] because a subscriber's channel was full.
144func (b *Broker[T]) DropCount() uint64 {
145	return b.dropCount.Load()
146}
147
148// MustDeliverDropCount returns the cumulative number of events dropped
149// by [Broker.PublishMustDeliver] after the per-subscriber timeout
150// expired.
151func (b *Broker[T]) MustDeliverDropCount() uint64 {
152	return b.mustDeliverDropCount.Load()
153}
154
155// Publish delivers an event to every active subscriber.
156//
157// Delivery is non-blocking and lossy: if a subscriber's channel is full
158// the event is dropped for that subscriber and [Broker.DropCount] is
159// incremented. Use [Broker.PublishMustDeliver] for events that must not
160// be silently dropped.
161func (b *Broker[T]) Publish(t EventType, payload T) {
162	b.mu.RLock()
163	defer b.mu.RUnlock()
164
165	select {
166	case <-b.done:
167		return
168	default:
169	}
170
171	event := Event[T]{Type: t, Payload: payload}
172
173	for sub := range b.subs {
174		select {
175		case sub <- event:
176		default:
177			// Channel is full, subscriber is slow - skip this event.
178			// Lossy by design; counted so saturation is observable.
179			b.dropCount.Add(1)
180		}
181	}
182}
183
184// PublishMustDeliver delivers an event with bounded-blocking semantics.
185// For each subscriber it first attempts a non-blocking send, then falls
186// back to a blocking send bounded by a per-subscriber timeout (default
187// [defaultMustDeliverTimeout]). On timeout the event is dropped for
188// that subscriber, [Broker.MustDeliverDropCount] is incremented, and an
189// error is logged. The publisher never blocks indefinitely.
190//
191// Use this for terminal events that must reach subscribers (finish,
192// tool result, error, cancel). Callers must still tolerate rare drops
193// after timeout — recovery is the subscriber's responsibility (e.g. a
194// re-fetch on the next session-visible event).
195func (b *Broker[T]) PublishMustDeliver(ctx context.Context, t EventType, payload T) {
196	b.mu.RLock()
197	defer b.mu.RUnlock()
198
199	select {
200	case <-b.done:
201		return
202	default:
203	}
204
205	event := Event[T]{Type: t, Payload: payload}
206	timeout := b.mustDeliverTimeout
207
208	for sub := range b.subs {
209		// Fast path: non-blocking send.
210		select {
211		case sub <- event:
212			continue
213		default:
214		}
215
216		// Slow path: bounded blocking send.
217		timer := time.NewTimer(timeout)
218		select {
219		case sub <- event:
220			timer.Stop()
221		case <-timer.C:
222			b.mustDeliverDropCount.Add(1)
223			slog.Error("PublishMustDeliver timed out delivering event",
224				"type", t, "timeout", timeout)
225		case <-ctx.Done():
226			timer.Stop()
227			return
228		}
229	}
230}