diff --git a/crates/acp_tools/src/acp_tools.rs b/crates/acp_tools/src/acp_tools.rs index 94d8cff0c8d4d30db29babb31d84dfcffca9b82b..c048c1cf5fda20cab0a9bcb89b6777d4a068b67e 100644 --- a/crates/acp_tools/src/acp_tools.rs +++ b/crates/acp_tools/src/acp_tools.rs @@ -160,10 +160,10 @@ impl AcpConnectionRegistry { this.subscribers.retain(|sender| !sender.is_closed()); for sender in &this.subscribers { - sender.try_send(message.clone()).ok(); + sender.try_send(message.clone()).log_err(); } }) - .ok(); + .log_err(); } // The transport closed — clear state so observers (e.g. the ACP @@ -173,7 +173,7 @@ impl AcpConnectionRegistry { this.subscribers.clear(); cx.notify(); }) - .ok(); + .log_err(); })); cx.notify(); @@ -252,7 +252,7 @@ impl AcpTools { this.update(cx, |this, cx| { this.push_stream_message(message, cx); }) - .ok(); + .log_err(); } }); diff --git a/crates/agent_servers/src/acp.rs b/crates/agent_servers/src/acp.rs index 7fc6855d19a8f4ab4da8ff7448210cbd7539a0b9..0eb047fe2592be9a531b451caa63fc178b514d15 100644 --- a/crates/agent_servers/src/acp.rs +++ b/crates/agent_servers/src/acp.rs @@ -229,14 +229,14 @@ macro_rules! dispatch_request_handler { ($dispatch_tx:expr, $handler:expr) => {{ let dispatch_tx = $dispatch_tx.clone(); async move |args, responder, _connection| { - if let Err(_) = dispatch_tx.unbounded_send(Box::new(move |cx, ctx| { - $handler(args, responder, cx, ctx); - })) { - // The dispatch channel is closed — the AcpConnection is being - // torn down and the child process is being killed. The responder - // inside the work item drops without responding; the agent won't - // be around to notice since the transport is also closing. - log::error!("dispatch channel closed, cannot handle request"); + if dispatch_tx.is_closed() { + respond_err(responder, acp::Error::internal_error()); + } else { + dispatch_tx + .unbounded_send(Box::new(move |cx, ctx| { + $handler(args, responder, cx, ctx); + })) + .log_err(); } Ok(()) } @@ -329,7 +329,7 @@ impl AcpConnection { direction: StreamMessageDirection::Incoming, line: Arc::from(line.as_str()), }) - .ok(); + .log_err(); } } }); @@ -338,14 +338,15 @@ impl AcpConnection { (Box::pin(stdin), stream_tap_tx), async move |(mut writer, tap_tx), line: String| { use futures::AsyncWriteExt; - writer.write_all(line.as_bytes()).await?; - writer.write_all(b"\n").await?; tap_tx .try_send(RawStreamLine { direction: StreamMessageDirection::Outgoing, line: Arc::from(line.as_str()), }) - .ok(); + .log_err(); + let mut bytes = line.into_bytes(); + bytes.push(b'\n'); + writer.write_all(&bytes).await?; Ok::<_, std::io::Error>((writer, tap_tx)) }, ); @@ -1713,12 +1714,13 @@ fn handle_write_text_file( cx.spawn(async move |cx| { let result: Result<_, acp::Error> = async { - let task = thread + thread .update(cx, |thread, cx| { thread.write_text_file(args.path, args.content, cx) }) + .map_err(acp::Error::from)? + .await .map_err(acp::Error::from)?; - task.await.map_err(acp::Error::from)?; Ok(()) } .await; @@ -1775,7 +1777,7 @@ fn handle_session_notification( ctx: &ClientContext, ) { // Extract everything we need from the session while briefly borrowing. - let (thread, session_modes, config_opts_data, update_clone) = { + let (thread, session_modes, config_opts_data) = { let sessions = ctx.sessions.borrow(); let Some(session) = sessions.get(¬ification.session_id) else { log::warn!( @@ -1791,7 +1793,6 @@ fn handle_session_notification( .config_options .as_ref() .map(|opts| (opts.config_options.clone(), opts.tx.clone())), - notification.update.clone(), ) }; // Borrow is dropped here. @@ -1823,7 +1824,7 @@ fn handle_session_notification( } // Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal. - if let acp::SessionUpdate::ToolCall(tc) = &update_clone { + if let acp::SessionUpdate::ToolCall(tc) = ¬ification.update { if let Some(meta) = &tc.meta { if let Some(terminal_info) = meta.get("terminal_info") { if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) { @@ -1875,7 +1876,7 @@ fn handle_session_notification( } // Post-handle: stream terminal output/exit if present on ToolCallUpdate meta. - if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone { + if let acp::SessionUpdate::ToolCallUpdate(tcu) = ¬ification.update { if let Some(meta) = &tcu.meta { if let Some(term_out) = meta.get("terminal_output") { if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {