diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index d71ac3c73e8f7e3c298f397e612119c3586f624e..4a72132875b9b06dfbf00388c41b4e7d0f3b4180 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -2520,8 +2520,9 @@ impl AgentPanel { let has_session = |cv: &Entity| -> bool { cv.read(cx) - .active_thread() - .is_some_and(|tv| tv.read(cx).thread.read(cx).session_id() == &session_id) + .root_session_id + .as_ref() + .is_some_and(|id| id == &session_id) }; // Check if the active view already has this session. @@ -8058,4 +8059,382 @@ mod tests { ); }); } + + /// Connection that tracks closed sessions and detects prompts against + /// sessions that no longer exist, used to reproduce session disassociation. + #[derive(Clone, Default)] + struct DisassociationTrackingConnection { + next_session_number: Arc>, + sessions: Arc>>, + closed_sessions: Arc>>, + missing_prompt_sessions: Arc>>, + } + + impl DisassociationTrackingConnection { + fn new() -> Self { + Self::default() + } + + fn create_session( + self: Rc, + session_id: acp::SessionId, + project: Entity, + work_dirs: PathList, + title: Option, + cx: &mut App, + ) -> Entity { + self.sessions.lock().insert(session_id.clone()); + + let action_log = cx.new(|_| ActionLog::new(project.clone())); + cx.new(|cx| { + AcpThread::new( + None, + title, + Some(work_dirs), + self, + project, + action_log, + session_id, + watch::Receiver::constant( + acp::PromptCapabilities::new() + .image(true) + .audio(true) + .embedded_context(true), + ), + cx, + ) + }) + } + } + + impl AgentConnection for DisassociationTrackingConnection { + fn agent_id(&self) -> AgentId { + agent::ZED_AGENT_ID.clone() + } + + fn telemetry_id(&self) -> SharedString { + "disassociation-tracking-test".into() + } + + fn new_session( + self: Rc, + project: Entity, + work_dirs: PathList, + cx: &mut App, + ) -> Task>> { + let session_id = { + let mut next_session_number = self.next_session_number.lock(); + let session_id = acp::SessionId::new(format!( + "disassociation-tracking-session-{}", + *next_session_number + )); + *next_session_number += 1; + session_id + }; + let thread = self.create_session(session_id, project, work_dirs, None, cx); + Task::ready(Ok(thread)) + } + + fn supports_load_session(&self) -> bool { + true + } + + fn load_session( + self: Rc, + session_id: acp::SessionId, + project: Entity, + work_dirs: PathList, + title: Option, + cx: &mut App, + ) -> Task>> { + let thread = self.create_session(session_id, project, work_dirs, title, cx); + thread.update(cx, |thread, cx| { + thread + .handle_session_update( + acp::SessionUpdate::UserMessageChunk(acp::ContentChunk::new( + "Restored user message".into(), + )), + cx, + ) + .expect("restored user message should be applied"); + thread + .handle_session_update( + acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new( + "Restored assistant message".into(), + )), + cx, + ) + .expect("restored assistant message should be applied"); + }); + Task::ready(Ok(thread)) + } + + fn supports_close_session(&self) -> bool { + true + } + + fn close_session( + self: Rc, + session_id: &acp::SessionId, + _cx: &mut App, + ) -> Task> { + self.sessions.lock().remove(session_id); + self.closed_sessions.lock().push(session_id.clone()); + Task::ready(Ok(())) + } + + fn auth_methods(&self) -> &[acp::AuthMethod] { + &[] + } + + fn authenticate(&self, _method_id: acp::AuthMethodId, _cx: &mut App) -> Task> { + Task::ready(Ok(())) + } + + fn prompt( + &self, + _id: UserMessageId, + params: acp::PromptRequest, + _cx: &mut App, + ) -> Task> { + if !self.sessions.lock().contains(¶ms.session_id) { + self.missing_prompt_sessions.lock().push(params.session_id); + return Task::ready(Err(anyhow!("Session not found"))); + } + + Task::ready(Ok(acp::PromptResponse::new(acp::StopReason::EndTurn))) + } + + fn cancel(&self, _session_id: &acp::SessionId, _cx: &mut App) {} + + fn into_any(self: Rc) -> Rc { + self + } + } + + async fn setup_workspace_panel( + cx: &mut TestAppContext, + ) -> (Entity, Entity, VisualTestContext) { + init_test(cx); + cx.update(|cx| { + agent::ThreadStore::init_global(cx); + language_model::LanguageModelRegistry::test(cx); + }); + + let fs = FakeFs::new(cx.executor()); + let project = Project::test(fs.clone(), [], cx).await; + + let multi_workspace = + cx.add_window(|window, cx| MultiWorkspace::test_new(project.clone(), window, cx)); + + let workspace = multi_workspace + .read_with(cx, |mw, _cx| mw.workspace().clone()) + .unwrap(); + + let mut cx = VisualTestContext::from_window(multi_workspace.into(), cx); + + let panel = workspace.update_in(&mut cx, |workspace, window, cx| { + let panel = cx.new(|cx| AgentPanel::new(workspace, None, window, cx)); + workspace.add_panel(panel.clone(), window, cx); + panel + }); + + (workspace, panel, cx) + } + + /// Reproduces the retained-thread reset race: + /// + /// 1. Thread A is active and Connected. + /// 2. User switches to thread B → A goes to retained_threads. + /// 3. A thread_error is set on retained A's thread view. + /// 4. AgentServersUpdated fires → retained A's handle_agent_servers_updated + /// sees has_thread_error=true → calls reset() → close_all_sessions → + /// session X removed, state = Loading. + /// 5. User reopens thread X via open_thread → load_agent_thread checks + /// retained A's has_session → returns false (state is Loading) → + /// creates new ConversationView C. + /// 6. Both A's reload task and C's load task complete → both call + /// load_session(X) → both get Connected with session X. + /// 7. A is eventually cleaned up → on_release → close_all_sessions → + /// removes session X. + /// 8. C sends → "Session not found". + #[gpui::test] + async fn test_retained_thread_reset_race_disassociates_session(cx: &mut TestAppContext) { + let (_workspace, panel, mut cx) = setup_workspace_panel(cx).await; + cx.run_until_parked(); + + let connection = DisassociationTrackingConnection::new(); + panel.update(&mut cx, |panel, cx| { + panel.connection_store.update(cx, |store, cx| { + store.restart_connection( + Agent::Stub, + Rc::new(StubAgentServer::new(connection.clone())), + cx, + ); + }); + }); + cx.run_until_parked(); + + // Step 1: Open thread A and send a message. + panel.update_in(&mut cx, |panel, window, cx| { + panel.external_thread( + Some(Agent::Stub), + None, + None, + None, + None, + true, + "agent_panel", + window, + cx, + ); + }); + cx.run_until_parked(); + send_message(&panel, &mut cx); + + let session_id_a = active_session_id(&panel, &cx); + let _thread_id_a = active_thread_id(&panel, &cx); + + // Step 2: Open thread B → A goes to retained_threads. + panel.update_in(&mut cx, |panel, window, cx| { + panel.external_thread( + Some(Agent::Stub), + None, + None, + None, + None, + true, + "agent_panel", + window, + cx, + ); + }); + cx.run_until_parked(); + send_message(&panel, &mut cx); + + // Confirm A is retained. + panel.read_with(&cx, |panel, _cx| { + assert!( + panel.retained_threads.contains_key(&_thread_id_a), + "thread A should be in retained_threads after switching to B" + ); + }); + + // Step 3: Set a thread_error on retained A's active thread view. + // This simulates an API error that occurred before the user switched + // away, or a transient failure. + let retained_conversation_a = panel.read_with(&cx, |panel, _cx| { + panel + .retained_threads + .get(&_thread_id_a) + .expect("thread A should be retained") + .clone() + }); + retained_conversation_a.update(&mut cx, |conversation, cx| { + if let Some(thread_view) = conversation.active_thread() { + thread_view.update(cx, |view, cx| { + view.handle_thread_error( + crate::conversation_view::ThreadError::Other { + message: "simulated error".into(), + acp_error_code: None, + }, + cx, + ); + }); + } + }); + + // Confirm the thread error is set. + retained_conversation_a.read_with(&cx, |conversation, cx| { + let connected = conversation.as_connected().expect("should be connected"); + assert!( + connected.has_thread_error(cx), + "retained A should have a thread error" + ); + }); + + // Step 4: Emit AgentServersUpdated → retained A's + // handle_agent_servers_updated sees has_thread_error=true, + // calls reset(), which closes session X and sets state=Loading. + // + // Critically, we do NOT call run_until_parked between the emit + // and open_thread. The emit's synchronous effects (event delivery + // → reset() → close_all_sessions → state=Loading) happen during + // the update's flush_effects. But the async reload task spawned + // by initial_state has NOT been polled yet. + panel.update(&mut cx, |panel, cx| { + panel.project.update(cx, |project, cx| { + project + .agent_server_store() + .update(cx, |_store, cx| cx.emit(project::AgentServersUpdated)); + }); + }); + // After this update returns, the retained ConversationView is in + // Loading state (reset ran synchronously), but its async reload + // task hasn't executed yet. + + // Step 5: Immediately open thread X via open_thread, BEFORE + // the retained view's async reload completes. load_agent_thread + // checks retained A's has_session → returns false (state is + // Loading) → creates a NEW ConversationView C for session X. + panel.update_in(&mut cx, |panel, window, cx| { + panel.open_thread(session_id_a.clone(), None, None, window, cx); + }); + + // NOW settle everything: both async tasks (A's reload and C's load) + // complete, both register session X. + cx.run_until_parked(); + + // Verify session A is the active session via C. + panel.read_with(&cx, |panel, cx| { + let active_session = panel + .active_agent_thread(cx) + .map(|t| t.read(cx).session_id().clone()); + assert_eq!( + active_session, + Some(session_id_a.clone()), + "session A should be the active session after open_thread" + ); + }); + + // Step 6: Force the retained ConversationView A to be dropped + // while the active view (C) still has the same session. + // We can't use remove_thread because C shares the same ThreadId + // and remove_thread would kill the active view too. Instead, + // directly remove from retained_threads and drop the handle + // so on_release → close_all_sessions fires only on A. + drop(retained_conversation_a); + panel.update(&mut cx, |panel, _cx| { + panel.retained_threads.remove(&_thread_id_a); + }); + cx.run_until_parked(); + + // The key assertion: sending messages on the ACTIVE view (C) + // must succeed. If the session was disassociated by A's cleanup, + // this will fail with "Session not found". + send_message(&panel, &mut cx); + send_message(&panel, &mut cx); + + let missing = connection.missing_prompt_sessions.lock().clone(); + assert!( + missing.is_empty(), + "session should not be disassociated after retained thread reset race, \ + got missing prompt sessions: {:?}", + missing + ); + + panel.read_with(&cx, |panel, cx| { + let active_view = panel + .active_conversation_view() + .expect("conversation should remain open"); + let connected = active_view + .read(cx) + .as_connected() + .expect("conversation should be connected"); + assert!( + !connected.has_thread_error(cx), + "conversation should not have a thread error" + ); + }); + } } diff --git a/crates/agent_ui/src/conversation_view.rs b/crates/agent_ui/src/conversation_view.rs index 4607f49190f1517180a08f4816df88ebd6d05662..6f49361bc6a8cad1d14174256873fd5af8745cb1 100644 --- a/crates/agent_ui/src/conversation_view.rs +++ b/crates/agent_ui/src/conversation_view.rs @@ -433,7 +433,7 @@ pub struct ConversationView { thread_store: Option>, prompt_store: Option>, pub(crate) thread_id: ThreadId, - root_session_id: Option, + pub(crate) root_session_id: Option, server_state: ServerState, focus_handle: FocusHandle, notifications: Vec>, @@ -703,7 +703,7 @@ impl ConversationView { thread_store, prompt_store, thread_id, - root_session_id: None, + root_session_id: resume_session_id.clone(), server_state: Self::initial_state( agent.clone(), connection_store,