diff --git a/crates/agent_servers/src/claude.rs b/crates/agent_servers/src/claude.rs index 79b02ddf74b02d957f7e2c706c46c6fa8e4656ed..cd7a6699aef4ee95b1eb33bae301fd1ae21a67cb 100644 --- a/crates/agent_servers/src/claude.rs +++ b/crates/agent_servers/src/claude.rs @@ -64,7 +64,7 @@ impl AgentServer for ClaudeCode { let project = project.clone(); let root_dir = root_dir.to_path_buf(); cx.spawn(async move |cx| { - let mut threads_map = Rc::new(RefCell::new(HashMap::default())); + let threads_map = Rc::new(RefCell::new(HashMap::default())); let tool_id_map = Rc::new(RefCell::new(HashMap::default())); let permission_mcp_server = diff --git a/crates/agent_servers/src/codex.rs b/crates/agent_servers/src/codex.rs index ba7e3c667ce40b67bfae67ce850dbd3216fc4826..4a9372610029218ec06b79074d7e6301e27063de 100644 --- a/crates/agent_servers/src/codex.rs +++ b/crates/agent_servers/src/codex.rs @@ -1,117 +1,16 @@ -use collections::HashMap; -use context_server::types::requests::CallTool; -use context_server::types::{CallToolParams, ToolResponseContent}; -use context_server::{ContextServer, ContextServerCommand, ContextServerId}; -use futures::channel::{mpsc, oneshot}; -use itertools::Itertools; use project::Project; -use serde::de::DeserializeOwned; -use settings::SettingsStore; -use smol::stream::StreamExt; -use std::cell::RefCell; -use std::path::{Path, PathBuf}; -use std::rc::Rc; +use std::path::Path; use std::sync::Arc; -use agentic_coding_protocol::{self as acp_old, Client as _}; -use anyhow::{Context, Result, anyhow}; -use futures::future::LocalBoxFuture; -use futures::{AsyncWriteExt, FutureExt, SinkExt as _}; -use gpui::{App, AppContext, Entity, Task}; -use serde::{Deserialize, Serialize}; -use util::ResultExt; +use anyhow::Result; +use gpui::{App, Entity, Task}; -use crate::mcp_server::{McpConfig, ZedMcpServer}; -use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings}; -use acp_thread::{AcpThread, AgentConnection, OldAcpClientDelegate}; +use crate::AgentServer; +use acp_thread::AgentConnection; #[derive(Clone)] pub struct Codex; -pub struct CodexApproval; -impl context_server::types::Request for CodexApproval { - type Params = CodexElicitation; - type Response = CodexApprovalResponse; - const METHOD: &'static str = "elicitation/create"; -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct ExecApprovalRequest { - // These fields are required so that `params` - // conforms to ElicitRequestParams. - pub message: String, - // #[serde(rename = "requestedSchema")] - // pub requested_schema: ElicitRequestParamsRequestedSchema, - - // // These are additional fields the client can use to - // // correlate the request with the codex tool call. - pub codex_mcp_tool_call_id: String, - // pub codex_event_id: String, - pub codex_command: Vec, - pub codex_cwd: PathBuf, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct PatchApprovalRequest { - pub message: String, - // #[serde(rename = "requestedSchema")] - // pub requested_schema: ElicitRequestParamsRequestedSchema, - pub codex_mcp_tool_call_id: String, - pub codex_event_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub codex_reason: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub codex_grant_root: Option, - pub codex_changes: HashMap, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "codex_elicitation", rename_all = "snake_case")] -enum CodexElicitation { - ExecApproval(ExecApprovalRequest), - PatchApproval(PatchApprovalRequest), -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -#[serde(rename_all = "snake_case")] -pub enum FileChange { - Add { - content: String, - }, - Delete, - Update { - unified_diff: String, - move_path: Option, - }, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct CodexApprovalResponse { - pub decision: ReviewDecision, -} - -/// User's decision in response to an ExecApprovalRequest. -#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -pub enum ReviewDecision { - /// User has approved this command and the agent should execute it. - Approved, - - /// User has approved this command and wants to automatically approve any - /// future identical instances (`command` and `cwd` match exactly) for the - /// remainder of the session. - ApprovedForSession, - - /// User has denied this command and the agent should not execute it, but - /// it should continue the session and try something else. - #[default] - Denied, - - /// User has denied this command and the agent should not do anything until - /// the user's next command. - Abort, -} - impl AgentServer for Codex { fn name(&self) -> &'static str { "Codex" @@ -133,597 +32,21 @@ impl AgentServer for Codex { false } - fn new_thread( + fn connect( &self, - root_dir: &Path, - project: &Entity, - cx: &mut App, - ) -> Task>> { - let project = project.clone(); - let root_dir = root_dir.to_path_buf(); - let title = self.name().into(); - cx.spawn(async move |cx| { - let (mut delegate_tx, delegate_rx) = watch::channel(None); - let tool_id_map = Rc::new(RefCell::new(HashMap::default())); - - let zed_mcp_server = ZedMcpServer::new(delegate_rx, tool_id_map.clone(), cx).await?; - let mcp_server_config = zed_mcp_server.server_config()?; - // https://github.com/openai/codex/blob/main/codex-rs/config.md - let cli_server_config = format!( - "mcp_servers.{}={{command = \"{}\", args = [{}]}}", - crate::mcp_server::SERVER_NAME, - mcp_server_config.command.display(), - mcp_server_config - .args - .iter() - .map(|arg| format!("\"{}\"", arg)) - .join(", ") - ); - - let settings = cx.read_global(|settings: &SettingsStore, _| { - settings.get::(None).codex.clone() - })?; - - let Some(mut command) = - AgentServerCommand::resolve("codex", &["mcp"], settings, &project, cx).await - else { - anyhow::bail!("Failed to find codex binary"); - }; - - command - .args - .extend(["--config".to_string(), cli_server_config]); - - let codex_mcp_client: Arc = ContextServer::stdio( - ContextServerId("codex-mcp-server".into()), - ContextServerCommand { - path: command.path, - args: command.args, - env: command.env, - }, - ) - .into(); - - ContextServer::start(codex_mcp_client.clone(), cx).await?; - // todo! stop - - let (notification_tx, mut notification_rx) = mpsc::unbounded(); - let (request_tx, mut request_rx) = mpsc::unbounded(); - - let client = codex_mcp_client - .client() - .context("Failed to subscribe to server")?; - client.on_notification("codex/event", { - move |event, cx| { - let mut notification_tx = notification_tx.clone(); - cx.background_spawn(async move { - log::trace!("Notification: {:?}", event); - if let Some(event) = serde_json::from_value::(event).log_err() { - notification_tx.send(event.msg).await.log_err(); - } - }) - .detach(); - } - }); - - client.on_request::({ - let delegate = delegate.clone(); - { - move |elicitation, cx| { - let (tx, rx) = oneshot::channel::>(); - request_tx.send((elicitation, tx)); - cx.foreground_executor().spawn(rx) - } - } - }); - - cx.new(|cx| { - let delegate = OldAcpClientDelegate::new(cx.entity().downgrade(), cx.to_async()); - delegate_tx.send(Some(delegate.clone())).log_err(); - - let handler_task = cx.spawn({ - let delegate = delegate.clone(); - let tool_id_map = tool_id_map.clone(); - async move |_, _cx| { - while let Some(notification) = notification_rx.next().await { - CodexAgentConnection::handle_acp_notification( - &delegate, - notification, - &tool_id_map, - ) - .await - .log_err(); - } - } - }); - - let request_task = cx.spawn({ - let delegate = delegate.clone(); - let tool_id_map = tool_id_map.clone(); - async move |_, _cx| { - while let Some((elicitation, respond)) = request_tx.next().await { - let confirmation = match elicitation { - CodexElicitation::ExecApproval(exec) => { - let inner_command = - strip_bash_lc_and_escape(&exec.codex_command); - - acp_old::RequestToolCallConfirmationParams { - tool_call: acp_old::PushToolCallParams { - label: todo!(), - icon: acp_old::Icon::Terminal, - content: None, - locations: vec![], - }, - confirmation: acp_old::ToolCallConfirmation::Execute { - root_command: inner_command - .split(" ") - .next() - .unwrap_or_default() - .to_string(), - command: inner_command, - description: Some(exec.message), - }, - } - } - CodexElicitation::PatchApproval(patch) => { - acp_old::RequestToolCallConfirmationParams { - tool_call: acp_old::PushToolCallParams { - label: "Edit".to_string(), - icon: acp_old::Icon::Pencil, - content: None, // todo!() - locations: patch - .codex_changes - .keys() - .map(|path| acp_old::ToolCallLocation { - path: path.clone(), - line: None, - }) - .collect(), - }, - confirmation: acp_old::ToolCallConfirmation::Edit { - description: Some(patch.message), - }, - } - } - }; - - let task = cx.spawn(async move |cx| { - let response = delegate - .request_tool_call_confirmation(confirmation) - .await?; - - let decision = match response.outcome { - acp_old::ToolCallConfirmationOutcome::Allow => { - ReviewDecision::Approved - } - acp_old::ToolCallConfirmationOutcome::AlwaysAllow - | acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer - | acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool => { - ReviewDecision::ApprovedForSession - } - acp_old::ToolCallConfirmationOutcome::Reject => { - ReviewDecision::Denied - } - acp_old::ToolCallConfirmationOutcome::Cancel => { - ReviewDecision::Abort - } - }; - - Ok(CodexApprovalResponse { decision }) - }); - } - - cx.spawn(async move |cx| { - tx.send(task.await).ok(); - }) - } - }); - - let connection = CodexAgentConnection { - root_dir, - codex_mcp: codex_mcp_client, - cancel_request_tx: Default::default(), - tool_id_map: tool_id_map.clone(), - _handler_task: handler_task, - _request_task: request_task, - _zed_mcp: zed_mcp_server, - }; - - acp_thread::AcpThread::new(connection, title, None, project.clone(), cx) - }) - }) - } -} - -impl AgentConnection for CodexAgentConnection { - /// Send a request to the agent and wait for a response. - fn request_any( - &self, - params: acp_old::AnyAgentRequest, - ) -> LocalBoxFuture<'static, Result> { - let client = self.codex_mcp.client(); - let root_dir = self.root_dir.clone(); - let cancel_request_tx = self.cancel_request_tx.clone(); - async move { - let client = client.context("Codex MCP server is not initialized")?; - - match params { - // todo: consider sending an empty request so we get the init response? - acp_old::AnyAgentRequest::InitializeParams(_) => Ok( - acp_old::AnyAgentResult::InitializeResponse(acp_old::InitializeResponse { - is_authenticated: true, - protocol_version: acp_old::ProtocolVersion::latest(), - }), - ), - acp_old::AnyAgentRequest::AuthenticateParams(_) => { - Err(anyhow!("Authentication not supported")) - } - acp_old::AnyAgentRequest::SendUserMessageParams(message) => { - let (new_cancel_tx, cancel_rx) = oneshot::channel(); - cancel_request_tx.borrow_mut().replace(new_cancel_tx); - - client - .cancellable_request::( - CallToolParams { - name: "codex".into(), - arguments: Some(serde_json::to_value(CodexToolCallParam { - prompt: message - .chunks - .into_iter() - .filter_map(|chunk| match chunk { - acp_old::UserMessageChunk::Text { text } => Some(text), - acp_old::UserMessageChunk::Path { .. } => { - // todo! - None - } - }) - .collect(), - cwd: root_dir, - })?), - meta: None, - }, - cancel_rx, - ) - .await?; - - Ok(acp_old::AnyAgentResult::SendUserMessageResponse( - acp_old::SendUserMessageResponse, - )) - } - acp_old::AnyAgentRequest::CancelSendMessageParams(_) => { - if let Ok(mut borrow) = cancel_request_tx.try_borrow_mut() { - if let Some(cancel_tx) = borrow.take() { - cancel_tx.send(()).ok(); - } - } - - Ok(acp_old::AnyAgentResult::CancelSendMessageResponse( - acp_old::CancelSendMessageResponse, - )) - } - } - } - .boxed_local() - } -} - -struct CodexAgentConnection { - codex_mcp: Arc, - root_dir: PathBuf, - cancel_request_tx: Rc>>>, - tool_id_map: Rc>>, - _handler_task: Task<()>, - _request_task: Task<()>, - _zed_mcp: ZedMcpServer, -} - -impl CodexAgentConnection { - async fn handle_acp_notification( - delegate: &OldAcpClientDelegate, - event: AcpNotification, - tool_id_map: &Rc>>, - ) -> Result<()> { - match event { - AcpNotification::AgentMessage(message) => { - delegate - .stream_assistant_message_chunk(acp_old::StreamAssistantMessageChunkParams { - chunk: acp_old::AssistantMessageChunk::Text { - text: message.message, - }, - }) - .await?; - } - AcpNotification::AgentReasoning(message) => { - delegate - .stream_assistant_message_chunk(acp_old::StreamAssistantMessageChunkParams { - chunk: acp_old::AssistantMessageChunk::Thought { - thought: message.text, - }, - }) - .await? - } - AcpNotification::McpToolCallBegin(event) => { - let result = delegate - .push_tool_call(acp_old::PushToolCallParams { - label: format!("`{}: {}`", event.server, event.tool), - icon: acp_old::Icon::Hammer, - content: event.arguments.and_then(|args| { - Some(acp_old::ToolCallContent::Markdown { - markdown: md_codeblock( - "json", - &serde_json::to_string_pretty(&args).ok()?, - ), - }) - }), - locations: vec![], - }) - .await?; - - tool_id_map.borrow_mut().insert(event.call_id, result.id); - } - AcpNotification::McpToolCallEnd(event) => { - let acp_call_id = tool_id_map - .borrow_mut() - .remove(&event.call_id) - .context("Missing tool call")?; - - let (status, content) = match event.result { - Ok(value) => { - if let Ok(response) = - serde_json::from_value::(value) - { - ( - acp_old::ToolCallStatus::Finished, - mcp_tool_content_to_acp(response.content), - ) - } else { - ( - acp_old::ToolCallStatus::Error, - Some(acp_old::ToolCallContent::Markdown { - markdown: "Failed to parse tool response".to_string(), - }), - ) - } - } - Err(error) => ( - acp_old::ToolCallStatus::Error, - Some(acp_old::ToolCallContent::Markdown { markdown: error }), - ), - }; - - delegate - .update_tool_call(acp_old::UpdateToolCallParams { - tool_call_id: acp_call_id, - status, - content, - }) - .await?; - } - AcpNotification::ExecCommandBegin(event) => { - let inner_command = strip_bash_lc_and_escape(&event.command); - - let result = delegate - .push_tool_call(acp_old::PushToolCallParams { - label: format!("`{}`", inner_command), - icon: acp_old::Icon::Terminal, - content: None, - locations: vec![], - }) - .await?; - - tool_id_map.borrow_mut().insert(event.call_id, result.id); - } - AcpNotification::ExecCommandEnd(event) => { - let acp_call_id = tool_id_map - .borrow_mut() - .remove(&event.call_id) - .context("Missing tool call")?; - - let mut content = String::new(); - if !event.stdout.is_empty() { - use std::fmt::Write; - writeln!( - &mut content, - "### Output\n\n{}", - md_codeblock("", &event.stdout) - ) - .unwrap(); - } - if !event.stdout.is_empty() && !event.stderr.is_empty() { - use std::fmt::Write; - writeln!(&mut content).unwrap(); - } - if !event.stderr.is_empty() { - use std::fmt::Write; - writeln!( - &mut content, - "### Error\n\n{}", - md_codeblock("", &event.stderr) - ) - .unwrap(); - } - let success = event.exit_code == 0; - if !success { - use std::fmt::Write; - writeln!(&mut content, "\nExit code: `{}`", event.exit_code).unwrap(); - } - - delegate - .update_tool_call(acp_old::UpdateToolCallParams { - tool_call_id: acp_call_id, - status: if success { - acp_old::ToolCallStatus::Finished - } else { - acp_old::ToolCallStatus::Error - }, - content: Some(acp_old::ToolCallContent::Markdown { markdown: content }), - }) - .await?; - } - AcpNotification::ExecApprovalRequest(event) => { - let inner_command = strip_bash_lc_and_escape(&event.command); - let root_command = inner_command - .split(" ") - .next() - .map(|s| s.to_string()) - .unwrap_or_default(); - - let response = delegate - .request_tool_call_confirmation(acp_old::RequestToolCallConfirmationParams { - tool_call: acp_old::PushToolCallParams { - label: format!("`{}`", inner_command), - icon: acp_old::Icon::Terminal, - content: None, - locations: vec![], - }, - confirmation: acp_old::ToolCallConfirmation::Execute { - command: inner_command, - root_command, - description: event.reason, - }, - }) - .await?; - - tool_id_map.borrow_mut().insert(event.call_id, response.id); - - // todo! approval - } - AcpNotification::Other => {} - } - - Ok(()) - } -} - -/// todo! use types from h2a crate when we have one - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "kebab-case")] -pub(crate) struct CodexToolCallParam { - pub prompt: String, - pub cwd: PathBuf, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct CodexEvent { - pub msg: AcpNotification, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum AcpNotification { - AgentMessage(AgentMessageEvent), - AgentReasoning(AgentReasoningEvent), - McpToolCallBegin(McpToolCallBeginEvent), - McpToolCallEnd(McpToolCallEndEvent), - ExecCommandBegin(ExecCommandBeginEvent), - ExecCommandEnd(ExecCommandEndEvent), - ExecApprovalRequest(ExecApprovalRequestEvent), - #[serde(other)] - Other, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct AgentMessageEvent { - pub message: String, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct AgentReasoningEvent { - pub text: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct McpToolCallBeginEvent { - pub call_id: String, - pub server: String, - pub tool: String, - pub arguments: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct McpToolCallEndEvent { - pub call_id: String, - pub result: Result, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ExecCommandBeginEvent { - pub call_id: String, - pub command: Vec, - pub cwd: PathBuf, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ExecCommandEndEvent { - pub call_id: String, - pub stdout: String, - pub stderr: String, - pub exit_code: i32, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ExecApprovalRequestEvent { - pub call_id: String, - pub command: Vec, - pub cwd: PathBuf, - #[serde(skip_serializing_if = "Option::is_none")] - pub reason: Option, -} - -// Helper functions -fn md_codeblock(lang: &str, content: &str) -> String { - if content.ends_with('\n') { - format!("```{}\n{}```", lang, content) - } else { - format!("```{}\n{}\n```", lang, content) - } -} - -fn strip_bash_lc_and_escape(command: &[String]) -> String { - match command { - // exactly three items - [first, second, third] - // first two must be "bash", "-lc" - if first == "bash" && second == "-lc" => - { - third.clone() - } - _ => escape_command(command), - } -} - -fn escape_command(command: &[String]) -> String { - shlex::try_join(command.iter().map(|s| s.as_str())).unwrap_or_else(|_| command.join(" ")) -} - -fn mcp_tool_content_to_acp(chunks: Vec) -> Option { - let mut content = String::new(); - - for chunk in chunks { - match chunk { - ToolResponseContent::Text { text } => content.push_str(&text), - ToolResponseContent::Image { .. } => { - // todo! - } - ToolResponseContent::Audio { .. } => { - // todo! - } - ToolResponseContent::Resource { .. } => { - // todo! - } - } - } - - if !content.is_empty() { - Some(acp_old::ToolCallContent::Markdown { markdown: content }) - } else { - None + _root_dir: &Path, + _project: &Entity, + _cx: &mut App, + ) -> Task>> { + // re-implement using ACP + todo!() } } #[cfg(test)] pub mod tests { + use crate::AgentServerCommand; + use super::*; crate::common_e2e_tests!(Codex);