app_test.go

  1package app
  2
  3import (
  4	"context"
  5	"sync"
  6	"testing"
  7	"testing/synctest"
  8	"time"
  9
 10	tea "charm.land/bubbletea/v2"
 11	"github.com/charmbracelet/crush/internal/pubsub"
 12	"github.com/stretchr/testify/require"
 13	"go.uber.org/goleak"
 14)
 15
 16func TestSetupSubscriber_NormalFlow(t *testing.T) {
 17	synctest.Test(t, func(t *testing.T) {
 18		f := newSubscriberFixture(t, 10)
 19
 20		time.Sleep(10 * time.Millisecond)
 21		synctest.Wait()
 22
 23		f.broker.Publish(pubsub.CreatedEvent, "event1")
 24		f.broker.Publish(pubsub.CreatedEvent, "event2")
 25
 26		for range 2 {
 27			select {
 28			case <-f.outputCh:
 29			case <-time.After(5 * time.Second):
 30				t.Fatal("Timed out waiting for messages")
 31			}
 32		}
 33
 34		f.cancel()
 35		f.wg.Wait()
 36	})
 37}
 38
 39func TestSetupSubscriber_SlowConsumer(t *testing.T) {
 40	synctest.Test(t, func(t *testing.T) {
 41		f := newSubscriberFixture(t, 0)
 42
 43		const numEvents = 5
 44
 45		var pubWg sync.WaitGroup
 46		pubWg.Go(func() {
 47			for range numEvents {
 48				f.broker.Publish(pubsub.CreatedEvent, "event")
 49				time.Sleep(10 * time.Millisecond)
 50				synctest.Wait()
 51			}
 52		})
 53
 54		time.Sleep(time.Duration(numEvents) * (subscriberSendTimeout + 20*time.Millisecond))
 55		synctest.Wait()
 56
 57		received := 0
 58		for {
 59			select {
 60			case <-f.outputCh:
 61				received++
 62			default:
 63				pubWg.Wait()
 64				f.cancel()
 65				f.wg.Wait()
 66				require.Less(t, received, numEvents, "Slow consumer should have dropped some messages")
 67				return
 68			}
 69		}
 70	})
 71}
 72
 73func TestSetupSubscriber_ContextCancellation(t *testing.T) {
 74	synctest.Test(t, func(t *testing.T) {
 75		f := newSubscriberFixture(t, 10)
 76
 77		f.broker.Publish(pubsub.CreatedEvent, "event1")
 78		time.Sleep(100 * time.Millisecond)
 79		synctest.Wait()
 80
 81		f.cancel()
 82		f.wg.Wait()
 83	})
 84}
 85
 86func TestSetupSubscriber_DrainAfterDrop(t *testing.T) {
 87	synctest.Test(t, func(t *testing.T) {
 88		f := newSubscriberFixture(t, 0)
 89
 90		time.Sleep(10 * time.Millisecond)
 91		synctest.Wait()
 92
 93		// First event: nobody reads outputCh so the timer fires (message dropped).
 94		f.broker.Publish(pubsub.CreatedEvent, "event1")
 95		time.Sleep(subscriberSendTimeout + 25*time.Millisecond)
 96		synctest.Wait()
 97
 98		// Second event: triggers Stop()==false path; without the fix this deadlocks.
 99		f.broker.Publish(pubsub.CreatedEvent, "event2")
100
101		// If the timer drain deadlocks, wg.Wait never returns.
102		done := make(chan struct{})
103		go func() {
104			f.cancel()
105			f.wg.Wait()
106			close(done)
107		}()
108
109		select {
110		case <-done:
111		case <-time.After(5 * time.Second):
112			t.Fatal("setupSubscriber goroutine hung — likely timer drain deadlock")
113		}
114	})
115}
116
117func TestSetupSubscriber_NoTimerLeak(t *testing.T) {
118	defer goleak.VerifyNone(t)
119	synctest.Test(t, func(t *testing.T) {
120		f := newSubscriberFixture(t, 100)
121
122		for range 100 {
123			f.broker.Publish(pubsub.CreatedEvent, "event")
124			time.Sleep(5 * time.Millisecond)
125			synctest.Wait()
126		}
127
128		f.cancel()
129		f.wg.Wait()
130	})
131}
132
133type subscriberFixture struct {
134	broker   *pubsub.Broker[string]
135	wg       sync.WaitGroup
136	outputCh chan tea.Msg
137	cancel   context.CancelFunc
138}
139
140func newSubscriberFixture(t *testing.T, bufSize int) *subscriberFixture {
141	t.Helper()
142	ctx, cancel := context.WithCancel(t.Context())
143	t.Cleanup(cancel)
144
145	f := &subscriberFixture{
146		broker:   pubsub.NewBroker[string](),
147		outputCh: make(chan tea.Msg, bufSize),
148		cancel:   cancel,
149	}
150	t.Cleanup(f.broker.Shutdown)
151
152	setupSubscriber(ctx, &f.wg, "test", func(ctx context.Context) <-chan pubsub.Event[string] {
153		return f.broker.Subscribe(ctx)
154	}, f.outputCh)
155
156	return f
157}