app_test.go

  1package app
  2
  3import (
  4	"context"
  5	"fmt"
  6	"sync"
  7	"testing"
  8	"time"
  9
 10	tea "charm.land/bubbletea/v2"
 11	"github.com/charmbracelet/crush/internal/pubsub"
 12	"github.com/stretchr/testify/require"
 13)
 14
 15// TestSetupSubscriber_NormalFlow verifies that events published to the source
 16// broker are forwarded to the output broker.
 17func TestSetupSubscriber_NormalFlow(t *testing.T) {
 18	t.Parallel()
 19
 20	ctx, cancel := context.WithCancel(t.Context())
 21	defer cancel()
 22
 23	src := pubsub.NewBroker[string]()
 24	defer src.Shutdown()
 25	out := pubsub.NewBroker[tea.Msg]()
 26	defer out.Shutdown()
 27
 28	ch := out.Subscribe(ctx)
 29
 30	var wg sync.WaitGroup
 31	setupSubscriber(ctx, &wg, "test", src.Subscribe, out)
 32
 33	// Yield so the subscriber goroutine can call src.Subscribe before we publish.
 34	time.Sleep(10 * time.Millisecond)
 35
 36	src.Publish(pubsub.CreatedEvent, "hello")
 37	src.Publish(pubsub.CreatedEvent, "world")
 38
 39	for range 2 {
 40		select {
 41		case <-ch:
 42		case <-time.After(5 * time.Second):
 43			t.Fatal("timed out waiting for forwarded event")
 44		}
 45	}
 46
 47	cancel()
 48	wg.Wait()
 49}
 50
 51// TestSetupSubscriber_ContextCancellation verifies the goroutine exits cleanly
 52// when the context is cancelled.
 53func TestSetupSubscriber_ContextCancellation(t *testing.T) {
 54	t.Parallel()
 55
 56	ctx, cancel := context.WithCancel(t.Context())
 57
 58	src := pubsub.NewBroker[string]()
 59	defer src.Shutdown()
 60	out := pubsub.NewBroker[tea.Msg]()
 61	defer out.Shutdown()
 62
 63	var wg sync.WaitGroup
 64	setupSubscriber(ctx, &wg, "test", src.Subscribe, out)
 65
 66	src.Publish(pubsub.CreatedEvent, "event")
 67	cancel()
 68
 69	done := make(chan struct{})
 70	go func() { wg.Wait(); close(done) }()
 71
 72	select {
 73	case <-done:
 74	case <-time.After(5 * time.Second):
 75		t.Fatal("setupSubscriber goroutine did not exit after context cancellation")
 76	}
 77}
 78
 79// TestEvents_ZeroConsumers verifies that publishing with no subscribers does
 80// not block or panic.
 81func TestEvents_ZeroConsumers(t *testing.T) {
 82	t.Parallel()
 83
 84	broker := pubsub.NewBroker[tea.Msg]()
 85	defer broker.Shutdown()
 86
 87	require.Equal(t, 0, broker.GetSubscriberCount())
 88
 89	// Must not block.
 90	done := make(chan struct{})
 91	go func() {
 92		broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg1"))
 93		broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg2"))
 94		close(done)
 95	}()
 96
 97	select {
 98	case <-done:
 99	case <-time.After(time.Second):
100		t.Fatal("Publish with zero consumers blocked")
101	}
102}
103
104// TestEvents_OneConsumer verifies that a single subscriber receives every event
105// exactly once.
106func TestEvents_OneConsumer(t *testing.T) {
107	t.Parallel()
108
109	ctx, cancel := context.WithCancel(t.Context())
110	defer cancel()
111
112	broker := pubsub.NewBroker[tea.Msg]()
113	defer broker.Shutdown()
114
115	ch := broker.Subscribe(ctx)
116
117	const n = 10
118	for i := range n {
119		broker.Publish(pubsub.UpdatedEvent, tea.Msg(i))
120	}
121
122	for i := range n {
123		select {
124		case ev := <-ch:
125			require.Equal(t, tea.Msg(i), ev.Payload)
126		case <-time.After(5 * time.Second):
127			t.Fatalf("timed out waiting for event %d", i)
128		}
129	}
130}
131
132// TestEvents_NConsumers verifies that every subscriber receives every event
133// exactly once, regardless of how many concurrent consumers are attached.
134func TestEvents_NConsumers(t *testing.T) {
135	t.Parallel()
136
137	for _, n := range []int{2, 5, 10} {
138		t.Run(fmt.Sprintf("consumers=%d", n), func(t *testing.T) {
139			t.Parallel()
140			testNConsumers(t, n)
141		})
142	}
143}
144
145func testNConsumers(t *testing.T, n int) {
146	t.Helper()
147
148	ctx, cancel := context.WithCancel(t.Context())
149	defer cancel()
150
151	broker := pubsub.NewBroker[tea.Msg]()
152	defer broker.Shutdown()
153
154	// Subscribe all N consumers before publishing.
155	channels := make([]<-chan pubsub.Event[tea.Msg], n)
156	for i := range n {
157		channels[i] = broker.Subscribe(ctx)
158	}
159	require.Equal(t, n, broker.GetSubscriberCount())
160
161	const numEvents = 20
162	for i := range numEvents {
163		broker.Publish(pubsub.UpdatedEvent, tea.Msg(i))
164	}
165
166	// Each consumer must receive all numEvents messages.
167	var wg sync.WaitGroup
168	for i, ch := range channels {
169		wg.Go(func() {
170			for j := range numEvents {
171				select {
172				case ev := <-ch:
173					require.Equal(t, tea.Msg(j), ev.Payload,
174						"consumer %d: wrong payload for event %d", i, j)
175				case <-time.After(5 * time.Second):
176					t.Errorf("consumer %d: timed out waiting for event %d", i, j)
177					return
178				}
179			}
180		})
181	}
182	wg.Wait()
183}