1package pubsub
2
3import (
4 "context"
5 "sync"
6)
7
8const bufferSize = 64
9
10type Broker[T any] struct {
11 subs map[chan Event[T]]struct{}
12 mu sync.RWMutex
13 done chan struct{}
14 subCount int
15 maxEvents int
16}
17
18func NewBroker[T any]() *Broker[T] {
19 return NewBrokerWithOptions[T](bufferSize, 1000)
20}
21
22func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
23 b := &Broker[T]{
24 subs: make(map[chan Event[T]]struct{}),
25 done: make(chan struct{}),
26 subCount: 0,
27 maxEvents: maxEvents,
28 }
29 return b
30}
31
32func (b *Broker[T]) Shutdown() {
33 select {
34 case <-b.done: // Already closed
35 return
36 default:
37 close(b.done)
38 }
39
40 b.mu.Lock()
41 defer b.mu.Unlock()
42
43 for ch := range b.subs {
44 delete(b.subs, ch)
45 close(ch)
46 }
47
48 b.subCount = 0
49}
50
51func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
52 b.mu.Lock()
53 defer b.mu.Unlock()
54
55 select {
56 case <-b.done:
57 ch := make(chan Event[T])
58 close(ch)
59 return ch
60 default:
61 }
62
63 sub := make(chan Event[T], bufferSize)
64 b.subs[sub] = struct{}{}
65 b.subCount++
66
67 go func() {
68 <-ctx.Done()
69
70 b.mu.Lock()
71 defer b.mu.Unlock()
72
73 select {
74 case <-b.done:
75 return
76 default:
77 }
78
79 delete(b.subs, sub)
80 close(sub)
81 b.subCount--
82 }()
83
84 return sub
85}
86
87func (b *Broker[T]) GetSubscriberCount() int {
88 b.mu.RLock()
89 defer b.mu.RUnlock()
90 return b.subCount
91}
92
93func (b *Broker[T]) Publish(t EventType, payload T) {
94 b.mu.RLock()
95 select {
96 case <-b.done:
97 b.mu.RUnlock()
98 return
99 default:
100 }
101
102 subscribers := make([]chan Event[T], 0, len(b.subs))
103 for sub := range b.subs {
104 subscribers = append(subscribers, sub)
105 }
106 b.mu.RUnlock()
107
108 event := Event[T]{Type: t, Payload: payload}
109
110 for _, sub := range subscribers {
111 select {
112 case sub <- event:
113 default:
114 // Channel is full, subscriber is slow - skip this event
115 // This prevents blocking the publisher
116 }
117 }
118}