From 0585f498ba16b43902708aec87914fbd63c68e8d Mon Sep 17 00:00:00 2001 From: Sven Olsen Date: Mon, 11 May 2026 14:48:07 -0700 Subject: [PATCH] fix(pubsub): raise default per-subscriber buffer (64 -> 4096) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There still seems to be a TUI-sync issue on upstream/main where the assistant cuts off mid-sentence on screen even though the DB has the full response. Two recent PRs probably get most of the way to fixing it: #2840 reordered the agent's TypeAgentFinished notification past the activeRequests cleanup, and #2836 made channelBufferSize actually honored. In power-user cases the default (64) is still too small, though. A streaming turn calls messages.Update on every OnTextDelta / OnReasoningDelta callback, and the TUI's glamour re-render periodically stalls the Update loop for tens to hundreds of milliseconds at a time. 64 slots fill before drain catches up and the non-blocking publish in broker.go silently drops the rest. This diff adopts a pretty aggressive fix -- 64 -> 4096 covers a long turn at typical SSE rates even under pathological View() stalls. Cost is a few MB of ring-buffer allocation across all brokers, comfortably inside any modern dev machine's budget; users on tighter memory targets can still tune via NewBrokerWithOptions. Drive-by cleanup: maxEvents (struct field + NewBrokerWithOptions param) is set to 1000 by default but never read anywhere in tree -- it's bufferSize, not maxEvents, that actually bounds the queue. Drop maxEvents on the theory that misleading dead code is worse than no code. 💘 Generated with Crush Assisted-by: Claude Opus 4.7 via Crush --- internal/pubsub/broker.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/pubsub/broker.go b/internal/pubsub/broker.go index 52e827c2d6306fa2365372b9867810d9b99d0227..5b2ae19d7639701b044ec38dfd2f63544c0fde01 100644 --- a/internal/pubsub/broker.go +++ b/internal/pubsub/broker.go @@ -5,26 +5,28 @@ import ( "sync" ) -const bufferSize = 64 +// bufferSize is the per-subscriber channel capacity for any broker +// created via NewBroker. Publish is non-blocking, so a full buffer +// silently drops events; sized to cover a long streaming assistant +// turn (~one UpdatedEvent per token) even under TUI render stalls. +const bufferSize = 4096 type Broker[T any] struct { subs map[chan Event[T]]struct{} mu sync.RWMutex done chan struct{} subCount int - maxEvents int channelBufferSize int } func NewBroker[T any]() *Broker[T] { - return NewBrokerWithOptions[T](bufferSize, 1000) + return NewBrokerWithOptions[T](bufferSize) } -func NewBrokerWithOptions[T any](channelBufferSize, maxEvents int) *Broker[T] { +func NewBrokerWithOptions[T any](channelBufferSize int) *Broker[T] { return &Broker[T]{ subs: make(map[chan Event[T]]struct{}), done: make(chan struct{}), - maxEvents: maxEvents, channelBufferSize: channelBufferSize, } }