diff --git a/internal/acp/agent.go b/internal/acp/agent.go index 809a574e45788ca5fd38fdcd48570379f0195c69..d9b9df10e1235ec1b63d21c8e000fd002ec75e61 100644 --- a/internal/acp/agent.go +++ b/internal/acp/agent.go @@ -74,7 +74,7 @@ func (a *Agent) NewSession(ctx context.Context, params acp.NewSessionRequest) (a // Use a background context since the sink needs to outlive the NewSession // request. sink := NewSink(context.Background(), a.conn, sess.ID) - sink.Start(a.app.Messages, a.app.Permissions) + sink.Start(a.app.Messages, a.app.Permissions, a.app.Sessions) a.sinks.Set(sess.ID, sink) return acp.NewSessionResponse{ diff --git a/internal/acp/sink.go b/internal/acp/sink.go index c3cc3c185059830098ecce4b46819294597c469b..85cdc407f5aacd0e6147f5c53c39bd9e600d9340 100644 --- a/internal/acp/sink.go +++ b/internal/acp/sink.go @@ -8,6 +8,7 @@ import ( "github.com/charmbracelet/crush/internal/message" "github.com/charmbracelet/crush/internal/permission" "github.com/charmbracelet/crush/internal/pubsub" + "github.com/charmbracelet/crush/internal/session" "github.com/coder/acp-go-sdk" ) @@ -37,8 +38,8 @@ func NewSink(ctx context.Context, conn *acp.AgentSideConnection, sessionID strin } } -// Start subscribes to messages and permissions, forwarding events to ACP. -func (s *Sink) Start(messages message.Service, permissions permission.Service) { +// Start subscribes to messages, permissions, and sessions, forwarding events to ACP. +func (s *Sink) Start(messages message.Service, permissions permission.Service, sessions session.Service) { // Subscribe to message events. go func() { msgCh := messages.Subscribe(s.ctx) @@ -70,6 +71,22 @@ func (s *Sink) Start(messages message.Service, permissions permission.Service) { } } }() + + // Subscribe to session events for todo/plan updates. + go func() { + sessCh := sessions.Subscribe(s.ctx) + for { + select { + case event, ok := <-sessCh: + if !ok { + return + } + s.HandleSession(event) + case <-s.ctx.Done(): + return + } + } + }() } // Stop cancels the sink's subscriptions. @@ -77,6 +94,42 @@ func (s *Sink) Stop() { s.cancel() } +// HandleSession translates session updates to ACP plan updates. +func (s *Sink) HandleSession(event pubsub.Event[session.Session]) { + sess := event.Payload + + // Only handle updates for our session. + if sess.ID != s.sessionID { + return + } + + // Only handle update events (not created/deleted). + if event.Type != pubsub.UpdatedEvent { + return + } + + // Convert todos to plan entries. + entries := make([]acp.PlanEntry, len(sess.Todos)) + for i, todo := range sess.Todos { + entries[i] = acp.PlanEntry{ + Content: todo.Content, + Status: acp.PlanEntryStatus(todo.Status), + Priority: acp.PlanEntryPriorityMedium, + } + if todo.ActiveForm != "" { + entries[i].Meta = map[string]string{"active_form": todo.ActiveForm} + } + } + + update := acp.UpdatePlan(entries...) + if err := s.conn.SessionUpdate(s.ctx, acp.SessionNotification{ + SessionId: acp.SessionId(s.sessionID), + Update: update, + }); err != nil { + slog.Error("Failed to send plan update", "error", err) + } +} + // HandleMessage translates a Crush message event to ACP session updates. func (s *Sink) HandleMessage(event pubsub.Event[message.Message]) { msg := event.Payload