broker.go

 1package pubsub
 2
 3import (
 4	"context"
 5	"sync"
 6)
 7
 8const bufferSize = 1024
 9
10// Broker allows clients to publish events and subscribe to events
11type Broker[T any] struct {
12	subs map[chan Event[T]]struct{} // subscriptions
13	mu   sync.Mutex                 // sync access to map
14	done chan struct{}              // close when broker is shutting down
15}
16
17// NewBroker constructs a pub/sub broker.
18func NewBroker[T any]() *Broker[T] {
19	b := &Broker[T]{
20		subs: make(map[chan Event[T]]struct{}),
21		done: make(chan struct{}),
22	}
23	return b
24}
25
26// Shutdown the broker, terminating any subscriptions.
27func (b *Broker[T]) Shutdown() {
28	close(b.done)
29
30	b.mu.Lock()
31	defer b.mu.Unlock()
32
33	// Remove each subscriber entry, so Publish() cannot send any further
34	// messages, and close each subscriber's channel, so the subscriber cannot
35	// consume any more messages.
36	for ch := range b.subs {
37		delete(b.subs, ch)
38		close(ch)
39	}
40}
41
42// Subscribe subscribes the caller to a stream of events. The returned channel
43// is closed when the broker is shutdown.
44func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
45	b.mu.Lock()
46	defer b.mu.Unlock()
47
48	// Check if broker has shutdown and if so return closed channel
49	select {
50	case <-b.done:
51		ch := make(chan Event[T])
52		close(ch)
53		return ch
54	default:
55	}
56
57	// Subscribe
58	sub := make(chan Event[T], bufferSize)
59	b.subs[sub] = struct{}{}
60
61	// Unsubscribe when context is done.
62	go func() {
63		<-ctx.Done()
64
65		b.mu.Lock()
66		defer b.mu.Unlock()
67
68		// Check if broker has shutdown and if so do nothing
69		select {
70		case <-b.done:
71			return
72		default:
73		}
74
75		delete(b.subs, sub)
76		close(sub)
77	}()
78
79	return sub
80}
81
82// Publish an event to subscribers.
83func (b *Broker[T]) Publish(t EventType, payload T) {
84	b.mu.Lock()
85	defer b.mu.Unlock()
86
87	for sub := range b.subs {
88		select {
89		case sub <- Event[T]{Type: t, Payload: payload}:
90		case <-b.done:
91			return
92		}
93	}
94}