1package pubsub
2
3import (
4 "context"
5 "sync"
6)
7
8const bufferSize = 1024
9
10// Broker allows clients to publish events and subscribe to events
11type Broker[T any] struct {
12 subs map[chan Event[T]]struct{} // subscriptions
13 mu sync.Mutex // sync access to map
14 done chan struct{} // close when broker is shutting down
15}
16
17// NewBroker constructs a pub/sub broker.
18func NewBroker[T any]() *Broker[T] {
19 b := &Broker[T]{
20 subs: make(map[chan Event[T]]struct{}),
21 done: make(chan struct{}),
22 }
23 return b
24}
25
26// Shutdown the broker, terminating any subscriptions.
27func (b *Broker[T]) Shutdown() {
28 close(b.done)
29
30 b.mu.Lock()
31 defer b.mu.Unlock()
32
33 // Remove each subscriber entry, so Publish() cannot send any further
34 // messages, and close each subscriber's channel, so the subscriber cannot
35 // consume any more messages.
36 for ch := range b.subs {
37 delete(b.subs, ch)
38 close(ch)
39 }
40}
41
42// Subscribe subscribes the caller to a stream of events. The returned channel
43// is closed when the broker is shutdown.
44func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
45 b.mu.Lock()
46 defer b.mu.Unlock()
47
48 // Check if broker has shutdown and if so return closed channel
49 select {
50 case <-b.done:
51 ch := make(chan Event[T])
52 close(ch)
53 return ch
54 default:
55 }
56
57 // Subscribe
58 sub := make(chan Event[T], bufferSize)
59 b.subs[sub] = struct{}{}
60
61 // Unsubscribe when context is done.
62 go func() {
63 <-ctx.Done()
64
65 b.mu.Lock()
66 defer b.mu.Unlock()
67
68 // Check if broker has shutdown and if so do nothing
69 select {
70 case <-b.done:
71 return
72 default:
73 }
74
75 delete(b.subs, sub)
76 close(sub)
77 }()
78
79 return sub
80}
81
82// Publish an event to subscribers.
83func (b *Broker[T]) Publish(t EventType, payload T) {
84 b.mu.Lock()
85 defer b.mu.Unlock()
86
87 for sub := range b.subs {
88 select {
89 case sub <- Event[T]{Type: t, Payload: payload}:
90 case <-b.done:
91 return
92 }
93 }
94}