broker.go

 1package pubsub
 2
 3import (
 4	"context"
 5	"sync"
 6)
 7
 8// Broker is a generic pub/sub broker using callbacks.
 9type Broker[T any] struct {
10	listeners []func(Event[T])
11	mu        sync.RWMutex
12}
13
14// NewBroker creates a new broker.
15func NewBroker[T any]() *Broker[T] {
16	return &Broker[T]{
17		listeners: make([]func(Event[T]), 0),
18	}
19}
20
21// AddListener registers a callback for events.
22func (b *Broker[T]) AddListener(fn func(Event[T])) {
23	b.mu.Lock()
24	defer b.mu.Unlock()
25	b.listeners = append(b.listeners, fn)
26}
27
28// Publish emits an event to all listeners without blocking.
29func (b *Broker[T]) Publish(ctx context.Context, t EventType, payload T) {
30	b.mu.RLock()
31	listeners := make([]func(Event[T]), len(b.listeners))
32	copy(listeners, b.listeners)
33	b.mu.RUnlock()
34
35	event := Event[T]{Type: t, Payload: payload}
36	for _, fn := range listeners {
37		go fn(event)
38	}
39}