fix(app): replace single events channel with pubsub.Broker for fan-out (#2663)

Tai Groot created

Fixes the SSE multi-consumer bug where multiple clients sharing the same
workspace received only a fraction of events (competing-consumer semantics
on a plain channel). Each caller to Events()/SubscribeEvents() now gets
its own dedicated channel via Broker.Subscribe(ctx).

💘 Generated with Crush

Assisted-by: AWS Claude Sonnet 4.6 via Crush <crush@charm.land>

Change summary

internal/app/app.go        |  55 ++------
internal/app/app_test.go   | 240 ++++++++++++++++++++++-----------------
internal/backend/events.go |   8 
internal/server/proto.go   |   6 
4 files changed, 157 insertions(+), 152 deletions(-)

Detailed changes

internal/app/app.go 🔗

@@ -66,7 +66,7 @@ type App struct {
 
 	serviceEventsWG *sync.WaitGroup
 	eventsCtx       context.Context
-	events          chan tea.Msg
+	events          *pubsub.Broker[tea.Msg]
 	tuiWG           *sync.WaitGroup
 
 	// global context and cleanup functions
@@ -100,7 +100,7 @@ func New(ctx context.Context, conn *sql.DB, store *config.ConfigStore) (*App, er
 
 		config: store,
 
-		events:             make(chan tea.Msg, 100),
+		events:             pubsub.NewBroker[tea.Msg](),
 		serviceEventsWG:    &sync.WaitGroup{},
 		tuiWG:              &sync.WaitGroup{},
 		agentNotifications: pubsub.NewBroker[notify.Notification](),
@@ -153,18 +153,15 @@ func (app *App) Store() *config.ConfigStore {
 	return app.config
 }
 
-// Events returns the events channel for the application.
-func (app *App) Events() <-chan tea.Msg {
-	return app.events
+// Events returns a per-caller subscription channel for application events.
+// Each caller receives its own channel; all callers receive every event.
+func (app *App) Events(ctx context.Context) <-chan pubsub.Event[tea.Msg] {
+	return app.events.Subscribe(ctx)
 }
 
-// SendEvent pushes a message into the application's events channel.
-// It is non-blocking; the message is dropped if the channel is full.
+// SendEvent publishes a message to all event subscribers.
 func (app *App) SendEvent(msg tea.Msg) {
-	select {
-	case app.events <- msg:
-	default:
-	}
+	app.events.Publish(pubsub.UpdatedEvent, msg)
 }
 
 // AgentNotifications returns the broker for agent notification events.
@@ -486,26 +483,21 @@ func (app *App) setupEvents() {
 	cleanupFunc := func(context.Context) error {
 		cancel()
 		app.serviceEventsWG.Wait()
+		app.events.Shutdown()
 		return nil
 	}
 	app.cleanupFuncs = append(app.cleanupFuncs, cleanupFunc)
 }
 
-const subscriberSendTimeout = 2 * time.Second
-
 func setupSubscriber[T any](
 	ctx context.Context,
 	wg *sync.WaitGroup,
 	name string,
 	subscriber func(context.Context) <-chan pubsub.Event[T],
-	outputCh chan<- tea.Msg,
+	broker *pubsub.Broker[tea.Msg],
 ) {
 	wg.Go(func() {
 		subCh := subscriber(ctx)
-		sendTimer := time.NewTimer(0)
-		<-sendTimer.C
-		defer sendTimer.Stop()
-
 		for {
 			select {
 			case event, ok := <-subCh:
@@ -513,23 +505,7 @@ func setupSubscriber[T any](
 					slog.Debug("Subscription channel closed", "name", name)
 					return
 				}
-				var msg tea.Msg = event
-				if !sendTimer.Stop() {
-					select {
-					case <-sendTimer.C:
-					default:
-					}
-				}
-				sendTimer.Reset(subscriberSendTimeout)
-
-				select {
-				case outputCh <- msg:
-				case <-sendTimer.C:
-					slog.Debug("Message dropped due to slow consumer", "name", name)
-				case <-ctx.Done():
-					slog.Debug("Subscription cancelled", "name", name)
-					return
-				}
+				broker.Publish(pubsub.UpdatedEvent, tea.Msg(event))
 			case <-ctx.Done():
 				slog.Debug("Subscription cancelled", "name", name)
 				return
@@ -579,17 +555,18 @@ func (app *App) Subscribe(program *tea.Program) {
 	})
 	defer app.tuiWG.Done()
 
+	events := app.events.Subscribe(tuiCtx)
 	for {
 		select {
 		case <-tuiCtx.Done():
 			slog.Debug("TUI message handler shutting down")
 			return
-		case msg, ok := <-app.events:
+		case ev, ok := <-events:
 			if !ok {
 				slog.Debug("TUI message channel closed")
 				return
 			}
-			program.Send(msg)
+			program.Send(ev.Payload)
 		}
 	}
 }
@@ -649,9 +626,9 @@ func (app *App) checkForUpdates(ctx context.Context) {
 	if err != nil || !info.Available() {
 		return
 	}
-	app.events <- UpdateAvailableMsg{
+	app.events.Publish(pubsub.UpdatedEvent, UpdateAvailableMsg{
 		CurrentVersion: info.Current,
 		LatestVersion:  info.Latest,
 		IsDevelopment:  info.IsDevelopment(),
-	}
+	})
 }

internal/app/app_test.go 🔗

@@ -2,156 +2,182 @@ package app
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"testing"
-	"testing/synctest"
 	"time"
 
 	tea "charm.land/bubbletea/v2"
 	"github.com/charmbracelet/crush/internal/pubsub"
 	"github.com/stretchr/testify/require"
-	"go.uber.org/goleak"
 )
 
+// TestSetupSubscriber_NormalFlow verifies that events published to the source
+// broker are forwarded to the output broker.
 func TestSetupSubscriber_NormalFlow(t *testing.T) {
-	synctest.Test(t, func(t *testing.T) {
-		f := newSubscriberFixture(t, 10)
+	t.Parallel()
 
-		time.Sleep(10 * time.Millisecond)
-		synctest.Wait()
+	ctx, cancel := context.WithCancel(t.Context())
+	defer cancel()
 
-		f.broker.Publish(pubsub.CreatedEvent, "event1")
-		f.broker.Publish(pubsub.CreatedEvent, "event2")
+	src := pubsub.NewBroker[string]()
+	defer src.Shutdown()
+	out := pubsub.NewBroker[tea.Msg]()
+	defer out.Shutdown()
 
-		for range 2 {
-			select {
-			case <-f.outputCh:
-			case <-time.After(5 * time.Second):
-				t.Fatal("Timed out waiting for messages")
-			}
+	ch := out.Subscribe(ctx)
+
+	var wg sync.WaitGroup
+	setupSubscriber(ctx, &wg, "test", src.Subscribe, out)
+
+	// Yield so the subscriber goroutine can call src.Subscribe before we publish.
+	time.Sleep(10 * time.Millisecond)
+
+	src.Publish(pubsub.CreatedEvent, "hello")
+	src.Publish(pubsub.CreatedEvent, "world")
+
+	for range 2 {
+		select {
+		case <-ch:
+		case <-time.After(5 * time.Second):
+			t.Fatal("timed out waiting for forwarded event")
 		}
+	}
 
-		f.cancel()
-		f.wg.Wait()
-	})
+	cancel()
+	wg.Wait()
 }
 
-func TestSetupSubscriber_SlowConsumer(t *testing.T) {
-	synctest.Test(t, func(t *testing.T) {
-		f := newSubscriberFixture(t, 0)
+// TestSetupSubscriber_ContextCancellation verifies the goroutine exits cleanly
+// when the context is cancelled.
+func TestSetupSubscriber_ContextCancellation(t *testing.T) {
+	t.Parallel()
 
-		const numEvents = 5
+	ctx, cancel := context.WithCancel(t.Context())
 
-		var pubWg sync.WaitGroup
-		pubWg.Go(func() {
-			for range numEvents {
-				f.broker.Publish(pubsub.CreatedEvent, "event")
-				time.Sleep(10 * time.Millisecond)
-				synctest.Wait()
-			}
-		})
+	src := pubsub.NewBroker[string]()
+	defer src.Shutdown()
+	out := pubsub.NewBroker[tea.Msg]()
+	defer out.Shutdown()
 
-		time.Sleep(time.Duration(numEvents) * (subscriberSendTimeout + 20*time.Millisecond))
-		synctest.Wait()
-
-		received := 0
-		for {
-			select {
-			case <-f.outputCh:
-				received++
-			default:
-				pubWg.Wait()
-				f.cancel()
-				f.wg.Wait()
-				require.Less(t, received, numEvents, "Slow consumer should have dropped some messages")
-				return
-			}
-		}
-	})
+	var wg sync.WaitGroup
+	setupSubscriber(ctx, &wg, "test", src.Subscribe, out)
+
+	src.Publish(pubsub.CreatedEvent, "event")
+	cancel()
+
+	done := make(chan struct{})
+	go func() { wg.Wait(); close(done) }()
+
+	select {
+	case <-done:
+	case <-time.After(5 * time.Second):
+		t.Fatal("setupSubscriber goroutine did not exit after context cancellation")
+	}
 }
 
-func TestSetupSubscriber_ContextCancellation(t *testing.T) {
-	synctest.Test(t, func(t *testing.T) {
-		f := newSubscriberFixture(t, 10)
+// TestEvents_ZeroConsumers verifies that publishing with no subscribers does
+// not block or panic.
+func TestEvents_ZeroConsumers(t *testing.T) {
+	t.Parallel()
+
+	broker := pubsub.NewBroker[tea.Msg]()
+	defer broker.Shutdown()
+
+	require.Equal(t, 0, broker.GetSubscriberCount())
 
-		f.broker.Publish(pubsub.CreatedEvent, "event1")
-		time.Sleep(100 * time.Millisecond)
-		synctest.Wait()
+	// Must not block.
+	done := make(chan struct{})
+	go func() {
+		broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg1"))
+		broker.Publish(pubsub.UpdatedEvent, tea.Msg("msg2"))
+		close(done)
+	}()
 
-		f.cancel()
-		f.wg.Wait()
-	})
+	select {
+	case <-done:
+	case <-time.After(time.Second):
+		t.Fatal("Publish with zero consumers blocked")
+	}
 }
 
-func TestSetupSubscriber_DrainAfterDrop(t *testing.T) {
-	synctest.Test(t, func(t *testing.T) {
-		f := newSubscriberFixture(t, 0)
+// TestEvents_OneConsumer verifies that a single subscriber receives every event
+// exactly once.
+func TestEvents_OneConsumer(t *testing.T) {
+	t.Parallel()
 
-		time.Sleep(10 * time.Millisecond)
-		synctest.Wait()
+	ctx, cancel := context.WithCancel(t.Context())
+	defer cancel()
 
-		// First event: nobody reads outputCh so the timer fires (message dropped).
-		f.broker.Publish(pubsub.CreatedEvent, "event1")
-		time.Sleep(subscriberSendTimeout + 25*time.Millisecond)
-		synctest.Wait()
+	broker := pubsub.NewBroker[tea.Msg]()
+	defer broker.Shutdown()
 
-		// Second event: triggers Stop()==false path; without the fix this deadlocks.
-		f.broker.Publish(pubsub.CreatedEvent, "event2")
+	ch := broker.Subscribe(ctx)
 
-		// If the timer drain deadlocks, wg.Wait never returns.
-		done := make(chan struct{})
-		go func() {
-			f.cancel()
-			f.wg.Wait()
-			close(done)
-		}()
+	const n = 10
+	for i := range n {
+		broker.Publish(pubsub.UpdatedEvent, tea.Msg(i))
+	}
 
+	for i := range n {
 		select {
-		case <-done:
+		case ev := <-ch:
+			require.Equal(t, tea.Msg(i), ev.Payload)
 		case <-time.After(5 * time.Second):
-			t.Fatal("setupSubscriber goroutine hung — likely timer drain deadlock")
+			t.Fatalf("timed out waiting for event %d", i)
 		}
-	})
+	}
 }
 
-func TestSetupSubscriber_NoTimerLeak(t *testing.T) {
-	defer goleak.VerifyNone(t)
-	synctest.Test(t, func(t *testing.T) {
-		f := newSubscriberFixture(t, 100)
+// TestEvents_NConsumers verifies that every subscriber receives every event
+// exactly once, regardless of how many concurrent consumers are attached.
+func TestEvents_NConsumers(t *testing.T) {
+	t.Parallel()
 
-		for range 100 {
-			f.broker.Publish(pubsub.CreatedEvent, "event")
-			time.Sleep(5 * time.Millisecond)
-			synctest.Wait()
-		}
-
-		f.cancel()
-		f.wg.Wait()
-	})
-}
-
-type subscriberFixture struct {
-	broker   *pubsub.Broker[string]
-	wg       sync.WaitGroup
-	outputCh chan tea.Msg
-	cancel   context.CancelFunc
+	for _, n := range []int{2, 5, 10} {
+		t.Run(fmt.Sprintf("consumers=%d", n), func(t *testing.T) {
+			t.Parallel()
+			testNConsumers(t, n)
+		})
+	}
 }
 
-func newSubscriberFixture(t *testing.T, bufSize int) *subscriberFixture {
+func testNConsumers(t *testing.T, n int) {
 	t.Helper()
+
 	ctx, cancel := context.WithCancel(t.Context())
-	t.Cleanup(cancel)
+	defer cancel()
+
+	broker := pubsub.NewBroker[tea.Msg]()
+	defer broker.Shutdown()
 
-	f := &subscriberFixture{
-		broker:   pubsub.NewBroker[string](),
-		outputCh: make(chan tea.Msg, bufSize),
-		cancel:   cancel,
+	// Subscribe all N consumers before publishing.
+	channels := make([]<-chan pubsub.Event[tea.Msg], n)
+	for i := range n {
+		channels[i] = broker.Subscribe(ctx)
 	}
-	t.Cleanup(f.broker.Shutdown)
+	require.Equal(t, n, broker.GetSubscriberCount())
 
-	setupSubscriber(ctx, &f.wg, "test", func(ctx context.Context) <-chan pubsub.Event[string] {
-		return f.broker.Subscribe(ctx)
-	}, f.outputCh)
+	const numEvents = 20
+	for i := range numEvents {
+		broker.Publish(pubsub.UpdatedEvent, tea.Msg(i))
+	}
 
-	return f
+	// Each consumer must receive all numEvents messages.
+	var wg sync.WaitGroup
+	for i, ch := range channels {
+		wg.Go(func() {
+			for j := range numEvents {
+				select {
+				case ev := <-ch:
+					require.Equal(t, tea.Msg(j), ev.Payload,
+						"consumer %d: wrong payload for event %d", i, j)
+				case <-time.After(5 * time.Second):
+					t.Errorf("consumer %d: timed out waiting for event %d", i, j)
+					return
+				}
+			}
+		})
+	}
+	wg.Wait()
 }

internal/backend/events.go 🔗

@@ -8,16 +8,18 @@ import (
 	mcptools "github.com/charmbracelet/crush/internal/agent/tools/mcp"
 	"github.com/charmbracelet/crush/internal/app"
 	"github.com/charmbracelet/crush/internal/config"
+	"github.com/charmbracelet/crush/internal/pubsub"
 )
 
-// SubscribeEvents returns the event channel for a workspace's app.
-func (b *Backend) SubscribeEvents(workspaceID string) (<-chan tea.Msg, error) {
+// SubscribeEvents returns a per-caller event channel for a workspace.
+// Each caller receives all events; multiple callers do not compete.
+func (b *Backend) SubscribeEvents(ctx context.Context, workspaceID string) (<-chan pubsub.Event[tea.Msg], error) {
 	ws, err := b.GetWorkspace(workspaceID)
 	if err != nil {
 		return nil, err
 	}
 
-	return ws.Events(), nil
+	return ws.Events(ctx), nil
 }
 
 // GetLSPStates returns the state of all LSP clients.

internal/server/proto.go 🔗

@@ -199,7 +199,7 @@ func (c *controllerV1) handleGetWorkspaceProviders(w http.ResponseWriter, r *htt
 func (c *controllerV1) handleGetWorkspaceEvents(w http.ResponseWriter, r *http.Request) {
 	flusher := http.NewResponseController(w)
 	id := r.PathValue("id")
-	events, err := c.backend.SubscribeEvents(id)
+	events, err := c.backend.SubscribeEvents(r.Context(), id)
 	if err != nil {
 		c.handleError(w, r, err)
 		return
@@ -218,8 +218,8 @@ func (c *controllerV1) handleGetWorkspaceEvents(w http.ResponseWriter, r *http.R
 			if !ok {
 				return
 			}
-			c.server.logDebug(r, "Sending event", "event", fmt.Sprintf("%T %+v", ev, ev))
-			wrapped := wrapEvent(ev)
+			c.server.logDebug(r, "Sending event", "event", fmt.Sprintf("%T %+v", ev.Payload, ev.Payload))
+			wrapped := wrapEvent(ev.Payload)
 			if wrapped == nil {
 				continue
 			}