1package pubsub
2
3import (
4 "context"
5 "sync"
6)
7
8const bufferSize = 64
9
10type Broker[T any] struct {
11 subs map[chan Event[T]]struct{}
12 mu sync.RWMutex
13 done chan struct{}
14 subCount int
15 maxEvents int
16}
17
18func NewBroker[T any]() *Broker[T] {
19 return NewBrokerWithOptions[T](bufferSize, 1000)
20}
21
22func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] {
23 b := &Broker[T]{
24 subs: make(map[chan Event[T]]struct{}),
25 done: make(chan struct{}),
26 subCount: 0,
27 maxEvents: maxEvents,
28 }
29 return b
30}
31
32func (b *Broker[T]) Shutdown() {
33 select {
34 case <-b.done: // Already closed
35 return
36 default:
37 close(b.done)
38 }
39
40 b.mu.Lock()
41 defer b.mu.Unlock()
42
43 for ch := range b.subs {
44 delete(b.subs, ch)
45 close(ch)
46 }
47
48 b.subCount = 0
49}
50
51func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
52 b.mu.Lock()
53 defer b.mu.Unlock()
54
55 select {
56 case <-b.done:
57 ch := make(chan Event[T])
58 close(ch)
59 return ch
60 default:
61 }
62
63 sub := make(chan Event[T], bufferSize)
64 b.subs[sub] = struct{}{}
65 b.subCount++
66
67 go func() {
68 <-ctx.Done()
69
70 b.mu.Lock()
71 defer b.mu.Unlock()
72
73 select {
74 case <-b.done:
75 return
76 default:
77 }
78
79 delete(b.subs, sub)
80 close(sub)
81 b.subCount--
82 }()
83
84 return sub
85}
86
87func (b *Broker[T]) GetSubscriberCount() int {
88 b.mu.RLock()
89 defer b.mu.RUnlock()
90 return b.subCount
91}
92
93func (b *Broker[T]) Publish(t EventType, payload T) {
94 b.mu.RLock()
95 defer b.mu.RUnlock()
96
97 select {
98 case <-b.done:
99 return
100 default:
101 }
102
103 event := Event[T]{Type: t, Payload: payload}
104
105 for sub := range b.subs {
106 select {
107 case sub <- event:
108 default:
109 // Channel is full, subscriber is slow - skip this event
110 // This prevents blocking the publisher
111 }
112 }
113}