@@ -160,10 +160,10 @@ impl AcpConnectionRegistry {
this.subscribers.retain(|sender| !sender.is_closed());
for sender in &this.subscribers {
- sender.try_send(message.clone()).ok();
+ sender.try_send(message.clone()).log_err();
}
})
- .ok();
+ .log_err();
}
// The transport closed — clear state so observers (e.g. the ACP
@@ -173,7 +173,7 @@ impl AcpConnectionRegistry {
this.subscribers.clear();
cx.notify();
})
- .ok();
+ .log_err();
}));
cx.notify();
@@ -252,7 +252,7 @@ impl AcpTools {
this.update(cx, |this, cx| {
this.push_stream_message(message, cx);
})
- .ok();
+ .log_err();
}
});
@@ -229,14 +229,14 @@ macro_rules! dispatch_request_handler {
($dispatch_tx:expr, $handler:expr) => {{
let dispatch_tx = $dispatch_tx.clone();
async move |args, responder, _connection| {
- if let Err(_) = dispatch_tx.unbounded_send(Box::new(move |cx, ctx| {
- $handler(args, responder, cx, ctx);
- })) {
- // The dispatch channel is closed — the AcpConnection is being
- // torn down and the child process is being killed. The responder
- // inside the work item drops without responding; the agent won't
- // be around to notice since the transport is also closing.
- log::error!("dispatch channel closed, cannot handle request");
+ if dispatch_tx.is_closed() {
+ respond_err(responder, acp::Error::internal_error());
+ } else {
+ dispatch_tx
+ .unbounded_send(Box::new(move |cx, ctx| {
+ $handler(args, responder, cx, ctx);
+ }))
+ .log_err();
}
Ok(())
}
@@ -329,7 +329,7 @@ impl AcpConnection {
direction: StreamMessageDirection::Incoming,
line: Arc::from(line.as_str()),
})
- .ok();
+ .log_err();
}
}
});
@@ -338,14 +338,15 @@ impl AcpConnection {
(Box::pin(stdin), stream_tap_tx),
async move |(mut writer, tap_tx), line: String| {
use futures::AsyncWriteExt;
- writer.write_all(line.as_bytes()).await?;
- writer.write_all(b"\n").await?;
tap_tx
.try_send(RawStreamLine {
direction: StreamMessageDirection::Outgoing,
line: Arc::from(line.as_str()),
})
- .ok();
+ .log_err();
+ let mut bytes = line.into_bytes();
+ bytes.push(b'\n');
+ writer.write_all(&bytes).await?;
Ok::<_, std::io::Error>((writer, tap_tx))
},
);
@@ -1713,12 +1714,13 @@ fn handle_write_text_file(
cx.spawn(async move |cx| {
let result: Result<_, acp::Error> = async {
- let task = thread
+ thread
.update(cx, |thread, cx| {
thread.write_text_file(args.path, args.content, cx)
})
+ .map_err(acp::Error::from)?
+ .await
.map_err(acp::Error::from)?;
- task.await.map_err(acp::Error::from)?;
Ok(())
}
.await;
@@ -1775,7 +1777,7 @@ fn handle_session_notification(
ctx: &ClientContext,
) {
// Extract everything we need from the session while briefly borrowing.
- let (thread, session_modes, config_opts_data, update_clone) = {
+ let (thread, session_modes, config_opts_data) = {
let sessions = ctx.sessions.borrow();
let Some(session) = sessions.get(¬ification.session_id) else {
log::warn!(
@@ -1791,7 +1793,6 @@ fn handle_session_notification(
.config_options
.as_ref()
.map(|opts| (opts.config_options.clone(), opts.tx.clone())),
- notification.update.clone(),
)
};
// Borrow is dropped here.
@@ -1823,7 +1824,7 @@ fn handle_session_notification(
}
// Pre-handle: if a ToolCall carries terminal_info, create/register a display-only terminal.
- if let acp::SessionUpdate::ToolCall(tc) = &update_clone {
+ if let acp::SessionUpdate::ToolCall(tc) = ¬ification.update {
if let Some(meta) = &tc.meta {
if let Some(terminal_info) = meta.get("terminal_info") {
if let Some(id_str) = terminal_info.get("terminal_id").and_then(|v| v.as_str()) {
@@ -1875,7 +1876,7 @@ fn handle_session_notification(
}
// Post-handle: stream terminal output/exit if present on ToolCallUpdate meta.
- if let acp::SessionUpdate::ToolCallUpdate(tcu) = &update_clone {
+ if let acp::SessionUpdate::ToolCallUpdate(tcu) = ¬ification.update {
if let Some(meta) = &tcu.meta {
if let Some(term_out) = meta.get("terminal_output") {
if let Some(id_str) = term_out.get("terminal_id").and_then(|v| v.as_str()) {