From 4d16e96bd30016f674f162fc47128d3c108353eb Mon Sep 17 00:00:00 2001 From: Tai Groot Date: Tue, 28 Apr 2026 17:03:50 -0400 Subject: [PATCH] fix(app): replace single events channel with pubsub.Broker for fan-out (#2663) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes the SSE multi-consumer bug where multiple clients sharing the same workspace received only a fraction of events (competing-consumer semantics on a plain channel). Each caller to Events()/SubscribeEvents() now gets its own dedicated channel via Broker.Subscribe(ctx). 💘 Generated with Crush Assisted-by: AWS Claude Sonnet 4.6 via Crush --- internal/app/app.go | 55 +++------ internal/app/app_test.go | 240 ++++++++++++++++++++----------------- internal/backend/events.go | 8 +- internal/server/proto.go | 6 +- 4 files changed, 157 insertions(+), 152 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index d16ee32e9b24a141e834f750b046e1332b6d050c..a167ca8638c8497a6d6f4260782ba334c6dbe0c3 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -66,7 +66,7 @@ type App struct { serviceEventsWG *sync.WaitGroup eventsCtx context.Context - events chan tea.Msg + events *pubsub.Broker[tea.Msg] tuiWG *sync.WaitGroup // global context and cleanup functions @@ -100,7 +100,7 @@ func New(ctx context.Context, conn *sql.DB, store *config.ConfigStore) (*App, er config: store, - events: make(chan tea.Msg, 100), + events: pubsub.NewBroker[tea.Msg](), serviceEventsWG: &sync.WaitGroup{}, tuiWG: &sync.WaitGroup{}, agentNotifications: pubsub.NewBroker[notify.Notification](), @@ -153,18 +153,15 @@ func (app *App) Store() *config.ConfigStore { return app.config } -// Events returns the events channel for the application. -func (app *App) Events() <-chan tea.Msg { - return app.events +// Events returns a per-caller subscription channel for application events. +// Each caller receives its own channel; all callers receive every event. +func (app *App) Events(ctx context.Context) <-chan pubsub.Event[tea.Msg] { + return app.events.Subscribe(ctx) } -// SendEvent pushes a message into the application's events channel. -// It is non-blocking; the message is dropped if the channel is full. +// SendEvent publishes a message to all event subscribers. func (app *App) SendEvent(msg tea.Msg) { - select { - case app.events <- msg: - default: - } + app.events.Publish(pubsub.UpdatedEvent, msg) } // AgentNotifications returns the broker for agent notification events. @@ -486,26 +483,21 @@ func (app *App) setupEvents() { cleanupFunc := func(context.Context) error { cancel() app.serviceEventsWG.Wait() + app.events.Shutdown() return nil } app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc) } -const subscriberSendTimeout = 2 * time.Second - func setupSubscriber[T any]( ctx context.Context, wg *sync.WaitGroup, name string, subscriber func(context.Context) <-chan pubsub.Event[T], - outputCh chan<- tea.Msg, + broker *pubsub.Broker[tea.Msg], ) { wg.Go(func() { subCh := subscriber(ctx) - sendTimer := time.NewTimer(0) - <-sendTimer.C - defer sendTimer.Stop() - for { select { case event, ok := <-subCh: @@ -513,23 +505,7 @@ func setupSubscriber[T any]( slog.Debug("Subscription channel closed", "name", name) return } - var msg tea.Msg = event - if !sendTimer.Stop() { - select { - case <-sendTimer.C: - default: - } - } - sendTimer.Reset(subscriberSendTimeout) - - select { - case outputCh <- msg: - case <-sendTimer.C: - slog.Debug("Message dropped due to slow consumer", "name", name) - case <-ctx.Done(): - slog.Debug("Subscription cancelled", "name", name) - return - } + broker.Publish(pubsub.UpdatedEvent, tea.Msg(event)) case <-ctx.Done(): slog.Debug("Subscription cancelled", "name", name) return @@ -579,17 +555,18 @@ func (app *App) Subscribe(program *tea.Program) { }) defer app.tuiWG.Done() + events := app.events.Subscribe(tuiCtx) for { select { case <-tuiCtx.Done(): slog.Debug("TUI message handler shutting down") return - case msg, ok := <-app.events: + case ev, ok := <-events: if !ok { slog.Debug("TUI message channel closed") return } - program.Send(msg) + program.Send(ev.Payload) } } } @@ -649,9 +626,9 @@ func (app *App) checkForUpdates(ctx context.Context) { if err != nil || !info.Available() { return } - app.events <- UpdateAvailableMsg{ + app.events.Publish(pubsub.UpdatedEvent, UpdateAvailableMsg{ CurrentVersion: info.Current, LatestVersion: info.Latest, IsDevelopment: info.IsDevelopment(), - } + }) } diff --git a/internal/app/app_test.go b/internal/app/app_test.go index 61b99158f9979d7e21a3c9fe7ad19c74a8111242..e87846ed9744fbfa950867a5e7baae2160518771 100644 --- a/internal/app/app_test.go +++ b/internal/app/app_test.go @@ -2,156 +2,182 @@ package app import ( "context" + "fmt" "sync" "testing" - "testing/synctest" "time" tea "charm.land/bubbletea/v2" "github.com/charmbracelet/crush/internal/pubsub" "github.com/stretchr/testify/require" - "go.uber.org/goleak" ) +// TestSetupSubscriber_NormalFlow verifies that events published to the source +// broker are forwarded to the output broker. func TestSetupSubscriber_NormalFlow(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - f := newSubscriberFixture(t, 10) + t.Parallel() - time.Sleep(10 * time.Millisecond) - synctest.Wait() + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() - f.broker.Publish(pubsub.CreatedEvent, "event1") - f.broker.Publish(pubsub.CreatedEvent, "event2") + src := pubsub.NewBroker[string]() + defer src.Shutdown() + out := pubsub.NewBroker[tea.Msg]() + defer out.Shutdown() - for range 2 { - select { - case <-f.outputCh: - case <-time.After(5 * time.Second): - t.Fatal("Timed out waiting for messages") - } + ch := out.Subscribe(ctx) + + var wg sync.WaitGroup + setupSubscriber(ctx, &wg, "test", src.Subscribe, out) + + // Yield so the subscriber goroutine can call src.Subscribe before we publish. + time.Sleep(10 * time.Millisecond) + + src.Publish(pubsub.CreatedEvent, "hello") + src.Publish(pubsub.CreatedEvent, "world") + + for range 2 { + select { + case <-ch: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for forwarded event") } + } - f.cancel() - f.wg.Wait() - }) + cancel() + wg.Wait() } -func TestSetupSubscriber_SlowConsumer(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - f := newSubscriberFixture(t, 0) +// TestSetupSubscriber_ContextCancellation verifies the goroutine exits cleanly +// when the context is cancelled. +func TestSetupSubscriber_ContextCancellation(t *testing.T) { + t.Parallel() - const numEvents = 5 + ctx, cancel := context.WithCancel(t.Context()) - var pubWg sync.WaitGroup - pubWg.Go(func() { - for range numEvents { - f.broker.Publish(pubsub.CreatedEvent, "event") - time.Sleep(10 * time.Millisecond) - synctest.Wait() - } - }) + src := pubsub.NewBroker[string]() + defer src.Shutdown() + out := pubsub.NewBroker[tea.Msg]() + defer out.Shutdown() - time.Sleep(time.Duration(numEvents) * (subscriberSendTimeout + 20*time.Millisecond)) - synctest.Wait() - - received := 0 - for { - select { - case <-f.outputCh: - received++ - default: - pubWg.Wait() - f.cancel() - f.wg.Wait() - require.Less(t, received, numEvents, "Slow consumer should have dropped some messages") - return - } - } - }) + var wg sync.WaitGroup + setupSubscriber(ctx, &wg, "test", src.Subscribe, out) + + src.Publish(pubsub.CreatedEvent, "event") + cancel() + + done := make(chan struct{}) + go func() { wg.Wait(); close(done) }() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("setupSubscriber goroutine did not exit after context cancellation") + } } -func TestSetupSubscriber_ContextCancellation(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - f := newSubscriberFixture(t, 10) +// TestEvents_ZeroConsumers verifies that publishing with no subscribers does +// not block or panic. +func TestEvents_ZeroConsumers(t *testing.T) { + t.Parallel() + + broker := pubsub.NewBroker[tea.Msg]() + defer broker.Shutdown() + + require.Equal(t, 0, broker.GetSubscriberCount()) - f.broker.Publish(pubsub.CreatedEvent, "event1") - time.Sleep(100 * time.Millisecond) - synctest.Wait() + // Must not block. + done := make(chan struct{}) + go func() { + broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg1")) + broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg2")) + close(done) + }() - f.cancel() - f.wg.Wait() - }) + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("Publish with zero consumers blocked") + } } -func TestSetupSubscriber_DrainAfterDrop(t *testing.T) { - synctest.Test(t, func(t *testing.T) { - f := newSubscriberFixture(t, 0) +// TestEvents_OneConsumer verifies that a single subscriber receives every event +// exactly once. +func TestEvents_OneConsumer(t *testing.T) { + t.Parallel() - time.Sleep(10 * time.Millisecond) - synctest.Wait() + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() - // First event: nobody reads outputCh so the timer fires (message dropped). - f.broker.Publish(pubsub.CreatedEvent, "event1") - time.Sleep(subscriberSendTimeout + 25*time.Millisecond) - synctest.Wait() + broker := pubsub.NewBroker[tea.Msg]() + defer broker.Shutdown() - // Second event: triggers Stop()==false path; without the fix this deadlocks. - f.broker.Publish(pubsub.CreatedEvent, "event2") + ch := broker.Subscribe(ctx) - // If the timer drain deadlocks, wg.Wait never returns. - done := make(chan struct{}) - go func() { - f.cancel() - f.wg.Wait() - close(done) - }() + const n = 10 + for i := range n { + broker.Publish(pubsub.UpdatedEvent, tea.Msg(i)) + } + for i := range n { select { - case <-done: + case ev := <-ch: + require.Equal(t, tea.Msg(i), ev.Payload) case <-time.After(5 * time.Second): - t.Fatal("setupSubscriber goroutine hung — likely timer drain deadlock") + t.Fatalf("timed out waiting for event %d", i) } - }) + } } -func TestSetupSubscriber_NoTimerLeak(t *testing.T) { - defer goleak.VerifyNone(t) - synctest.Test(t, func(t *testing.T) { - f := newSubscriberFixture(t, 100) +// TestEvents_NConsumers verifies that every subscriber receives every event +// exactly once, regardless of how many concurrent consumers are attached. +func TestEvents_NConsumers(t *testing.T) { + t.Parallel() - for range 100 { - f.broker.Publish(pubsub.CreatedEvent, "event") - time.Sleep(5 * time.Millisecond) - synctest.Wait() - } - - f.cancel() - f.wg.Wait() - }) -} - -type subscriberFixture struct { - broker *pubsub.Broker[string] - wg sync.WaitGroup - outputCh chan tea.Msg - cancel context.CancelFunc + for _, n := range []int{2, 5, 10} { + t.Run(fmt.Sprintf("consumers=%d", n), func(t *testing.T) { + t.Parallel() + testNConsumers(t, n) + }) + } } -func newSubscriberFixture(t *testing.T, bufSize int) *subscriberFixture { +func testNConsumers(t *testing.T, n int) { t.Helper() + ctx, cancel := context.WithCancel(t.Context()) - t.Cleanup(cancel) + defer cancel() + + broker := pubsub.NewBroker[tea.Msg]() + defer broker.Shutdown() - f := &subscriberFixture{ - broker: pubsub.NewBroker[string](), - outputCh: make(chan tea.Msg, bufSize), - cancel: cancel, + // Subscribe all N consumers before publishing. + channels := make([]<-chan pubsub.Event[tea.Msg], n) + for i := range n { + channels[i] = broker.Subscribe(ctx) } - t.Cleanup(f.broker.Shutdown) + require.Equal(t, n, broker.GetSubscriberCount()) - setupSubscriber(ctx, &f.wg, "test", func(ctx context.Context) <-chan pubsub.Event[string] { - return f.broker.Subscribe(ctx) - }, f.outputCh) + const numEvents = 20 + for i := range numEvents { + broker.Publish(pubsub.UpdatedEvent, tea.Msg(i)) + } - return f + // Each consumer must receive all numEvents messages. + var wg sync.WaitGroup + for i, ch := range channels { + wg.Go(func() { + for j := range numEvents { + select { + case ev := <-ch: + require.Equal(t, tea.Msg(j), ev.Payload, + "consumer %d: wrong payload for event %d", i, j) + case <-time.After(5 * time.Second): + t.Errorf("consumer %d: timed out waiting for event %d", i, j) + return + } + } + }) + } + wg.Wait() } diff --git a/internal/backend/events.go b/internal/backend/events.go index a91bad1d5322d1c0ed909b3239e9e97c0eb0c366..6df61f8a742f6672582a3e0db0ba6afbbc2d53af 100644 --- a/internal/backend/events.go +++ b/internal/backend/events.go @@ -8,16 +8,18 @@ import ( mcptools "github.com/charmbracelet/crush/internal/agent/tools/mcp" "github.com/charmbracelet/crush/internal/app" "github.com/charmbracelet/crush/internal/config" + "github.com/charmbracelet/crush/internal/pubsub" ) -// SubscribeEvents returns the event channel for a workspace's app. -func (b *Backend) SubscribeEvents(workspaceID string) (<-chan tea.Msg, error) { +// SubscribeEvents returns a per-caller event channel for a workspace. +// Each caller receives all events; multiple callers do not compete. +func (b *Backend) SubscribeEvents(ctx context.Context, workspaceID string) (<-chan pubsub.Event[tea.Msg], error) { ws, err := b.GetWorkspace(workspaceID) if err != nil { return nil, err } - return ws.Events(), nil + return ws.Events(ctx), nil } // GetLSPStates returns the state of all LSP clients. diff --git a/internal/server/proto.go b/internal/server/proto.go index af34131810a7af8c3672fe460198d25afe9ba064..af591173d64e4c56ae51964ca56fcfbbca9658a6 100644 --- a/internal/server/proto.go +++ b/internal/server/proto.go @@ -199,7 +199,7 @@ func (c *controllerV1) handleGetWorkspaceProviders(w http.ResponseWriter, r *htt func (c *controllerV1) handleGetWorkspaceEvents(w http.ResponseWriter, r *http.Request) { flusher := http.NewResponseController(w) id := r.PathValue("id") - events, err := c.backend.SubscribeEvents(id) + events, err := c.backend.SubscribeEvents(r.Context(), id) if err != nil { c.handleError(w, r, err) return @@ -218,8 +218,8 @@ func (c *controllerV1) handleGetWorkspaceEvents(w http.ResponseWriter, r *http.R if !ok { return } - c.server.logDebug(r, "Sending event", "event", fmt.Sprintf("%T %+v", ev, ev)) - wrapped := wrapEvent(ev) + c.server.logDebug(r, "Sending event", "event", fmt.Sprintf("%T %+v", ev.Payload, ev.Payload)) + wrapped := wrapEvent(ev.Payload) if wrapped == nil { continue }