subpub.go

  1package subpub
  2
  3import (
  4	"context"
  5	"sync"
  6)
  7
  8type SubPub[K any] struct {
  9	mu          sync.Mutex
 10	subscribers []*subscriber[K]
 11}
 12
 13type subscriber[K any] struct {
 14	idx    int64
 15	ch     chan K
 16	ctx    context.Context
 17	cancel context.CancelFunc
 18}
 19
 20func New[K any]() *SubPub[K] {
 21	return &SubPub[K]{
 22		subscribers: make([]*subscriber[K], 0),
 23	}
 24}
 25
 26// Subscribe registers an interest in messages after the given index, subject to the
 27// expiration/cancellation of the provided context. The returned function blocks
 28// until a new message, and can return false as the second arguent if the subscription
 29// is done for.
 30func (sp *SubPub[K]) Subscribe(ctx context.Context, idx int64) func() (K, bool) {
 31	// Create a child context so we can cancel the subscription independently
 32	subCtx, cancel := context.WithCancel(ctx)
 33
 34	// Buffered channel to avoid blocking publishers
 35	ch := make(chan K, 10)
 36	sub := &subscriber[K]{
 37		idx:    idx,
 38		ch:     ch,
 39		ctx:    subCtx,
 40		cancel: cancel,
 41	}
 42
 43	sp.mu.Lock()
 44	sp.subscribers = append(sp.subscribers, sub)
 45	sp.mu.Unlock()
 46
 47	// Return a function that blocks until the next message
 48	return func() (K, bool) {
 49		select {
 50		case msg, ok := <-ch:
 51			if !ok {
 52				var zero K
 53				return zero, false
 54			}
 55			return msg, true
 56		case <-subCtx.Done():
 57			// Context cancelled, but drain any buffered messages first
 58			select {
 59			case msg, ok := <-ch:
 60				if ok {
 61					return msg, true
 62				}
 63			default:
 64			}
 65			var zero K
 66			return zero, false
 67		}
 68	}
 69}
 70
 71// Publish sends a message to all subscribers waiting for messages after the given index.
 72// Subscribers that are "behind" should get a disconnection message.
 73func (sp *SubPub[K]) Publish(idx int64, message K) {
 74	sp.mu.Lock()
 75	defer sp.mu.Unlock()
 76
 77	// Notify subscribers and filter out disconnected ones
 78	remaining := sp.subscribers[:0]
 79	for _, sub := range sp.subscribers {
 80		// Check if context is still valid
 81		select {
 82		case <-sub.ctx.Done():
 83			// Context cancelled, close channel and don't keep subscriber
 84			close(sub.ch)
 85			continue
 86		default:
 87		}
 88
 89		// Only send to subscribers waiting for messages after an index < idx
 90		if sub.idx < idx {
 91			// Try to send the message
 92			select {
 93			case sub.ch <- message:
 94				// Success, update subscriber's index and keep them
 95				sub.idx = idx
 96				remaining = append(remaining, sub)
 97			default:
 98				// Channel full, subscriber is behind - disconnect them
 99				close(sub.ch)
100				sub.cancel()
101			}
102		} else {
103			// This subscriber is not interested yet (already has this index or beyond)
104			remaining = append(remaining, sub)
105		}
106	}
107	sp.subscribers = remaining
108}
109
110// Broadcast sends a message to ALL subscribers regardless of their current index.
111// This is used for out-of-band notifications like conversation list updates.
112func (sp *SubPub[K]) Broadcast(message K) {
113	sp.mu.Lock()
114	defer sp.mu.Unlock()
115
116	remaining := sp.subscribers[:0]
117	for _, sub := range sp.subscribers {
118		select {
119		case <-sub.ctx.Done():
120			close(sub.ch)
121			continue
122		default:
123		}
124
125		select {
126		case sub.ch <- message:
127			remaining = append(remaining, sub)
128		default:
129			// Channel full, disconnect
130			close(sub.ch)
131			sub.cancel()
132		}
133	}
134	sp.subscribers = remaining
135}