Fix acp logs

Ben Brandt created

Change summary

Cargo.lock                        |  1 
crates/acp_tools/Cargo.toml       |  1 
crates/acp_tools/src/acp_tools.rs | 89 +++++++++++++++++++++++++-------
crates/agent_servers/src/acp.rs   | 57 +++++++++++++++++---
4 files changed, 117 insertions(+), 31 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -59,6 +59,7 @@ dependencies = [
  "serde",
  "serde_json",
  "settings",
+ "smol",
  "theme_settings",
  "ui",
  "util",

crates/acp_tools/Cargo.toml 🔗

@@ -22,6 +22,7 @@ markdown.workspace = true
 project.workspace = true
 serde.workspace = true
 serde_json.workspace = true
+smol.workspace = true
 settings.workspace = true
 theme_settings.workspace = true
 ui.workspace = true

crates/acp_tools/src/acp_tools.rs 🔗

@@ -17,21 +17,16 @@ use workspace::{
     Item, ItemHandle, ToolbarItemEvent, ToolbarItemLocation, ToolbarItemView, Workspace,
 };
 
-// Stub types for the old streaming API which is not available in agent-client-protocol-core.
-// These allow the ACP debug panel code to compile but the streaming won't be active.
-#[allow(dead_code)]
-type RequestId = String;
+pub type RequestId = String;
 
 #[derive(Clone)]
-#[allow(dead_code)]
-enum StreamMessageDirection {
+pub enum StreamMessageDirection {
     Incoming,
     Outgoing,
 }
 
 #[derive(Clone)]
-#[allow(dead_code)]
-enum StreamMessageContent {
+pub enum StreamMessageContent {
     Request {
         id: RequestId,
         method: Arc<str>,
@@ -47,10 +42,49 @@ enum StreamMessageContent {
     },
 }
 
-#[allow(dead_code)]
-struct StreamMessage {
-    direction: StreamMessageDirection,
-    message: StreamMessageContent,
+pub struct StreamMessage {
+    pub direction: StreamMessageDirection,
+    pub message: StreamMessageContent,
+}
+
+impl StreamMessage {
+    pub fn from_json_line(direction: StreamMessageDirection, line: &str) -> Option<Self> {
+        let value: serde_json::Value = serde_json::from_str(line).ok()?;
+        let obj = value.as_object()?;
+
+        let message = if let Some(method) = obj.get("method").and_then(|m| m.as_str()) {
+            if let Some(id) = obj.get("id") {
+                StreamMessageContent::Request {
+                    id: id.to_string(),
+                    method: method.into(),
+                    params: obj.get("params").cloned(),
+                }
+            } else {
+                StreamMessageContent::Notification {
+                    method: method.into(),
+                    params: obj.get("params").cloned(),
+                }
+            }
+        } else if let Some(id) = obj.get("id") {
+            if let Some(error) = obj.get("error") {
+                let acp_err = serde_json::from_value::<acp::Error>(error.clone())
+                    .unwrap_or_else(|_| acp::Error::internal_error());
+                StreamMessageContent::Response {
+                    id: id.to_string(),
+                    result: Err(acp_err),
+                }
+            } else {
+                StreamMessageContent::Response {
+                    id: id.to_string(),
+                    result: Ok(obj.get("result").cloned()),
+                }
+            }
+        } else {
+            return None;
+        };
+
+        Some(StreamMessage { direction, message })
+    }
 }
 
 actions!(dev, [OpenAcpLogs]);
@@ -79,6 +113,7 @@ pub struct AcpConnectionRegistry {
 
 struct ActiveConnection {
     agent_id: AgentId,
+    messages_rx: smol::channel::Receiver<StreamMessage>,
 }
 
 impl AcpConnectionRegistry {
@@ -92,9 +127,16 @@ impl AcpConnectionRegistry {
         }
     }
 
-    pub fn set_active_connection(&self, agent_id: AgentId, cx: &mut Context<Self>) {
-        self.active_connection
-            .replace(Some(ActiveConnection { agent_id }));
+    pub fn set_active_connection(
+        &self,
+        agent_id: AgentId,
+        messages_rx: smol::channel::Receiver<StreamMessage>,
+        cx: &mut Context<Self>,
+    ) {
+        self.active_connection.replace(Some(ActiveConnection {
+            agent_id,
+            messages_rx,
+        }));
         cx.notify();
     }
 }
@@ -112,9 +154,7 @@ struct WatchedConnection {
     agent_id: AgentId,
     messages: Vec<WatchedConnectionMessage>,
     list_state: ListState,
-    #[allow(dead_code)]
     incoming_request_methods: HashMap<RequestId, Arc<str>>,
-    #[allow(dead_code)]
     outgoing_request_methods: HashMap<RequestId, Arc<str>>,
     _task: Task<()>,
 }
@@ -152,17 +192,26 @@ impl AcpTools {
             }
         }
 
+        let messages_rx = active_connection.messages_rx.clone();
+        let task = cx.spawn(async move |this, cx| {
+            while let Ok(message) = messages_rx.recv().await {
+                this.update(cx, |this, cx| {
+                    this.push_stream_message(message, cx);
+                })
+                .ok();
+            }
+        });
+
         self.watched_connection = Some(WatchedConnection {
             agent_id: active_connection.agent_id.clone(),
             messages: vec![],
             list_state: ListState::new(0, ListAlignment::Bottom, px(2048.)),
             incoming_request_methods: HashMap::default(),
             outgoing_request_methods: HashMap::default(),
-            _task: Task::ready(()),
+            _task: task,
         });
     }
 
-    #[allow(dead_code)]
     fn push_stream_message(&mut self, stream_message: StreamMessage, cx: &mut Context<Self>) {
         let Some(connection) = self.watched_connection.as_mut() else {
             return;
@@ -427,7 +476,6 @@ impl WatchedConnectionMessage {
     }
 }
 
-#[allow(dead_code)]
 fn collapsed_params_md(
     params: &serde_json::Value,
     language_registry: &Arc<LanguageRegistry>,
@@ -460,7 +508,6 @@ fn expanded_params_md(
     cx.new(|cx| Markdown::new(params_md.into(), Some(language_registry.clone()), None, cx))
 }
 
-#[allow(dead_code)]
 enum MessageType {
     Request,
     Response,

crates/agent_servers/src/acp.rs 🔗

@@ -2,9 +2,10 @@ use acp_thread::{
     AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
     AgentSessionListResponse,
 };
+use acp_tools::{AcpConnectionRegistry, StreamMessage, StreamMessageDirection};
 use action_log::ActionLog;
 use agent_client_protocol_core::schema::{self as acp, ErrorCode};
-use agent_client_protocol_core::{ByteStreams, ConnectionTo};
+use agent_client_protocol_core::{ConnectionTo, Lines};
 use anyhow::anyhow;
 use collections::HashMap;
 use feature_flags::{AcpBetaFeatureFlag, FeatureFlagAppExt as _};
@@ -259,8 +260,46 @@ impl AcpConnection {
         // closures to the !Send foreground thread.
         let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
 
-        // Build the transport from child stdio
-        let transport = ByteStreams::new(stdin, stdout);
+        // Build a tapped transport that intercepts raw JSON-RPC lines for
+        // the ACP logs panel. We replicate the ByteStreams→Lines conversion
+        // manually so we can wrap the stream and sink with inspection.
+        let (stream_tap_tx, stream_tap_rx) = smol::channel::unbounded::<StreamMessage>();
+
+        let incoming_lines = futures::io::BufReader::new(stdout).lines();
+        let tapped_incoming = incoming_lines.inspect({
+            let tap_tx = stream_tap_tx.clone();
+            move |result| {
+                if let Ok(line) = result {
+                    if let Some(msg) =
+                        StreamMessage::from_json_line(StreamMessageDirection::Incoming, line)
+                    {
+                        tap_tx.try_send(msg).ok();
+                    }
+                }
+            }
+        });
+
+        let outgoing_sink =
+            futures::sink::unfold(Box::pin(stdin), async move |mut writer, line: String| {
+                use futures::AsyncWriteExt;
+                let mut bytes = line.into_bytes();
+                bytes.push(b'\n');
+                writer.write_all(&bytes).await?;
+                Ok::<_, std::io::Error>(writer)
+            });
+        let tapped_outgoing = futures::SinkExt::with(outgoing_sink, {
+            let tap_tx = stream_tap_tx;
+            move |line: String| {
+                if let Some(msg) =
+                    StreamMessage::from_json_line(StreamMessageDirection::Outgoing, &line)
+                {
+                    tap_tx.try_send(msg).ok();
+                }
+                futures::future::ok::<String, std::io::Error>(line)
+            }
+        });
+
+        let transport = Lines::new(tapped_outgoing, tapped_incoming);
 
         // Use a oneshot channel to extract the ConnectionTo<Agent> from the
         // connect_with closure.
@@ -505,13 +544,11 @@ impl AcpConnection {
             }
         });
 
-        // TODO: Update AcpConnectionRegistry to support ConnectionTo<Agent>
-        // The old streaming/subscribe API is not available in agent-client-protocol-core.
-        // cx.update(|cx| {
-        //     AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
-        //         registry.set_active_connection(agent_id.clone(), &connection, cx)
-        //     });
-        // });
+        cx.update(|cx| {
+            AcpConnectionRegistry::default_global(cx).update(cx, |registry, cx| {
+                registry.set_active_connection(agent_id.clone(), stream_tap_rx, cx)
+            });
+        });
 
         let response = connection
             .send_request(