@@ -74,7 +74,7 @@ func (a *Agent) NewSession(ctx context.Context, params acp.NewSessionRequest) (a
// Use a background context since the sink needs to outlive the NewSession
// request.
sink := NewSink(context.Background(), a.conn, sess.ID)
- sink.Start(a.app.Messages, a.app.Permissions)
+ sink.Start(a.app.Messages, a.app.Permissions, a.app.Sessions)
a.sinks.Set(sess.ID, sink)
return acp.NewSessionResponse{
@@ -8,6 +8,7 @@ import (
"github.com/charmbracelet/crush/internal/message"
"github.com/charmbracelet/crush/internal/permission"
"github.com/charmbracelet/crush/internal/pubsub"
+ "github.com/charmbracelet/crush/internal/session"
"github.com/coder/acp-go-sdk"
)
@@ -37,8 +38,8 @@ func NewSink(ctx context.Context, conn *acp.AgentSideConnection, sessionID strin
}
}
-// Start subscribes to messages and permissions, forwarding events to ACP.
-func (s *Sink) Start(messages message.Service, permissions permission.Service) {
+// Start subscribes to messages, permissions, and sessions, forwarding events to ACP.
+func (s *Sink) Start(messages message.Service, permissions permission.Service, sessions session.Service) {
// Subscribe to message events.
go func() {
msgCh := messages.Subscribe(s.ctx)
@@ -70,6 +71,22 @@ func (s *Sink) Start(messages message.Service, permissions permission.Service) {
}
}
}()
+
+ // Subscribe to session events for todo/plan updates.
+ go func() {
+ sessCh := sessions.Subscribe(s.ctx)
+ for {
+ select {
+ case event, ok := <-sessCh:
+ if !ok {
+ return
+ }
+ s.HandleSession(event)
+ case <-s.ctx.Done():
+ return
+ }
+ }
+ }()
}
// Stop cancels the sink's subscriptions.
@@ -77,6 +94,42 @@ func (s *Sink) Stop() {
s.cancel()
}
+// HandleSession translates session updates to ACP plan updates.
+func (s *Sink) HandleSession(event pubsub.Event[session.Session]) {
+ sess := event.Payload
+
+ // Only handle updates for our session.
+ if sess.ID != s.sessionID {
+ return
+ }
+
+ // Only handle update events (not created/deleted).
+ if event.Type != pubsub.UpdatedEvent {
+ return
+ }
+
+ // Convert todos to plan entries.
+ entries := make([]acp.PlanEntry, len(sess.Todos))
+ for i, todo := range sess.Todos {
+ entries[i] = acp.PlanEntry{
+ Content: todo.Content,
+ Status: acp.PlanEntryStatus(todo.Status),
+ Priority: acp.PlanEntryPriorityMedium,
+ }
+ if todo.ActiveForm != "" {
+ entries[i].Meta = map[string]string{"active_form": todo.ActiveForm}
+ }
+ }
+
+ update := acp.UpdatePlan(entries...)
+ if err := s.conn.SessionUpdate(s.ctx, acp.SessionNotification{
+ SessionId: acp.SessionId(s.sessionID),
+ Update: update,
+ }); err != nil {
+ slog.Error("Failed to send plan update", "error", err)
+ }
+}
+
// HandleMessage translates a Crush message event to ACP session updates.
func (s *Sink) HandleMessage(event pubsub.Event[message.Message]) {
msg := event.Payload