agent_ui: Fix session equality check for reloading retained threads (#53992)

Nathan Sobo , Ben Brandt , and Bennet Bo Fenner created

## Summary

Adds three tests that exercise session disassociation ("Session not
found" after reopening a thread). The third test **fails**, reproducing
the bug.

### Root cause

A retained `ConversationView` with a `thread_error` receives
`AgentServersUpdated`, which triggers `reset()` → `close_all_sessions` →
state transitions to `Loading`. When the user then reopens the same
session via `open_thread`, `load_agent_thread`'s `has_session` check
returns `false` (because the retained view is in `Loading` state, not
`Connected`), so a **second** `ConversationView` is created for the same
session. Both views' async load tasks complete and both transition to
`Connected` with the same session ID. When the retained view is later
cleaned up, its `on_release` → `close_all_sessions` removes the session
from the connection, leaving the active view stranded with a dead
session.

### Tests

| Test | Status | What it verifies |
|------|--------|-----------------|
| `test_open_thread_on_visible_session_does_not_disassociate_it` | ✅
Pass | `load_agent_thread` short-circuits when session is already active
|
| `test_retained_thread_cleanup_does_not_disassociate_reopened_session`
| ✅ Pass | Eviction + reopen cycle works (close-then-reload) |
| `test_retained_thread_reset_race_disassociates_session` | ❌ **Fail** |
Reproduces the race described above |

cc @bennetbo @ConradIrwin

Release Notes:

- N/A

---------

Co-authored-by: Ben Brandt <benjamin.j.brandt@gmail.com>
Co-authored-by: Bennet Bo Fenner <bennetbo@gmx.de>

Change summary

crates/agent_ui/src/agent_panel.rs       | 383 +++++++++++++++++++++++++
crates/agent_ui/src/conversation_view.rs |   4 
2 files changed, 383 insertions(+), 4 deletions(-)

Detailed changes

crates/agent_ui/src/agent_panel.rs 🔗

@@ -2520,8 +2520,9 @@ impl AgentPanel {
 
         let has_session = |cv: &Entity<ConversationView>| -> bool {
             cv.read(cx)
-                .active_thread()
-                .is_some_and(|tv| tv.read(cx).thread.read(cx).session_id() == &session_id)
+                .root_session_id
+                .as_ref()
+                .is_some_and(|id| id == &session_id)
         };
 
         // Check if the active view already has this session.
@@ -8058,4 +8059,382 @@ mod tests {
             );
         });
     }
