1package pubsub
2
3import (
4 "context"
5 "sync"
6)
7
8const bufferSize = 1024
9
10type Logger interface {
11 Debug(msg string, args ...any)
12 Info(msg string, args ...any)
13 Warn(msg string, args ...any)
14 Error(msg string, args ...any)
15}
16
17// Broker allows clients to publish events and subscribe to events
18type Broker[T any] struct {
19 subs map[chan Event[T]]struct{} // subscriptions
20 mu sync.Mutex // sync access to map
21 done chan struct{} // close when broker is shutting down
22}
23
24// NewBroker constructs a pub/sub broker.
25func NewBroker[T any]() *Broker[T] {
26 b := &Broker[T]{
27 subs: make(map[chan Event[T]]struct{}),
28 done: make(chan struct{}),
29 }
30 return b
31}
32
33// Shutdown the broker, terminating any subscriptions.
34func (b *Broker[T]) Shutdown() {
35 close(b.done)
36
37 b.mu.Lock()
38 defer b.mu.Unlock()
39
40 // Remove each subscriber entry, so Publish() cannot send any further
41 // messages, and close each subscriber's channel, so the subscriber cannot
42 // consume any more messages.
43 for ch := range b.subs {
44 delete(b.subs, ch)
45 close(ch)
46 }
47}
48
49// Subscribe subscribes the caller to a stream of events. The returned channel
50// is closed when the broker is shutdown.
51func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
52 b.mu.Lock()
53 defer b.mu.Unlock()
54
55 // Check if broker has shutdown and if so return closed channel
56 select {
57 case <-b.done:
58 ch := make(chan Event[T])
59 close(ch)
60 return ch
61 default:
62 }
63
64 // Subscribe
65 sub := make(chan Event[T], bufferSize)
66 b.subs[sub] = struct{}{}
67
68 // Unsubscribe when context is done.
69 go func() {
70 <-ctx.Done()
71
72 b.mu.Lock()
73 defer b.mu.Unlock()
74
75 // Check if broker has shutdown and if so do nothing
76 select {
77 case <-b.done:
78 return
79 default:
80 }
81
82 delete(b.subs, sub)
83 close(sub)
84 }()
85
86 return sub
87}
88
89// Publish an event to subscribers.
90func (b *Broker[T]) Publish(t EventType, payload T) {
91 b.mu.Lock()
92 defer b.mu.Unlock()
93
94 for sub := range b.subs {
95 select {
96 case sub <- Event[T]{Type: t, Payload: payload}:
97 case <-b.done:
98 return
99 }
100 }
101}