1package app
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "testing"
8 "time"
9
10 tea "charm.land/bubbletea/v2"
11 "github.com/charmbracelet/crush/internal/pubsub"
12 "github.com/stretchr/testify/require"
13)
14
15// TestSetupSubscriber_NormalFlow verifies that events published to the source
16// broker are forwarded to the output broker.
17func TestSetupSubscriber_NormalFlow(t *testing.T) {
18 t.Parallel()
19
20 ctx, cancel := context.WithCancel(t.Context())
21 defer cancel()
22
23 src := pubsub.NewBroker[string]()
24 defer src.Shutdown()
25 out := pubsub.NewBroker[tea.Msg]()
26 defer out.Shutdown()
27
28 ch := out.Subscribe(ctx)
29
30 var wg sync.WaitGroup
31 setupSubscriber(ctx, &wg, "test", src.Subscribe, out)
32
33 // Yield so the subscriber goroutine can call src.Subscribe before we publish.
34 time.Sleep(10 * time.Millisecond)
35
36 src.Publish(pubsub.CreatedEvent, "hello")
37 src.Publish(pubsub.CreatedEvent, "world")
38
39 for range 2 {
40 select {
41 case <-ch:
42 case <-time.After(5 * time.Second):
43 t.Fatal("timed out waiting for forwarded event")
44 }
45 }
46
47 cancel()
48 wg.Wait()
49}
50
51// TestSetupSubscriber_ContextCancellation verifies the goroutine exits cleanly
52// when the context is cancelled.
53func TestSetupSubscriber_ContextCancellation(t *testing.T) {
54 t.Parallel()
55
56 ctx, cancel := context.WithCancel(t.Context())
57
58 src := pubsub.NewBroker[string]()
59 defer src.Shutdown()
60 out := pubsub.NewBroker[tea.Msg]()
61 defer out.Shutdown()
62
63 var wg sync.WaitGroup
64 setupSubscriber(ctx, &wg, "test", src.Subscribe, out)
65
66 src.Publish(pubsub.CreatedEvent, "event")
67 cancel()
68
69 done := make(chan struct{})
70 go func() { wg.Wait(); close(done) }()
71
72 select {
73 case <-done:
74 case <-time.After(5 * time.Second):
75 t.Fatal("setupSubscriber goroutine did not exit after context cancellation")
76 }
77}
78
79// TestEvents_ZeroConsumers verifies that publishing with no subscribers does
80// not block or panic.
81func TestEvents_ZeroConsumers(t *testing.T) {
82 t.Parallel()
83
84 broker := pubsub.NewBroker[tea.Msg]()
85 defer broker.Shutdown()
86
87 require.Equal(t, 0, broker.GetSubscriberCount())
88
89 // Must not block.
90 done := make(chan struct{})
91 go func() {
92 broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg1"))
93 broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg2"))
94 close(done)
95 }()
96
97 select {
98 case <-done:
99 case <-time.After(time.Second):
100 t.Fatal("Publish with zero consumers blocked")
101 }
102}
103
104// TestEvents_OneConsumer verifies that a single subscriber receives every event
105// exactly once.
106func TestEvents_OneConsumer(t *testing.T) {
107 t.Parallel()
108
109 ctx, cancel := context.WithCancel(t.Context())
110 defer cancel()
111
112 broker := pubsub.NewBroker[tea.Msg]()
113 defer broker.Shutdown()
114
115 ch := broker.Subscribe(ctx)
116
117 const n = 10
118 for i := range n {
119 broker.Publish(pubsub.UpdatedEvent, tea.Msg(i))
120 }
121
122 for i := range n {
123 select {
124 case ev := <-ch:
125 require.Equal(t, tea.Msg(i), ev.Payload)
126 case <-time.After(5 * time.Second):
127 t.Fatalf("timed out waiting for event %d", i)
128 }
129 }
130}
131
132// TestEvents_NConsumers verifies that every subscriber receives every event
133// exactly once, regardless of how many concurrent consumers are attached.
134func TestEvents_NConsumers(t *testing.T) {
135 t.Parallel()
136
137 for _, n := range []int{2, 5, 10} {
138 t.Run(fmt.Sprintf("consumers=%d", n), func(t *testing.T) {
139 t.Parallel()
140 testNConsumers(t, n)
141 })
142 }
143}
144
145func testNConsumers(t *testing.T, n int) {
146 t.Helper()
147
148 ctx, cancel := context.WithCancel(t.Context())
149 defer cancel()
150
151 broker := pubsub.NewBroker[tea.Msg]()
152 defer broker.Shutdown()
153
154 // Subscribe all N consumers before publishing.
155 channels := make([]<-chan pubsub.Event[tea.Msg], n)
156 for i := range n {
157 channels[i] = broker.Subscribe(ctx)
158 }
159 require.Equal(t, n, broker.GetSubscriberCount())
160
161 const numEvents = 20
162 for i := range numEvents {
163 broker.Publish(pubsub.UpdatedEvent, tea.Msg(i))
164 }
165
166 // Each consumer must receive all numEvents messages.
167 var wg sync.WaitGroup
168 for i, ch := range channels {
169 wg.Go(func() {
170 for j := range numEvents {
171 select {
172 case ev := <-ch:
173 require.Equal(t, tea.Msg(j), ev.Payload,
174 "consumer %d: wrong payload for event %d", i, j)
175 case <-time.After(5 * time.Second):
176 t.Errorf("consumer %d: timed out waiting for event %d", i, j)
177 return
178 }
179 }
180 })
181 }
182 wg.Wait()
183}