1// Package pubsub provides a lightweight in-process broker for fan-out
2// event delivery between services and the UI.
3//
4// Delivery semantics:
5//
6// - [Broker.Publish] is best-effort and lossy under contention. If a
7// subscriber's channel is full, the event is dropped for that
8// subscriber, a warning is logged, and a counter is incremented.
9// This is the right choice for high-frequency intermediate updates
10// (e.g. streaming token deltas) where only the latest state
11// matters.
12//
13// - [Broker.PublishMustDeliver] is bounded-blocking. For each
14// subscriber it first tries a non-blocking send, then falls back to
15// a per-subscriber blocking send with a hard timeout. On timeout the
16// event is dropped for that subscriber, an error is logged, and the
17// must-deliver drop counter is incremented. The publisher never
18// blocks indefinitely. This is the right choice for terminal events
19// (finish, tool result, error, cancel) that must not be silently
20// coalesced away.
21//
22// Drop counters ([Broker.DropCount], [Broker.MustDeliverDropCount]) are
23// exposed so callers can surface saturation in telemetry.
24package pubsub
25
26import (
27 "context"
28 "log/slog"
29 "sync"
30 "sync/atomic"
31 "time"
32)
33
34const (
35 // bufferSize is the per-subscriber channel capacity for any broker
36 // created via NewBroker. Publish is non-blocking, so a full buffer
37 // drops events (with a warning log); sized to cover a long
38 // streaming assistant turn (~one UpdatedEvent per token) even under
39 // TUI render stalls.
40 bufferSize = 4096
41
42 // defaultMustDeliverTimeout is the per-subscriber upper bound on how
43 // long [Broker.PublishMustDeliver] will block waiting for buffer
44 // space before giving up on that subscriber.
45 defaultMustDeliverTimeout = 50 * time.Millisecond
46)
47
48type Broker[T any] struct {
49 subs map[chan Event[T]]struct{}
50 mu sync.RWMutex
51 done chan struct{}
52 subCount int
53 channelBufferSize int
54 mustDeliverTimeout time.Duration
55 dropCount atomic.Uint64
56 mustDeliverDropCount atomic.Uint64
57}
58
59func NewBroker[T any]() *Broker[T] {
60 return NewBrokerWithOptions[T](bufferSize)
61}
62
63func NewBrokerWithOptions[T any](channelBufferSize int) *Broker[T] {
64 return &Broker[T]{
65 subs: make(map[chan Event[T]]struct{}),
66 done: make(chan struct{}),
67 channelBufferSize: channelBufferSize,
68 mustDeliverTimeout: defaultMustDeliverTimeout,
69 }
70}
71
72// SetMustDeliverTimeout overrides the per-subscriber timeout used by
73// [Broker.PublishMustDeliver]. A zero or negative value resets to the
74// default. Intended primarily for tests.
75func (b *Broker[T]) SetMustDeliverTimeout(d time.Duration) {
76 b.mu.Lock()
77 defer b.mu.Unlock()
78 if d <= 0 {
79 b.mustDeliverTimeout = defaultMustDeliverTimeout
80 return
81 }
82 b.mustDeliverTimeout = d
83}
84
85func (b *Broker[T]) Shutdown() {
86 select {
87 case <-b.done: // Already closed
88 return
89 default:
90 close(b.done)
91 }
92
93 b.mu.Lock()
94 defer b.mu.Unlock()
95
96 for ch := range b.subs {
97 delete(b.subs, ch)
98 close(ch)
99 }
100
101 b.subCount = 0
102}
103
104func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
105 b.mu.Lock()
106 defer b.mu.Unlock()
107
108 select {
109 case <-b.done:
110 ch := make(chan Event[T])
111 close(ch)
112 return ch
113 default:
114 }
115
116 sub := make(chan Event[T], b.channelBufferSize)
117 b.subs[sub] = struct{}{}
118 b.subCount++
119
120 go func() {
121 <-ctx.Done()
122
123 b.mu.Lock()
124 defer b.mu.Unlock()
125
126 select {
127 case <-b.done:
128 return
129 default:
130 }
131
132 delete(b.subs, sub)
133 close(sub)
134 b.subCount--
135 }()
136
137 return sub
138}
139
140func (b *Broker[T]) GetSubscriberCount() int {
141 b.mu.RLock()
142 defer b.mu.RUnlock()
143 return b.subCount
144}
145
146// DropCount returns the cumulative number of events dropped by
147// [Broker.Publish] because a subscriber's channel was full.
148func (b *Broker[T]) DropCount() uint64 {
149 return b.dropCount.Load()
150}
151
152// MustDeliverDropCount returns the cumulative number of events dropped
153// by [Broker.PublishMustDeliver] after the per-subscriber timeout
154// expired.
155func (b *Broker[T]) MustDeliverDropCount() uint64 {
156 return b.mustDeliverDropCount.Load()
157}
158
159// Publish delivers an event to every active subscriber.
160//
161// Delivery is non-blocking and lossy: if a subscriber's channel is full
162// the event is dropped for that subscriber, a warning is logged, and
163// [Broker.DropCount] is incremented. Use [Broker.PublishMustDeliver]
164// for events that must not be silently dropped.
165func (b *Broker[T]) Publish(t EventType, payload T) {
166 b.mu.RLock()
167 defer b.mu.RUnlock()
168
169 select {
170 case <-b.done:
171 return
172 default:
173 }
174
175 event := Event[T]{Type: t, Payload: payload}
176
177 for sub := range b.subs {
178 select {
179 case sub <- event:
180 default:
181 // Channel is full, subscriber is slow — skip this event.
182 // Lossy by design; counted and logged so saturation is
183 // observable.
184 b.dropCount.Add(1)
185 slog.Warn("Pubsub buffer full; dropping event", "type", t)
186 }
187 }
188}
189
190// PublishMustDeliver delivers an event with bounded-blocking semantics.
191// For each subscriber it first attempts a non-blocking send, then falls
192// back to a blocking send bounded by a per-subscriber timeout (default
193// [defaultMustDeliverTimeout]). On timeout the event is dropped for
194// that subscriber, [Broker.MustDeliverDropCount] is incremented, and an
195// error is logged. The publisher never blocks indefinitely.
196//
197// Use this for terminal events that must reach subscribers (finish,
198// tool result, error, cancel). Callers must still tolerate rare drops
199// after timeout — recovery is the subscriber's responsibility (e.g. a
200// re-fetch on the next session-visible event).
201func (b *Broker[T]) PublishMustDeliver(ctx context.Context, t EventType, payload T) {
202 b.mu.RLock()
203 defer b.mu.RUnlock()
204
205 select {
206 case <-b.done:
207 return
208 default:
209 }
210
211 event := Event[T]{Type: t, Payload: payload}
212 timeout := b.mustDeliverTimeout
213
214 for sub := range b.subs {
215 // Fast path: non-blocking send.
216 select {
217 case sub <- event:
218 continue
219 default:
220 }
221
222 // Slow path: bounded blocking send.
223 timer := time.NewTimer(timeout)
224 select {
225 case sub <- event:
226 timer.Stop()
227 case <-timer.C:
228 b.mustDeliverDropCount.Add(1)
229 slog.Error("PublishMustDeliver timed out delivering event",
230 "type", t, "timeout", timeout)
231 case <-ctx.Done():
232 timer.Stop()
233 return
234 }
235 }
236}