1// Package pubsub provides a lightweight in-process broker for fan-out
2// event delivery between services and the UI.
3//
4// Delivery semantics:
5//
6// - [Broker.Publish] is best-effort and lossy under contention. If a
7// subscriber's channel is full, the event is dropped for that
8// subscriber and a counter is incremented. This is the right choice
9// for high-frequency intermediate updates (e.g. streaming token
10// deltas) where only the latest state matters.
11//
12// - [Broker.PublishMustDeliver] is bounded-blocking. For each
13// subscriber it first tries a non-blocking send, then falls back to
14// a per-subscriber blocking send with a hard timeout. On timeout the
15// event is dropped for that subscriber, an error is logged, and the
16// must-deliver drop counter is incremented. The publisher never
17// blocks indefinitely. This is the right choice for terminal events
18// (finish, tool result, error, cancel) that must not be silently
19// coalesced away.
20//
21// Drop counters ([Broker.DropCount], [Broker.MustDeliverDropCount]) are
22// exposed so callers can surface saturation in telemetry.
23package pubsub
24
25import (
26 "context"
27 "log/slog"
28 "sync"
29 "sync/atomic"
30 "time"
31)
32
33const (
34 bufferSize = 64
35
36 // defaultMustDeliverTimeout is the per-subscriber upper bound on how
37 // long [Broker.PublishMustDeliver] will block waiting for buffer
38 // space before giving up on that subscriber.
39 defaultMustDeliverTimeout = 50 * time.Millisecond
40)
41
42type Broker[T any] struct {
43 subs map[chan Event[T]]struct{}
44 mu sync.RWMutex
45 done chan struct{}
46 subCount int
47 maxEvents int
48 channelBufferSize int
49 mustDeliverTimeout time.Duration
50 dropCount atomic.Uint64
51 mustDeliverDropCount atomic.Uint64
52}
53
54func NewBroker[T any]() *Broker[T] {
55 return NewBrokerWithOptions[T](bufferSize, 1000)
56}
57
58func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
59 return &Broker[T]{
60 subs: make(map[chan Event[T]]struct{}),
61 done: make(chan struct{}),
62 maxEvents: maxEvents,
63 channelBufferSize: channelBufferSize,
64 mustDeliverTimeout: defaultMustDeliverTimeout,
65 }
66}
67
68// SetMustDeliverTimeout overrides the per-subscriber timeout used by
69// [Broker.PublishMustDeliver]. A zero or negative value resets to the
70// default. Intended primarily for tests.
71func (b *Broker[T]) SetMustDeliverTimeout(d time.Duration) {
72 b.mu.Lock()
73 defer b.mu.Unlock()
74 if d <= 0 {
75 b.mustDeliverTimeout = defaultMustDeliverTimeout
76 return
77 }
78 b.mustDeliverTimeout = d
79}
80
81func (b *Broker[T]) Shutdown() {
82 select {
83 case <-b.done: // Already closed
84 return
85 default:
86 close(b.done)
87 }
88
89 b.mu.Lock()
90 defer b.mu.Unlock()
91
92 for ch := range b.subs {
93 delete(b.subs, ch)
94 close(ch)
95 }
96
97 b.subCount = 0
98}
99
100func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
101 b.mu.Lock()
102 defer b.mu.Unlock()
103
104 select {
105 case <-b.done:
106 ch := make(chan Event[T])
107 close(ch)
108 return ch
109 default:
110 }
111
112 sub := make(chan Event[T], b.channelBufferSize)
113 b.subs[sub] = struct{}{}
114 b.subCount++
115
116 go func() {
117 <-ctx.Done()
118
119 b.mu.Lock()
120 defer b.mu.Unlock()
121
122 select {
123 case <-b.done:
124 return
125 default:
126 }
127
128 delete(b.subs, sub)
129 close(sub)
130 b.subCount--
131 }()
132
133 return sub
134}
135
136func (b *Broker[T]) GetSubscriberCount() int {
137 b.mu.RLock()
138 defer b.mu.RUnlock()
139 return b.subCount
140}
141
142// DropCount returns the cumulative number of events dropped by
143// [Broker.Publish] because a subscriber's channel was full.
144func (b *Broker[T]) DropCount() uint64 {
145 return b.dropCount.Load()
146}
147
148// MustDeliverDropCount returns the cumulative number of events dropped
149// by [Broker.PublishMustDeliver] after the per-subscriber timeout
150// expired.
151func (b *Broker[T]) MustDeliverDropCount() uint64 {
152 return b.mustDeliverDropCount.Load()
153}
154
155// Publish delivers an event to every active subscriber.
156//
157// Delivery is non-blocking and lossy: if a subscriber's channel is full
158// the event is dropped for that subscriber and [Broker.DropCount] is
159// incremented. Use [Broker.PublishMustDeliver] for events that must not
160// be silently dropped.
161func (b *Broker[T]) Publish(t EventType, payload T) {
162 b.mu.RLock()
163 defer b.mu.RUnlock()
164
165 select {
166 case <-b.done:
167 return
168 default:
169 }
170
171 event := Event[T]{Type: t, Payload: payload}
172
173 for sub := range b.subs {
174 select {
175 case sub <- event:
176 default:
177 // Channel is full, subscriber is slow - skip this event.
178 // Lossy by design; counted so saturation is observable.
179 b.dropCount.Add(1)
180 }
181 }
182}
183
184// PublishMustDeliver delivers an event with bounded-blocking semantics.
185// For each subscriber it first attempts a non-blocking send, then falls
186// back to a blocking send bounded by a per-subscriber timeout (default
187// [defaultMustDeliverTimeout]). On timeout the event is dropped for
188// that subscriber, [Broker.MustDeliverDropCount] is incremented, and an
189// error is logged. The publisher never blocks indefinitely.
190//
191// Use this for terminal events that must reach subscribers (finish,
192// tool result, error, cancel). Callers must still tolerate rare drops
193// after timeout — recovery is the subscriber's responsibility (e.g. a
194// re-fetch on the next session-visible event).
195func (b *Broker[T]) PublishMustDeliver(ctx context.Context, t EventType, payload T) {
196 b.mu.RLock()
197 defer b.mu.RUnlock()
198
199 select {
200 case <-b.done:
201 return
202 default:
203 }
204
205 event := Event[T]{Type: t, Payload: payload}
206 timeout := b.mustDeliverTimeout
207
208 for sub := range b.subs {
209 // Fast path: non-blocking send.
210 select {
211 case sub <- event:
212 continue
213 default:
214 }
215
216 // Slow path: bounded blocking send.
217 timer := time.NewTimer(timeout)
218 select {
219 case sub <- event:
220 timer.Stop()
221 case <-timer.C:
222 b.mustDeliverDropCount.Add(1)
223 slog.Error("PublishMustDeliver timed out delivering event",
224 "type", t, "timeout", timeout)
225 case <-ctx.Done():
226 timer.Stop()
227 return
228 }
229 }
230}