agent_ui: Reuse visible sessions when reopening threads (#53905)

Nathan Sobo created

When reopening a thread that is already visible in the agent panel,
`open_thread` could recreate the session instead of reusing the existing
visible thread. That left the UI holding a thread whose backend session
could be closed out from under it.

This change routes `AgentPanel::open_thread` through
`load_agent_thread`, which already handles the right behavior for
existing sessions:
- reuse the active thread when it already matches
- promote a retained thread when it matches
- only load when the session is not already present

The regression coverage is now a focused behavioral test at the
`AgentPanel::open_thread` boundary that verifies reopening an
already-visible session keeps the thread usable.

cc @bennetbo

Release Notes:

- Fixed reopening an already-visible agent thread so it reuses the
existing session instead of creating a broken duplicate.

Change summary

crates/agent_ui/src/agent_panel.rs | 217 +++++++++++++++++++++++++++++++
1 file changed, 211 insertions(+), 6 deletions(-)

Detailed changes

crates/agent_ui/src/agent_panel.rs 🔗

@@ -1287,12 +1287,11 @@ impl AgentPanel {
         window: &mut Window,
         cx: &mut Context<Self>,
     ) {
-        self.external_thread(
-            Some(crate::Agent::NativeAgent),
-            Some(session_id),
+        self.load_agent_thread(
+            crate::Agent::NativeAgent,
+            session_id,
             work_dirs,
             title,
-            None,
             true,
             window,
             cx,
@@ -5273,18 +5272,169 @@ mod tests {
         active_session_id, active_thread_id, open_thread_with_connection,
         open_thread_with_custom_connection, send_message,
     };
-    use acp_thread::{StubAgentConnection, ThreadStatus};
+    use acp_thread::{AgentConnection, StubAgentConnection, ThreadStatus, UserMessageId};
+    use action_log::ActionLog;
     use agent_servers::CODEX_ID;
+    use anyhow::Result;
     use feature_flags::FeatureFlagAppExt;
     use fs::FakeFs;
-    use gpui::{TestAppContext, VisualTestContext};
+    use gpui::{App, TestAppContext, VisualTestContext};
+    use parking_lot::Mutex;
     use project::Project;
+    use std::any::Any;
 
     use serde_json::json;
     use std::path::Path;
+    use std::sync::Arc;
     use std::time::Instant;
     use workspace::MultiWorkspace;
 
+    #[derive(Clone, Default)]
+    struct SessionTrackingConnection {
+        next_session_number: Arc<Mutex<usize>>,
+        sessions: Arc<Mutex<HashSet<acp::SessionId>>>,
+    }
+
+    impl SessionTrackingConnection {
+        fn new() -> Self {
+            Self::default()
+        }
+
+        fn create_session(
+            self: Rc<Self>,
+            session_id: acp::SessionId,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            title: Option<SharedString>,
+            cx: &mut App,
+        ) -> Entity<AcpThread> {
+            self.sessions.lock().insert(session_id.clone());
+
+            let action_log = cx.new(|_| ActionLog::new(project.clone()));
+            cx.new(|cx| {
+                AcpThread::new(
+                    None,
+                    title,
+                    Some(work_dirs),
+                    self,
+                    project,
+                    action_log,
+                    session_id,
+                    watch::Receiver::constant(
+                        acp::PromptCapabilities::new()
+                            .image(true)
+                            .audio(true)
+                            .embedded_context(true),
+                    ),
+                    cx,
+                )
+            })
+        }
+    }
+
+    impl AgentConnection for SessionTrackingConnection {
+        fn agent_id(&self) -> AgentId {
+            agent::ZED_AGENT_ID.clone()
+        }
+
+        fn telemetry_id(&self) -> SharedString {
+            "session-tracking-test".into()
+        }
+
+        fn new_session(
+            self: Rc<Self>,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            cx: &mut App,
+        ) -> Task<Result<Entity<AcpThread>>> {
+            let session_id = {
+                let mut next_session_number = self.next_session_number.lock();
+                let session_id = acp::SessionId::new(format!(
+                    "session-tracking-session-{}",
+                    *next_session_number
+                ));
+                *next_session_number += 1;
+                session_id
+            };
+            let thread = self.create_session(session_id, project, work_dirs, None, cx);
+            Task::ready(Ok(thread))
+        }
+
+        fn supports_load_session(&self) -> bool {
+            true
+        }
+
+        fn load_session(
+            self: Rc<Self>,
+            session_id: acp::SessionId,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            title: Option<SharedString>,
+            cx: &mut App,
+        ) -> Task<Result<Entity<AcpThread>>> {
+            let thread = self.create_session(session_id, project, work_dirs, title, cx);
+            thread.update(cx, |thread, cx| {
+                thread
+                    .handle_session_update(
+                        acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
+                            "Restored user message".into(),
+                        )),
+                        cx,
+                    )
+                    .expect("restored user message should be applied");
+                thread
+                    .handle_session_update(
+                        acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
+                            "Restored assistant message".into(),
+                        )),
+                        cx,
+                    )
+                    .expect("restored assistant message should be applied");
+            });
+            Task::ready(Ok(thread))
+        }
+
+        fn supports_close_session(&self) -> bool {
+            true
+        }
+
+        fn close_session(
+            self: Rc<Self>,
+            session_id: &acp::SessionId,
+            _cx: &mut App,
+        ) -> Task<Result<()>> {
+            self.sessions.lock().remove(session_id);
+            Task::ready(Ok(()))
+        }
+
+        fn auth_methods(&self) -> &[acp::AuthMethod] {
+            &[]
+        }
+
+        fn authenticate(&self, _method_id: acp::AuthMethodId, _cx: &mut App) -> Task<Result<()>> {
+            Task::ready(Ok(()))
+        }
+
+        fn prompt(
+            &self,
+            _id: UserMessageId,
+            params: acp::PromptRequest,
+            _cx: &mut App,
+        ) -> Task<Result<acp::PromptResponse>> {
+            if !self.sessions.lock().contains(&params.session_id) {
+                return Task::ready(Err(anyhow!("Session not found")));
+            }
+
+            Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
+        }
+
+        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
+
+        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
+            self
+        }
+    }
+
     #[gpui::test]
     async fn test_active_thread_serialize_and_load_round_trip(cx: &mut TestAppContext) {
         init_test(cx);
@@ -5969,6 +6119,61 @@ mod tests {
         });
     }
 
