broker.go

  1package pubsub
  2
  3import (
  4	"context"
  5	"sync"
  6)
  7
  8const bufferSize = 1024
  9
 10type Logger interface {
 11	Debug(msg string, args ...any)
 12	Info(msg string, args ...any)
 13	Warn(msg string, args ...any)
 14	Error(msg string, args ...any)
 15}
 16
 17// Broker allows clients to publish events and subscribe to events
 18type Broker[T any] struct {
 19	subs map[chan Event[T]]struct{} // subscriptions
 20	mu   sync.Mutex                 // sync access to map
 21	done chan struct{}              // close when broker is shutting down
 22}
 23
 24// NewBroker constructs a pub/sub broker.
 25func NewBroker[T any]() *Broker[T] {
 26	b := &Broker[T]{
 27		subs: make(map[chan Event[T]]struct{}),
 28		done: make(chan struct{}),
 29	}
 30	return b
 31}
 32
 33// Shutdown the broker, terminating any subscriptions.
 34func (b *Broker[T]) Shutdown() {
 35	close(b.done)
 36
 37	b.mu.Lock()
 38	defer b.mu.Unlock()
 39
 40	// Remove each subscriber entry, so Publish() cannot send any further
 41	// messages, and close each subscriber's channel, so the subscriber cannot
 42	// consume any more messages.
 43	for ch := range b.subs {
 44		delete(b.subs, ch)
 45		close(ch)
 46	}
 47}
 48
 49// Subscribe subscribes the caller to a stream of events. The returned channel
 50// is closed when the broker is shutdown.
 51func (b *Broker[T]) Subscribe(ctx context.Context) <-chan Event[T] {
 52	b.mu.Lock()
 53	defer b.mu.Unlock()
 54
 55	// Check if broker has shutdown and if so return closed channel
 56	select {
 57	case <-b.done:
 58		ch := make(chan Event[T])
 59		close(ch)
 60		return ch
 61	default:
 62	}
 63
 64	// Subscribe
 65	sub := make(chan Event[T], bufferSize)
 66	b.subs[sub] = struct{}{}
 67
 68	// Unsubscribe when context is done.
 69	go func() {
 70		<-ctx.Done()
 71
 72		b.mu.Lock()
 73		defer b.mu.Unlock()
 74
 75		// Check if broker has shutdown and if so do nothing
 76		select {
 77		case <-b.done:
 78			return
 79		default:
 80		}
 81
 82		delete(b.subs, sub)
 83		close(sub)
 84	}()
 85
 86	return sub
 87}
 88
 89// Publish an event to subscribers.
 90func (b *Broker[T]) Publish(t EventType, payload T) {
 91	b.mu.Lock()
 92	defer b.mu.Unlock()
 93
 94	for sub := range b.subs {
 95		select {
 96		case sub <- Event[T]{Type: t, Payload: payload}:
 97		case <-b.done:
 98			return
 99		}
100	}
101}