+
+    /// Connection that tracks closed sessions and detects prompts against
+    /// sessions that no longer exist, used to reproduce session disassociation.
+    #[derive(Clone, Default)]
+    struct DisassociationTrackingConnection {
+        next_session_number: Arc<Mutex<usize>>,
+        sessions: Arc<Mutex<HashSet<acp::SessionId>>>,
+        closed_sessions: Arc<Mutex<Vec<acp::SessionId>>>,
+        missing_prompt_sessions: Arc<Mutex<Vec<acp::SessionId>>>,
+    }
+
+    impl DisassociationTrackingConnection {
+        fn new() -> Self {
+            Self::default()
+        }
+
+        fn create_session(
+            self: Rc<Self>,
+            session_id: acp::SessionId,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            title: Option<SharedString>,
+            cx: &mut App,
+        ) -> Entity<AcpThread> {
+            self.sessions.lock().insert(session_id.clone());
+
+            let action_log = cx.new(|_| ActionLog::new(project.clone()));
+            cx.new(|cx| {
+                AcpThread::new(
+                    None,
+                    title,
+                    Some(work_dirs),
+                    self,
+                    project,
+                    action_log,
+                    session_id,
+                    watch::Receiver::constant(
+                        acp::PromptCapabilities::new()
+                            .image(true)
+                            .audio(true)
+                            .embedded_context(true),
+                    ),
+                    cx,
+                )
+            })
+        }
+    }
+
+    impl AgentConnection for DisassociationTrackingConnection {
+        fn agent_id(&self) -> AgentId {
+            agent::ZED_AGENT_ID.clone()
+        }
+
+        fn telemetry_id(&self) -> SharedString {
+            "disassociation-tracking-test".into()
+        }
+
+        fn new_session(
+            self: Rc<Self>,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            cx: &mut App,
+        ) -> Task<Result<Entity<AcpThread>>> {
+            let session_id = {
+                let mut next_session_number = self.next_session_number.lock();
+                let session_id = acp::SessionId::new(format!(
+                    "disassociation-tracking-session-{}",
+                    *next_session_number
+                ));
+                *next_session_number += 1;
+                session_id
+            };
+            let thread = self.create_session(session_id, project, work_dirs, None, cx);
+            Task::ready(Ok(thread))
+        }
+
+        fn supports_load_session(&self) -> bool {
+            true
+        }
+
+        fn load_session(
+            self: Rc<Self>,
+            session_id: acp::SessionId,
+            project: Entity<Project>,
+            work_dirs: PathList,
+            title: Option<SharedString>,
+            cx: &mut App,
+        ) -> Task<Result<Entity<AcpThread>>> {
+            let thread = self.create_session(session_id, project, work_dirs, title, cx);
+            thread.update(cx, |thread, cx| {
+                thread
+                    .handle_session_update(
+                        acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new(
+                            "Restored user message".into(),
+                        )),
+                        cx,
+                    )
+                    .expect("restored user message should be applied");
+                thread
+                    .handle_session_update(
+                        acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new(
+                            "Restored assistant message".into(),
+                        )),
+                        cx,
+                    )
+                    .expect("restored assistant message should be applied");
+            });
+            Task::ready(Ok(thread))
+        }
+
+        fn supports_close_session(&self) -> bool {
+            true
+        }
+
+        fn close_session(
+            self: Rc<Self>,
+            session_id: &acp::SessionId,
+            _cx: &mut App,
+        ) -> Task<Result<()>> {
+            self.sessions.lock().remove(session_id);
+            self.closed_sessions.lock().push(session_id.clone());
+            Task::ready(Ok(()))
+        }
+
+        fn auth_methods(&self) -> &[acp::AuthMethod] {
+            &[]
+        }
+
+        fn authenticate(&self, _method_id: acp::AuthMethodId, _cx: &mut App) -> Task<Result<()>> {
+            Task::ready(Ok(()))
+        }
+
+        fn prompt(
+            &self,
+            _id: UserMessageId,
+            params: acp::PromptRequest,
+            _cx: &mut App,
+        ) -> Task<Result<acp::PromptResponse>> {
+            if !self.sessions.lock().contains(&params.session_id) {
+                self.missing_prompt_sessions.lock().push(params.session_id);
+                return Task::ready(Err(anyhow!("Session not found")));
+            }
+
+            Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn)))
+        }
+
+        fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {}
+
+        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
+            self
+        }
+    }
+
+    async fn setup_workspace_panel(
+        cx: &mut TestAppContext,
+    ) -> (Entity<Workspace>, Entity<AgentPanel>, VisualTestContext) {
+        init_test(cx);
+        cx.update(|cx| {
+            agent::ThreadStore::init_global(cx);
+            language_model::LanguageModelRegistry::test(cx);
+        });
+
+        let fs = FakeFs::new(cx.executor());
+        let project = Project::test(fs.clone(), [], cx).await;
+
+        let multi_workspace =
+            cx.add_window(|window, cx| MultiWorkspace::test_new(project.clone(), window, cx));
+
+        let workspace = multi_workspace
+            .read_with(cx, |mw, _cx| mw.workspace().clone())
+            .unwrap();
+
+        let mut cx = VisualTestContext::from_window(multi_workspace.into(), cx);
+
+        let panel = workspace.update_in(&mut cx, |workspace, window, cx| {
+            let panel = cx.new(|cx| AgentPanel::new(workspace, None, window, cx));
+            workspace.add_panel(panel.clone(), window, cx);
+            panel
+        });
+
+        (workspace, panel, cx)
+    }
+
+    /// Reproduces the retained-thread reset race:
+    ///
+    /// 1. Thread A is active and Connected.
+    /// 2. User switches to thread B → A goes to retained_threads.
+    /// 3. A thread_error is set on retained A's thread view.
+    /// 4. AgentServersUpdated fires → retained A's handle_agent_servers_updated
+    ///    sees has_thread_error=true → calls reset() → close_all_sessions →
+    ///    session X removed, state = Loading.
+    /// 5. User reopens thread X via open_thread → load_agent_thread checks
+    ///    retained A's has_session → returns false (state is Loading) →
+    ///    creates new ConversationView C.
+    /// 6. Both A's reload task and C's load task complete → both call
+    ///    load_session(X) → both get Connected with session X.
+    /// 7. A is eventually cleaned up → on_release → close_all_sessions →
+    ///    removes session X.
+    /// 8. C sends → "Session not found".
+    #[gpui::test]
+    async fn test_retained_thread_reset_race_disassociates_session(cx: &mut TestAppContext) {
+        let (_workspace, panel, mut cx) = setup_workspace_panel(cx).await;
+        cx.run_until_parked();
+
+        let connection = DisassociationTrackingConnection::new();
+        panel.update(&mut cx, |panel, cx| {
+            panel.connection_store.update(cx, |store, cx| {
+                store.restart_connection(
+                    Agent::Stub,
+                    Rc::new(StubAgentServer::new(connection.clone())),
+                    cx,
+                );
+            });
+        });
+        cx.run_until_parked();
+
+        // Step 1: Open thread A and send a message.
+        panel.update_in(&mut cx, |panel, window, cx| {
+            panel.external_thread(
+                Some(Agent::Stub),
+                None,
+                None,
+                None,
+                None,
+                true,
+                "agent_panel",
+                window,
+                cx,
+            );
+        });
+        cx.run_until_parked();
+        send_message(&panel, &mut cx);
+
+        let session_id_a = active_session_id(&panel, &cx);
+        let _thread_id_a = active_thread_id(&panel, &cx);
+
+        // Step 2: Open thread B → A goes to retained_threads.
+        panel.update_in(&mut cx, |panel, window, cx| {
+            panel.external_thread(
+                Some(Agent::Stub),
+                None,
+                None,
+                None,
+                None,
+                true,
+                "agent_panel",
+                window,
+                cx,
+            );
+        });
+        cx.run_until_parked();
+        send_message(&panel, &mut cx);
+
+        // Confirm A is retained.
+        panel.read_with(&cx, |panel, _cx| {
+            assert!(
+                panel.retained_threads.contains_key(&_thread_id_a),
+                "thread A should be in retained_threads after switching to B"
+            );
+        });
+
+        // Step 3: Set a thread_error on retained A's active thread view.
+        // This simulates an API error that occurred before the user switched
+        // away, or a transient failure.
+        let retained_conversation_a = panel.read_with(&cx, |panel, _cx| {
+            panel
+                .retained_threads
+                .get(&_thread_id_a)
+                .expect("thread A should be retained")
+                .clone()
+        });
+        retained_conversation_a.update(&mut cx, |conversation, cx| {
+            if let Some(thread_view) = conversation.active_thread() {
+                thread_view.update(cx, |view, cx| {
+                    view.handle_thread_error(
+                        crate::conversation_view::ThreadError::Other {
+                            message: "simulated error".into(),
+                            acp_error_code: None,
+                        },
+                        cx,
+                    );
+                });
+            }
+        });
+
+        // Confirm the thread error is set.
+        retained_conversation_a.read_with(&cx, |conversation, cx| {
+            let connected = conversation.as_connected().expect("should be connected");
+            assert!(
+                connected.has_thread_error(cx),
+                "retained A should have a thread error"
+            );
+        });
+
+        // Step 4: Emit AgentServersUpdated → retained A's
+        // handle_agent_servers_updated sees has_thread_error=true,
+        // calls reset(), which closes session X and sets state=Loading.
+        //
+        // Critically, we do NOT call run_until_parked between the emit
+        // and open_thread. The emit's synchronous effects (event delivery
+        // → reset() → close_all_sessions → state=Loading) happen during
+        // the update's flush_effects. But the async reload task spawned
+        // by initial_state has NOT been polled yet.
+        panel.update(&mut cx, |panel, cx| {
+            panel.project.update(cx, |project, cx| {
+                project
+                    .agent_server_store()
+                    .update(cx, |_store, cx| cx.emit(project::AgentServersUpdated));
+            });
+        });
+        // After this update returns, the retained ConversationView is in
+        // Loading state (reset ran synchronously), but its async reload
+        // task hasn't executed yet.
+
+        // Step 5: Immediately open thread X via open_thread, BEFORE
+        // the retained view's async reload completes. load_agent_thread
+        // checks retained A's has_session → returns false (state is
+        // Loading) → creates a NEW ConversationView C for session X.
+        panel.update_in(&mut cx, |panel, window, cx| {
+            panel.open_thread(session_id_a.clone(), None, None, window, cx);
+        });
+
+        // NOW settle everything: both async tasks (A's reload and C's load)
+        // complete, both register session X.
+        cx.run_until_parked();
+
+        // Verify session A is the active session via C.
+        panel.read_with(&cx, |panel, cx| {
+            let active_session = panel
+                .active_agent_thread(cx)
+                .map(|t| t.read(cx).session_id().clone());
+            assert_eq!(
+                active_session,
+                Some(session_id_a.clone()),
+                "session A should be the active session after open_thread"
+            );
+        });
+
+        // Step 6: Force the retained ConversationView A to be dropped
+        // while the active view (C) still has the same session.
+        // We can't use remove_thread because C shares the same ThreadId
+        // and remove_thread would kill the active view too. Instead,
+        // directly remove from retained_threads and drop the handle
+        // so on_release → close_all_sessions fires only on A.
+        drop(retained_conversation_a);
+        panel.update(&mut cx, |panel, _cx| {
+            panel.retained_threads.remove(&_thread_id_a);
+        });
+        cx.run_until_parked();
+
+        // The key assertion: sending messages on the ACTIVE view (C)
+        // must succeed. If the session was disassociated by A's cleanup,
+        // this will fail with "Session not found".
+        send_message(&panel, &mut cx);
+        send_message(&panel, &mut cx);
+
+        let missing = connection.missing_prompt_sessions.lock().clone();
+        assert!(
+            missing.is_empty(),
+            "session should not be disassociated after retained thread reset race, \
+             got missing prompt sessions: {:?}",
+            missing
+        );
+
+        panel.read_with(&cx, |panel, cx| {
+            let active_view = panel
+                .active_conversation_view()
+                .expect("conversation should remain open");
+            let connected = active_view
+                .read(cx)
+                .as_connected()
+                .expect("conversation should be connected");
+            assert!(
+                !connected.has_thread_error(cx),
+                "conversation should not have a thread error"
+            );
+        });
+    }
 }

crates/agent_ui/src/conversation_view.rs 🔗

@@ -433,7 +433,7 @@ pub struct ConversationView {
     thread_store: Option<Entity<ThreadStore>>,
     prompt_store: Option<Entity<PromptStore>>,
     pub(crate) thread_id: ThreadId,
-    root_session_id: Option<acp::SessionId>,
+    pub(crate) root_session_id: Option<acp::SessionId>,
     server_state: ServerState,
     focus_handle: FocusHandle,
     notifications: Vec<WindowHandle<AgentNotification>>,
@@ -703,7 +703,7 @@ impl ConversationView {
             thread_store,
             prompt_store,
             thread_id,
-            root_session_id: None,
+            root_session_id: resume_session_id.clone(),
             server_state: Self::initial_state(
                 agent.clone(),
                 connection_store,