diff --git a/go.mod b/go.mod index fe49e19462c4444590ae01b61ea2f9bb7ec2b688..b3b8670e1266a8f07a899cf7122b7497bbdfe949 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/charmbracelet/x/exp/slice v0.0.0-20251201173703-9f73bfd934ff github.com/charmbracelet/x/powernap v0.0.0-20251015113943-25f979b54ad4 github.com/charmbracelet/x/term v0.2.2 + github.com/coder/acp-go-sdk v0.6.3 github.com/denisbrodbeck/machineid v1.0.1 github.com/disintegration/imageorient v0.0.0-20180920195336-8147d86e83ec github.com/google/uuid v1.6.0 diff --git a/go.sum b/go.sum index 3102660d56f34e91542d1d5c696be4d0276f1ea8..80c4e508391b4f51cbce2199ea81752ff2eef70e 100644 --- a/go.sum +++ b/go.sum @@ -128,6 +128,8 @@ github.com/clipperhouse/stringish v0.1.1 h1:+NSqMOr3GR6k1FdRhhnXrLfztGzuG+VuFDfa github.com/clipperhouse/stringish v0.1.1/go.mod h1:v/WhFtE1q0ovMta2+m+UbpZ+2/HEXNWYXQgCt4hdOzA= github.com/clipperhouse/uax29/v2 v2.3.0 h1:SNdx9DVUqMoBuBoW3iLOj4FQv3dN5mDtuqwuhIGpJy4= github.com/clipperhouse/uax29/v2 v2.3.0/go.mod h1:Wn1g7MK6OoeDT0vL+Q0SQLDz/KpfsVRgg6W7ihQeh4g= +github.com/coder/acp-go-sdk v0.6.3 h1:LsXQytehdjKIYJnoVWON/nf7mqbiarnyuyE3rrjBsXQ= +github.com/coder/acp-go-sdk v0.6.3/go.mod h1:yKzM/3R9uELp4+nBAwwtkS0aN1FOFjo11CNPy37yFko= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/creack/pty v1.1.24 h1:bJrF4RRfyJnbTJqzRLHzcGaZK1NeM5kTC9jGgovnR1s= github.com/creack/pty v1.1.24/go.mod h1:08sCNb52WyoAwi2QDyzUCTgcvVFhUzewun7wtTfvcwE= diff --git a/internal/acp/agent.go b/internal/acp/agent.go new file mode 100644 index 0000000000000000000000000000000000000000..60160f9d1da165a91e019a0a6270257f0498a020 --- /dev/null +++ b/internal/acp/agent.go @@ -0,0 +1,111 @@ +package acp + +import ( + "context" + "log/slog" + + "github.com/charmbracelet/crush/internal/app" + "github.com/coder/acp-go-sdk" +) + +// Agent implements the acp.Agent interface to handle ACP protocol methods. +type Agent struct { + app *app.App + conn *acp.AgentSideConnection +} + +// Compile-time interface checks. +var ( + _ acp.Agent = (*Agent)(nil) +) + +// NewAgent creates a new ACP agent backed by a Crush app instance. +func NewAgent(app *app.App) *Agent { + return &Agent{app: app} +} + +// SetAgentConnection stores the connection for sending notifications. +func (a *Agent) SetAgentConnection(conn *acp.AgentSideConnection) { + a.conn = conn +} + +// Initialize handles the ACP initialize request. +func (a *Agent) Initialize(ctx context.Context, params acp.InitializeRequest) (acp.InitializeResponse, error) { + slog.Debug("ACP Initialize", "protocol_version", params.ProtocolVersion) + return acp.InitializeResponse{ + ProtocolVersion: acp.ProtocolVersionNumber, + AgentCapabilities: acp.AgentCapabilities{ + LoadSession: false, + McpCapabilities: acp.McpCapabilities{ + Http: false, + Sse: false, + }, + PromptCapabilities: acp.PromptCapabilities{ + EmbeddedContext: true, + Audio: false, + Image: false, + }, + }, + }, nil +} + +// Authenticate handles authentication requests (stub for local stdio). +func (a *Agent) Authenticate(ctx context.Context, params acp.AuthenticateRequest) (acp.AuthenticateResponse, error) { + slog.Debug("ACP Authenticate") + return acp.AuthenticateResponse{}, nil +} + +// NewSession creates a new Crush session. +func (a *Agent) NewSession(ctx context.Context, params acp.NewSessionRequest) (acp.NewSessionResponse, error) { + slog.Info("ACP NewSession", "cwd", params.Cwd) + + sess, err := a.app.Sessions.Create(ctx, "ACP Session") + if err != nil { + return acp.NewSessionResponse{}, err + } + + return acp.NewSessionResponse{ + SessionId: acp.SessionId(sess.ID), + }, nil +} + +// SetSessionMode handles mode switching (stub - Crush doesn't have modes yet). +func (a *Agent) SetSessionMode(ctx context.Context, params acp.SetSessionModeRequest) (acp.SetSessionModeResponse, error) { + slog.Debug("ACP SetSessionMode", "mode_id", params.ModeId) + return acp.SetSessionModeResponse{}, nil +} + +// Prompt handles a prompt request by running the agent. +func (a *Agent) Prompt(ctx context.Context, params acp.PromptRequest) (acp.PromptResponse, error) { + slog.Info("ACP Prompt", "session_id", params.SessionId) + + // Extract text from content blocks. + var prompt string + for _, block := range params.Prompt { + if block.Text != nil { + prompt += block.Text.Text + } + } + + if prompt == "" { + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil + } + + // Run the agent. + _, err := a.app.AgentCoordinator.Run(ctx, string(params.SessionId), prompt) + if err != nil { + if ctx.Err() != nil { + return acp.PromptResponse{StopReason: acp.StopReasonCancelled}, nil + } + return acp.PromptResponse{}, err + } + + return acp.PromptResponse{StopReason: acp.StopReasonEndTurn}, nil +} + +// Cancel handles cancellation of an in-flight prompt. +func (a *Agent) Cancel(ctx context.Context, params acp.CancelNotification) error { + slog.Info("ACP Cancel", "session_id", params.SessionId) + a.app.AgentCoordinator.Cancel(string(params.SessionId)) + return nil +} diff --git a/internal/acp/server.go b/internal/acp/server.go new file mode 100644 index 0000000000000000000000000000000000000000..18e66a3e6c05c47ba4c158fd9f807bf6deb19cb4 --- /dev/null +++ b/internal/acp/server.go @@ -0,0 +1,55 @@ +// Package acp implements the Agent-Client Protocol server for Crush. +// +// ACP allows external clients (web, desktop, mobile) to drive Crush as an +// agent server over stdio using JSON-RPC. +package acp + +import ( + "context" + "log/slog" + "os" + "os/signal" + "syscall" + + "github.com/coder/acp-go-sdk" +) + +// Server manages the ACP connection lifecycle. +type Server struct { + ctx context.Context + cancel context.CancelFunc + agent *Agent +} + +// NewServer creates a new ACP server. +func NewServer(ctx context.Context) *Server { + ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, os.Kill, syscall.SIGTERM) + return &Server{ + ctx: ctx, + cancel: cancel, + } +} + +// Run starts the ACP server and blocks until the connection closes. +func (s *Server) Run(agent *Agent) error { + s.agent = agent + slog.Info("Starting ACP server") + + conn := acp.NewAgentSideConnection(agent, os.Stdout, os.Stdin) + conn.SetLogger(slog.Default()) + agent.SetAgentConnection(conn) + + select { + case <-conn.Done(): + slog.Debug("ACP client disconnected") + case <-s.ctx.Done(): + slog.Debug("ACP server received shutdown signal") + } + + return nil +} + +// Shutdown performs graceful shutdown. +func (s *Server) Shutdown() { + s.cancel() +} diff --git a/internal/acp/sink.go b/internal/acp/sink.go new file mode 100644 index 0000000000000000000000000000000000000000..0fe2653df2f09de80510811538c6cd3ac7ce6269 --- /dev/null +++ b/internal/acp/sink.go @@ -0,0 +1,207 @@ +package acp + +import ( + "context" + "log/slog" + + "github.com/charmbracelet/crush/internal/message" + "github.com/charmbracelet/crush/internal/permission" + "github.com/charmbracelet/crush/internal/pubsub" + "github.com/coder/acp-go-sdk" +) + +// Sink receives events from Crush's pubsub system and translates them to ACP +// session updates. +type Sink struct { + ctx context.Context + conn *acp.AgentSideConnection + sessionID string + + // Track text deltas per message to avoid re-sending content. + textOffsets map[string]int + reasoningOffsets map[string]int +} + +// NewSink creates a new event sink for the given session. +func NewSink(ctx context.Context, conn *acp.AgentSideConnection, sessionID string) *Sink { + return &Sink{ + ctx: ctx, + conn: conn, + sessionID: sessionID, + textOffsets: make(map[string]int), + reasoningOffsets: make(map[string]int), + } +} + +// HandleMessage translates a Crush message event to ACP session updates. +func (s *Sink) HandleMessage(event pubsub.Event[message.Message]) { + msg := event.Payload + + // Only handle messages for our session. + if msg.SessionID != s.sessionID { + return + } + + for _, part := range msg.Parts { + update := s.translatePart(msg.ID, msg.Role, part) + if update == nil { + continue + } + + if err := s.conn.SessionUpdate(s.ctx, acp.SessionNotification{ + SessionId: acp.SessionId(s.sessionID), + Update: *update, + }); err != nil { + slog.Error("Failed to send session update", "error", err) + } + } +} + +// HandlePermission translates a permission request to an ACP permission request. +func (s *Sink) HandlePermission(req permission.PermissionRequest, permissions permission.Service) { + // Only handle permissions for our session. + if req.SessionID != s.sessionID { + return + } + + slog.Debug("ACP permission request", "tool", req.ToolName, "action", req.Action) + + resp, err := s.conn.RequestPermission(s.ctx, acp.RequestPermissionRequest{ + SessionId: acp.SessionId(s.sessionID), + ToolCall: acp.RequestPermissionToolCall{ + ToolCallId: acp.ToolCallId(req.ToolCallID), + Title: acp.Ptr(req.Description), + Kind: acp.Ptr(acp.ToolKindEdit), + Status: acp.Ptr(acp.ToolCallStatusPending), + Locations: []acp.ToolCallLocation{{Path: req.Path}}, + RawInput: req.Params, + }, + Options: []acp.PermissionOption{ + {Kind: acp.PermissionOptionKindAllowOnce, Name: "Allow", OptionId: "allow"}, + {Kind: acp.PermissionOptionKindAllowAlways, Name: "Allow always", OptionId: "allow_always"}, + {Kind: acp.PermissionOptionKindRejectOnce, Name: "Deny", OptionId: "deny"}, + }, + }) + if err != nil { + slog.Error("Failed to request permission", "error", err) + permissions.Deny(req) + return + } + + if resp.Outcome.Cancelled != nil { + permissions.Deny(req) + return + } + + if resp.Outcome.Selected != nil { + switch string(resp.Outcome.Selected.OptionId) { + case "allow": + permissions.Grant(req) + case "allow_always": + permissions.GrantPersistent(req) + default: + permissions.Deny(req) + } + } +} + +// translatePart converts a message part to an ACP session update. +func (s *Sink) translatePart(msgID string, role message.MessageRole, part message.ContentPart) *acp.SessionUpdate { + switch p := part.(type) { + case message.TextContent: + return s.translateText(msgID, role, p) + + case message.ReasoningContent: + return s.translateReasoning(msgID, p) + + case message.ToolCall: + return s.translateToolCall(p) + + case message.ToolResult: + return s.translateToolResult(p) + + case message.Finish: + // Reset offsets on message finish. + delete(s.textOffsets, msgID) + delete(s.reasoningOffsets, msgID) + return nil + + default: + return nil + } +} + +func (s *Sink) translateText(msgID string, role message.MessageRole, text message.TextContent) *acp.SessionUpdate { + offset := s.textOffsets[msgID] + if len(text.Text) <= offset { + return nil + } + + delta := text.Text[offset:] + s.textOffsets[msgID] = len(text.Text) + + if delta == "" { + return nil + } + + switch role { + case message.Assistant: + update := acp.UpdateAgentMessageText(delta) + return &update + case message.User: + update := acp.UpdateUserMessageText(delta) + return &update + default: + return nil + } +} + +func (s *Sink) translateReasoning(msgID string, reasoning message.ReasoningContent) *acp.SessionUpdate { + offset := s.reasoningOffsets[msgID] + if len(reasoning.Thinking) <= offset { + return nil + } + + delta := reasoning.Thinking[offset:] + s.reasoningOffsets[msgID] = len(reasoning.Thinking) + + if delta == "" { + return nil + } + + update := acp.UpdateAgentThoughtText(delta) + return &update +} + +func (s *Sink) translateToolCall(tc message.ToolCall) *acp.SessionUpdate { + if !tc.Finished { + update := acp.StartToolCall( + acp.ToolCallId(tc.ID), + tc.Name, + acp.WithStartStatus(acp.ToolCallStatusPending), + ) + return &update + } + + update := acp.UpdateToolCall( + acp.ToolCallId(tc.ID), + acp.WithUpdateStatus(acp.ToolCallStatusInProgress), + ) + return &update +} + +func (s *Sink) translateToolResult(tr message.ToolResult) *acp.SessionUpdate { + status := acp.ToolCallStatusCompleted + if tr.IsError { + status = acp.ToolCallStatusFailed + } + + update := acp.UpdateToolCall( + acp.ToolCallId(tr.ToolCallID), + acp.WithUpdateStatus(status), + acp.WithUpdateContent([]acp.ToolCallContent{ + acp.ToolContent(acp.TextBlock(tr.Content)), + }), + ) + return &update +} diff --git a/internal/cmd/acp.go b/internal/cmd/acp.go new file mode 100644 index 0000000000000000000000000000000000000000..89bfc11fba87e72062d36955f566c82bd881090b --- /dev/null +++ b/internal/cmd/acp.go @@ -0,0 +1,37 @@ +package cmd + +import ( + "github.com/charmbracelet/crush/internal/acp" + "github.com/charmbracelet/crush/internal/event" + "github.com/spf13/cobra" +) + +var acpCmd = &cobra.Command{ + Use: "acp", + Short: "Start Crush as an ACP server", + Long: `Start Crush in Agent-Client Protocol mode. + +This allows external ACP clients (web, desktop, mobile) to drive Crush +over stdio using JSON-RPC. The client sends prompts and receives +streaming updates about agent activity.`, + RunE: func(cmd *cobra.Command, args []string) error { + app, err := setupApp(cmd) + if err != nil { + return err + } + defer app.Shutdown() + + if shouldEnableMetrics() { + event.Init() + } + + event.AppInitialized() + defer event.AppExited() + + server := acp.NewServer(cmd.Context()) + defer server.Shutdown() + + agent := acp.NewAgent(app) + return server.Run(agent) + }, +} diff --git a/internal/cmd/root.go b/internal/cmd/root.go index edb2512171348b0c9a1156683ecb398d73657ccf..a3bc4967ad197193038bd20222bfc2b826deb777 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -46,6 +46,7 @@ func init() { logsCmd, schemaCmd, loginCmd, + acpCmd, ) }