+    #[gpui::test]
+    async fn test_reopening_visible_thread_keeps_thread_usable(cx: &mut TestAppContext) {
+        let (panel, mut cx) = setup_panel(cx).await;
+        cx.run_until_parked();
+
+        panel.update(&mut cx, |panel, cx| {
+            panel.connection_store.update(cx, |store, cx| {
+                store.restart_connection(
+                    Agent::NativeAgent,
+                    Rc::new(StubAgentServer::new(SessionTrackingConnection::new())),
+                    cx,
+                );
+            });
+        });
+        cx.run_until_parked();
+
+        panel.update_in(&mut cx, |panel, window, cx| {
+            panel.external_thread(
+                Some(Agent::NativeAgent),
+                None,
+                None,
+                None,
+                None,
+                true,
+                window,
+                cx,
+            );
+        });
+        cx.run_until_parked();
+        send_message(&panel, &mut cx);
+
+        let session_id = active_session_id(&panel, &cx);
+
+        panel.update_in(&mut cx, |panel, window, cx| {
+            panel.open_thread(session_id.clone(), None, None, window, cx);
+        });
+        cx.run_until_parked();
+
+        send_message(&panel, &mut cx);
+
+        panel.read_with(&cx, |panel, cx| {
+            let active_view = panel
+                .active_conversation_view()
+                .expect("visible conversation should remain open after reopening");
+            let connected = active_view
+                .read(cx)
+                .as_connected()
+                .expect("visible conversation should still be connected in the UI");
+            assert!(
+                !connected.has_thread_error(cx),
+                "reopening an already-visible session should keep the thread usable"
+            );
+        });
+    }
+
     #[gpui::test]
     async fn test_cleanup_retained_threads_keeps_five_most_recent_idle_loadable_threads(
         cx: &mut TestAppContext,