1package pubsub
2
3import (
4 "context"
5 "log/slog"
6 "sync"
7)
8
9const bufferSize = 256
10
11type Broker[T any] struct {
12 subs map[chan Event[T]]struct{}
13 mu sync.RWMutex
14 done chan struct{}
15 subCount int
16 maxEvents int
17}
18
19func NewBroker[T any]() *Broker[T] {
20 return NewBrokerWithOptions[T](bufferSize, 1000)
21}
22
23func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
24 b := &Broker[T]{
25 subs: make(map[chan Event[T]]struct{}),
26 done: make(chan struct{}),
27 subCount: 0,
28 maxEvents: maxEvents,
29 }
30 return b
31}
32
33func (b *Broker[T]) Shutdown() {
34 select {
35 case <-b.done: // Already closed
36 return
37 default:
38 close(b.done)
39 }
40
41 b.mu.Lock()
42 defer b.mu.Unlock()
43
44 for ch := range b.subs {
45 delete(b.subs, ch)
46 close(ch)
47 }
48
49 b.subCount = 0
50}
51
52func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
53 b.mu.Lock()
54 defer b.mu.Unlock()
55
56 select {
57 case <-b.done:
58 ch := make(chan Event[T])
59 close(ch)
60 return ch
61 default:
62 }
63
64 sub := make(chan Event[T], bufferSize)
65 b.subs[sub] = struct{}{}
66 b.subCount++
67
68 go func() {
69 <-ctx.Done()
70
71 b.mu.Lock()
72 defer b.mu.Unlock()
73
74 select {
75 case <-b.done:
76 return
77 default:
78 }
79
80 delete(b.subs, sub)
81 close(sub)
82 b.subCount--
83 }()
84
85 return sub
86}
87
88func (b *Broker[T]) GetSubscriberCount() int {
89 b.mu.RLock()
90 defer b.mu.RUnlock()
91 return b.subCount
92}
93
94func (b *Broker[T]) Publish(t EventType, payload T) {
95 b.mu.RLock()
96 select {
97 case <-b.done:
98 b.mu.RUnlock()
99 return
100 default:
101 }
102
103 subscribers := make([]chan Event[T], 0, len(b.subs))
104 for sub := range b.subs {
105 subscribers = append(subscribers, sub)
106 }
107 b.mu.RUnlock()
108
109 event := Event[T]{Type: t, Payload: payload}
110
111 for _, sub := range subscribers {
112 select {
113 case sub <- event:
114 default:
115 // Channel is full, subscriber is slow
116 // Log this for debugging but don't block
117 if b.GetSubscriberCount() > 0 {
118 slog.Debug("Dropping event for slow subscriber", "eventType", t)
119 }
120 }
121 }
122}