1package app
2
3import (
4 "context"
5 "sync"
6 "testing"
7 "testing/synctest"
8 "time"
9
10 tea "charm.land/bubbletea/v2"
11 "github.com/charmbracelet/crush/internal/pubsub"
12 "github.com/stretchr/testify/require"
13 "go.uber.org/goleak"
14)
15
16func TestSetupSubscriber_NormalFlow(t *testing.T) {
17 synctest.Test(t, func(t *testing.T) {
18 f := newSubscriberFixture(t, 10)
19
20 time.Sleep(10 * time.Millisecond)
21 synctest.Wait()
22
23 f.broker.Publish(pubsub.CreatedEvent, "event1")
24 f.broker.Publish(pubsub.CreatedEvent, "event2")
25
26 for range 2 {
27 select {
28 case <-f.outputCh:
29 case <-time.After(5 * time.Second):
30 t.Fatal("Timed out waiting for messages")
31 }
32 }
33
34 f.cancel()
35 f.wg.Wait()
36 })
37}
38
39func TestSetupSubscriber_SlowConsumer(t *testing.T) {
40 synctest.Test(t, func(t *testing.T) {
41 f := newSubscriberFixture(t, 0)
42
43 const numEvents = 5
44
45 var pubWg sync.WaitGroup
46 pubWg.Go(func() {
47 for range numEvents {
48 f.broker.Publish(pubsub.CreatedEvent, "event")
49 time.Sleep(10 * time.Millisecond)
50 synctest.Wait()
51 }
52 })
53
54 time.Sleep(time.Duration(numEvents) * (subscriberSendTimeout + 20*time.Millisecond))
55 synctest.Wait()
56
57 received := 0
58 for {
59 select {
60 case <-f.outputCh:
61 received++
62 default:
63 pubWg.Wait()
64 f.cancel()
65 f.wg.Wait()
66 require.Less(t, received, numEvents, "Slow consumer should have dropped some messages")
67 return
68 }
69 }
70 })
71}
72
73func TestSetupSubscriber_ContextCancellation(t *testing.T) {
74 synctest.Test(t, func(t *testing.T) {
75 f := newSubscriberFixture(t, 10)
76
77 f.broker.Publish(pubsub.CreatedEvent, "event1")
78 time.Sleep(100 * time.Millisecond)
79 synctest.Wait()
80
81 f.cancel()
82 f.wg.Wait()
83 })
84}
85
86func TestSetupSubscriber_DrainAfterDrop(t *testing.T) {
87 synctest.Test(t, func(t *testing.T) {
88 f := newSubscriberFixture(t, 0)
89
90 time.Sleep(10 * time.Millisecond)
91 synctest.Wait()
92
93 // First event: nobody reads outputCh so the timer fires (message dropped).
94 f.broker.Publish(pubsub.CreatedEvent, "event1")
95 time.Sleep(subscriberSendTimeout + 25*time.Millisecond)
96 synctest.Wait()
97
98 // Second event: triggers Stop()==false path; without the fix this deadlocks.
99 f.broker.Publish(pubsub.CreatedEvent, "event2")
100
101 // If the timer drain deadlocks, wg.Wait never returns.
102 done := make(chan struct{})
103 go func() {
104 f.cancel()
105 f.wg.Wait()
106 close(done)
107 }()
108
109 select {
110 case <-done:
111 case <-time.After(5 * time.Second):
112 t.Fatal("setupSubscriber goroutine hung — likely timer drain deadlock")
113 }
114 })
115}
116
117func TestSetupSubscriber_NoTimerLeak(t *testing.T) {
118 defer goleak.VerifyNone(t)
119 synctest.Test(t, func(t *testing.T) {
120 f := newSubscriberFixture(t, 100)
121
122 for range 100 {
123 f.broker.Publish(pubsub.CreatedEvent, "event")
124 time.Sleep(5 * time.Millisecond)
125 synctest.Wait()
126 }
127
128 f.cancel()
129 f.wg.Wait()
130 })
131}
132
133type subscriberFixture struct {
134 broker *pubsub.Broker[string]
135 wg sync.WaitGroup
136 outputCh chan tea.Msg
137 cancel context.CancelFunc
138}
139
140func newSubscriberFixture(t *testing.T, bufSize int) *subscriberFixture {
141 t.Helper()
142 ctx, cancel := context.WithCancel(t.Context())
143 t.Cleanup(cancel)
144
145 f := &subscriberFixture{
146 broker: pubsub.NewBroker[string](),
147 outputCh: make(chan tea.Msg, bufSize),
148 cancel: cancel,
149 }
150 t.Cleanup(f.broker.Shutdown)
151
152 setupSubscriber(ctx, &f.wg, "test", func(ctx context.Context) <-chan pubsub.Event[string] {
153 return f.broker.Subscribe(ctx)
154 }, f.outputCh)
155
156 return f
157}