From 7181b80a33b4d77ae8b102635face25f9c5eee36 Mon Sep 17 00:00:00 2001 From: Amolith Date: Sat, 3 Jan 2026 20:01:01 -0700 Subject: [PATCH] refactor(acp): split sink.go into focused files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Separates the 450-line sink.go into five focused files: - sink.go: Core struct, NewSink, Start, Stop - sink_messages.go: HandleMessage, translatePart, translateText, translateReasoning - sink_permissions.go: HandlePermission, extractEditParams - sink_tools.go: translateToolCall, translateToolResult, parseToolInput, toolKind - sink_plan.go: HandleSession (todo→plan conversion) Assisted-by: Claude Sonnet 4 via Crush --- internal/acp/sink.go | 364 ------------------------------- internal/acp/sink_messages.go | 99 +++++++++ internal/acp/sink_permissions.go | 95 ++++++++ internal/acp/sink_plan.go | 45 ++++ internal/acp/sink_tools.go | 157 +++++++++++++ 5 files changed, 396 insertions(+), 364 deletions(-) create mode 100644 internal/acp/sink_messages.go create mode 100644 internal/acp/sink_permissions.go create mode 100644 internal/acp/sink_plan.go create mode 100644 internal/acp/sink_tools.go diff --git a/internal/acp/sink.go b/internal/acp/sink.go index 85cdc407f5aacd0e6147f5c53c39bd9e600d9340..bf886c770071ae3068dbb8e546f954f604a3db88 100644 --- a/internal/acp/sink.go +++ b/internal/acp/sink.go @@ -2,12 +2,9 @@ package acp import ( "context" - "encoding/json" - "log/slog" "github.com/charmbracelet/crush/internal/message" "github.com/charmbracelet/crush/internal/permission" - "github.com/charmbracelet/crush/internal/pubsub" "github.com/charmbracelet/crush/internal/session" "github.com/coder/acp-go-sdk" ) @@ -93,364 +90,3 @@ func (s *Sink) Start(messages message.Service, permissions permission.Service, s func (s *Sink) Stop() { s.cancel() } - -// HandleSession translates session updates to ACP plan updates. -func (s *Sink) HandleSession(event pubsub.Event[session.Session]) { - sess := event.Payload - - // Only handle updates for our session. - if sess.ID != s.sessionID { - return - } - - // Only handle update events (not created/deleted). - if event.Type != pubsub.UpdatedEvent { - return - } - - // Convert todos to plan entries. - entries := make([]acp.PlanEntry, len(sess.Todos)) - for i, todo := range sess.Todos { - entries[i] = acp.PlanEntry{ - Content: todo.Content, - Status: acp.PlanEntryStatus(todo.Status), - Priority: acp.PlanEntryPriorityMedium, - } - if todo.ActiveForm != "" { - entries[i].Meta = map[string]string{"active_form": todo.ActiveForm} - } - } - - update := acp.UpdatePlan(entries...) - if err := s.conn.SessionUpdate(s.ctx, acp.SessionNotification{ - SessionId: acp.SessionId(s.sessionID), - Update: update, - }); err != nil { - slog.Error("Failed to send plan update", "error", err) - } -} - -// HandleMessage translates a Crush message event to ACP session updates. -func (s *Sink) HandleMessage(event pubsub.Event[message.Message]) { - msg := event.Payload - - // Only handle messages for our session. - if msg.SessionID != s.sessionID { - return - } - - for _, part := range msg.Parts { - update := s.translatePart(msg.ID, msg.Role, part) - if update == nil { - continue - } - - if err := s.conn.SessionUpdate(s.ctx, acp.SessionNotification{ - SessionId: acp.SessionId(s.sessionID), - Update: *update, - }); err != nil { - slog.Error("Failed to send session update", "error", err) - } - } -} - -// HandlePermission translates a permission request to an ACP permission request. -func (s *Sink) HandlePermission(req permission.PermissionRequest, permissions permission.Service) { - // Only handle permissions for our session. - if req.SessionID != s.sessionID { - return - } - - slog.Debug("ACP permission request", "tool", req.ToolName, "action", req.Action) - - // Build the tool call for the permission request. - toolCall := acp.RequestPermissionToolCall{ - ToolCallId: acp.ToolCallId(req.ToolCallID), - Title: acp.Ptr(req.Description), - Kind: acp.Ptr(acp.ToolKindEdit), - Status: acp.Ptr(acp.ToolCallStatusPending), - Locations: []acp.ToolCallLocation{{Path: req.Path}}, - RawInput: req.Params, - } - - // For edit tools, include diff content so the client can show the proposed - // changes. - if meta := extractEditParams(req.Params); meta != nil && meta.FilePath != "" { - toolCall.Content = []acp.ToolCallContent{ - acp.ToolDiffContent(meta.FilePath, meta.NewContent, meta.OldContent), - } - } - - resp, err := s.conn.RequestPermission(s.ctx, acp.RequestPermissionRequest{ - SessionId: acp.SessionId(s.sessionID), - ToolCall: toolCall, - Options: []acp.PermissionOption{ - {Kind: acp.PermissionOptionKindAllowOnce, Name: "Allow", OptionId: "allow"}, - {Kind: acp.PermissionOptionKindAllowAlways, Name: "Allow always", OptionId: "allow_always"}, - {Kind: acp.PermissionOptionKindRejectOnce, Name: "Deny", OptionId: "deny"}, - }, - }) - if err != nil { - slog.Error("Failed to request permission", "error", err) - permissions.Deny(req) - return - } - - if resp.Outcome.Cancelled != nil { - permissions.Deny(req) - return - } - - if resp.Outcome.Selected != nil { - switch string(resp.Outcome.Selected.OptionId) { - case "allow": - permissions.Grant(req) - case "allow_always": - permissions.GrantPersistent(req) - default: - permissions.Deny(req) - } - } -} - -// editParams holds fields needed for diff content in permission requests. -type editParams struct { - FilePath string `json:"file_path"` - OldContent string `json:"old_content"` - NewContent string `json:"new_content"` -} - -// extractEditParams attempts to extract edit parameters from permission params. -func extractEditParams(params any) *editParams { - if params == nil { - return nil - } - - // Try JSON round-trip to extract fields. - data, err := json.Marshal(params) - if err != nil { - return nil - } - - var ep editParams - if err := json.Unmarshal(data, &ep); err != nil { - return nil - } - - return &ep -} - -// translatePart converts a message part to an ACP session update. -func (s *Sink) translatePart(msgID string, role message.MessageRole, part message.ContentPart) *acp.SessionUpdate { - switch p := part.(type) { - case message.TextContent: - return s.translateText(msgID, role, p) - - case message.ReasoningContent: - return s.translateReasoning(msgID, p) - - case message.ToolCall: - return s.translateToolCall(p) - - case message.ToolResult: - return s.translateToolResult(p) - - case message.Finish: - // Reset offsets on message finish. - delete(s.textOffsets, msgID) - delete(s.reasoningOffsets, msgID) - return nil - - default: - return nil - } -} - -func (s *Sink) translateText(msgID string, role message.MessageRole, text message.TextContent) *acp.SessionUpdate { - // Skip user messages - the client already knows what it sent via the - // prompt request. - if role != message.Assistant { - return nil - } - - offset := s.textOffsets[msgID] - if len(text.Text) <= offset { - return nil - } - - delta := text.Text[offset:] - s.textOffsets[msgID] = len(text.Text) - - if delta == "" { - return nil - } - - update := acp.UpdateAgentMessageText(delta) - return &update -} - -func (s *Sink) translateReasoning(msgID string, reasoning message.ReasoningContent) *acp.SessionUpdate { - offset := s.reasoningOffsets[msgID] - if len(reasoning.Thinking) <= offset { - return nil - } - - delta := reasoning.Thinking[offset:] - s.reasoningOffsets[msgID] = len(reasoning.Thinking) - - if delta == "" { - return nil - } - - update := acp.UpdateAgentThoughtText(delta) - return &update -} - -func (s *Sink) translateToolCall(tc message.ToolCall) *acp.SessionUpdate { - if !tc.Finished { - opts := []acp.ToolCallStartOpt{ - acp.WithStartStatus(acp.ToolCallStatusPending), - acp.WithStartKind(toolKind(tc.Name)), - } - - // Parse input to extract path, title, and raw input. - title := tc.Name - if input := parseToolInput(tc.Input); input != nil { - if input.Path != "" { - opts = append(opts, acp.WithStartLocations([]acp.ToolCallLocation{{Path: input.Path}})) - } - if input.Title != "" { - title = input.Title - } - opts = append(opts, acp.WithStartRawInput(input.Raw)) - } - - update := acp.StartToolCall(acp.ToolCallId(tc.ID), title, opts...) - return &update - } - - // Tool finished streaming - update with title and input now available. - opts := []acp.ToolCallUpdateOpt{ - acp.WithUpdateStatus(acp.ToolCallStatusInProgress), - } - if input := parseToolInput(tc.Input); input != nil { - if input.Title != "" { - opts = append(opts, acp.WithUpdateTitle(input.Title)) - } - if input.Path != "" { - opts = append(opts, acp.WithUpdateLocations([]acp.ToolCallLocation{{Path: input.Path}})) - } - opts = append(opts, acp.WithUpdateRawInput(input.Raw)) - } - - update := acp.UpdateToolCall(acp.ToolCallId(tc.ID), opts...) - return &update -} - -// toolInput holds parsed tool call input. -type toolInput struct { - Path string - Title string - Raw map[string]any -} - -// parseToolInput extracts path and raw input from JSON tool input. -func parseToolInput(input string) *toolInput { - if input == "" { - return nil - } - - var raw map[string]any - if err := json.Unmarshal([]byte(input), &raw); err != nil { - return nil - } - - ti := &toolInput{Raw: raw} - - // Extract path from common field names. - if path, ok := raw["file_path"].(string); ok { - ti.Path = path - } else if path, ok := raw["path"].(string); ok { - ti.Path = path - } - - // Extract title/description for display. - if desc, ok := raw["description"].(string); ok { - ti.Title = desc - } - - return ti -} - -// toolKind maps Crush tool names to ACP tool kinds. -func toolKind(name string) acp.ToolKind { - switch name { - case "view", "ls", "job_output", "lsp_diagnostics": - return acp.ToolKindRead - case "edit", "multiedit", "write": - return acp.ToolKindEdit - case "bash", "job_kill": - return acp.ToolKindExecute - case "grep", "glob", "lsp_references", "sourcegraph", "web_search": - return acp.ToolKindSearch - case "fetch", "agentic_fetch", "web_fetch", "download": - return acp.ToolKindFetch - default: - return acp.ToolKindOther - } -} - -// diffMetadata holds fields common to edit tool response metadata. -type diffMetadata struct { - FilePath string `json:"file_path"` - OldContent string `json:"old_content"` - NewContent string `json:"new_content"` -} - -func (s *Sink) translateToolResult(tr message.ToolResult) *acp.SessionUpdate { - status := acp.ToolCallStatusCompleted - if tr.IsError { - status = acp.ToolCallStatusFailed - } - - // For edit tools with metadata, emit diff content. - content := []acp.ToolCallContent{acp.ToolContent(acp.TextBlock(tr.Content))} - var locations []acp.ToolCallLocation - - if !tr.IsError && tr.Metadata != "" { - switch tr.Name { - case "edit", "multiedit", "write": - var meta diffMetadata - if err := json.Unmarshal([]byte(tr.Metadata), &meta); err == nil && meta.FilePath != "" { - content = []acp.ToolCallContent{ - acp.ToolDiffContent(meta.FilePath, meta.NewContent, meta.OldContent), - } - } - case "view": - var meta struct { - FilePath string `json:"file_path"` - } - if err := json.Unmarshal([]byte(tr.Metadata), &meta); err == nil && meta.FilePath != "" { - locations = []acp.ToolCallLocation{{Path: meta.FilePath}} - } - case "ls": - var meta struct { - Path string `json:"path"` - } - if err := json.Unmarshal([]byte(tr.Metadata), &meta); err == nil && meta.Path != "" { - locations = []acp.ToolCallLocation{{Path: meta.Path}} - } - } - } - - opts := []acp.ToolCallUpdateOpt{ - acp.WithUpdateStatus(status), - acp.WithUpdateContent(content), - } - if len(locations) > 0 { - opts = append(opts, acp.WithUpdateLocations(locations)) - } - - update := acp.UpdateToolCall(acp.ToolCallId(tr.ToolCallID), opts...) - return &update -} diff --git a/internal/acp/sink_messages.go b/internal/acp/sink_messages.go new file mode 100644 index 0000000000000000000000000000000000000000..6c9ad146e2dae22772741fc0253dfe4bb9a5ccc1 --- /dev/null +++ b/internal/acp/sink_messages.go @@ -0,0 +1,99 @@ +package acp + +import ( + "log/slog" + + "github.com/charmbracelet/crush/internal/message" + "github.com/charmbracelet/crush/internal/pubsub" + "github.com/coder/acp-go-sdk" +) + +// HandleMessage translates a Crush message event to ACP session updates. +func (s *Sink) HandleMessage(event pubsub.Event[message.Message]) { + msg := event.Payload + + // Only handle messages for our session. + if msg.SessionID != s.sessionID { + return + } + + for _, part := range msg.Parts { + update := s.translatePart(msg.ID, msg.Role, part) + if update == nil { + continue + } + + if err := s.conn.SessionUpdate(s.ctx, acp.SessionNotification{ + SessionId: acp.SessionId(s.sessionID), + Update: *update, + }); err != nil { + slog.Error("Failed to send session update", "error", err) + } + } +} + +// translatePart converts a message part to an ACP session update. +func (s *Sink) translatePart(msgID string, role message.MessageRole, part message.ContentPart) *acp.SessionUpdate { + switch p := part.(type) { + case message.TextContent: + return s.translateText(msgID, role, p) + + case message.ReasoningContent: + return s.translateReasoning(msgID, p) + + case message.ToolCall: + return s.translateToolCall(p) + + case message.ToolResult: + return s.translateToolResult(p) + + case message.Finish: + // Reset offsets on message finish. + delete(s.textOffsets, msgID) + delete(s.reasoningOffsets, msgID) + return nil + + default: + return nil + } +} + +func (s *Sink) translateText(msgID string, role message.MessageRole, text message.TextContent) *acp.SessionUpdate { + // Skip user messages - the client already knows what it sent via the + // prompt request. + if role != message.Assistant { + return nil + } + + offset := s.textOffsets[msgID] + if len(text.Text) <= offset { + return nil + } + + delta := text.Text[offset:] + s.textOffsets[msgID] = len(text.Text) + + if delta == "" { + return nil + } + + update := acp.UpdateAgentMessageText(delta) + return &update +} + +func (s *Sink) translateReasoning(msgID string, reasoning message.ReasoningContent) *acp.SessionUpdate { + offset := s.reasoningOffsets[msgID] + if len(reasoning.Thinking) <= offset { + return nil + } + + delta := reasoning.Thinking[offset:] + s.reasoningOffsets[msgID] = len(reasoning.Thinking) + + if delta == "" { + return nil + } + + update := acp.UpdateAgentThoughtText(delta) + return &update +} diff --git a/internal/acp/sink_permissions.go b/internal/acp/sink_permissions.go new file mode 100644 index 0000000000000000000000000000000000000000..978ca16d71dfe1fbfb01f4157f83209179b75ea4 --- /dev/null +++ b/internal/acp/sink_permissions.go @@ -0,0 +1,95 @@ +package acp + +import ( + "encoding/json" + "log/slog" + + "github.com/charmbracelet/crush/internal/permission" + "github.com/coder/acp-go-sdk" +) + +// HandlePermission translates a permission request to an ACP permission request. +func (s *Sink) HandlePermission(req permission.PermissionRequest, permissions permission.Service) { + // Only handle permissions for our session. + if req.SessionID != s.sessionID { + return + } + + slog.Debug("ACP permission request", "tool", req.ToolName, "action", req.Action) + + // Build the tool call for the permission request. + toolCall := acp.RequestPermissionToolCall{ + ToolCallId: acp.ToolCallId(req.ToolCallID), + Title: acp.Ptr(req.Description), + Kind: acp.Ptr(acp.ToolKindEdit), + Status: acp.Ptr(acp.ToolCallStatusPending), + Locations: []acp.ToolCallLocation{{Path: req.Path}}, + RawInput: req.Params, + } + + // For edit tools, include diff content so the client can show the proposed + // changes. + if meta := extractEditParams(req.Params); meta != nil && meta.FilePath != "" { + toolCall.Content = []acp.ToolCallContent{ + acp.ToolDiffContent(meta.FilePath, meta.NewContent, meta.OldContent), + } + } + + resp, err := s.conn.RequestPermission(s.ctx, acp.RequestPermissionRequest{ + SessionId: acp.SessionId(s.sessionID), + ToolCall: toolCall, + Options: []acp.PermissionOption{ + {Kind: acp.PermissionOptionKindAllowOnce, Name: "Allow", OptionId: "allow"}, + {Kind: acp.PermissionOptionKindAllowAlways, Name: "Allow always", OptionId: "allow_always"}, + {Kind: acp.PermissionOptionKindRejectOnce, Name: "Deny", OptionId: "deny"}, + }, + }) + if err != nil { + slog.Error("Failed to request permission", "error", err) + permissions.Deny(req) + return + } + + if resp.Outcome.Cancelled != nil { + permissions.Deny(req) + return + } + + if resp.Outcome.Selected != nil { + switch string(resp.Outcome.Selected.OptionId) { + case "allow": + permissions.Grant(req) + case "allow_always": + permissions.GrantPersistent(req) + default: + permissions.Deny(req) + } + } +} + +// editParams holds fields needed for diff content in permission requests. +type editParams struct { + FilePath string `json:"file_path"` + OldContent string `json:"old_content"` + NewContent string `json:"new_content"` +} + +// extractEditParams attempts to extract edit parameters from permission params. +func extractEditParams(params any) *editParams { + if params == nil { + return nil + } + + // Try JSON round-trip to extract fields. + data, err := json.Marshal(params) + if err != nil { + return nil + } + + var ep editParams + if err := json.Unmarshal(data, &ep); err != nil { + return nil + } + + return &ep +} diff --git a/internal/acp/sink_plan.go b/internal/acp/sink_plan.go new file mode 100644 index 0000000000000000000000000000000000000000..477e506d3ad3649189e70240d32152f2940496e2 --- /dev/null +++ b/internal/acp/sink_plan.go @@ -0,0 +1,45 @@ +package acp + +import ( + "log/slog" + + "github.com/charmbracelet/crush/internal/pubsub" + "github.com/charmbracelet/crush/internal/session" + "github.com/coder/acp-go-sdk" +) + +// HandleSession translates session updates to ACP plan updates. +func (s *Sink) HandleSession(event pubsub.Event[session.Session]) { + sess := event.Payload + + // Only handle updates for our session. + if sess.ID != s.sessionID { + return + } + + // Only handle update events (not created/deleted). + if event.Type != pubsub.UpdatedEvent { + return + } + + // Convert todos to plan entries. + entries := make([]acp.PlanEntry, len(sess.Todos)) + for i, todo := range sess.Todos { + entries[i] = acp.PlanEntry{ + Content: todo.Content, + Status: acp.PlanEntryStatus(todo.Status), + Priority: acp.PlanEntryPriorityMedium, + } + if todo.ActiveForm != "" { + entries[i].Meta = map[string]string{"active_form": todo.ActiveForm} + } + } + + update := acp.UpdatePlan(entries...) + if err := s.conn.SessionUpdate(s.ctx, acp.SessionNotification{ + SessionId: acp.SessionId(s.sessionID), + Update: update, + }); err != nil { + slog.Error("Failed to send plan update", "error", err) + } +} diff --git a/internal/acp/sink_tools.go b/internal/acp/sink_tools.go new file mode 100644 index 0000000000000000000000000000000000000000..57f79f6b96affec706b949bcb448d8be1e1455ab --- /dev/null +++ b/internal/acp/sink_tools.go @@ -0,0 +1,157 @@ +package acp + +import ( + "encoding/json" + + "github.com/charmbracelet/crush/internal/message" + "github.com/coder/acp-go-sdk" +) + +func (s *Sink) translateToolCall(tc message.ToolCall) *acp.SessionUpdate { + if !tc.Finished { + opts := []acp.ToolCallStartOpt{ + acp.WithStartStatus(acp.ToolCallStatusPending), + acp.WithStartKind(toolKind(tc.Name)), + } + + // Parse input to extract path, title, and raw input. + title := tc.Name + if input := parseToolInput(tc.Input); input != nil { + if input.Path != "" { + opts = append(opts, acp.WithStartLocations([]acp.ToolCallLocation{{Path: input.Path}})) + } + if input.Title != "" { + title = input.Title + } + opts = append(opts, acp.WithStartRawInput(input.Raw)) + } + + update := acp.StartToolCall(acp.ToolCallId(tc.ID), title, opts...) + return &update + } + + // Tool finished streaming - update with title and input now available. + opts := []acp.ToolCallUpdateOpt{ + acp.WithUpdateStatus(acp.ToolCallStatusInProgress), + } + if input := parseToolInput(tc.Input); input != nil { + if input.Title != "" { + opts = append(opts, acp.WithUpdateTitle(input.Title)) + } + if input.Path != "" { + opts = append(opts, acp.WithUpdateLocations([]acp.ToolCallLocation{{Path: input.Path}})) + } + opts = append(opts, acp.WithUpdateRawInput(input.Raw)) + } + + update := acp.UpdateToolCall(acp.ToolCallId(tc.ID), opts...) + return &update +} + +// toolInput holds parsed tool call input. +type toolInput struct { + Path string + Title string + Raw map[string]any +} + +// parseToolInput extracts path and raw input from JSON tool input. +func parseToolInput(input string) *toolInput { + if input == "" { + return nil + } + + var raw map[string]any + if err := json.Unmarshal([]byte(input), &raw); err != nil { + return nil + } + + ti := &toolInput{Raw: raw} + + // Extract path from common field names. + if path, ok := raw["file_path"].(string); ok { + ti.Path = path + } else if path, ok := raw["path"].(string); ok { + ti.Path = path + } + + // Extract title/description for display. + if desc, ok := raw["description"].(string); ok { + ti.Title = desc + } + + return ti +} + +// toolKind maps Crush tool names to ACP tool kinds. +func toolKind(name string) acp.ToolKind { + switch name { + case "view", "ls", "job_output", "lsp_diagnostics": + return acp.ToolKindRead + case "edit", "multiedit", "write": + return acp.ToolKindEdit + case "bash", "job_kill": + return acp.ToolKindExecute + case "grep", "glob", "lsp_references", "sourcegraph", "web_search": + return acp.ToolKindSearch + case "fetch", "agentic_fetch", "web_fetch", "download": + return acp.ToolKindFetch + default: + return acp.ToolKindOther + } +} + +// diffMetadata holds fields common to edit tool response metadata. +type diffMetadata struct { + FilePath string `json:"file_path"` + OldContent string `json:"old_content"` + NewContent string `json:"new_content"` +} + +func (s *Sink) translateToolResult(tr message.ToolResult) *acp.SessionUpdate { + status := acp.ToolCallStatusCompleted + if tr.IsError { + status = acp.ToolCallStatusFailed + } + + // For edit tools with metadata, emit diff content. + content := []acp.ToolCallContent{acp.ToolContent(acp.TextBlock(tr.Content))} + var locations []acp.ToolCallLocation + + if !tr.IsError && tr.Metadata != "" { + switch tr.Name { + case "edit", "multiedit", "write": + var meta diffMetadata + if err := json.Unmarshal([]byte(tr.Metadata), &meta); err == nil && meta.FilePath != "" { + content = []acp.ToolCallContent{ + acp.ToolDiffContent(meta.FilePath, meta.NewContent, meta.OldContent), + } + } + case "view": + var meta struct { + FilePath string `json:"file_path"` + } + if err := json.Unmarshal([]byte(tr.Metadata), &meta); err == nil && meta.FilePath != "" { + locations = []acp.ToolCallLocation{{Path: meta.FilePath}} + } + case "ls": + var meta struct { + Path string `json:"path"` + } + if err := json.Unmarshal([]byte(tr.Metadata), &meta); err == nil && meta.Path != "" { + locations = []acp.ToolCallLocation{{Path: meta.Path}} + } + } + } + + opts := []acp.ToolCallUpdateOpt{ + acp.WithUpdateStatus(status), + acp.WithUpdateContent(content), + } + if len(locations) > 0 { + opts = append(opts, acp.WithUpdateLocations(locations)) + } + + update := acp.UpdateToolCall(acp.ToolCallId(tr.ToolCallID), opts...) + return &update +}