From 0491b670e4eefde474876dbb7b699ed2d8b9e318 Mon Sep 17 00:00:00 2001 From: Carlos Alexandro Becker Date: Fri, 6 Feb 2026 15:55:04 -0300 Subject: [PATCH] perf: timer leak in setupSubscriber (#2147) * fix: fix timer leak in setupSubscriber causing resource exhaustion under high load * fix: synctest Signed-off-by: Carlos Alexandro Becker * test: improvements Signed-off-by: Carlos Alexandro Becker * refactor: tests Signed-off-by: Carlos Alexandro Becker --------- Signed-off-by: Carlos Alexandro Becker Co-authored-by: AnyCPU --- go.mod | 1 + go.sum | 2 + internal/app/app.go | 16 +++- internal/app/app_test.go | 157 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 internal/app/app_test.go diff --git a/go.mod b/go.mod index daca23284eb1b3e6280210d13f6fb852371b9caf..bda063569bac9702747085c90f9e57ec93593a06 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 github.com/zeebo/xxh3 v1.1.0 + go.uber.org/goleak v1.3.0 golang.org/x/net v0.49.0 golang.org/x/sync v0.19.0 golang.org/x/text v0.33.0 diff --git a/go.sum b/go.sum index f7248662cf8cb5300e0977553c86b88420d3e5e2..5d5d296dbc94806e3d06a24190949548668671bd 100644 --- a/go.sum +++ b/go.sum @@ -375,6 +375,8 @@ go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFh go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= diff --git a/internal/app/app.go b/internal/app/app.go index f0cabfa534a58401280fb5e9b973aa6f5a9d91c9..35534629f64e29dc39beb95c55e5873b551218a4 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -421,6 +421,8 @@ func (app *App) setupEvents() { app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc) } +const subscriberSendTimeout = 2 * time.Second + func setupSubscriber[T any]( ctx context.Context, wg *sync.WaitGroup, @@ -430,6 +432,10 @@ func setupSubscriber[T any]( ) { wg.Go(func() { subCh := subscriber(ctx) + sendTimer := time.NewTimer(0) + <-sendTimer.C + defer sendTimer.Stop() + for { select { case event, ok := <-subCh: @@ -438,9 +444,17 @@ func setupSubscriber[T any]( return } var msg tea.Msg = event + if !sendTimer.Stop() { + select { + case <-sendTimer.C: + default: + } + } + sendTimer.Reset(subscriberSendTimeout) + select { case outputCh <- msg: - case <-time.After(2 * time.Second): + case <-sendTimer.C: slog.Debug("Message dropped due to slow consumer", "name", name) case <-ctx.Done(): slog.Debug("Subscription cancelled", "name", name) diff --git a/internal/app/app_test.go b/internal/app/app_test.go new file mode 100644 index 0000000000000000000000000000000000000000..61b99158f9979d7e21a3c9fe7ad19c74a8111242 --- /dev/null +++ b/internal/app/app_test.go @@ -0,0 +1,157 @@ +package app + +import ( + "context" + "sync" + "testing" + "testing/synctest" + "time" + + tea "charm.land/bubbletea/v2" + "github.com/charmbracelet/crush/internal/pubsub" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" +) + +func TestSetupSubscriber_NormalFlow(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + f := newSubscriberFixture(t, 10) + + time.Sleep(10 * time.Millisecond) + synctest.Wait() + + f.broker.Publish(pubsub.CreatedEvent, "event1") + f.broker.Publish(pubsub.CreatedEvent, "event2") + + for range 2 { + select { + case <-f.outputCh: + case <-time.After(5 * time.Second): + t.Fatal("Timed out waiting for messages") + } + } + + f.cancel() + f.wg.Wait() + }) +} + +func TestSetupSubscriber_SlowConsumer(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + f := newSubscriberFixture(t, 0) + + const numEvents = 5 + + var pubWg sync.WaitGroup + pubWg.Go(func() { + for range numEvents { + f.broker.Publish(pubsub.CreatedEvent, "event") + time.Sleep(10 * time.Millisecond) + synctest.Wait() + } + }) + + time.Sleep(time.Duration(numEvents) * (subscriberSendTimeout + 20*time.Millisecond)) + synctest.Wait() + + received := 0 + for { + select { + case <-f.outputCh: + received++ + default: + pubWg.Wait() + f.cancel() + f.wg.Wait() + require.Less(t, received, numEvents, "Slow consumer should have dropped some messages") + return + } + } + }) +} + +func TestSetupSubscriber_ContextCancellation(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + f := newSubscriberFixture(t, 10) + + f.broker.Publish(pubsub.CreatedEvent, "event1") + time.Sleep(100 * time.Millisecond) + synctest.Wait() + + f.cancel() + f.wg.Wait() + }) +} + +func TestSetupSubscriber_DrainAfterDrop(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + f := newSubscriberFixture(t, 0) + + time.Sleep(10 * time.Millisecond) + synctest.Wait() + + // First event: nobody reads outputCh so the timer fires (message dropped). + f.broker.Publish(pubsub.CreatedEvent, "event1") + time.Sleep(subscriberSendTimeout + 25*time.Millisecond) + synctest.Wait() + + // Second event: triggers Stop()==false path; without the fix this deadlocks. + f.broker.Publish(pubsub.CreatedEvent, "event2") + + // If the timer drain deadlocks, wg.Wait never returns. + done := make(chan struct{}) + go func() { + f.cancel() + f.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("setupSubscriber goroutine hung — likely timer drain deadlock") + } + }) +} + +func TestSetupSubscriber_NoTimerLeak(t *testing.T) { + defer goleak.VerifyNone(t) + synctest.Test(t, func(t *testing.T) { + f := newSubscriberFixture(t, 100) + + for range 100 { + f.broker.Publish(pubsub.CreatedEvent, "event") + time.Sleep(5 * time.Millisecond) + synctest.Wait() + } + + f.cancel() + f.wg.Wait() + }) +} + +type subscriberFixture struct { + broker *pubsub.Broker[string] + wg sync.WaitGroup + outputCh chan tea.Msg + cancel context.CancelFunc +} + +func newSubscriberFixture(t *testing.T, bufSize int) *subscriberFixture { + t.Helper() + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(cancel) + + f := &subscriberFixture{ + broker: pubsub.NewBroker[string](), + outputCh: make(chan tea.Msg, bufSize), + cancel: cancel, + } + t.Cleanup(f.broker.Shutdown) + + setupSubscriber(ctx, &f.wg, "test", func(ctx context.Context) <-chan pubsub.Event[string] { + return f.broker.Subscribe(ctx) + }, f.outputCh) + + return f +}