perf: timer leak in setupSubscriber (#2147)

Carlos Alexandro Becker and AnyCPU created

* fix: fix timer leak in setupSubscriber causing resource exhaustion under high load

* fix: synctest

Signed-off-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>

* test: improvements

Signed-off-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>

* refactor: tests

Signed-off-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>

---------

Signed-off-by: Carlos Alexandro Becker <caarlos0@users.noreply.github.com>
Co-authored-by: AnyCPU <AnyCPU@users.noreply.github.com>

Change summary

go.mod                   |   1 
go.sum                   |   2 
internal/app/app.go      |  16 ++++
internal/app/app_test.go | 157 ++++++++++++++++++++++++++++++++++++++++++
4 files changed, 175 insertions(+), 1 deletion(-)

Detailed changes

go.mod 🔗

@@ -60,6 +60,7 @@ require (
 	github.com/tidwall/gjson v1.18.0
 	github.com/tidwall/sjson v1.2.5
 	github.com/zeebo/xxh3 v1.1.0
+	go.uber.org/goleak v1.3.0
 	golang.org/x/net v0.49.0
 	golang.org/x/sync v0.19.0
 	golang.org/x/text v0.33.0

go.sum 🔗

@@ -375,6 +375,8 @@ go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFh
 go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
 go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI=
 go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA=
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
 go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
 go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=

internal/app/app.go 🔗

@@ -421,6 +421,8 @@ func (app *App) setupEvents() {
 	app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc)
 }
 
+const subscriberSendTimeout = 2 * time.Second
+
 func setupSubscriber[T any](
 	ctx context.Context,
 	wg *sync.WaitGroup,
@@ -430,6 +432,10 @@ func setupSubscriber[T any](
 ) {
 	wg.Go(func() {
 		subCh := subscriber(ctx)
+		sendTimer := time.NewTimer(0)
+		<-sendTimer.C
+		defer sendTimer.Stop()
+
 		for {
 			select {
 			case event, ok := <-subCh:
@@ -438,9 +444,17 @@ func setupSubscriber[T any](
 					return
 				}
 				var msg tea.Msg = event
+				if !sendTimer.Stop() {
+					select {
+					case <-sendTimer.C:
+					default:
+					}
+				}
+				sendTimer.Reset(subscriberSendTimeout)
+
 				select {
 				case outputCh <- msg:
-				case <-time.After(2 * time.Second):
+				case <-sendTimer.C:
 					slog.Debug("Message dropped due to slow consumer", "name", name)
 				case <-ctx.Done():
 					slog.Debug("Subscription cancelled", "name", name)

internal/app/app_test.go 🔗

@@ -0,0 +1,157 @@
+package app
+
+import (
+	"context"
+	"sync"
+	"testing"
+	"testing/synctest"
+	"time"
+
+	tea "charm.land/bubbletea/v2"
+	"github.com/charmbracelet/crush/internal/pubsub"
+	"github.com/stretchr/testify/require"
+	"go.uber.org/goleak"
+)
+
+func TestSetupSubscriber_NormalFlow(t *testing.T) {
+	synctest.Test(t, func(t *testing.T) {
+		f := newSubscriberFixture(t, 10)
+
+		time.Sleep(10 * time.Millisecond)
+		synctest.Wait()
+
+		f.broker.Publish(pubsub.CreatedEvent, "event1")
+		f.broker.Publish(pubsub.CreatedEvent, "event2")
+
+		for range 2 {
+			select {
+			case <-f.outputCh:
+			case <-time.After(5 * time.Second):
+				t.Fatal("Timed out waiting for messages")
+			}
+		}
+
+		f.cancel()
+		f.wg.Wait()
+	})
+}
+
+func TestSetupSubscriber_SlowConsumer(t *testing.T) {
+	synctest.Test(t, func(t *testing.T) {
+		f := newSubscriberFixture(t, 0)
+
+		const numEvents = 5
+
+		var pubWg sync.WaitGroup
+		pubWg.Go(func() {
+			for range numEvents {
+				f.broker.Publish(pubsub.CreatedEvent, "event")
+				time.Sleep(10 * time.Millisecond)
+				synctest.Wait()
+			}
+		})
+
+		time.Sleep(time.Duration(numEvents) * (subscriberSendTimeout + 20*time.Millisecond))
+		synctest.Wait()
+
+		received := 0
+		for {
+			select {
+			case <-f.outputCh:
+				received++
+			default:
+				pubWg.Wait()
+				f.cancel()
+				f.wg.Wait()
+				require.Less(t, received, numEvents, "Slow consumer should have dropped some messages")
+				return
+			}
+		}
+	})
+}
+
+func TestSetupSubscriber_ContextCancellation(t *testing.T) {
+	synctest.Test(t, func(t *testing.T) {
+		f := newSubscriberFixture(t, 10)
+
+		f.broker.Publish(pubsub.CreatedEvent, "event1")
+		time.Sleep(100 * time.Millisecond)
+		synctest.Wait()
+
+		f.cancel()
+		f.wg.Wait()
+	})
+}
+
+func TestSetupSubscriber_DrainAfterDrop(t *testing.T) {
+	synctest.Test(t, func(t *testing.T) {
+		f := newSubscriberFixture(t, 0)
+
+		time.Sleep(10 * time.Millisecond)
+		synctest.Wait()
+
+		// First event: nobody reads outputCh so the timer fires (message dropped).
+		f.broker.Publish(pubsub.CreatedEvent, "event1")
+		time.Sleep(subscriberSendTimeout + 25*time.Millisecond)
+		synctest.Wait()
+
+		// Second event: triggers Stop()==false path; without the fix this deadlocks.
+		f.broker.Publish(pubsub.CreatedEvent, "event2")
+
+		// If the timer drain deadlocks, wg.Wait never returns.
+		done := make(chan struct{})
+		go func() {
+			f.cancel()
+			f.wg.Wait()
+			close(done)
+		}()
+
+		select {
+		case <-done:
+		case <-time.After(5 * time.Second):
+			t.Fatal("setupSubscriber goroutine hung — likely timer drain deadlock")
+		}
+	})
+}
+
+func TestSetupSubscriber_NoTimerLeak(t *testing.T) {
+	defer goleak.VerifyNone(t)
+	synctest.Test(t, func(t *testing.T) {
+		f := newSubscriberFixture(t, 100)
+
+		for range 100 {
+			f.broker.Publish(pubsub.CreatedEvent, "event")
+			time.Sleep(5 * time.Millisecond)
+			synctest.Wait()
+		}
+
+		f.cancel()
+		f.wg.Wait()
+	})
+}
+
+type subscriberFixture struct {
+	broker   *pubsub.Broker[string]
+	wg       sync.WaitGroup
+	outputCh chan tea.Msg
+	cancel   context.CancelFunc
+}
+
+func newSubscriberFixture(t *testing.T, bufSize int) *subscriberFixture {
+	t.Helper()
+	ctx, cancel := context.WithCancel(t.Context())
+	t.Cleanup(cancel)
+
+	f := &subscriberFixture{
+		broker:   pubsub.NewBroker[string](),
+		outputCh: make(chan tea.Msg, bufSize),
+		cancel:   cancel,
+	}
+	t.Cleanup(f.broker.Shutdown)
+
+	setupSubscriber(ctx, &f.wg, "test", func(ctx context.Context) <-chan pubsub.Event[string] {
+		return f.broker.Subscribe(ctx)
+	}, f.outputCh)
+
+	return f
+}