broker.go

  1// Package pubsub provides a lightweight in-process broker for fan-out
  2// event delivery between services and the UI.
  3//
  4// Delivery semantics:
  5//
  6//   - [Broker.Publish] is best-effort and lossy under contention. If a
  7//     subscriber's channel is full, the event is dropped for that
  8//     subscriber, a warning is logged, and a counter is incremented.
  9//     This is the right choice for high-frequency intermediate updates
 10//     (e.g. streaming token deltas) where only the latest state
 11//     matters.
 12//
 13//   - [Broker.PublishMustDeliver] is bounded-blocking. For each
 14//     subscriber it first tries a non-blocking send, then falls back to
 15//     a per-subscriber blocking send with a hard timeout. On timeout the
 16//     event is dropped for that subscriber, an error is logged, and the
 17//     must-deliver drop counter is incremented. The publisher never
 18//     blocks indefinitely. This is the right choice for terminal events
 19//     (finish, tool result, error, cancel) that must not be silently
 20//     coalesced away.
 21//
 22// Drop counters ([Broker.DropCount], [Broker.MustDeliverDropCount]) are
 23// exposed so callers can surface saturation in telemetry.
 24package pubsub
 25
 26import (
 27	"context"
 28	"log/slog"
 29	"sync"
 30	"sync/atomic"
 31	"time"
 32)
 33
 34const (
 35	// bufferSize is the per-subscriber channel capacity for any broker
 36	// created via NewBroker. Publish is non-blocking, so a full buffer
 37	// drops events (with a warning log); sized to cover a long
 38	// streaming assistant turn (~one UpdatedEvent per token) even under
 39	// TUI render stalls.
 40	bufferSize = 4096
 41
 42	// defaultMustDeliverTimeout is the per-subscriber upper bound on how
 43	// long [Broker.PublishMustDeliver] will block waiting for buffer
 44	// space before giving up on that subscriber.
 45	defaultMustDeliverTimeout = 50 * time.Millisecond
 46)
 47
 48type Broker[T any] struct {
 49	subs                 map[chan Event[T]]struct{}
 50	mu                   sync.RWMutex
 51	done                 chan struct{}
 52	subCount             int
 53	channelBufferSize    int
 54	mustDeliverTimeout   time.Duration
 55	dropCount            atomic.Uint64
 56	mustDeliverDropCount atomic.Uint64
 57}
 58
 59func NewBroker[T any]() *Broker[T] {
 60	return NewBrokerWithOptions[T](bufferSize)
 61}
 62
 63func NewBrokerWithOptions[T any](channelBufferSize int) *Broker[T] {
 64	return &Broker[T]{
 65		subs:               make(map[chan Event[T]]struct{}),
 66		done:               make(chan struct{}),
 67		channelBufferSize:  channelBufferSize,
 68		mustDeliverTimeout: defaultMustDeliverTimeout,
 69	}
 70}
 71
 72// SetMustDeliverTimeout overrides the per-subscriber timeout used by
 73// [Broker.PublishMustDeliver]. A zero or negative value resets to the
 74// default. Intended primarily for tests.
 75func (b *Broker[T]) SetMustDeliverTimeout(d time.Duration) {
 76	b.mu.Lock()
 77	defer b.mu.Unlock()
 78	if d <= 0 {
 79		b.mustDeliverTimeout = defaultMustDeliverTimeout
 80		return
 81	}
 82	b.mustDeliverTimeout = d
 83}
 84
 85func (b *Broker[T]) Shutdown() {
 86	select {
 87	case <-b.done: // Already closed
 88		return
 89	default:
 90		close(b.done)
 91	}
 92
 93	b.mu.Lock()
 94	defer b.mu.Unlock()
 95
 96	for ch := range b.subs {
 97		delete(b.subs, ch)
 98		close(ch)
 99	}
100
101	b.subCount = 0
102}
103
104func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
105	b.mu.Lock()
106	defer b.mu.Unlock()
107
108	select {
109	case <-b.done:
110		ch := make(chan Event[T])
111		close(ch)
112		return ch
113	default:
114	}
115
116	sub := make(chan Event[T], b.channelBufferSize)
117	b.subs[sub] = struct{}{}
118	b.subCount++
119
120	go func() {
121		<-ctx.Done()
122
123		b.mu.Lock()
124		defer b.mu.Unlock()
125
126		select {
127		case <-b.done:
128			return
129		default:
130		}
131
132		delete(b.subs, sub)
133		close(sub)
134		b.subCount--
135	}()
136
137	return sub
138}
139
140func (b *Broker[T]) GetSubscriberCount() int {
141	b.mu.RLock()
142	defer b.mu.RUnlock()
143	return b.subCount
144}
145
146// DropCount returns the cumulative number of events dropped by
147// [Broker.Publish] because a subscriber's channel was full.
148func (b *Broker[T]) DropCount() uint64 {
149	return b.dropCount.Load()
150}
151
152// MustDeliverDropCount returns the cumulative number of events dropped
153// by [Broker.PublishMustDeliver] after the per-subscriber timeout
154// expired.
155func (b *Broker[T]) MustDeliverDropCount() uint64 {
156	return b.mustDeliverDropCount.Load()
157}
158
159// Publish delivers an event to every active subscriber.
160//
161// Delivery is non-blocking and lossy: if a subscriber's channel is full
162// the event is dropped for that subscriber, a warning is logged, and
163// [Broker.DropCount] is incremented. Use [Broker.PublishMustDeliver]
164// for events that must not be silently dropped.
165func (b *Broker[T]) Publish(t EventType, payload T) {
166	b.mu.RLock()
167	defer b.mu.RUnlock()
168
169	select {
170	case <-b.done:
171		return
172	default:
173	}
174
175	event := Event[T]{Type: t, Payload: payload}
176
177	for sub := range b.subs {
178		select {
179		case sub <- event:
180		default:
181			// Channel is full, subscriber is slow — skip this event.
182			// Lossy by design; counted and logged so saturation is
183			// observable.
184			b.dropCount.Add(1)
185			slog.Warn("Pubsub buffer full; dropping event", "type", t)
186		}
187	}
188}
189
190// PublishMustDeliver delivers an event with bounded-blocking semantics.
191// For each subscriber it first attempts a non-blocking send, then falls
192// back to a blocking send bounded by a per-subscriber timeout (default
193// [defaultMustDeliverTimeout]). On timeout the event is dropped for
194// that subscriber, [Broker.MustDeliverDropCount] is incremented, and an
195// error is logged. The publisher never blocks indefinitely.
196//
197// Use this for terminal events that must reach subscribers (finish,
198// tool result, error, cancel). Callers must still tolerate rare drops
199// after timeout — recovery is the subscriber's responsibility (e.g. a
200// re-fetch on the next session-visible event).
201func (b *Broker[T]) PublishMustDeliver(ctx context.Context, t EventType, payload T) {
202	b.mu.RLock()
203	defer b.mu.RUnlock()
204
205	select {
206	case <-b.done:
207		return
208	default:
209	}
210
211	event := Event[T]{Type: t, Payload: payload}
212	timeout := b.mustDeliverTimeout
213
214	for sub := range b.subs {
215		// Fast path: non-blocking send.
216		select {
217		case sub <- event:
218			continue
219		default:
220		}
221
222		// Slow path: bounded blocking send.
223		timer := time.NewTimer(timeout)
224		select {
225		case sub <- event:
226			timer.Stop()
227		case <-timer.C:
228			b.mustDeliverDropCount.Add(1)
229			slog.Error("PublishMustDeliver timed out delivering event",
230				"type", t, "timeout", timeout)
231		case <-ctx.Done():
232			timer.Stop()
233			return
234		}
235	}
236}