diff --git a/Cargo.lock b/Cargo.lock index 4f52f444867c7ad96b7cd9609fa7cdcb038cc893..bd82df022a65d6a652cc3acaa2541f1261f592c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,6 +54,7 @@ dependencies = [ "collections", "gpui", "language", + "log", "markdown", "project", "serde", diff --git a/crates/acp_tools/Cargo.toml b/crates/acp_tools/Cargo.toml index 15d928b664fd2612aa1c6f9b7253b406d0e70793..7f341008f5e523b7f8ff1f179c9abfdad43222a9 100644 --- a/crates/acp_tools/Cargo.toml +++ b/crates/acp_tools/Cargo.toml @@ -18,6 +18,7 @@ agent-client-protocol.workspace = true collections.workspace = true gpui.workspace = true language.workspace= true +log.workspace = true markdown.workspace = true project.workspace = true serde.workspace = true diff --git a/crates/acp_tools/src/acp_tools.rs b/crates/acp_tools/src/acp_tools.rs index d17b8346aa17332a1100bd4a55f373dd2ac5760d..4876456fcbcecc1acc9f846058782c3445e690a0 100644 --- a/crates/acp_tools/src/acp_tools.rs +++ b/crates/acp_tools/src/acp_tools.rs @@ -17,7 +17,7 @@ use workspace::{ Item, ItemHandle, ToolbarItemEvent, ToolbarItemLocation, ToolbarItemView, Workspace, }; -pub type RequestId = String; +pub type RequestId = serde_json::Value; #[derive(Clone)] pub enum StreamMessageDirection { @@ -56,7 +56,7 @@ impl StreamMessage { let message = if let Some(method) = obj.get("method").and_then(|m| m.as_str()) { if let Some(id) = obj.get("id") { StreamMessageContent::Request { - id: id.to_string(), + id: id.clone(), method: method.into(), params: obj.get("params").cloned(), } @@ -68,15 +68,18 @@ impl StreamMessage { } } else if let Some(id) = obj.get("id") { if let Some(error) = obj.get("error") { - let acp_err = serde_json::from_value::(error.clone()) - .unwrap_or_else(|_| acp::Error::internal_error()); + let acp_err = + serde_json::from_value::(error.clone()).unwrap_or_else(|err| { + log::warn!("Failed to deserialize ACP error: {err}"); + acp::Error::internal_error() + }); StreamMessageContent::Response { - id: id.to_string(), + id: id.clone(), result: Err(acp_err), } } else { StreamMessageContent::Response { - id: id.to_string(), + id: id.clone(), result: Ok(obj.get("result").cloned()), } } @@ -107,6 +110,13 @@ struct GlobalAcpConnectionRegistry(Entity); impl Global for GlobalAcpConnectionRegistry {} +/// A raw JSON-RPC line captured from the transport, tagged with direction. +/// Deserialization into [`StreamMessage`] is deferred until a subscriber is listening. +pub struct RawStreamLine { + pub direction: StreamMessageDirection, + pub line: Arc, +} + #[derive(Default)] pub struct AcpConnectionRegistry { active_agent_id: Option, @@ -129,7 +139,7 @@ impl AcpConnectionRegistry { pub fn set_active_connection( &mut self, agent_id: AgentId, - messages_rx: smol::channel::Receiver, + raw_rx: smol::channel::Receiver, cx: &mut Context, ) { self.active_agent_id = Some(agent_id); @@ -137,10 +147,21 @@ impl AcpConnectionRegistry { self.subscribers.clear(); self._broadcast_task = Some(cx.spawn(async move |this, cx| { - while let Ok(message) = messages_rx.recv().await { + while let Ok(raw) = raw_rx.recv().await { this.update(cx, |this, _cx| { - this.subscribers - .retain(|sender| sender.try_send(message.clone()).is_ok()); + if this.subscribers.is_empty() { + return; + } + + let Some(message) = StreamMessage::from_json_line(raw.direction, &raw.line) + else { + return; + }; + + this.subscribers.retain(|sender| !sender.is_closed()); + for sender in &this.subscribers { + sender.try_send(message.clone()).ok(); + } }) .ok(); } @@ -150,7 +171,7 @@ impl AcpConnectionRegistry { } pub fn subscribe(&mut self) -> smol::channel::Receiver { - let (sender, receiver) = smol::channel::bounded(4096); + let (sender, receiver) = smol::channel::unbounded(); self.subscribers.push(sender); receiver } diff --git a/crates/agent_servers/src/acp.rs b/crates/agent_servers/src/acp.rs index 99c49da208fd739246f75c2251c5a75285f2d4db..830d74546eeed064a72c828984c000adb8aabbe7 100644 --- a/crates/agent_servers/src/acp.rs +++ b/crates/agent_servers/src/acp.rs @@ -2,7 +2,7 @@ use acp_thread::{ AgentConnection, AgentSessionInfo, AgentSessionList, AgentSessionListRequest, AgentSessionListResponse, }; -use acp_tools::{AcpConnectionRegistry, StreamMessage, StreamMessageDirection}; +use acp_tools::{AcpConnectionRegistry, RawStreamLine, StreamMessageDirection}; use action_log::ActionLog; use agent_client_protocol::schema::{self as acp, ErrorCode}; use agent_client_protocol::{ @@ -26,6 +26,7 @@ use util::process::Child; use std::path::PathBuf; use std::process::Stdio; use std::rc::Rc; +use std::sync::Arc; use std::{any::Any, cell::RefCell}; use thiserror::Error; @@ -228,11 +229,18 @@ macro_rules! dispatch_request_handler { ($dispatch_tx:expr, $handler:expr) => {{ let dispatch_tx = $dispatch_tx.clone(); async move |args, responder, _connection| { - dispatch_tx - .unbounded_send(Box::new(move |cx, ctx| { - $handler(args, responder, cx, ctx); - })) - .log_err(); + if dispatch_tx.is_closed() { + log::error!("dispatch channel closed, cannot handle request"); + responder + .respond_with_error(acp::Error::internal_error()) + .log_err(); + } else { + dispatch_tx + .unbounded_send(Box::new(move |cx, ctx| { + $handler(args, responder, cx, ctx); + })) + .log_err(); + } Ok(()) } }}; @@ -310,43 +318,40 @@ impl AcpConnection { let (dispatch_tx, dispatch_rx) = mpsc::unbounded::(); // Build a tapped transport that intercepts raw JSON-RPC lines for - // the ACP logs panel. We replicate the ByteStreams→Lines conversion - // manually so we can wrap the stream and sink with inspection. - let (stream_tap_tx, stream_tap_rx) = smol::channel::bounded::(4096); + // the ACP logs panel. Raw lines are sent without parsing — deserialization + // is deferred until a subscriber is actually listening. + let (stream_tap_tx, stream_tap_rx) = smol::channel::unbounded::(); let incoming_lines = futures::io::BufReader::new(stdout).lines(); let tapped_incoming = incoming_lines.inspect({ let tap_tx = stream_tap_tx.clone(); move |result| { if let Ok(line) = result { - if let Some(msg) = - StreamMessage::from_json_line(StreamMessageDirection::Incoming, line) - { - tap_tx.try_send(msg).ok(); - } + tap_tx + .try_send(RawStreamLine { + direction: StreamMessageDirection::Incoming, + line: Arc::from(line.as_str()), + }) + .ok(); } } }); - let outgoing_sink = - futures::sink::unfold(Box::pin(stdin), async move |mut writer, line: String| { + let tapped_outgoing = futures::sink::unfold( + (Box::pin(stdin), stream_tap_tx), + async move |(mut writer, tap_tx), line: String| { use futures::AsyncWriteExt; - let mut bytes = line.into_bytes(); - bytes.push(b'\n'); - writer.write_all(&bytes).await?; - Ok::<_, std::io::Error>(writer) - }); - let tapped_outgoing = futures::SinkExt::with(outgoing_sink, { - let tap_tx = stream_tap_tx; - move |line: String| { - if let Some(msg) = - StreamMessage::from_json_line(StreamMessageDirection::Outgoing, &line) - { - tap_tx.try_send(msg).ok(); - } - futures::future::ok::(line) - } - }); + writer.write_all(line.as_bytes()).await?; + writer.write_all(b"\n").await?; + tap_tx + .try_send(RawStreamLine { + direction: StreamMessageDirection::Outgoing, + line: Arc::from(line.as_str()), + }) + .ok(); + Ok::<_, std::io::Error>((writer, tap_tx)) + }, + ); let transport = Lines::new(tapped_outgoing, tapped_incoming); @@ -402,7 +407,11 @@ impl AcpConnection { .connect_with( transport, move |connection: ConnectionTo| async move { - connection_tx.send(connection.clone()).ok(); + if connection_tx.send(connection.clone()).is_err() { + log::error!( + "failed to send ACP connection handle — receiver was dropped" + ); + } // Keep the connection alive until the transport closes. futures::future::pending::>().await }, @@ -1768,7 +1777,8 @@ fn handle_session_notification( cx: &mut AsyncApp, ctx: &ClientContext, ) { - let (thread, update_clone) = { + // Extract everything we need from the session while briefly borrowing. + let (thread, session_modes, config_opts_data, update_clone) = { let sessions = ctx.sessions.borrow(); let Some(session) = sessions.get(¬ification.session_id) else { log::warn!( @@ -1777,77 +1787,80 @@ fn handle_session_notification( ); return; }; + ( + session.thread.clone(), + session.session_modes.clone(), + session + .config_options + .as_ref() + .map(|opts| (opts.config_options.clone(), opts.tx.clone())), + notification.update.clone(), + ) + }; + // Borrow is dropped here. - if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate { - current_mode_id, - .. - }) = ¬ification.update - { - if let Some(session_modes) = &session.session_modes { - session_modes.borrow_mut().current_mode_id = current_mode_id.clone(); - } - } - - if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate { - config_options, - .. - }) = ¬ification.update - { - if let Some(opts) = &session.config_options { - *opts.config_options.borrow_mut() = config_options.clone(); - opts.tx.borrow_mut().send(()).ok(); - } + // Apply mode/config/session_list updates without holding the borrow. + if let acp::SessionUpdate::CurrentModeUpdate(acp::CurrentModeUpdate { + current_mode_id, .. + }) = ¬ification.update + { + if let Some(session_modes) = &session_modes { + session_modes.borrow_mut().current_mode_id = current_mode_id.clone(); } + } - if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update - && let Some(session_list) = ctx.session_list.borrow().as_ref() - { - session_list.send_info_update(notification.session_id.clone(), info_update.clone()); + if let acp::SessionUpdate::ConfigOptionUpdate(acp::ConfigOptionUpdate { + config_options, .. + }) = ¬ification.update + { + if let Some((config_opts_cell, tx_cell)) = &config_opts_data { + *config_opts_cell.borrow_mut() = config_options.clone(); + tx_cell.borrow_mut().send(()).ok(); } + } - let update_clone = notification.update.clone(); - let thread = session.thread.clone(); + if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update + && let Some(session_list) = ctx.session_list.borrow().as_ref() + { + session_list.send_info_update(notification.session_id.clone(), info_update.clone()); + } - // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal. - if let acp::SessionUpdate::ToolCall(tc) = &update_clone { - if let Some(meta) = &tc.meta { - if let Some(terminal_info) = meta.get("terminal_info") { - if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) - { - let terminal_id = acp::TerminalId::new(id_str); - let cwd = terminal_info - .get("cwd") - .and_then(|v| v.as_str().map(PathBuf::from)); + // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal. + if let acp::SessionUpdate::ToolCall(tc) = &update_clone { + if let Some(meta) = &tc.meta { + if let Some(terminal_info) = meta.get("terminal_info") { + if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) { + let terminal_id = acp::TerminalId::new(id_str); + let cwd = terminal_info + .get("cwd") + .and_then(|v| v.as_str().map(PathBuf::from)); - let _ = thread.update(cx, |thread, cx| { - let builder = TerminalBuilder::new_display_only( - CursorShape::default(), - AlternateScroll::On, - None, - 0, - cx.background_executor(), - thread.project().read(cx).path_style(cx), - )?; - let lower = cx.new(|cx| builder.subscribe(cx)); - thread.on_terminal_provider_event( - TerminalProviderEvent::Created { - terminal_id, - label: tc.title.clone(), - cwd, - output_byte_limit: None, - terminal: lower, - }, - cx, - ); - anyhow::Ok(()) - }); - } + let _ = thread.update(cx, |thread, cx| { + let builder = TerminalBuilder::new_display_only( + CursorShape::default(), + AlternateScroll::On, + None, + 0, + cx.background_executor(), + thread.project().read(cx).path_style(cx), + )?; + let lower = cx.new(|cx| builder.subscribe(cx)); + thread.on_terminal_provider_event( + TerminalProviderEvent::Created { + terminal_id, + label: tc.title.clone(), + cwd, + output_byte_limit: None, + terminal: lower, + }, + cx, + ); + anyhow::Ok(()) + }); } } } - - (thread, update_clone) - }; + } // Forward the update to the acp_thread as usual. if let Err(err) = thread