1package pubsub
2
3import (
4 "context"
5 "log/slog"
6 "sync"
7)
8
9// bufferSize is the per-subscriber channel capacity for any broker
10// created via NewBroker. Publish is non-blocking, so a full buffer
11// drops events (with a warning log); sized to cover a long streaming
12// assistant turn (~one UpdatedEvent per token) even under TUI render
13// stalls.
14const bufferSize = 4096
15
16type Broker[T any] struct {
17 subs map[chan Event[T]]struct{}
18 mu sync.RWMutex
19 done chan struct{}
20 subCount int
21 channelBufferSize int
22}
23
24func NewBroker[T any]() *Broker[T] {
25 return NewBrokerWithOptions[T](bufferSize)
26}
27
28func NewBrokerWithOptions[T any](channelBufferSize int) *Broker[T] {
29 return &Broker[T]{
30 subs: make(map[chan Event[T]]struct{}),
31 done: make(chan struct{}),
32 channelBufferSize: channelBufferSize,
33 }
34}
35
36func (b *Broker[T]) Shutdown() {
37 select {
38 case <-b.done: // Already closed
39 return
40 default:
41 close(b.done)
42 }
43
44 b.mu.Lock()
45 defer b.mu.Unlock()
46
47 for ch := range b.subs {
48 delete(b.subs, ch)
49 close(ch)
50 }
51
52 b.subCount = 0
53}
54
55func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
56 b.mu.Lock()
57 defer b.mu.Unlock()
58
59 select {
60 case <-b.done:
61 ch := make(chan Event[T])
62 close(ch)
63 return ch
64 default:
65 }
66
67 sub := make(chan Event[T], b.channelBufferSize)
68 b.subs[sub] = struct{}{}
69 b.subCount++
70
71 go func() {
72 <-ctx.Done()
73
74 b.mu.Lock()
75 defer b.mu.Unlock()
76
77 select {
78 case <-b.done:
79 return
80 default:
81 }
82
83 delete(b.subs, sub)
84 close(sub)
85 b.subCount--
86 }()
87
88 return sub
89}
90
91func (b *Broker[T]) GetSubscriberCount() int {
92 b.mu.RLock()
93 defer b.mu.RUnlock()
94 return b.subCount
95}
96
97func (b *Broker[T]) Publish(t EventType, payload T) {
98 b.mu.RLock()
99 defer b.mu.RUnlock()
100
101 select {
102 case <-b.done:
103 return
104 default:
105 }
106
107 event := Event[T]{Type: t, Payload: payload}
108
109 for sub := range b.subs {
110 select {
111 case sub <- event:
112 default:
113 // Channel is full, subscriber is slow — skip this event.
114 // This prevents blocking the publisher.
115 slog.Warn("Pubsub buffer full; dropping event", "type", t)
116 }
117 }
118}