Fix ACP connection and thread leak (#35670)

Agus Zubiaga created

When you switched away from an ACP thread, the `AcpThreadView` entity
(and thus thread, and subprocess) was leaked. This happened because we
were using `cx.processor` for the `list` state callback, which uses a
strong reference.

This PR changes the callback so that it holds a weak reference, and adds
some tests and assertions at various levels to make sure we don't
reintroduce the leak in the future.

Release Notes:

- N/A

Change summary

Cargo.lock                             |  4 +-
Cargo.toml                             |  2 
crates/agent_servers/src/acp/v0.rs     |  1 
crates/agent_servers/src/acp/v1.rs     | 15 +++++++++----
crates/agent_servers/src/claude.rs     |  5 +--
crates/agent_servers/src/e2e_tests.rs  | 27 ++++++++++++++++++++++++++
crates/agent_ui/src/acp/thread_view.rs | 29 +++++++++++++++++++--------
crates/agent_ui/src/agent_panel.rs     | 17 +++++++++------
8 files changed, 73 insertions(+), 27 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -137,9 +137,9 @@ dependencies = [
 
 [[package]]
 name = "agent-client-protocol"
-version = "0.0.18"
+version = "0.0.20"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f8e4c1dccb35e69d32566f0d11948d902f9942fc3f038821816c1150cf5925f4"
+checksum = "12dbfec3d27680337ed9d3064eecafe97acf0b0f190148bb4e29d96707c9e403"
 dependencies = [
  "anyhow",
  "futures 0.3.31",

Cargo.toml 🔗

@@ -423,7 +423,7 @@ zlog_settings = { path = "crates/zlog_settings" }
 #
 
 agentic-coding-protocol = "0.0.10"
-agent-client-protocol = "0.0.18"
+agent-client-protocol = "0.0.20"
 aho-corasick = "1.1"
 alacritty_terminal = { git = "https://github.com/zed-industries/alacritty.git", branch = "add-hush-login-flag" }
 any_vec = "0.14"

crates/agent_servers/src/acp/v0.rs 🔗

@@ -380,6 +380,7 @@ impl AcpConnection {
 
             let stdin = child.stdin.take().unwrap();
             let stdout = child.stdout.take().unwrap();
+            log::trace!("Spawned (pid: {})", child.id());
 
             let foreground_executor = cx.foreground_executor().clone();
 

crates/agent_servers/src/acp/v1.rs 🔗

@@ -19,7 +19,6 @@ pub struct AcpConnection {
     sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
     auth_methods: Vec<acp::AuthMethod>,
     _io_task: Task<Result<()>>,
-    _child: smol::process::Child,
 }
 
 pub struct AcpSession {
@@ -47,6 +46,7 @@ impl AcpConnection {
 
         let stdout = child.stdout.take().expect("Failed to take stdout");
         let stdin = child.stdin.take().expect("Failed to take stdin");
+        log::trace!("Spawned (pid: {})", child.id());
 
         let sessions = Rc::new(RefCell::new(HashMap::default()));
 
@@ -61,7 +61,11 @@ impl AcpConnection {
             }
         });
 
-        let io_task = cx.background_spawn(io_task);
+        let io_task = cx.background_spawn(async move {
+            io_task.await?;
+            drop(child);
+            Ok(())
+        });
 
         let response = connection
             .initialize(acp::InitializeRequest {
@@ -84,7 +88,6 @@ impl AcpConnection {
             connection: connection.into(),
             server_name,
             sessions,
-            _child: child,
             _io_task: io_task,
         })
     }
@@ -155,8 +158,10 @@ impl AgentConnection for AcpConnection {
 
     fn prompt(&self, params: acp::PromptRequest, cx: &mut App) -> Task<Result<()>> {
         let conn = self.connection.clone();
-        cx.foreground_executor()
-            .spawn(async move { Ok(conn.prompt(params).await?) })
+        cx.foreground_executor().spawn(async move {
+            conn.prompt(params).await?;
+            Ok(())
+        })
     }
 
     fn cancel(&self, session_id: &acp::SessionId, cx: &mut App) {

crates/agent_servers/src/claude.rs 🔗

@@ -125,8 +125,7 @@ impl AgentConnection for ClaudeAgentConnection {
                         session_id.clone(),
                         &mcp_config_path,
                         &cwd,
-                    )
-                    .await?;
+                    )?;
 
                     let pid = child.id();
                     log::trace!("Spawned (pid: {})", pid);
@@ -262,7 +261,7 @@ enum ClaudeSessionMode {
     Resume,
 }
 
-async fn spawn_claude(
+fn spawn_claude(
     command: &AgentServerCommand,
     mode: ClaudeSessionMode,
     session_id: acp::SessionId,

crates/agent_servers/src/e2e_tests.rs 🔗

@@ -311,6 +311,27 @@ pub async fn test_cancel(server: impl AgentServer + 'static, cx: &mut TestAppCon
     });
 }
 
+pub async fn test_thread_drop(server: impl AgentServer + 'static, cx: &mut TestAppContext) {
+    let fs = init_test(cx).await;
+    let project = Project::test(fs, [], cx).await;
+    let thread = new_test_thread(server, project.clone(), "/private/tmp", cx).await;
+
+    thread
+        .update(cx, |thread, cx| thread.send_raw("Hello from test!", cx))
+        .await
+        .unwrap();
+
+    thread.read_with(cx, |thread, _| {
+        assert!(thread.entries().len() >= 2, "Expected at least 2 entries");
+    });
+
+    let weak_thread = thread.downgrade();
+    drop(thread);
+
+    cx.executor().run_until_parked();
+    assert!(!weak_thread.is_upgradable());
+}
+
 #[macro_export]
 macro_rules! common_e2e_tests {
     ($server:expr, allow_option_id = $allow_option_id:expr) => {
@@ -351,6 +372,12 @@ macro_rules! common_e2e_tests {
             async fn cancel(cx: &mut ::gpui::TestAppContext) {
                 $crate::e2e_tests::test_cancel($server, cx).await;
             }
+
+            #[::gpui::test]
+            #[cfg_attr(not(feature = "e2e"), ignore)]
+            async fn thread_drop(cx: &mut ::gpui::TestAppContext) {
+                $crate::e2e_tests::test_thread_drop($server, cx).await;
+            }
         }
     };
 }

crates/agent_ui/src/acp/thread_view.rs 🔗

@@ -169,12 +169,13 @@ impl AcpThreadView {
 
         let mention_set = mention_set.clone();
 
-        let list_state = ListState::new(
-            0,
-            gpui::ListAlignment::Bottom,
-            px(2048.0),
-            cx.processor({
-                move |this: &mut Self, index: usize, window, cx| {
+        let list_state = ListState::new(0, gpui::ListAlignment::Bottom, px(2048.0), {
+            let this = cx.entity().downgrade();
+            move |index: usize, window, cx| {
+                let Some(this) = this.upgrade() else {
+                    return Empty.into_any();
+                };
+                this.update(cx, |this, cx| {
                     let Some((entry, len)) = this.thread().and_then(|thread| {
                         let entries = &thread.read(cx).entries();
                         Some((entries.get(index)?, entries.len()))
@@ -182,9 +183,9 @@ impl AcpThreadView {
                         return Empty.into_any();
                     };
                     this.render_entry(index, len, entry, window, cx)
-                }
-            }),
-        );
+                })
+            }
+        });
 
         Self {
             agent: agent.clone(),
@@ -2719,6 +2720,16 @@ mod tests {
 
     use super::*;
 
+    #[gpui::test]
+    async fn test_drop(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let (thread_view, _cx) = setup_thread_view(StubAgentServer::default(), cx).await;
+        let weak_view = thread_view.downgrade();
+        drop(thread_view);
+        assert!(!weak_view.is_upgradable());
+    }
+
     #[gpui::test]
     async fn test_notification_for_stop_event(cx: &mut TestAppContext) {
         init_test(cx);

crates/agent_ui/src/agent_panel.rs 🔗

@@ -970,13 +970,7 @@ impl AgentPanel {
                     )
                 });
 
-                this.set_active_view(
-                    ActiveView::ExternalAgentThread {
-                        thread_view: thread_view.clone(),
-                    },
-                    window,
-                    cx,
-                );
+                this.set_active_view(ActiveView::ExternalAgentThread { thread_view }, window, cx);
             })
         })
         .detach_and_log_err(cx);
@@ -1477,6 +1471,7 @@ impl AgentPanel {
 
         let current_is_special = current_is_history || current_is_config;
         let new_is_special = new_is_history || new_is_config;
+        let mut old_acp_thread = None;
 
         match &self.active_view {
             ActiveView::Thread { thread, .. } => {
@@ -1488,6 +1483,9 @@ impl AgentPanel {
                     });
                 }
             }
+            ActiveView::ExternalAgentThread { thread_view } => {
+                old_acp_thread.replace(thread_view.downgrade());
+            }
             _ => {}
         }
 
@@ -1518,6 +1516,11 @@ impl AgentPanel {
             self.active_view = new_view;
         }
 
+        debug_assert!(
+            old_acp_thread.map_or(true, |thread| !thread.is_upgradable()),
+            "AcpThreadView leaked"
+        );
+
         self.acp_message_history.borrow_mut().reset_position();
 
         self.focus_handle(cx).focus(window);