subpub_test.go

  1package subpub
  2
  3import (
  4	"context"
  5	"fmt"
  6	"testing"
  7	"testing/synctest"
  8	"time"
  9)
 10
 11func TestSubPubBasic(t *testing.T) {
 12	synctest.Test(t, func(t *testing.T) {
 13		sp := New[string]()
 14		ctx := context.Background()
 15
 16		// Subscribe waiting for messages after index 0
 17		next := sp.Subscribe(ctx, 0)
 18
 19		// Publish a message at index 1
 20		go func() {
 21			sp.Publish(1, "hello")
 22		}()
 23
 24		// Should receive the message
 25		msg, ok := next()
 26		if !ok {
 27			t.Fatal("Expected to receive message, got closed channel")
 28		}
 29		if msg != "hello" {
 30			t.Errorf("Expected 'hello', got %q", msg)
 31		}
 32	})
 33}
 34
 35func TestSubPubMultipleSubscribers(t *testing.T) {
 36	synctest.Test(t, func(t *testing.T) {
 37		sp := New[string]()
 38		ctx := context.Background()
 39
 40		// Create multiple subscribers
 41		next1 := sp.Subscribe(ctx, 0)
 42		next2 := sp.Subscribe(ctx, 0)
 43		next3 := sp.Subscribe(ctx, 0)
 44
 45		// Publish a message
 46		go func() {
 47			sp.Publish(1, "broadcast")
 48		}()
 49
 50		// All subscribers should receive it
 51		for i, next := range []func() (string, bool){next1, next2, next3} {
 52			msg, ok := next()
 53			if !ok {
 54				t.Fatalf("Subscriber %d: expected to receive message, got closed channel", i+1)
 55			}
 56			if msg != "broadcast" {
 57				t.Errorf("Subscriber %d: expected 'broadcast', got %q", i+1, msg)
 58			}
 59		}
 60	})
 61}
 62
 63func TestSubPubSubscriberAlreadyHasMessage(t *testing.T) {
 64	synctest.Test(t, func(t *testing.T) {
 65		sp := New[int]()
 66		ctx := context.Background()
 67
 68		// Subscriber already has index 5, waiting for index > 5
 69		next := sp.Subscribe(ctx, 5)
 70
 71		// Publish at index 3 (subscriber already has this)
 72		sp.Publish(3, 100)
 73
 74		// Publish at index 6 (subscriber should get this)
 75		go func() {
 76			sp.Publish(6, 200)
 77		}()
 78
 79		msg, ok := next()
 80		if !ok {
 81			t.Fatal("Expected to receive message, got closed channel")
 82		}
 83		if msg != 200 {
 84			t.Errorf("Expected 200, got %d", msg)
 85		}
 86	})
 87}
 88
 89func TestSubPubContextCancellation(t *testing.T) {
 90	synctest.Test(t, func(t *testing.T) {
 91		sp := New[string]()
 92		ctx, cancel := context.WithCancel(context.Background())
 93
 94		next := sp.Subscribe(ctx, 0)
 95
 96		// Cancel the context
 97		cancel()
 98
 99		// Should return false when context is cancelled
100		_, ok := next()
101		if ok {
102			t.Error("Expected closed channel after context cancellation")
103		}
104	})
105}
106
107func TestSubPubSubscriberBehind(t *testing.T) {
108	// Don't use synctest for this test as it involves checking buffer overflow behavior
109	sp := New[string]()
110	ctx := context.Background()
111
112	// Subscriber waiting for messages after index 0
113	next := sp.Subscribe(ctx, 0)
114
115	// Fill up the channel buffer (10 messages) quickly before subscriber reads
116	for i := 1; i <= 10; i++ {
117		sp.Publish(int64(i), fmt.Sprintf("message%d", i))
118	}
119
120	// Try to send one more - subscriber should be disconnected because buffer is full
121	sp.Publish(11, "overflow")
122
123	// Try to receive - should work for buffered messages
124	received := 0
125	var messages []string
126	for {
127		msg, ok := next()
128		if !ok {
129			break
130		}
131		messages = append(messages, msg)
132		received++
133		if received > 11 {
134			t.Fatal("Received more messages than expected")
135		}
136	}
137
138	// Should have received exactly 10 messages before being disconnected
139	if received != 10 {
140		t.Errorf("Expected to receive 10 buffered messages, got %d: %v", received, messages)
141	}
142}
143
144func TestSubPubSequentialMessages(t *testing.T) {
145	// Don't use synctest for this test as mutex blocking doesn't work well with it
146	sp := New[int]()
147	ctx := context.Background()
148
149	next := sp.Subscribe(ctx, 0)
150
151	// Publish multiple messages in order
152	for i := 1; i <= 5; i++ {
153		sp.Publish(int64(i), i*10)
154	}
155
156	// Receive all messages
157	received := []int{}
158	for i := 1; i <= 5; i++ {
159		msg, ok := next()
160		if !ok {
161			t.Fatalf("Expected to receive 5 messages, got closed channel after %d messages", i-1)
162		}
163		received = append(received, msg)
164	}
165
166	// Check we got all expected values in order
167	expected := []int{10, 20, 30, 40, 50}
168	for i, val := range received {
169		if val != expected[i] {
170			t.Errorf("Message %d: expected %d, got %d", i, expected[i], val)
171		}
172	}
173}
174
175func TestSubPubLateSubscriber(t *testing.T) {
176	synctest.Test(t, func(t *testing.T) {
177		sp := New[string]()
178		ctx := context.Background()
179
180		// Publish some messages before anyone subscribes
181		sp.Publish(1, "early1")
182		sp.Publish(2, "early2")
183
184		// Late subscriber joins, interested in messages after index 2
185		next := sp.Subscribe(ctx, 2)
186
187		// Publish a new message
188		go func() {
189			sp.Publish(3, "late")
190		}()
191
192		// Should only receive the new message
193		msg, ok := next()
194		if !ok {
195			t.Fatal("Expected to receive message, got closed channel")
196		}
197		if msg != "late" {
198			t.Errorf("Expected 'late', got %q", msg)
199		}
200	})
201}
202
203func TestSubPubWithTimeout(t *testing.T) {
204	sp := New[string]()
205	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
206	defer cancel()
207
208	next := sp.Subscribe(ctx, 0)
209
210	// Don't publish anything, just wait for timeout
211	_, ok := next()
212	if ok {
213		t.Error("Expected timeout to close the subscription")
214	}
215}
216
217func TestSubPubMultiplePublishes(t *testing.T) {
218	synctest.Test(t, func(t *testing.T) {
219		sp := New[string]()
220		ctx := context.Background()
221
222		// Start two subscribers at different positions
223		next1 := sp.Subscribe(ctx, 0)
224		next2 := sp.Subscribe(ctx, 1)
225
226		// Publish at index 2 - only next1 should receive (next2 already has idx 1)
227		go func() {
228			sp.Publish(2, "msg2")
229		}()
230
231		msg, ok := next1()
232		if !ok {
233			t.Fatal("Subscriber 1: expected to receive message, got closed channel")
234		}
235		if msg != "msg2" {
236			t.Errorf("Subscriber 1: expected 'msg2', got %q", msg)
237		}
238
239		msg, ok = next2()
240		if !ok {
241			t.Fatal("Subscriber 2: expected to receive message, got closed channel")
242		}
243		if msg != "msg2" {
244			t.Errorf("Subscriber 2: expected 'msg2', got %q", msg)
245		}
246
247		// Now both are at index 2, publish at index 3
248		go func() {
249			sp.Publish(3, "msg3")
250		}()
251
252		for i, next := range []func() (string, bool){next1, next2} {
253			msg, ok := next()
254			if !ok {
255				t.Fatalf("Subscriber %d: expected to receive msg3, got closed channel", i+1)
256			}
257			if msg != "msg3" {
258				t.Errorf("Subscriber %d: expected 'msg3', got %q", i+1, msg)
259			}
260		}
261	})
262}
263
264// TestSubPubSubscriberContextCancelled tests that subscribers properly handle context cancellation
265func TestSubPubSubscriberContextCancelled(t *testing.T) {
266	synctest.Test(t, func(t *testing.T) {
267		sp := New[string]()
268		ctx, cancel := context.WithCancel(context.Background())
269
270		next := sp.Subscribe(ctx, 0)
271
272		// Cancel context before publishing
273		cancel()
274
275		// Publish a message
276		sp.Publish(1, "test")
277
278		// Should return false when context is cancelled
279		_, ok := next()
280		if ok {
281			t.Error("Expected closed channel after context cancellation")
282		}
283	})
284}
285
286// TestSubPubSubscriberDisconnected tests that subscribers get disconnected when channel is full
287func TestSubPubSubscriberDisconnected(t *testing.T) {
288	sp := New[string]()
289	ctx := context.Background()
290
291	// Create subscriber
292	next := sp.Subscribe(ctx, 0)
293
294	// Fill up the channel buffer (10 messages) + 1 more to trigger disconnection
295	for i := 1; i <= 11; i++ {
296		sp.Publish(int64(i), fmt.Sprintf("message%d", i))
297	}
298
299	// Try to receive all messages - should get exactly 10, then be disconnected
300	received := 0
301	for {
302		_, ok := next()
303		if !ok {
304			break
305		}
306		received++
307		if received > 11 {
308			t.Fatal("Received more messages than expected")
309		}
310	}
311
312	// Should have received exactly 10 messages before being disconnected
313	if received != 10 {
314		t.Errorf("Expected to receive 10 buffered messages, got %d", received)
315	}
316}
317
318// TestSubPubSubscriberNotInterested tests that subscribers don't receive messages they're not interested in
319func TestSubPubSubscriberNotInterested(t *testing.T) {
320	synctest.Test(t, func(t *testing.T) {
321		sp := New[int]()
322		ctx := context.Background()
323
324		// Subscriber already has index 5, waiting for messages after index 5
325		next := sp.Subscribe(ctx, 5)
326
327		// Publish at index 5 (subscriber already has this)
328		sp.Publish(5, 100)
329
330		// Publish at index 4 (subscriber is ahead of this)
331		sp.Publish(4, 200)
332
333		// Publish at index 6 (subscriber should get this)
334		go func() {
335			sp.Publish(6, 300)
336		}()
337
338		msg, ok := next()
339		if !ok {
340			t.Fatal("Expected to receive message, got closed channel")
341		}
342		if msg != 300 {
343			t.Errorf("Expected 300, got %d", msg)
344		}
345	})
346}
347
348// TestSubPubSubscriberContextDoneDuringPublish tests subscriber context cancellation during publish
349func TestSubPubSubscriberContextDoneDuringPublish(t *testing.T) {
350	sp := New[string]()
351	ctx, cancel := context.WithCancel(context.Background())
352
353	// Create subscriber
354	next := sp.Subscribe(ctx, 0)
355
356	// Cancel context
357	cancel()
358
359	// Publish a message - subscriber should be removed
360	sp.Publish(1, "test")
361
362	// Try to receive - should be closed
363	_, ok := next()
364	if ok {
365		t.Error("Expected closed channel after context cancellation")
366	}
367}