1package pubsub
2
3import (
4 "context"
5 "sync"
6)
7
8const bufferSize = 64
9
10type Broker[T any] struct {
11 subs map[chan Event[T]]struct{}
12 mu sync.RWMutex
13 done chan struct{}
14 subCount int
15 maxEvents int
16}
17
18func NewBroker[T any]() *Broker[T] {
19 return NewBrokerWithOptions[T](bufferSize, 1000)
20}
21
22func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
23 return &Broker[T]{
24 subs: make(map[chan Event[T]]struct{}),
25 done: make(chan struct{}),
26 maxEvents: maxEvents,
27 }
28}
29
30func (b *Broker[T]) Shutdown() {
31 select {
32 case <-b.done: // Already closed
33 return
34 default:
35 close(b.done)
36 }
37
38 b.mu.Lock()
39 defer b.mu.Unlock()
40
41 for ch := range b.subs {
42 delete(b.subs, ch)
43 close(ch)
44 }
45
46 b.subCount = 0
47}
48
49func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
50 b.mu.Lock()
51 defer b.mu.Unlock()
52
53 select {
54 case <-b.done:
55 ch := make(chan Event[T])
56 close(ch)
57 return ch
58 default:
59 }
60
61 sub := make(chan Event[T], bufferSize)
62 b.subs[sub] = struct{}{}
63 b.subCount++
64
65 go func() {
66 <-ctx.Done()
67
68 b.mu.Lock()
69 defer b.mu.Unlock()
70
71 select {
72 case <-b.done:
73 return
74 default:
75 }
76
77 delete(b.subs, sub)
78 close(sub)
79 b.subCount--
80 }()
81
82 return sub
83}
84
85func (b *Broker[T]) GetSubscriberCount() int {
86 b.mu.RLock()
87 defer b.mu.RUnlock()
88 return b.subCount
89}
90
91func (b *Broker[T]) Publish(t EventType, payload T) {
92 b.mu.RLock()
93 defer b.mu.RUnlock()
94
95 select {
96 case <-b.done:
97 return
98 default:
99 }
100
101 event := Event[T]{Type: t, Payload: payload}
102
103 for sub := range b.subs {
104 select {
105 case sub <- event:
106 default:
107 // Channel is full, subscriber is slow - skip this event
108 // This prevents blocking the publisher
109 }
110 }
111}