1package subpub
2
3import (
4 "context"
5 "sync"
6)
7
8type SubPub[K any] struct {
9 mu sync.Mutex
10 subscribers []*subscriber[K]
11}
12
13type subscriber[K any] struct {
14 idx int64
15 ch chan K
16 ctx context.Context
17 cancel context.CancelFunc
18}
19
20func New[K any]() *SubPub[K] {
21 return &SubPub[K]{
22 subscribers: make([]*subscriber[K], 0),
23 }
24}
25
26// Subscribe registers an interest in messages after the given index, subject to the
27// expiration/cancellation of the provided context. The returned function blocks
28// until a new message, and can return false as the second arguent if the subscription
29// is done for.
30func (sp *SubPub[K]) Subscribe(ctx context.Context, idx int64) func() (K, bool) {
31 // Create a child context so we can cancel the subscription independently
32 subCtx, cancel := context.WithCancel(ctx)
33
34 // Buffered channel to avoid blocking publishers
35 ch := make(chan K, 10)
36 sub := &subscriber[K]{
37 idx: idx,
38 ch: ch,
39 ctx: subCtx,
40 cancel: cancel,
41 }
42
43 sp.mu.Lock()
44 sp.subscribers = append(sp.subscribers, sub)
45 sp.mu.Unlock()
46
47 // Return a function that blocks until the next message
48 return func() (K, bool) {
49 select {
50 case msg, ok := <-ch:
51 if !ok {
52 var zero K
53 return zero, false
54 }
55 return msg, true
56 case <-subCtx.Done():
57 // Context cancelled, but drain any buffered messages first
58 select {
59 case msg, ok := <-ch:
60 if ok {
61 return msg, true
62 }
63 default:
64 }
65 var zero K
66 return zero, false
67 }
68 }
69}
70
71// Publish sends a message to all subscribers waiting for messages after the given index.
72// Subscribers that are "behind" should get a disconnection message.
73func (sp *SubPub[K]) Publish(idx int64, message K) {
74 sp.mu.Lock()
75 defer sp.mu.Unlock()
76
77 // Notify subscribers and filter out disconnected ones
78 remaining := sp.subscribers[:0]
79 for _, sub := range sp.subscribers {
80 // Check if context is still valid
81 select {
82 case <-sub.ctx.Done():
83 // Context cancelled, close channel and don't keep subscriber
84 close(sub.ch)
85 continue
86 default:
87 }
88
89 // Only send to subscribers waiting for messages after an index < idx
90 if sub.idx < idx {
91 // Try to send the message
92 select {
93 case sub.ch <- message:
94 // Success, update subscriber's index and keep them
95 sub.idx = idx
96 remaining = append(remaining, sub)
97 default:
98 // Channel full, subscriber is behind - disconnect them
99 close(sub.ch)
100 sub.cancel()
101 }
102 } else {
103 // This subscriber is not interested yet (already has this index or beyond)
104 remaining = append(remaining, sub)
105 }
106 }
107 sp.subscribers = remaining
108}
109
110// Broadcast sends a message to ALL subscribers regardless of their current index.
111// This is used for out-of-band notifications like conversation list updates.
112func (sp *SubPub[K]) Broadcast(message K) {
113 sp.mu.Lock()
114 defer sp.mu.Unlock()
115
116 remaining := sp.subscribers[:0]
117 for _, sub := range sp.subscribers {
118 select {
119 case <-sub.ctx.Done():
120 close(sub.ch)
121 continue
122 default:
123 }
124
125 select {
126 case sub.ch <- message:
127 remaining = append(remaining, sub)
128 default:
129 // Channel full, disconnect
130 close(sub.ch)
131 sub.cancel()
132 }
133 }
134 sp.subscribers = remaining
135}