clean up

Ben Brandt created

Change summary

Cargo.lock                        |   1 
crates/acp_tools/Cargo.toml       |   1 
crates/acp_tools/src/acp_tools.rs |  43 +++++-
crates/agent_servers/src/acp.rs   | 203 +++++++++++++++++---------------
4 files changed, 142 insertions(+), 106 deletions(-)

Detailed changes

Cargo.lock ๐Ÿ”—

@@ -54,6 +54,7 @@ dependencies = [
  "collections",
  "gpui",
  "language",
+ "log",
  "markdown",
  "project",
  "serde",

crates/acp_tools/Cargo.toml ๐Ÿ”—

@@ -18,6 +18,7 @@ agent-client-protocol.workspace = true
 collections.workspace = true
 gpui.workspace = true
 language.workspace= true
+log.workspace = true
 markdown.workspace = true
 project.workspace = true
 serde.workspace = true

crates/acp_tools/src/acp_tools.rs ๐Ÿ”—

@@ -17,7 +17,7 @@ use workspace::{
     Item, ItemHandle, ToolbarItemEvent, ToolbarItemLocation, ToolbarItemView, Workspace,
 };
 
-pub type RequestId = String;
+pub type RequestId = serde_json::Value;
 
 #[derive(Clone)]
 pub enum StreamMessageDirection {
@@ -56,7 +56,7 @@ impl StreamMessage {
         let message = if let Some(method) = obj.get("method").and_then(|m| m.as_str()) {
             if let Some(id) = obj.get("id") {
                 StreamMessageContent::Request {
-                    id: id.to_string(),
+                    id: id.clone(),
                     method: method.into(),
                     params: obj.get("params").cloned(),
                 }
@@ -68,15 +68,18 @@ impl StreamMessage {
             }
         } else if let Some(id) = obj.get("id") {
             if let Some(error) = obj.get("error") {
-                let acp_err = serde_json::from_value::<acp::Error>(error.clone())
-                    .unwrap_or_else(|_| acp::Error::internal_error());
+                let acp_err =
+                    serde_json::from_value::<acp::Error>(error.clone()).unwrap_or_else(|err| {
+                        log::warn!("Failed to deserialize ACP error: {err}");
+                        acp::Error::internal_error()
+                    });
                 StreamMessageContent::Response {
-                    id: id.to_string(),
+                    id: id.clone(),
                     result: Err(acp_err),
                 }
             } else {
                 StreamMessageContent::Response {
-                    id: id.to_string(),
+                    id: id.clone(),
                     result: Ok(obj.get("result").cloned()),
                 }
             }
@@ -107,6 +110,13 @@ struct GlobalAcpConnectionRegistry(Entity<AcpConnectionRegistry>);
 
 impl Global for GlobalAcpConnectionRegistry {}
 
+/// A raw JSON-RPC line captured from the transport, tagged with direction.
+/// Deserialization into [`StreamMessage`] is deferred until a subscriber is listening.
+pub struct RawStreamLine {
+    pub direction: StreamMessageDirection,
+    pub line: Arc<str>,
+}
+
 #[derive(Default)]
 pub struct AcpConnectionRegistry {
     active_agent_id: Option<AgentId>,
@@ -129,7 +139,7 @@ impl AcpConnectionRegistry {
     pub fn set_active_connection(
         &mut self,
         agent_id: AgentId,
-        messages_rx: smol::channel::Receiver<StreamMessage>,
+        raw_rx: smol::channel::Receiver<RawStreamLine>,
         cx: &mut Context<Self>,
     ) {
         self.active_agent_id = Some(agent_id);
@@ -137,10 +147,21 @@ impl AcpConnectionRegistry {
         self.subscribers.clear();
 
         self._broadcast_task = Some(cx.spawn(async move |this, cx| {
-            while let Ok(message) = messages_rx.recv().await {
+            while let Ok(raw) = raw_rx.recv().await {
                 this.update(cx, |this, _cx| {
-                    this.subscribers
-                        .retain(|sender| sender.try_send(message.clone()).is_ok());
+                    if this.subscribers.is_empty() {
+                        return;
+                    }
+
+                    let Some(message) = StreamMessage::from_json_line(raw.direction, &raw.line)
+                    else {
+                        return;
+                    };
+
+                    this.subscribers.retain(|sender| !sender.is_closed());
+                    for sender in &this.subscribers {
+                        sender.try_send(message.clone()).ok();
+                    }
                 })
                 .ok();
             }
@@ -150,7 +171,7 @@ impl AcpConnectionRegistry {
     }
 
     pub fn subscribe(&mut self) -> smol::channel::Receiver<StreamMessage> {
-        let (sender, receiver) = smol::channel::bounded(4096);
+        let (sender, receiver) = smol::channel::unbounded();
         self.subscribers.push(sender);
         receiver
     }

crates/agent_servers/src/acp.rs ๐Ÿ”—

@@ -2,7 +2,7 @@ use acp_thread::{
     AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest,
     AgentSessionListResponse,
 };
-use acp_tools::{AcpConnectionRegistry, StreamMessage, StreamMessageDirection};
+use acp_tools::{AcpConnectionRegistry, RawStreamLine, StreamMessageDirection};
 use action_log::ActionLog;
 use agent_client_protocol::schema::{self as acp, ErrorCode};
 use agent_client_protocol::{
@@ -26,6 +26,7 @@ use util::process::Child;
 use std::path::PathBuf;
 use std::process::Stdio;
 use std::rc::Rc;
+use std::sync::Arc;
 use std::{any::Any, cell::RefCell};
 use thiserror::Error;
 
@@ -228,11 +229,18 @@ macro_rules! dispatch_request_handler {
     ($dispatch_tx:expr, $handler:expr) => {{
         let dispatch_tx = $dispatch_tx.clone();
         async move |args, responder, _connection| {
-            dispatch_tx
-                .unbounded_send(Box::new(move |cx, ctx| {
-                    $handler(args, responder, cx, ctx);
-                }))
-                .log_err();
+            if dispatch_tx.is_closed() {
+                log::error!("dispatch channel closed, cannot handle request");
+                responder
+                    .respond_with_error(acp::Error::internal_error())
+                    .log_err();
+            } else {
+                dispatch_tx
+                    .unbounded_send(Box::new(move |cx, ctx| {
+                        $handler(args, responder, cx, ctx);
+                    }))
+                    .log_err();
+            }
             Ok(())
         }
     }};
@@ -310,43 +318,40 @@ impl AcpConnection {
         let (dispatch_tx, dispatch_rx) = mpsc::unbounded::<ForegroundWork>();
 
         // Build a tapped transport that intercepts raw JSON-RPC lines for
-        // the ACP logs panel. We replicate the ByteStreamsโ†’Lines conversion
-        // manually so we can wrap the stream and sink with inspection.
-        let (stream_tap_tx, stream_tap_rx) = smol::channel::bounded::<StreamMessage>(4096);
+        // the ACP logs panel. Raw lines are sent without parsing โ€” deserialization
+        // is deferred until a subscriber is actually listening.
+        let (stream_tap_tx, stream_tap_rx) = smol::channel::unbounded::<RawStreamLine>();
 
         let incoming_lines = futures::io::BufReader::new(stdout).lines();
         let tapped_incoming = incoming_lines.inspect({
             let tap_tx = stream_tap_tx.clone();
             move |result| {
                 if let Ok(line) = result {
-                    if let Some(msg) =
-                        StreamMessage::from_json_line(StreamMessageDirection::Incoming, line)
-                    {
-                        tap_tx.try_send(msg).ok();
-                    }
+                    tap_tx
+                        .try_send(RawStreamLine {
+                            direction: StreamMessageDirection::Incoming,
+                            line: Arc::from(line.as_str()),
+                        })
+                        .ok();
                 }
             }
         });
 
-        let outgoing_sink =
-            futures::sink::unfold(Box::pin(stdin), async move |mut writer, line: String| {
+        let tapped_outgoing = futures::sink::unfold(
+            (Box::pin(stdin), stream_tap_tx),
+            async move |(mut writer, tap_tx), line: String| {
                 use futures::AsyncWriteExt;
-                let mut bytes = line.into_bytes();
-                bytes.push(b'\n');
-                writer.write_all(&bytes).await?;
-                Ok::<_, std::io::Error>(writer)
-            });
-        let tapped_outgoing = futures::SinkExt::with(outgoing_sink, {
-            let tap_tx = stream_tap_tx;
-            move |line: String| {
-                if let Some(msg) =
-                    StreamMessage::from_json_line(StreamMessageDirection::Outgoing, &line)
-                {
-                    tap_tx.try_send(msg).ok();
-                }
-                futures::future::ok::<String, std::io::Error>(line)
-            }
-        });
+                writer.write_all(line.as_bytes()).await?;
+                writer.write_all(b"\n").await?;
+                tap_tx
+                    .try_send(RawStreamLine {
+                        direction: StreamMessageDirection::Outgoing,
+                        line: Arc::from(line.as_str()),
+                    })
+                    .ok();
+                Ok::<_, std::io::Error>((writer, tap_tx))
+            },
+        );
 
         let transport = Lines::new(tapped_outgoing, tapped_incoming);
 
@@ -402,7 +407,11 @@ impl AcpConnection {
                 .connect_with(
                     transport,
                     move |connection: ConnectionTo<Agent>| async move {
-                        connection_tx.send(connection.clone()).ok();
+                        if connection_tx.send(connection.clone()).is_err() {
+                            log::error!(
+                                "failed to send ACP connection handle โ€” receiver was dropped"
+                            );
+                        }
                         // Keep the connection alive until the transport closes.
                         futures::future::pending::<Result<(), acp::Error>>().await
                     },
@@ -1768,7 +1777,8 @@ fn handle_session_notification(
     cx: &mut AsyncApp,
     ctx: &ClientContext,
 ) {
-    let (thread, update_clone) = {
+    // Extract everything we need from the session while briefly borrowing.
+    let (thread, session_modes, config_opts_data, update_clone) = {
         let sessions = ctx.sessions.borrow();
         let Some(session) = sessions.get(&notification.session_id) else {
             log::warn!(
@@ -1777,77 +1787,80 @@ fn handle_session_notification(
             );
             return;
         };
+        (
+            session.thread.clone(),
+            session.session_modes.clone(),
+            session
+                .config_options
+                .as_ref()
+                .map(|opts| (opts.config_options.clone(), opts.tx.clone())),
+            notification.update.clone(),
+        )
+    };
+    // Borrow is dropped here.
 
-        if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
-            current_mode_id,
-            ..
-        }) = &notification.update
-        {
-            if let Some(session_modes) = &session.session_modes {
-                session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
-            }
-        }
-
-        if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
-            config_options,
-            ..
-        }) = &notification.update
-        {
-            if let Some(opts) = &session.config_options {
-                *opts.config_options.borrow_mut() = config_options.clone();
-                opts.tx.borrow_mut().send(()).ok();
-            }
+    // Apply mode/config/session_list updates without holding the borrow.
+    if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate {
+        current_mode_id, ..
+    }) = &notification.update
+    {
+        if let Some(session_modes) = &session_modes {
+            session_modes.borrow_mut().current_mode_id = current_mode_id.clone();
         }
+    }
 
-        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
-            && let Some(session_list) = ctx.session_list.borrow().as_ref()
-        {
-            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
+    if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate {
+        config_options, ..
+    }) = &notification.update
+    {
+        if let Some((config_opts_cell, tx_cell)) = &config_opts_data {
+            *config_opts_cell.borrow_mut() = config_options.clone();
+            tx_cell.borrow_mut().send(()).ok();
         }
+    }
 
-        let update_clone = notification.update.clone();
-        let thread = session.thread.clone();
+    if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
+        && let Some(session_list) = ctx.session_list.borrow().as_ref()
+    {
+        session_list.send_info_update(notification.session_id.clone(), info_update.clone());
+    }
 
-        // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
-        if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
-            if let Some(meta) = &tc.meta {
-                if let Some(terminal_info) = meta.get("terminal_info") {
-                    if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str())
-                    {
-                        let terminal_id = acp::TerminalId::new(id_str);
-                        let cwd = terminal_info
-                            .get("cwd")
-                            .and_then(|v| v.as_str().map(PathBuf::from));
+    // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
+    if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
+        if let Some(meta) = &tc.meta {
+            if let Some(terminal_info) = meta.get("terminal_info") {
+                if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) {
+                    let terminal_id = acp::TerminalId::new(id_str);
+                    let cwd = terminal_info
+                        .get("cwd")
+                        .and_then(|v| v.as_str().map(PathBuf::from));
 
-                        let _ = thread.update(cx, |thread, cx| {
-                            let builder = TerminalBuilder::new_display_only(
-                                CursorShape::default(),
-                                AlternateScroll::On,
-                                None,
-                                0,
-                                cx.background_executor(),
-                                thread.project().read(cx).path_style(cx),
-                            )?;
-                            let lower = cx.new(|cx| builder.subscribe(cx));
-                            thread.on_terminal_provider_event(
-                                TerminalProviderEvent::Created {
-                                    terminal_id,
-                                    label: tc.title.clone(),
-                                    cwd,
-                                    output_byte_limit: None,
-                                    terminal: lower,
-                                },
-                                cx,
-                            );
-                            anyhow::Ok(())
-                        });
-                    }
+                    let _ = thread.update(cx, |thread, cx| {
+                        let builder = TerminalBuilder::new_display_only(
+                            CursorShape::default(),
+                            AlternateScroll::On,
+                            None,
+                            0,
+                            cx.background_executor(),
+                            thread.project().read(cx).path_style(cx),
+                        )?;
+                        let lower = cx.new(|cx| builder.subscribe(cx));
+                        thread.on_terminal_provider_event(
+                            TerminalProviderEvent::Created {
+                                terminal_id,
+                                label: tc.title.clone(),
+                                cwd,
+                                output_byte_limit: None,
+                                terminal: lower,
+                            },
+                            cx,
+                        );
+                        anyhow::Ok(())
+                    });
                 }
             }
         }
-
-        (thread, update_clone)
-    };
+    }
 
     // Forward the update to the acp_thread as usual.
     if let Err(err) = thread