1package subpub
2
3import (
4 "context"
5 "fmt"
6 "testing"
7 "testing/synctest"
8 "time"
9)
10
11func TestSubPubBasic(t *testing.T) {
12 synctest.Test(t, func(t *testing.T) {
13 sp := New[string]()
14 ctx := context.Background()
15
16 // Subscribe waiting for messages after index 0
17 next := sp.Subscribe(ctx, 0)
18
19 // Publish a message at index 1
20 go func() {
21 sp.Publish(1, "hello")
22 }()
23
24 // Should receive the message
25 msg, ok := next()
26 if !ok {
27 t.Fatal("Expected to receive message, got closed channel")
28 }
29 if msg != "hello" {
30 t.Errorf("Expected 'hello', got %q", msg)
31 }
32 })
33}
34
35func TestSubPubMultipleSubscribers(t *testing.T) {
36 synctest.Test(t, func(t *testing.T) {
37 sp := New[string]()
38 ctx := context.Background()
39
40 // Create multiple subscribers
41 next1 := sp.Subscribe(ctx, 0)
42 next2 := sp.Subscribe(ctx, 0)
43 next3 := sp.Subscribe(ctx, 0)
44
45 // Publish a message
46 go func() {
47 sp.Publish(1, "broadcast")
48 }()
49
50 // All subscribers should receive it
51 for i, next := range []func() (string, bool){next1, next2, next3} {
52 msg, ok := next()
53 if !ok {
54 t.Fatalf("Subscriber %d: expected to receive message, got closed channel", i+1)
55 }
56 if msg != "broadcast" {
57 t.Errorf("Subscriber %d: expected 'broadcast', got %q", i+1, msg)
58 }
59 }
60 })
61}
62
63func TestSubPubSubscriberAlreadyHasMessage(t *testing.T) {
64 synctest.Test(t, func(t *testing.T) {
65 sp := New[int]()
66 ctx := context.Background()
67
68 // Subscriber already has index 5, waiting for index > 5
69 next := sp.Subscribe(ctx, 5)
70
71 // Publish at index 3 (subscriber already has this)
72 sp.Publish(3, 100)
73
74 // Publish at index 6 (subscriber should get this)
75 go func() {
76 sp.Publish(6, 200)
77 }()
78
79 msg, ok := next()
80 if !ok {
81 t.Fatal("Expected to receive message, got closed channel")
82 }
83 if msg != 200 {
84 t.Errorf("Expected 200, got %d", msg)
85 }
86 })
87}
88
89func TestSubPubContextCancellation(t *testing.T) {
90 synctest.Test(t, func(t *testing.T) {
91 sp := New[string]()
92 ctx, cancel := context.WithCancel(context.Background())
93
94 next := sp.Subscribe(ctx, 0)
95
96 // Cancel the context
97 cancel()
98
99 // Should return false when context is cancelled
100 _, ok := next()
101 if ok {
102 t.Error("Expected closed channel after context cancellation")
103 }
104 })
105}
106
107func TestSubPubSubscriberBehind(t *testing.T) {
108 // Don't use synctest for this test as it involves checking buffer overflow behavior
109 sp := New[string]()
110 ctx := context.Background()
111
112 // Subscriber waiting for messages after index 0
113 next := sp.Subscribe(ctx, 0)
114
115 // Fill up the channel buffer (10 messages) quickly before subscriber reads
116 for i := 1; i <= 10; i++ {
117 sp.Publish(int64(i), fmt.Sprintf("message%d", i))
118 }
119
120 // Try to send one more - subscriber should be disconnected because buffer is full
121 sp.Publish(11, "overflow")
122
123 // Try to receive - should work for buffered messages
124 received := 0
125 var messages []string
126 for {
127 msg, ok := next()
128 if !ok {
129 break
130 }
131 messages = append(messages, msg)
132 received++
133 if received > 11 {
134 t.Fatal("Received more messages than expected")
135 }
136 }
137
138 // Should have received exactly 10 messages before being disconnected
139 if received != 10 {
140 t.Errorf("Expected to receive 10 buffered messages, got %d: %v", received, messages)
141 }
142}
143
144func TestSubPubSequentialMessages(t *testing.T) {
145 // Don't use synctest for this test as mutex blocking doesn't work well with it
146 sp := New[int]()
147 ctx := context.Background()
148
149 next := sp.Subscribe(ctx, 0)
150
151 // Publish multiple messages in order
152 for i := 1; i <= 5; i++ {
153 sp.Publish(int64(i), i*10)
154 }
155
156 // Receive all messages
157 received := []int{}
158 for i := 1; i <= 5; i++ {
159 msg, ok := next()
160 if !ok {
161 t.Fatalf("Expected to receive 5 messages, got closed channel after %d messages", i-1)
162 }
163 received = append(received, msg)
164 }
165
166 // Check we got all expected values in order
167 expected := []int{10, 20, 30, 40, 50}
168 for i, val := range received {
169 if val != expected[i] {
170 t.Errorf("Message %d: expected %d, got %d", i, expected[i], val)
171 }
172 }
173}
174
175func TestSubPubLateSubscriber(t *testing.T) {
176 synctest.Test(t, func(t *testing.T) {
177 sp := New[string]()
178 ctx := context.Background()
179
180 // Publish some messages before anyone subscribes
181 sp.Publish(1, "early1")
182 sp.Publish(2, "early2")
183
184 // Late subscriber joins, interested in messages after index 2
185 next := sp.Subscribe(ctx, 2)
186
187 // Publish a new message
188 go func() {
189 sp.Publish(3, "late")
190 }()
191
192 // Should only receive the new message
193 msg, ok := next()
194 if !ok {
195 t.Fatal("Expected to receive message, got closed channel")
196 }
197 if msg != "late" {
198 t.Errorf("Expected 'late', got %q", msg)
199 }
200 })
201}
202
203func TestSubPubWithTimeout(t *testing.T) {
204 sp := New[string]()
205 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
206 defer cancel()
207
208 next := sp.Subscribe(ctx, 0)
209
210 // Don't publish anything, just wait for timeout
211 _, ok := next()
212 if ok {
213 t.Error("Expected timeout to close the subscription")
214 }
215}
216
217func TestSubPubMultiplePublishes(t *testing.T) {
218 synctest.Test(t, func(t *testing.T) {
219 sp := New[string]()
220 ctx := context.Background()
221
222 // Start two subscribers at different positions
223 next1 := sp.Subscribe(ctx, 0)
224 next2 := sp.Subscribe(ctx, 1)
225
226 // Publish at index 2 - only next1 should receive (next2 already has idx 1)
227 go func() {
228 sp.Publish(2, "msg2")
229 }()
230
231 msg, ok := next1()
232 if !ok {
233 t.Fatal("Subscriber 1: expected to receive message, got closed channel")
234 }
235 if msg != "msg2" {
236 t.Errorf("Subscriber 1: expected 'msg2', got %q", msg)
237 }
238
239 msg, ok = next2()
240 if !ok {
241 t.Fatal("Subscriber 2: expected to receive message, got closed channel")
242 }
243 if msg != "msg2" {
244 t.Errorf("Subscriber 2: expected 'msg2', got %q", msg)
245 }
246
247 // Now both are at index 2, publish at index 3
248 go func() {
249 sp.Publish(3, "msg3")
250 }()
251
252 for i, next := range []func() (string, bool){next1, next2} {
253 msg, ok := next()
254 if !ok {
255 t.Fatalf("Subscriber %d: expected to receive msg3, got closed channel", i+1)
256 }
257 if msg != "msg3" {
258 t.Errorf("Subscriber %d: expected 'msg3', got %q", i+1, msg)
259 }
260 }
261 })
262}
263
264// TestSubPubSubscriberContextCancelled tests that subscribers properly handle context cancellation
265func TestSubPubSubscriberContextCancelled(t *testing.T) {
266 synctest.Test(t, func(t *testing.T) {
267 sp := New[string]()
268 ctx, cancel := context.WithCancel(context.Background())
269
270 next := sp.Subscribe(ctx, 0)
271
272 // Cancel context before publishing
273 cancel()
274
275 // Publish a message
276 sp.Publish(1, "test")
277
278 // Should return false when context is cancelled
279 _, ok := next()
280 if ok {
281 t.Error("Expected closed channel after context cancellation")
282 }
283 })
284}
285
286// TestSubPubSubscriberDisconnected tests that subscribers get disconnected when channel is full
287func TestSubPubSubscriberDisconnected(t *testing.T) {
288 sp := New[string]()
289 ctx := context.Background()
290
291 // Create subscriber
292 next := sp.Subscribe(ctx, 0)
293
294 // Fill up the channel buffer (10 messages) + 1 more to trigger disconnection
295 for i := 1; i <= 11; i++ {
296 sp.Publish(int64(i), fmt.Sprintf("message%d", i))
297 }
298
299 // Try to receive all messages - should get exactly 10, then be disconnected
300 received := 0
301 for {
302 _, ok := next()
303 if !ok {
304 break
305 }
306 received++
307 if received > 11 {
308 t.Fatal("Received more messages than expected")
309 }
310 }
311
312 // Should have received exactly 10 messages before being disconnected
313 if received != 10 {
314 t.Errorf("Expected to receive 10 buffered messages, got %d", received)
315 }
316}
317
318// TestSubPubSubscriberNotInterested tests that subscribers don't receive messages they're not interested in
319func TestSubPubSubscriberNotInterested(t *testing.T) {
320 synctest.Test(t, func(t *testing.T) {
321 sp := New[int]()
322 ctx := context.Background()
323
324 // Subscriber already has index 5, waiting for messages after index 5
325 next := sp.Subscribe(ctx, 5)
326
327 // Publish at index 5 (subscriber already has this)
328 sp.Publish(5, 100)
329
330 // Publish at index 4 (subscriber is ahead of this)
331 sp.Publish(4, 200)
332
333 // Publish at index 6 (subscriber should get this)
334 go func() {
335 sp.Publish(6, 300)
336 }()
337
338 msg, ok := next()
339 if !ok {
340 t.Fatal("Expected to receive message, got closed channel")
341 }
342 if msg != 300 {
343 t.Errorf("Expected 300, got %d", msg)
344 }
345 })
346}
347
348// TestSubPubSubscriberContextDoneDuringPublish tests subscriber context cancellation during publish
349func TestSubPubSubscriberContextDoneDuringPublish(t *testing.T) {
350 sp := New[string]()
351 ctx, cancel := context.WithCancel(context.Background())
352
353 // Create subscriber
354 next := sp.Subscribe(ctx, 0)
355
356 // Cancel context
357 cancel()
358
359 // Publish a message - subscriber should be removed
360 sp.Publish(1, "test")
361
362 // Try to receive - should be closed
363 _, ok := next()
364 if ok {
365 t.Error("Expected closed channel after context cancellation")
366 }
367}