diff --git a/Cargo.lock b/Cargo.lock index 849129850d7df27c521bea00dcd3796c10369dea..2bc256c9adbe5fec01a2ca50e2c9eb6f1a7d16ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -59,6 +59,7 @@ dependencies = [ "serde", "serde_json", "settings", + "smol", "theme_settings", "ui", "util", diff --git a/crates/acp_tools/Cargo.toml b/crates/acp_tools/Cargo.toml index 43963b4034411c0daee0d88bd5b15ff16a1c9018..9124e0aa5503ddbe683e7c25d63d3c69a2e30565 100644 --- a/crates/acp_tools/Cargo.toml +++ b/crates/acp_tools/Cargo.toml @@ -22,6 +22,7 @@ markdown.workspace = true project.workspace = true serde.workspace = true serde_json.workspace = true +smol.workspace = true settings.workspace = true theme_settings.workspace = true ui.workspace = true diff --git a/crates/acp_tools/src/acp_tools.rs b/crates/acp_tools/src/acp_tools.rs index dfeaab42e14e8032721b4161472b974b9459b876..debc04d41b04476babcfff42fdc853eaaedcc1e3 100644 --- a/crates/acp_tools/src/acp_tools.rs +++ b/crates/acp_tools/src/acp_tools.rs @@ -17,21 +17,16 @@ use workspace::{ Item, ItemHandle, ToolbarItemEvent, ToolbarItemLocation, ToolbarItemView, Workspace, }; -// Stub types for the old streaming API which is not available in agent-client-protocol-core. -// These allow the ACP debug panel code to compile but the streaming won't be active. -#[allow(dead_code)] -type RequestId = String; +pub type RequestId = String; #[derive(Clone)] -#[allow(dead_code)] -enum StreamMessageDirection { +pub enum StreamMessageDirection { Incoming, Outgoing, } #[derive(Clone)] -#[allow(dead_code)] -enum StreamMessageContent { +pub enum StreamMessageContent { Request { id: RequestId, method: Arc, @@ -47,10 +42,49 @@ enum StreamMessageContent { }, } -#[allow(dead_code)] -struct StreamMessage { - direction: StreamMessageDirection, - message: StreamMessageContent, +pub struct StreamMessage { + pub direction: StreamMessageDirection, + pub message: StreamMessageContent, +} + +impl StreamMessage { + pub fn from_json_line(direction: StreamMessageDirection, line: &str) -> Option { + let value: serde_json::Value = serde_json::from_str(line).ok()?; + let obj = value.as_object()?; + + let message = if let Some(method) = obj.get("method").and_then(|m| m.as_str()) { + if let Some(id) = obj.get("id") { + StreamMessageContent::Request { + id: id.to_string(), + method: method.into(), + params: obj.get("params").cloned(), + } + } else { + StreamMessageContent::Notification { + method: method.into(), + params: obj.get("params").cloned(), + } + } + } else if let Some(id) = obj.get("id") { + if let Some(error) = obj.get("error") { + let acp_err = serde_json::from_value::(error.clone()) + .unwrap_or_else(|_| acp::Error::internal_error()); + StreamMessageContent::Response { + id: id.to_string(), + result: Err(acp_err), + } + } else { + StreamMessageContent::Response { + id: id.to_string(), + result: Ok(obj.get("result").cloned()), + } + } + } else { + return None; + }; + + Some(StreamMessage { direction, message }) + } } actions!(dev, [OpenAcpLogs]); @@ -79,6 +113,7 @@ pub struct AcpConnectionRegistry { struct ActiveConnection { agent_id: AgentId, + messages_rx: smol::channel::Receiver, } impl AcpConnectionRegistry { @@ -92,9 +127,16 @@ impl AcpConnectionRegistry { } } - pub fn set_active_connection(&self, agent_id: AgentId, cx: &mut Context) { - self.active_connection - .replace(Some(ActiveConnection { agent_id })); + pub fn set_active_connection( + &self, + agent_id: AgentId, + messages_rx: smol::channel::Receiver, + cx: &mut Context, + ) { + self.active_connection.replace(Some(ActiveConnection { + agent_id, + messages_rx, + })); cx.notify(); } } @@ -112,9 +154,7 @@ struct WatchedConnection { agent_id: AgentId, messages: Vec, list_state: ListState, - #[allow(dead_code)] incoming_request_methods: HashMap>, - #[allow(dead_code)] outgoing_request_methods: HashMap>, _task: Task<()>, } @@ -152,17 +192,26 @@ impl AcpTools { } } + let messages_rx = active_connection.messages_rx.clone(); + let task = cx.spawn(async move |this, cx| { + while let Ok(message) = messages_rx.recv().await { + this.update(cx, |this, cx| { + this.push_stream_message(message, cx); + }) + .ok(); + } + }); + self.watched_connection = Some(WatchedConnection { agent_id: active_connection.agent_id.clone(), messages: vec![], list_state: ListState::new(0, ListAlignment::Bottom, px(2048.)), incoming_request_methods: HashMap::default(), outgoing_request_methods: HashMap::default(), - _task: Task::ready(()), + _task: task, }); } - #[allow(dead_code)] fn push_stream_message(&mut self, stream_message: StreamMessage, cx: &mut Context) { let Some(connection) = self.watched_connection.as_mut() else { return; @@ -427,7 +476,6 @@ impl WatchedConnectionMessage { } } -#[allow(dead_code)] fn collapsed_params_md( params: &serde_json::Value, language_registry: &Arc, @@ -460,7 +508,6 @@ fn expanded_params_md( cx.new(|cx| Markdown::new(params_md.into(), Some(language_registry.clone()), None, cx)) } -#[allow(dead_code)] enum MessageType { Request, Response, diff --git a/crates/agent_servers/src/acp.rs b/crates/agent_servers/src/acp.rs index 2ef69c0fb0f3ab2262142aeb603e259c714f12a1..80a037a7e891466d85adb4f2ce559b4f4f4549e6 100644 --- a/crates/agent_servers/src/acp.rs +++ b/crates/agent_servers/src/acp.rs @@ -2,9 +2,10 @@ use acp_thread::{ AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest, AgentSessionListResponse, }; +use acp_tools::{AcpConnectionRegistry, StreamMessage, StreamMessageDirection}; use action_log::ActionLog; use agent_client_protocol_core::schema::{self as acp, ErrorCode}; -use agent_client_protocol_core::{ByteStreams, ConnectionTo}; +use agent_client_protocol_core::{ConnectionTo, Lines}; use anyhow::anyhow; use collections::HashMap; use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _}; @@ -259,8 +260,46 @@ impl AcpConnection { // closures to the !Send foreground thread. let (dispatch_tx, dispatch_rx) = mpsc::unbounded::(); - // Build the transport from child stdio - let transport = ByteStreams::new(stdin, stdout); + // Build a tapped transport that intercepts raw JSON-RPC lines for + // the ACP logs panel. We replicate the ByteStreams→Lines conversion + // manually so we can wrap the stream and sink with inspection. + let (stream_tap_tx, stream_tap_rx) = smol::channel::unbounded::(); + + let incoming_lines = futures::io::BufReader::new(stdout).lines(); + let tapped_incoming = incoming_lines.inspect({ + let tap_tx = stream_tap_tx.clone(); + move |result| { + if let Ok(line) = result { + if let Some(msg) = + StreamMessage::from_json_line(StreamMessageDirection::Incoming, line) + { + tap_tx.try_send(msg).ok(); + } + } + } + }); + + let outgoing_sink = + futures::sink::unfold(Box::pin(stdin), async move |mut writer, line: String| { + use futures::AsyncWriteExt; + let mut bytes = line.into_bytes(); + bytes.push(b'\n'); + writer.write_all(&bytes).await?; + Ok::<_, std::io::Error>(writer) + }); + let tapped_outgoing = futures::SinkExt::with(outgoing_sink, { + let tap_tx = stream_tap_tx; + move |line: String| { + if let Some(msg) = + StreamMessage::from_json_line(StreamMessageDirection::Outgoing, &line) + { + tap_tx.try_send(msg).ok(); + } + futures::future::ok::(line) + } + }); + + let transport = Lines::new(tapped_outgoing, tapped_incoming); // Use a oneshot channel to extract the ConnectionTo from the // connect_with closure. @@ -505,13 +544,11 @@ impl AcpConnection { } }); - // TODO: Update AcpConnectionRegistry to support ConnectionTo - // The old streaming/subscribe API is not available in agent-client-protocol-core. - // cx.update(|cx| { - // AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| { - // registry.set_active_connection(agent_id.clone(), &connection, cx) - // }); - // }); + cx.update(|cx| { + AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| { + registry.set_active_connection(agent_id.clone(), stream_tap_rx, cx) + }); + }); let response = connection .send_request(