From e93ba32d2c165783dd69dab7e56afe849bfb81b6 Mon Sep 17 00:00:00 2001 From: Amolith Date: Sat, 3 Jan 2026 17:16:47 -0700 Subject: [PATCH] feat(acp): subscribe sink to pubsub for streaming Assisted-by: Claude Opus 4.5 via Crush --- internal/acp/agent.go | 16 ++++++++++++--- internal/acp/sink.go | 45 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/internal/acp/agent.go b/internal/acp/agent.go index 7478e507fcef790ae33e206bc1148f52610c842e..f47d5c3776e786b0ceb915f0197f34a756d5fc50 100644 --- a/internal/acp/agent.go +++ b/internal/acp/agent.go @@ -5,13 +5,15 @@ import ( "log/slog" "github.com/charmbracelet/crush/internal/app" + "github.com/charmbracelet/crush/internal/csync" "github.com/coder/acp-go-sdk" ) // Agent implements the acp.Agent interface to handle ACP protocol methods. type Agent struct { - app *app.App - conn *acp.AgentSideConnection + app *app.App + conn *acp.AgentSideConnection + sinks *csync.Map[string, *Sink] } // Compile-time interface checks. @@ -22,7 +24,10 @@ var ( // NewAgent creates a new ACP agent backed by a Crush app instance. func NewAgent(app *app.App) *Agent { - return &Agent{app: app} + return &Agent{ + app: app, + sinks: csync.NewMap[string, *Sink](), + } } // SetAgentConnection stores the connection for sending notifications. @@ -65,6 +70,11 @@ func (a *Agent) NewSession(ctx context.Context, params acp.NewSessionRequest) (a return acp.NewSessionResponse{}, err } + // Create and start the event sink to stream updates to this session. + sink := NewSink(ctx, a.conn, sess.ID) + sink.Start(a.app.Messages, a.app.Permissions) + a.sinks.Set(sess.ID, sink) + return acp.NewSessionResponse{ SessionId: acp.SessionId(sess.ID), }, nil diff --git a/internal/acp/sink.go b/internal/acp/sink.go index 0fe2653df2f09de80510811538c6cd3ac7ce6269..e6d7af94edcc873fc222f27b288a7fa1323f8fff 100644 --- a/internal/acp/sink.go +++ b/internal/acp/sink.go @@ -14,6 +14,7 @@ import ( // session updates. type Sink struct { ctx context.Context + cancel context.CancelFunc conn *acp.AgentSideConnection sessionID string @@ -24,8 +25,10 @@ type Sink struct { // NewSink creates a new event sink for the given session. func NewSink(ctx context.Context, conn *acp.AgentSideConnection, sessionID string) *Sink { + sinkCtx, cancel := context.WithCancel(ctx) return &Sink{ - ctx: ctx, + ctx: sinkCtx, + cancel: cancel, conn: conn, sessionID: sessionID, textOffsets: make(map[string]int), @@ -33,6 +36,46 @@ func NewSink(ctx context.Context, conn *acp.AgentSideConnection, sessionID strin } } +// Start subscribes to messages and permissions, forwarding events to ACP. +func (s *Sink) Start(messages message.Service, permissions permission.Service) { + // Subscribe to message events. + go func() { + msgCh := messages.Subscribe(s.ctx) + for { + select { + case event, ok := <-msgCh: + if !ok { + return + } + s.HandleMessage(event) + case <-s.ctx.Done(): + return + } + } + }() + + // Subscribe to permission events. + go func() { + permCh := permissions.Subscribe(s.ctx) + for { + select { + case event, ok := <-permCh: + if !ok { + return + } + s.HandlePermission(event.Payload, permissions) + case <-s.ctx.Done(): + return + } + } + }() +} + +// Stop cancels the sink's subscriptions. +func (s *Sink) Stop() { + s.cancel() +} + // HandleMessage translates a Crush message event to ACP session updates. func (s *Sink) HandleMessage(event pubsub.Event[message.Message]) { msg := event.Payload