feat(acp): subscribe sink to pubsub for streaming

Amolith created

Assisted-by: Claude Opus 4.5 via Crush

Change summary

internal/acp/agent.go | 16 +++++++++++++---
internal/acp/sink.go  | 45 ++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 57 insertions(+), 4 deletions(-)

Detailed changes

internal/acp/agent.go 🔗

@@ -5,13 +5,15 @@ import (
 	"log/slog"
 
 	"github.com/charmbracelet/crush/internal/app"
+	"github.com/charmbracelet/crush/internal/csync"
 	"github.com/coder/acp-go-sdk"
 )
 
 // Agent implements the acp.Agent interface to handle ACP protocol methods.
 type Agent struct {
-	app  *app.App
-	conn *acp.AgentSideConnection
+	app   *app.App
+	conn  *acp.AgentSideConnection
+	sinks *csync.Map[string, *Sink]
 }
 
 // Compile-time interface checks.
@@ -22,7 +24,10 @@ var (
 
 // NewAgent creates a new ACP agent backed by a Crush app instance.
 func NewAgent(app *app.App) *Agent {
-	return &Agent{app: app}
+	return &Agent{
+		app:   app,
+		sinks: csync.NewMap[string, *Sink](),
+	}
 }
 
 // SetAgentConnection stores the connection for sending notifications.
@@ -65,6 +70,11 @@ func (a *Agent) NewSession(ctx context.Context, params acp.NewSessionRequest) (a
 		return acp.NewSessionResponse{}, err
 	}
 
+	// Create and start the event sink to stream updates to this session.
+	sink := NewSink(ctx, a.conn, sess.ID)
+	sink.Start(a.app.Messages, a.app.Permissions)
+	a.sinks.Set(sess.ID, sink)
+
 	return acp.NewSessionResponse{
 		SessionId: acp.SessionId(sess.ID),
 	}, nil

internal/acp/sink.go 🔗

@@ -14,6 +14,7 @@ import (
 // session updates.
 type Sink struct {
 	ctx       context.Context
+	cancel    context.CancelFunc
 	conn      *acp.AgentSideConnection
 	sessionID string
 
@@ -24,8 +25,10 @@ type Sink struct {
 
 // NewSink creates a new event sink for the given session.
 func NewSink(ctx context.Context, conn *acp.AgentSideConnection, sessionID string) *Sink {
+	sinkCtx, cancel := context.WithCancel(ctx)
 	return &Sink{
-		ctx:              ctx,
+		ctx:              sinkCtx,
+		cancel:           cancel,
 		conn:             conn,
 		sessionID:        sessionID,
 		textOffsets:      make(map[string]int),
@@ -33,6 +36,46 @@ func NewSink(ctx context.Context, conn *acp.AgentSideConnection, sessionID strin
 	}
 }
 
+// Start subscribes to messages and permissions, forwarding events to ACP.
+func (s *Sink) Start(messages message.Service, permissions permission.Service) {
+	// Subscribe to message events.
+	go func() {
+		msgCh := messages.Subscribe(s.ctx)
+		for {
+			select {
+			case event, ok := <-msgCh:
+				if !ok {
+					return
+				}
+				s.HandleMessage(event)
+			case <-s.ctx.Done():
+				return
+			}
+		}
+	}()
+
+	// Subscribe to permission events.
+	go func() {
+		permCh := permissions.Subscribe(s.ctx)
+		for {
+			select {
+			case event, ok := <-permCh:
+				if !ok {
+					return
+				}
+				s.HandlePermission(event.Payload, permissions)
+			case <-s.ctx.Done():
+				return
+			}
+		}
+	}()
+}
+
+// Stop cancels the sink's subscriptions.
+func (s *Sink) Stop() {
+	s.cancel()
+}
+
 // HandleMessage translates a Crush message event to ACP session updates.
 func (s *Sink) HandleMessage(event pubsub.Event[message.Message]) {
 	msg := event.Payload