1package pubsub
2
3import (
4 "context"
5 "sync"
6)
7
8// bufferSize is the per-subscriber channel capacity for any broker
9// created via NewBroker. Publish is non-blocking, so a full buffer
10// silently drops events; sized to cover a long streaming assistant
11// turn (~one UpdatedEvent per token) even under TUI render stalls.
12const bufferSize = 4096
13
14type Broker[T any] struct {
15 subs map[chan Event[T]]struct{}
16 mu sync.RWMutex
17 done chan struct{}
18 subCount int
19 channelBufferSize int
20}
21
22func NewBroker[T any]() *Broker[T] {
23 return NewBrokerWithOptions[T](bufferSize)
24}
25
26func NewBrokerWithOptions[T any](channelBufferSize int) *Broker[T] {
27 return &Broker[T]{
28 subs: make(map[chan Event[T]]struct{}),
29 done: make(chan struct{}),
30 channelBufferSize: channelBufferSize,
31 }
32}
33
34func (b *Broker[T]) Shutdown() {
35 select {
36 case <-b.done: // Already closed
37 return
38 default:
39 close(b.done)
40 }
41
42 b.mu.Lock()
43 defer b.mu.Unlock()
44
45 for ch := range b.subs {
46 delete(b.subs, ch)
47 close(ch)
48 }
49
50 b.subCount = 0
51}
52
53func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
54 b.mu.Lock()
55 defer b.mu.Unlock()
56
57 select {
58 case <-b.done:
59 ch := make(chan Event[T])
60 close(ch)
61 return ch
62 default:
63 }
64
65 sub := make(chan Event[T], b.channelBufferSize)
66 b.subs[sub] = struct{}{}
67 b.subCount++
68
69 go func() {
70 <-ctx.Done()
71
72 b.mu.Lock()
73 defer b.mu.Unlock()
74
75 select {
76 case <-b.done:
77 return
78 default:
79 }
80
81 delete(b.subs, sub)
82 close(sub)
83 b.subCount--
84 }()
85
86 return sub
87}
88
89func (b *Broker[T]) GetSubscriberCount() int {
90 b.mu.RLock()
91 defer b.mu.RUnlock()
92 return b.subCount
93}
94
95func (b *Broker[T]) Publish(t EventType, payload T) {
96 b.mu.RLock()
97 defer b.mu.RUnlock()
98
99 select {
100 case <-b.done:
101 return
102 default:
103 }
104
105 event := Event[T]{Type: t, Payload: payload}
106
107 for sub := range b.subs {
108 select {
109 case sub <- event:
110 default:
111 // Channel is full, subscriber is slow - skip this event
112 // This prevents blocking the publisher
113 }
114 }
115}