1use collections::HashMap;
2use context_server::types::requests::CallTool;
3use context_server::types::{CallToolParams, ToolResponseContent};
4use context_server::{ContextServer, ContextServerCommand, ContextServerId};
5use futures::channel::{mpsc, oneshot};
6use itertools::Itertools;
7use project::Project;
8use serde::de::DeserializeOwned;
9use settings::SettingsStore;
10use smol::stream::StreamExt;
11use std::cell::RefCell;
12use std::path::{Path, PathBuf};
13use std::rc::Rc;
14use std::sync::Arc;
15
16use agentic_coding_protocol::{self as acp_old, Client as _};
17use anyhow::{Context, Result, anyhow};
18use futures::future::LocalBoxFuture;
19use futures::{AsyncWriteExt, FutureExt, SinkExt as _};
20use gpui::{App, AppContext, Entity, Task};
21use serde::{Deserialize, Serialize};
22use util::ResultExt;
23
24use crate::mcp_server::{McpConfig, ZedMcpServer};
25use crate::{AgentServer, AgentServerCommand, AllAgentServersSettings};
26use acp_thread::{AcpThread, AgentConnection, OldAcpClientDelegate};
27
28#[derive(Clone)]
29pub struct Codex;
30
31pub struct CodexApproval;
32impl context_server::types::Request for CodexApproval {
33 type Params = CodexElicitation;
34 type Response = CodexApprovalResponse;
35 const METHOD: &'static str = "elicitation/create";
36}
37
38#[derive(Debug, Serialize, Deserialize)]
39pub struct ExecApprovalRequest {
40 // These fields are required so that `params`
41 // conforms to ElicitRequestParams.
42 pub message: String,
43 // #[serde(rename = "requestedSchema")]
44 // pub requested_schema: ElicitRequestParamsRequestedSchema,
45
46 // // These are additional fields the client can use to
47 // // correlate the request with the codex tool call.
48 pub codex_mcp_tool_call_id: String,
49 // pub codex_event_id: String,
50 pub codex_command: Vec<String>,
51 pub codex_cwd: PathBuf,
52}
53
54#[derive(Debug, Serialize, Deserialize)]
55pub struct PatchApprovalRequest {
56 pub message: String,
57 // #[serde(rename = "requestedSchema")]
58 // pub requested_schema: ElicitRequestParamsRequestedSchema,
59 pub codex_mcp_tool_call_id: String,
60 pub codex_event_id: String,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 pub codex_reason: Option<String>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub codex_grant_root: Option<PathBuf>,
65 pub codex_changes: HashMap<PathBuf, FileChange>,
66}
67
68#[derive(Debug, Serialize, Deserialize)]
69#[serde(tag = "codex_elicitation", rename_all = "snake_case")]
70enum CodexElicitation {
71 ExecApproval(ExecApprovalRequest),
72 PatchApproval(PatchApprovalRequest),
73}
74
75#[derive(Debug, Clone, Deserialize, Serialize)]
76#[serde(rename_all = "snake_case")]
77pub enum FileChange {
78 Add {
79 content: String,
80 },
81 Delete,
82 Update {
83 unified_diff: String,
84 move_path: Option<PathBuf>,
85 },
86}
87
88#[derive(Debug, Serialize, Deserialize)]
89pub struct CodexApprovalResponse {
90 pub decision: ReviewDecision,
91}
92
93/// User's decision in response to an ExecApprovalRequest.
94#[derive(Debug, Default, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
95#[serde(rename_all = "snake_case")]
96pub enum ReviewDecision {
97 /// User has approved this command and the agent should execute it.
98 Approved,
99
100 /// User has approved this command and wants to automatically approve any
101 /// future identical instances (`command` and `cwd` match exactly) for the
102 /// remainder of the session.
103 ApprovedForSession,
104
105 /// User has denied this command and the agent should not execute it, but
106 /// it should continue the session and try something else.
107 #[default]
108 Denied,
109
110 /// User has denied this command and the agent should not do anything until
111 /// the user's next command.
112 Abort,
113}
114
115impl AgentServer for Codex {
116 fn name(&self) -> &'static str {
117 "Codex"
118 }
119
120 fn empty_state_headline(&self) -> &'static str {
121 self.name()
122 }
123
124 fn empty_state_message(&self) -> &'static str {
125 ""
126 }
127
128 fn logo(&self) -> ui::IconName {
129 ui::IconName::AiOpenAi
130 }
131
132 fn supports_always_allow(&self) -> bool {
133 false
134 }
135
136 fn new_thread(
137 &self,
138 root_dir: &Path,
139 project: &Entity<Project>,
140 cx: &mut App,
141 ) -> Task<Result<Entity<AcpThread>>> {
142 let project = project.clone();
143 let root_dir = root_dir.to_path_buf();
144 let title = self.name().into();
145 cx.spawn(async move |cx| {
146 let (mut delegate_tx, delegate_rx) = watch::channel(None);
147 let tool_id_map = Rc::new(RefCell::new(HashMap::default()));
148
149 let zed_mcp_server = ZedMcpServer::new(delegate_rx, tool_id_map.clone(), cx).await?;
150 let mcp_server_config = zed_mcp_server.server_config()?;
151 // https://github.com/openai/codex/blob/main/codex-rs/config.md
152 let cli_server_config = format!(
153 "mcp_servers.{}={{command = \"{}\", args = [{}]}}",
154 crate::mcp_server::SERVER_NAME,
155 mcp_server_config.command.display(),
156 mcp_server_config
157 .args
158 .iter()
159 .map(|arg| format!("\"{}\"", arg))
160 .join(", ")
161 );
162
163 let settings = cx.read_global(|settings: &SettingsStore, _| {
164 settings.get::<AllAgentServersSettings>(None).codex.clone()
165 })?;
166
167 let Some(mut command) =
168 AgentServerCommand::resolve("codex", &["mcp"], settings, &project, cx).await
169 else {
170 anyhow::bail!("Failed to find codex binary");
171 };
172
173 command
174 .args
175 .extend(["--config".to_string(), cli_server_config]);
176
177 let codex_mcp_client: Arc<ContextServer> = ContextServer::stdio(
178 ContextServerId("codex-mcp-server".into()),
179 ContextServerCommand {
180 path: command.path,
181 args: command.args,
182 env: command.env,
183 },
184 )
185 .into();
186
187 ContextServer::start(codex_mcp_client.clone(), cx).await?;
188 // todo! stop
189
190 let (notification_tx, mut notification_rx) = mpsc::unbounded();
191 let (request_tx, mut request_rx) = mpsc::unbounded();
192
193 let client = codex_mcp_client
194 .client()
195 .context("Failed to subscribe to server")?;
196 client.on_notification("codex/event", {
197 move |event, cx| {
198 let mut notification_tx = notification_tx.clone();
199 cx.background_spawn(async move {
200 log::trace!("Notification: {:?}", event);
201 if let Some(event) = serde_json::from_value::<CodexEvent>(event).log_err() {
202 notification_tx.send(event.msg).await.log_err();
203 }
204 })
205 .detach();
206 }
207 });
208
209 client.on_request::<CodexApproval, _>({
210 let delegate = delegate.clone();
211 {
212 move |elicitation, cx| {
213 let (tx, rx) = oneshot::channel::<Result<CodexApprovalResponse>>();
214 request_tx.send((elicitation, tx));
215 cx.foreground_executor().spawn(rx)
216 }
217 }
218 });
219
220 cx.new(|cx| {
221 let delegate = OldAcpClientDelegate::new(cx.entity().downgrade(), cx.to_async());
222 delegate_tx.send(Some(delegate.clone())).log_err();
223
224 let handler_task = cx.spawn({
225 let delegate = delegate.clone();
226 let tool_id_map = tool_id_map.clone();
227 async move |_, _cx| {
228 while let Some(notification) = notification_rx.next().await {
229 CodexAgentConnection::handle_acp_notification(
230 &delegate,
231 notification,
232 &tool_id_map,
233 )
234 .await
235 .log_err();
236 }
237 }
238 });
239
240 let request_task = cx.spawn({
241 let delegate = delegate.clone();
242 let tool_id_map = tool_id_map.clone();
243 async move |_, _cx| {
244 while let Some((elicitation, respond)) = request_tx.next().await {
245 let confirmation = match elicitation {
246 CodexElicitation::ExecApproval(exec) => {
247 let inner_command =
248 strip_bash_lc_and_escape(&exec.codex_command);
249
250 acp_old::RequestToolCallConfirmationParams {
251 tool_call: acp_old::PushToolCallParams {
252 label: todo!(),
253 icon: acp_old::Icon::Terminal,
254 content: None,
255 locations: vec![],
256 },
257 confirmation: acp_old::ToolCallConfirmation::Execute {
258 root_command: inner_command
259 .split(" ")
260 .next()
261 .unwrap_or_default()
262 .to_string(),
263 command: inner_command,
264 description: Some(exec.message),
265 },
266 }
267 }
268 CodexElicitation::PatchApproval(patch) => {
269 acp_old::RequestToolCallConfirmationParams {
270 tool_call: acp_old::PushToolCallParams {
271 label: "Edit".to_string(),
272 icon: acp_old::Icon::Pencil,
273 content: None, // todo!()
274 locations: patch
275 .codex_changes
276 .keys()
277 .map(|path| acp_old::ToolCallLocation {
278 path: path.clone(),
279 line: None,
280 })
281 .collect(),
282 },
283 confirmation: acp_old::ToolCallConfirmation::Edit {
284 description: Some(patch.message),
285 },
286 }
287 }
288 };
289
290 let task = cx.spawn(async move |cx| {
291 let response = delegate
292 .request_tool_call_confirmation(confirmation)
293 .await?;
294
295 let decision = match response.outcome {
296 acp_old::ToolCallConfirmationOutcome::Allow => {
297 ReviewDecision::Approved
298 }
299 acp_old::ToolCallConfirmationOutcome::AlwaysAllow
300 | acp_old::ToolCallConfirmationOutcome::AlwaysAllowMcpServer
301 | acp_old::ToolCallConfirmationOutcome::AlwaysAllowTool => {
302 ReviewDecision::ApprovedForSession
303 }
304 acp_old::ToolCallConfirmationOutcome::Reject => {
305 ReviewDecision::Denied
306 }
307 acp_old::ToolCallConfirmationOutcome::Cancel => {
308 ReviewDecision::Abort
309 }
310 };
311
312 Ok(CodexApprovalResponse { decision })
313 });
314 }
315
316 cx.spawn(async move |cx| {
317 tx.send(task.await).ok();
318 })
319 }
320 });
321
322 let connection = CodexAgentConnection {
323 root_dir,
324 codex_mcp: codex_mcp_client,
325 cancel_request_tx: Default::default(),
326 tool_id_map: tool_id_map.clone(),
327 _handler_task: handler_task,
328 _request_task: request_task,
329 _zed_mcp: zed_mcp_server,
330 };
331
332 acp_thread::AcpThread::new(connection, title, None, project.clone(), cx)
333 })
334 })
335 }
336}
337
338impl AgentConnection for CodexAgentConnection {
339 /// Send a request to the agent and wait for a response.
340 fn request_any(
341 &self,
342 params: acp_old::AnyAgentRequest,
343 ) -> LocalBoxFuture<'static, Result<acp_old::acp_old::AnyAgentResult>> {
344 let client = self.codex_mcp.client();
345 let root_dir = self.root_dir.clone();
346 let cancel_request_tx = self.cancel_request_tx.clone();
347 async move {
348 let client = client.context("Codex MCP server is not initialized")?;
349
350 match params {
351 // todo: consider sending an empty request so we get the init response?
352 acp_old::AnyAgentRequest::InitializeParams(_) => Ok(
353 acp_old::AnyAgentResult::InitializeResponse(acp_old::InitializeResponse {
354 is_authenticated: true,
355 protocol_version: acp_old::ProtocolVersion::latest(),
356 }),
357 ),
358 acp_old::AnyAgentRequest::AuthenticateParams(_) => {
359 Err(anyhow!("Authentication not supported"))
360 }
361 acp_old::AnyAgentRequest::SendUserMessageParams(message) => {
362 let (new_cancel_tx, cancel_rx) = oneshot::channel();
363 cancel_request_tx.borrow_mut().replace(new_cancel_tx);
364
365 client
366 .cancellable_request::<CallTool>(
367 CallToolParams {
368 name: "codex".into(),
369 arguments: Some(serde_json::to_value(CodexToolCallParam {
370 prompt: message
371 .chunks
372 .into_iter()
373 .filter_map(|chunk| match chunk {
374 acp_old::UserMessageChunk::Text { text } => Some(text),
375 acp_old::UserMessageChunk::Path { .. } => {
376 // todo!
377 None
378 }
379 })
380 .collect(),
381 cwd: root_dir,
382 })?),
383 meta: None,
384 },
385 cancel_rx,
386 )
387 .await?;
388
389 Ok(acp_old::AnyAgentResult::SendUserMessageResponse(
390 acp_old::SendUserMessageResponse,
391 ))
392 }
393 acp_old::AnyAgentRequest::CancelSendMessageParams(_) => {
394 if let Ok(mut borrow) = cancel_request_tx.try_borrow_mut() {
395 if let Some(cancel_tx) = borrow.take() {
396 cancel_tx.send(()).ok();
397 }
398 }
399
400 Ok(acp_old::AnyAgentResult::CancelSendMessageResponse(
401 acp_old::CancelSendMessageResponse,
402 ))
403 }
404 }
405 }
406 .boxed_local()
407 }
408}
409
410struct CodexAgentConnection {
411 codex_mcp: Arc<context_server::ContextServer>,
412 root_dir: PathBuf,
413 cancel_request_tx: Rc<RefCell<Option<oneshot::Sender<()>>>>,
414 tool_id_map: Rc<RefCell<HashMap<String, acp_old::ToolCallId>>>,
415 _handler_task: Task<()>,
416 _request_task: Task<()>,
417 _zed_mcp: ZedMcpServer,
418}
419
420impl CodexAgentConnection {
421 async fn handle_acp_notification(
422 delegate: &OldAcpClientDelegate,
423 event: AcpNotification,
424 tool_id_map: &Rc<RefCell<HashMap<String, acp_old::ToolCallId>>>,
425 ) -> Result<()> {
426 match event {
427 AcpNotification::AgentMessage(message) => {
428 delegate
429 .stream_assistant_message_chunk(acp_old::StreamAssistantMessageChunkParams {
430 chunk: acp_old::AssistantMessageChunk::Text {
431 text: message.message,
432 },
433 })
434 .await?;
435 }
436 AcpNotification::AgentReasoning(message) => {
437 delegate
438 .stream_assistant_message_chunk(acp_old::StreamAssistantMessageChunkParams {
439 chunk: acp_old::AssistantMessageChunk::Thought {
440 thought: message.text,
441 },
442 })
443 .await?
444 }
445 AcpNotification::McpToolCallBegin(event) => {
446 let result = delegate
447 .push_tool_call(acp_old::PushToolCallParams {
448 label: format!("`{}: {}`", event.server, event.tool),
449 icon: acp_old::Icon::Hammer,
450 content: event.arguments.and_then(|args| {
451 Some(acp_old::ToolCallContent::Markdown {
452 markdown: md_codeblock(
453 "json",
454 &serde_json::to_string_pretty(&args).ok()?,
455 ),
456 })
457 }),
458 locations: vec![],
459 })
460 .await?;
461
462 tool_id_map.borrow_mut().insert(event.call_id, result.id);
463 }
464 AcpNotification::McpToolCallEnd(event) => {
465 let acp_call_id = tool_id_map
466 .borrow_mut()
467 .remove(&event.call_id)
468 .context("Missing tool call")?;
469
470 let (status, content) = match event.result {
471 Ok(value) => {
472 if let Ok(response) =
473 serde_json::from_value::<context_server::types::CallToolResponse>(value)
474 {
475 (
476 acp_old::ToolCallStatus::Finished,
477 mcp_tool_content_to_acp(response.content),
478 )
479 } else {
480 (
481 acp_old::ToolCallStatus::Error,
482 Some(acp_old::ToolCallContent::Markdown {
483 markdown: "Failed to parse tool response".to_string(),
484 }),
485 )
486 }
487 }
488 Err(error) => (
489 acp_old::ToolCallStatus::Error,
490 Some(acp_old::ToolCallContent::Markdown { markdown: error }),
491 ),
492 };
493
494 delegate
495 .update_tool_call(acp_old::UpdateToolCallParams {
496 tool_call_id: acp_call_id,
497 status,
498 content,
499 })
500 .await?;
501 }
502 AcpNotification::ExecCommandBegin(event) => {
503 let inner_command = strip_bash_lc_and_escape(&event.command);
504
505 let result = delegate
506 .push_tool_call(acp_old::PushToolCallParams {
507 label: format!("`{}`", inner_command),
508 icon: acp_old::Icon::Terminal,
509 content: None,
510 locations: vec![],
511 })
512 .await?;
513
514 tool_id_map.borrow_mut().insert(event.call_id, result.id);
515 }
516 AcpNotification::ExecCommandEnd(event) => {
517 let acp_call_id = tool_id_map
518 .borrow_mut()
519 .remove(&event.call_id)
520 .context("Missing tool call")?;
521
522 let mut content = String::new();
523 if !event.stdout.is_empty() {
524 use std::fmt::Write;
525 writeln!(
526 &mut content,
527 "### Output\n\n{}",
528 md_codeblock("", &event.stdout)
529 )
530 .unwrap();
531 }
532 if !event.stdout.is_empty() && !event.stderr.is_empty() {
533 use std::fmt::Write;
534 writeln!(&mut content).unwrap();
535 }
536 if !event.stderr.is_empty() {
537 use std::fmt::Write;
538 writeln!(
539 &mut content,
540 "### Error\n\n{}",
541 md_codeblock("", &event.stderr)
542 )
543 .unwrap();
544 }
545 let success = event.exit_code == 0;
546 if !success {
547 use std::fmt::Write;
548 writeln!(&mut content, "\nExit code: `{}`", event.exit_code).unwrap();
549 }
550
551 delegate
552 .update_tool_call(acp_old::UpdateToolCallParams {
553 tool_call_id: acp_call_id,
554 status: if success {
555 acp_old::ToolCallStatus::Finished
556 } else {
557 acp_old::ToolCallStatus::Error
558 },
559 content: Some(acp_old::ToolCallContent::Markdown { markdown: content }),
560 })
561 .await?;
562 }
563 AcpNotification::ExecApprovalRequest(event) => {
564 let inner_command = strip_bash_lc_and_escape(&event.command);
565 let root_command = inner_command
566 .split(" ")
567 .next()
568 .map(|s| s.to_string())
569 .unwrap_or_default();
570
571 let response = delegate
572 .request_tool_call_confirmation(acp_old::RequestToolCallConfirmationParams {
573 tool_call: acp_old::PushToolCallParams {
574 label: format!("`{}`", inner_command),
575 icon: acp_old::Icon::Terminal,
576 content: None,
577 locations: vec![],
578 },
579 confirmation: acp_old::ToolCallConfirmation::Execute {
580 command: inner_command,
581 root_command,
582 description: event.reason,
583 },
584 })
585 .await?;
586
587 tool_id_map.borrow_mut().insert(event.call_id, response.id);
588
589 // todo! approval
590 }
591 AcpNotification::Other => {}
592 }
593
594 Ok(())
595 }
596}
597
598/// todo! use types from h2a crate when we have one
599
600#[derive(Debug, Clone, Serialize, Deserialize)]
601#[serde(rename_all = "kebab-case")]
602pub(crate) struct CodexToolCallParam {
603 pub prompt: String,
604 pub cwd: PathBuf,
605}
606
607#[derive(Debug, Clone, Serialize, Deserialize)]
608struct CodexEvent {
609 pub msg: AcpNotification,
610}
611
612#[derive(Debug, Clone, Serialize, Deserialize)]
613#[serde(tag = "type", rename_all = "snake_case")]
614pub enum AcpNotification {
615 AgentMessage(AgentMessageEvent),
616 AgentReasoning(AgentReasoningEvent),
617 McpToolCallBegin(McpToolCallBeginEvent),
618 McpToolCallEnd(McpToolCallEndEvent),
619 ExecCommandBegin(ExecCommandBeginEvent),
620 ExecCommandEnd(ExecCommandEndEvent),
621 ExecApprovalRequest(ExecApprovalRequestEvent),
622 #[serde(other)]
623 Other,
624}
625
626#[derive(Debug, Clone, Serialize, Deserialize)]
627pub struct AgentMessageEvent {
628 pub message: String,
629}
630
631#[derive(Debug, Clone, Deserialize, Serialize)]
632pub struct AgentReasoningEvent {
633 pub text: String,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
637pub struct McpToolCallBeginEvent {
638 pub call_id: String,
639 pub server: String,
640 pub tool: String,
641 pub arguments: Option<serde_json::Value>,
642}
643
644#[derive(Debug, Clone, Serialize, Deserialize)]
645pub struct McpToolCallEndEvent {
646 pub call_id: String,
647 pub result: Result<serde_json::Value, String>,
648}
649
650#[derive(Debug, Clone, Serialize, Deserialize)]
651pub struct ExecCommandBeginEvent {
652 pub call_id: String,
653 pub command: Vec<String>,
654 pub cwd: PathBuf,
655}
656
657#[derive(Debug, Clone, Serialize, Deserialize)]
658pub struct ExecCommandEndEvent {
659 pub call_id: String,
660 pub stdout: String,
661 pub stderr: String,
662 pub exit_code: i32,
663}
664
665#[derive(Debug, Clone, Serialize, Deserialize)]
666pub struct ExecApprovalRequestEvent {
667 pub call_id: String,
668 pub command: Vec<String>,
669 pub cwd: PathBuf,
670 #[serde(skip_serializing_if = "Option::is_none")]
671 pub reason: Option<String>,
672}
673
674// Helper functions
675fn md_codeblock(lang: &str, content: &str) -> String {
676 if content.ends_with('\n') {
677 format!("```{}\n{}```", lang, content)
678 } else {
679 format!("```{}\n{}\n```", lang, content)
680 }
681}
682
683fn strip_bash_lc_and_escape(command: &[String]) -> String {
684 match command {
685 // exactly three items
686 [first, second, third]
687 // first two must be "bash", "-lc"
688 if first == "bash" && second == "-lc" =>
689 {
690 third.clone()
691 }
692 _ => escape_command(command),
693 }
694}
695
696fn escape_command(command: &[String]) -> String {
697 shlex::try_join(command.iter().map(|s| s.as_str())).unwrap_or_else(|_| command.join(" "))
698}
699
700fn mcp_tool_content_to_acp(chunks: Vec<ToolResponseContent>) -> Option<acp_old::ToolCallContent> {
701 let mut content = String::new();
702
703 for chunk in chunks {
704 match chunk {
705 ToolResponseContent::Text { text } => content.push_str(&text),
706 ToolResponseContent::Image { .. } => {
707 // todo!
708 }
709 ToolResponseContent::Audio { .. } => {
710 // todo!
711 }
712 ToolResponseContent::Resource { .. } => {
713 // todo!
714 }
715 }
716 }
717
718 if !content.is_empty() {
719 Some(acp_old::ToolCallContent::Markdown { markdown: content })
720 } else {
721 None
722 }
723}
724
725#[cfg(test)]
726pub mod tests {
727 use super::*;
728
729 crate::common_e2e_tests!(Codex);
730
731 pub fn local_command() -> AgentServerCommand {
732 let cli_path =
733 Path::new(env!("CARGO_MANIFEST_DIR")).join("../../../codex/code-rs/target/debug/codex");
734
735 AgentServerCommand {
736 path: cli_path,
737 args: vec!["mcp".into()],
738 env: None,
739 }
740 }
741}