diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index 5e8aa6ec5ce1f6a4b05981a8d9dc34246f7685c1..56b3531e6fd7281541c6cded995a6bc43376e393 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -567,11 +567,14 @@ mod test_support { ) } - #[derive(Clone, Default)] + #[derive(Clone)] pub struct StubAgentConnection { sessions: Arc>>, permission_requests: HashMap, next_prompt_updates: Arc>>, + supports_load_session: bool, + agent_id: AgentId, + telemetry_id: SharedString, } struct Session { @@ -579,12 +582,21 @@ mod test_support { response_tx: Option>, } + impl Default for StubAgentConnection { + fn default() -> Self { + Self::new() + } + } + impl StubAgentConnection { pub fn new() -> Self { Self { next_prompt_updates: Default::default(), permission_requests: HashMap::default(), sessions: Arc::default(), + supports_load_session: false, + agent_id: AgentId::new("stub"), + telemetry_id: "stub".into(), } } @@ -600,6 +612,59 @@ mod test_support { self } + pub fn with_supports_load_session(mut self, supports_load_session: bool) -> Self { + self.supports_load_session = supports_load_session; + self + } + + pub fn with_agent_id(mut self, agent_id: AgentId) -> Self { + self.agent_id = agent_id; + self + } + + pub fn with_telemetry_id(mut self, telemetry_id: SharedString) -> Self { + self.telemetry_id = telemetry_id; + self + } + + fn create_session( + self: Rc, + session_id: acp::SessionId, + project: Entity, + work_dirs: PathList, + title: Option, + cx: &mut gpui::App, + ) -> Entity { + let action_log = cx.new(|_| ActionLog::new(project.clone())); + let thread_title = title.unwrap_or_else(|| SharedString::new_static("Test")); + let thread = cx.new(|cx| { + AcpThread::new( + None, + thread_title, + Some(work_dirs), + self.clone(), + project, + action_log, + session_id.clone(), + watch::Receiver::constant( + acp::PromptCapabilities::new() + .image(true) + .audio(true) + .embedded_context(true), + ), + cx, + ) + }); + self.sessions.lock().insert( + session_id, + Session { + thread: thread.downgrade(), + response_tx: None, + }, + ); + thread + } + pub fn send_update( &self, session_id: acp::SessionId, @@ -637,11 +702,11 @@ mod test_support { impl AgentConnection for StubAgentConnection { fn agent_id(&self) -> AgentId { - AgentId::new("stub") + self.agent_id.clone() } fn telemetry_id(&self) -> SharedString { - "stub".into() + self.telemetry_id.clone() } fn auth_methods(&self) -> &[acp::AuthMethod] { @@ -664,32 +729,27 @@ mod test_support { static NEXT_SESSION_ID: AtomicUsize = AtomicUsize::new(0); let session_id = acp::SessionId::new(NEXT_SESSION_ID.fetch_add(1, Ordering::SeqCst).to_string()); - let action_log = cx.new(|_| ActionLog::new(project.clone())); - let thread = cx.new(|cx| { - AcpThread::new( - None, - "Test", - Some(work_dirs), - self.clone(), - project, - action_log, - session_id.clone(), - watch::Receiver::constant( - acp::PromptCapabilities::new() - .image(true) - .audio(true) - .embedded_context(true), - ), - cx, - ) - }); - self.sessions.lock().insert( - session_id, - Session { - thread: thread.downgrade(), - response_tx: None, - }, - ); + let thread = self.create_session(session_id, project, work_dirs, None, cx); + Task::ready(Ok(thread)) + } + + fn supports_load_session(&self) -> bool { + self.supports_load_session + } + + fn load_session( + self: Rc, + session_id: acp::SessionId, + project: Entity, + work_dirs: PathList, + title: Option, + cx: &mut App, + ) -> Task>> { + if !self.supports_load_session { + return Task::ready(Err(anyhow::Error::msg("Loading sessions is not supported"))); + } + + let thread = self.create_session(session_id, project, work_dirs, title, cx); Task::ready(Ok(thread)) } diff --git a/crates/agent_ui/src/agent_panel.rs b/crates/agent_ui/src/agent_panel.rs index d8d6f273d0d785fc77df390c98e1e0f2886bb8f5..d55bdb1a8af3c68c478227e040d40849fb47369a 100644 --- a/crates/agent_ui/src/agent_panel.rs +++ b/crates/agent_ui/src/agent_panel.rs @@ -1978,13 +1978,13 @@ impl AgentPanel { let mut views = Vec::new(); if let Some(server_view) = self.active_conversation_view() { - if let Some(thread_view) = server_view.read(cx).parent_thread(cx) { + if let Some(thread_view) = server_view.read(cx).root_thread(cx) { views.push(thread_view); } } for server_view in self.background_threads.values() { - if let Some(thread_view) = server_view.read(cx).parent_thread(cx) { + if let Some(thread_view) = server_view.read(cx).root_thread(cx) { views.push(thread_view); } } @@ -1997,22 +1997,46 @@ impl AgentPanel { return; }; - let Some(thread_view) = conversation_view.read(cx).parent_thread(cx) else { + let Some(thread_view) = conversation_view.read(cx).root_thread(cx) else { return; }; - let thread = &thread_view.read(cx).thread; - let (status, session_id) = { - let thread = thread.read(cx); - (thread.status(), thread.session_id().clone()) - }; + self.background_threads + .insert(thread_view.read(cx).id.clone(), conversation_view); + self.cleanup_background_threads(cx); + } - if status != ThreadStatus::Generating { - return; - } + /// We keep threads that are: + /// - Still running + /// - Do not support reloading the full session + /// - Have had the most recent events (up to 5 idle threads) + fn cleanup_background_threads(&mut self, cx: &App) { + let mut potential_removals = self + .background_threads + .iter() + .filter(|(_id, view)| { + let Some(thread_view) = view.read(cx).root_thread(cx) else { + return true; + }; + let thread = thread_view.read(cx).thread.read(cx); + thread.connection().supports_load_session() && thread.status() == ThreadStatus::Idle + }) + .collect::>(); - self.background_threads - .insert(session_id, conversation_view); + const MAX_IDLE_BACKGROUND_THREADS: usize = 5; + + potential_removals.sort_unstable_by_key(|(_, view)| view.read(cx).updated_at(cx)); + let n = potential_removals + .len() + .saturating_sub(MAX_IDLE_BACKGROUND_THREADS); + let to_remove = potential_removals + .into_iter() + .map(|(id, _)| id.clone()) + .take(n) + .collect::>(); + for id in to_remove { + self.background_threads.remove(&id); + } } pub(crate) fn active_native_agent_thread(&self, cx: &App) -> Option> { @@ -3186,12 +3210,12 @@ impl AgentPanel { ActiveView::AgentThread { conversation_view } => { let server_view_ref = conversation_view.read(cx); let is_generating_title = server_view_ref.as_native_thread(cx).is_some() - && server_view_ref.parent_thread(cx).map_or(false, |tv| { + && server_view_ref.root_thread(cx).map_or(false, |tv| { tv.read(cx).thread.read(cx).has_provisional_title() }); if let Some(title_editor) = server_view_ref - .parent_thread(cx) + .root_thread(cx) .map(|r| r.read(cx).title_editor.clone()) { if is_generating_title { @@ -4943,7 +4967,10 @@ impl AgentPanel { mod tests { use super::*; use crate::conversation_view::tests::{StubAgentServer, init_test}; - use crate::test_support::{active_session_id, open_thread_with_connection, send_message}; + use crate::test_support::{ + active_session_id, open_thread_with_connection, open_thread_with_custom_connection, + send_message, + }; use acp_thread::{StubAgentConnection, ThreadStatus}; use agent_servers::CODEX_ID; use assistant_text_thread::TextThreadStore; @@ -4952,6 +4979,7 @@ mod tests { use gpui::{TestAppContext, VisualTestContext}; use project::Project; use serde_json::json; + use std::time::Instant; use workspace::MultiWorkspace; #[gpui::test] @@ -5419,6 +5447,41 @@ mod tests { assert!(uri.contains("utils.rs"), "URI should encode the file path"); } + fn open_generating_thread_with_loadable_connection( + panel: &Entity, + connection: &StubAgentConnection, + cx: &mut VisualTestContext, + ) -> acp::SessionId { + open_thread_with_custom_connection(panel, connection.clone(), cx); + let session_id = active_session_id(panel, cx); + send_message(panel, cx); + cx.update(|_, cx| { + connection.send_update( + session_id.clone(), + acp::SessionUpdate::AgentMessageChunk(acp::ContentChunk::new("done".into())), + cx, + ); + }); + cx.run_until_parked(); + session_id + } + + fn open_idle_thread_with_non_loadable_connection( + panel: &Entity, + connection: &StubAgentConnection, + cx: &mut VisualTestContext, + ) -> acp::SessionId { + open_thread_with_custom_connection(panel, connection.clone(), cx); + let session_id = active_session_id(panel, cx); + + connection.set_next_prompt_updates(vec![acp::SessionUpdate::AgentMessageChunk( + acp::ContentChunk::new("done".into()), + )]); + send_message(panel, cx); + + session_id + } + async fn setup_panel(cx: &mut TestAppContext) -> (Entity, VisualTestContext) { init_test(cx); cx.update(|cx| { @@ -5492,7 +5555,7 @@ mod tests { } #[gpui::test] - async fn test_idle_thread_dropped_when_navigating_away(cx: &mut TestAppContext) { + async fn test_idle_non_loadable_thread_retained_when_navigating_away(cx: &mut TestAppContext) { let (panel, mut cx) = setup_panel(cx).await; let connection_a = StubAgentConnection::new(); @@ -5505,6 +5568,7 @@ mod tests { let weak_view_a = panel.read_with(&cx, |panel, _cx| { panel.active_conversation().unwrap().downgrade() }); + let session_id_a = active_session_id(&panel, &cx); // Thread A should be idle (auto-completed via set_next_prompt_updates). panel.read_with(&cx, |panel, cx| { @@ -5512,21 +5576,25 @@ mod tests { assert_eq!(thread.read(cx).status(), ThreadStatus::Idle); }); - // Open a new thread B — thread A should NOT be retained. + // Open a new thread B — thread A should be retained because it is not loadable. let connection_b = StubAgentConnection::new(); open_thread_with_connection(&panel, connection_b, &mut cx); panel.read_with(&cx, |panel, _cx| { + assert_eq!( + panel.background_threads.len(), + 1, + "Idle non-loadable thread A should be retained in background_views" + ); assert!( - panel.background_threads.is_empty(), - "Idle thread A should not be retained in background_views" + panel.background_threads.contains_key(&session_id_a), + "Background view should be keyed by thread A's session ID" ); }); - // Verify the old ConnectionView entity was dropped (no strong references remain). assert!( - weak_view_a.upgrade().is_none(), - "Idle ConnectionView should have been dropped" + weak_view_a.upgrade().is_some(), + "Idle non-loadable ConnectionView should still be retained" ); } @@ -5587,8 +5655,152 @@ mod tests { "Promoted thread A should no longer be in background_views" ); assert!( - !panel.background_threads.contains_key(&session_id_b), - "Thread B (idle) should not have been retained in background_views" + panel.background_threads.contains_key(&session_id_b), + "Thread B (idle, non-loadable) should remain retained in background_views" + ); + }); + } + + #[gpui::test] + async fn test_cleanup_background_threads_keeps_five_most_recent_idle_loadable_threads( + cx: &mut TestAppContext, + ) { + let (panel, mut cx) = setup_panel(cx).await; + let connection = StubAgentConnection::new() + .with_supports_load_session(true) + .with_agent_id("loadable-stub".into()) + .with_telemetry_id("loadable-stub".into()); + let mut session_ids = Vec::new(); + + for _ in 0..7 { + session_ids.push(open_generating_thread_with_loadable_connection( + &panel, + &connection, + &mut cx, + )); + } + + let base_time = Instant::now(); + + for session_id in session_ids.iter().take(6) { + connection.end_turn(session_id.clone(), acp::StopReason::EndTurn); + } + cx.run_until_parked(); + + panel.update(&mut cx, |panel, cx| { + for (index, session_id) in session_ids.iter().take(6).enumerate() { + let conversation_view = panel + .background_threads + .get(session_id) + .expect("background thread should exist") + .clone(); + conversation_view.update(cx, |view, cx| { + view.set_updated_at(base_time + Duration::from_secs(index as u64), cx); + }); + } + panel.cleanup_background_threads(cx); + }); + + panel.read_with(&cx, |panel, _cx| { + assert_eq!( + panel.background_threads.len(), + 5, + "cleanup should keep at most five idle loadable background threads" + ); + assert!( + !panel.background_threads.contains_key(&session_ids[0]), + "oldest idle loadable background thread should be removed" + ); + for session_id in &session_ids[1..6] { + assert!( + panel.background_threads.contains_key(session_id), + "more recent idle loadable background threads should be retained" + ); + } + assert!( + !panel.background_threads.contains_key(&session_ids[6]), + "the active thread should not also be stored as a background thread" + ); + }); + } + + #[gpui::test] + async fn test_cleanup_background_threads_preserves_idle_non_loadable_threads( + cx: &mut TestAppContext, + ) { + let (panel, mut cx) = setup_panel(cx).await; + + let non_loadable_connection = StubAgentConnection::new(); + let non_loadable_session_id = open_idle_thread_with_non_loadable_connection( + &panel, + &non_loadable_connection, + &mut cx, + ); + + let loadable_connection = StubAgentConnection::new() + .with_supports_load_session(true) + .with_agent_id("loadable-stub".into()) + .with_telemetry_id("loadable-stub".into()); + let mut loadable_session_ids = Vec::new(); + + for _ in 0..7 { + loadable_session_ids.push(open_generating_thread_with_loadable_connection( + &panel, + &loadable_connection, + &mut cx, + )); + } + + let base_time = Instant::now(); + + for session_id in loadable_session_ids.iter().take(6) { + loadable_connection.end_turn(session_id.clone(), acp::StopReason::EndTurn); + } + cx.run_until_parked(); + + panel.update(&mut cx, |panel, cx| { + for (index, session_id) in loadable_session_ids.iter().take(6).enumerate() { + let conversation_view = panel + .background_threads + .get(session_id) + .expect("background thread should exist") + .clone(); + conversation_view.update(cx, |view, cx| { + view.set_updated_at(base_time + Duration::from_secs(index as u64), cx); + }); + } + panel.cleanup_background_threads(cx); + }); + + panel.read_with(&cx, |panel, _cx| { + assert_eq!( + panel.background_threads.len(), + 6, + "cleanup should keep the non-loadable idle thread in addition to five loadable ones" + ); + assert!( + panel + .background_threads + .contains_key(&non_loadable_session_id), + "idle non-loadable background threads should not be cleanup candidates" + ); + assert!( + !panel + .background_threads + .contains_key(&loadable_session_ids[0]), + "oldest idle loadable background thread should still be removed" + ); + for session_id in &loadable_session_ids[1..6] { + assert!( + panel.background_threads.contains_key(session_id), + "more recent idle loadable background threads should be retained" + ); + } + assert!( + !panel + .background_threads + .contains_key(&loadable_session_ids[6]), + "the active loadable thread should not also be stored as a background thread" ); }); } diff --git a/crates/agent_ui/src/conversation_view.rs b/crates/agent_ui/src/conversation_view.rs index 3c6eff594c04e3af346dc9c233cfd9a901322923..1dbd3984b51ed6b997cb9453c76fcfb32b84d287 100644 --- a/crates/agent_ui/src/conversation_view.rs +++ b/crates/agent_ui/src/conversation_view.rs @@ -167,41 +167,45 @@ pub(crate) struct Conversation { /// Tracks the selected granularity index for each tool call's permission dropdown. /// The index corresponds to the position in the allow_options list. selected_permission_granularity: HashMap>, + updated_at: Option, } impl Conversation { pub fn register_thread(&mut self, thread: Entity, cx: &mut Context) { let session_id = thread.read(cx).session_id().clone(); - let subscription = cx.subscribe(&thread, move |this, _thread, event, _cx| match event { - AcpThreadEvent::ToolAuthorizationRequested(id) => { - this.permission_requests - .entry(session_id.clone()) - .or_default() - .push(id.clone()); - } - AcpThreadEvent::ToolAuthorizationReceived(id) => { - if let Some(tool_calls) = this.permission_requests.get_mut(&session_id) { - tool_calls.retain(|tool_call_id| tool_call_id != id); - if tool_calls.is_empty() { - this.permission_requests.shift_remove(&session_id); + let subscription = cx.subscribe(&thread, move |this, _thread, event, _cx| { + this.updated_at = Some(Instant::now()); + match event { + AcpThreadEvent::ToolAuthorizationRequested(id) => { + this.permission_requests + .entry(session_id.clone()) + .or_default() + .push(id.clone()); + } + AcpThreadEvent::ToolAuthorizationReceived(id) => { + if let Some(tool_calls) = this.permission_requests.get_mut(&session_id) { + tool_calls.retain(|tool_call_id| tool_call_id != id); + if tool_calls.is_empty() { + this.permission_requests.shift_remove(&session_id); + } } } + AcpThreadEvent::NewEntry + | AcpThreadEvent::TitleUpdated + | AcpThreadEvent::TokenUsageUpdated + | AcpThreadEvent::EntryUpdated(_) + | AcpThreadEvent::EntriesRemoved(_) + | AcpThreadEvent::Retry(_) + | AcpThreadEvent::SubagentSpawned(_) + | AcpThreadEvent::Stopped(_) + | AcpThreadEvent::Error + | AcpThreadEvent::LoadError(_) + | AcpThreadEvent::PromptCapabilitiesUpdated + | AcpThreadEvent::Refusal + | AcpThreadEvent::AvailableCommandsUpdated(_) + | AcpThreadEvent::ModeUpdated(_) + | AcpThreadEvent::ConfigOptionsUpdated(_) => {} } - AcpThreadEvent::NewEntry - | AcpThreadEvent::TitleUpdated - | AcpThreadEvent::TokenUsageUpdated - | AcpThreadEvent::EntryUpdated(_) - | AcpThreadEvent::EntriesRemoved(_) - | AcpThreadEvent::Retry(_) - | AcpThreadEvent::SubagentSpawned(_) - | AcpThreadEvent::Stopped(_) - | AcpThreadEvent::Error - | AcpThreadEvent::LoadError(_) - | AcpThreadEvent::PromptCapabilitiesUpdated - | AcpThreadEvent::Refusal - | AcpThreadEvent::AvailableCommandsUpdated(_) - | AcpThreadEvent::ModeUpdated(_) - | AcpThreadEvent::ConfigOptionsUpdated(_) => {} }); self.subscriptions.push(subscription); self.threads @@ -352,7 +356,7 @@ impl ConversationView { .pending_tool_call(id, cx) } - pub fn parent_thread(&self, cx: &App) -> Option> { + pub fn root_thread(&self, cx: &App) -> Option> { match &self.server_state { ServerState::Connected(connected) => { let mut current = connected.active_view()?; @@ -388,6 +392,11 @@ impl ConversationView { } } + pub fn updated_at(&self, cx: &App) -> Option { + self.as_connected() + .and_then(|connected| connected.conversation.read(cx).updated_at) + } + pub fn navigate_to_session( &mut self, session_id: acp::SessionId, @@ -1152,7 +1161,7 @@ impl ConversationView { pub fn parent_id(&self, cx: &App) -> Option { match &self.server_state { ServerState::Connected(_) => self - .parent_thread(cx) + .root_thread(cx) .map(|thread| thread.read(cx).id.clone()), ServerState::Loading(loading) => loading.read(cx).session_id.clone(), ServerState::LoadError { session_id, .. } => session_id.clone(), @@ -2595,6 +2604,17 @@ impl ConversationView { cx.notify(); } } + + #[cfg(any(test, feature = "test-support"))] + pub fn set_updated_at(&mut self, updated_at: Instant, cx: &mut Context) { + let Some(connected) = self.as_connected_mut() else { + return; + }; + + connected.conversation.update(cx, |conversation, _cx| { + conversation.updated_at = Some(updated_at); + }); + } } impl Render for ConversationView { diff --git a/crates/agent_ui/src/test_support.rs b/crates/agent_ui/src/test_support.rs index 49639136854c8228d027573ee22f423ce687c2d5..43efc85f02f581fe2d2b9d6b3efb7f332b1944e9 100644 --- a/crates/agent_ui/src/test_support.rs +++ b/crates/agent_ui/src/test_support.rs @@ -13,11 +13,23 @@ use crate::agent_panel; pub struct StubAgentServer { connection: C, + agent_id: AgentId, } -impl StubAgentServer { +impl StubAgentServer +where + C: AgentConnection, +{ pub fn new(connection: C) -> Self { - Self { connection } + Self { + connection, + agent_id: "Test".into(), + } + } + + pub fn with_connection_agent_id(mut self) -> Self { + self.agent_id = self.connection.agent_id(); + self } } @@ -40,7 +52,7 @@ where } fn agent_id(&self) -> AgentId { - "Test".into() + self.agent_id.clone() } fn connect( @@ -83,6 +95,23 @@ pub fn open_thread_with_connection( cx.run_until_parked(); } +pub fn open_thread_with_custom_connection( + panel: &Entity, + connection: C, + cx: &mut VisualTestContext, +) where + C: 'static + AgentConnection + Send + Clone, +{ + panel.update_in(cx, |panel, window, cx| { + panel.open_external_thread_with_server( + Rc::new(StubAgentServer::new(connection).with_connection_agent_id()), + window, + cx, + ); + }); + cx.run_until_parked(); +} + pub fn send_message(panel: &Entity, cx: &mut VisualTestContext) { let thread_view = panel.read_with(cx, |panel, cx| panel.active_thread_view(cx).unwrap()); let message_editor = thread_view.read_with(cx, |view, _cx| view.message_editor.clone());