diff --git a/Cargo.lock b/Cargo.lock index 6e4f810b452392db187c7f9bebeabdaea09e93ee..c5d25e5e0c7b3b334894855d01c1a6f2b96069c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,6 +292,7 @@ dependencies = [ "serde", "serde_json", "settings", + "smol", "task", "tempfile", "terminal", diff --git a/crates/acp_thread/src/connection.rs b/crates/acp_thread/src/connection.rs index cd8b40e30361a26721428af417fd309193b43d18..dd29293d8a34146b95bd9e0b7d56684dccfeb2cd 100644 --- a/crates/acp_thread/src/connection.rs +++ b/crates/acp_thread/src/connection.rs @@ -226,6 +226,15 @@ impl AgentSessionInfo { } } +#[derive(Debug, Clone)] +pub enum SessionListUpdate { + Refresh, + SessionInfo { + session_id: acp::SessionId, + update: acp::SessionInfoUpdate, + }, +} + pub trait AgentSessionList { fn list_sessions( &self, @@ -245,7 +254,7 @@ pub trait AgentSessionList { Task::ready(Err(anyhow::anyhow!("delete_sessions not supported"))) } - fn watch(&self, _cx: &mut App) -> Option> { + fn watch(&self, _cx: &mut App) -> Option> { None } diff --git a/crates/agent/src/agent.rs b/crates/agent/src/agent.rs index a2ff1910ea0e9718e98ba45aa458894ff0c6308f..3d0f45d1c90b2dc8c6af65f8fe3df59390a5a446 100644 --- a/crates/agent/src/agent.rs +++ b/crates/agent/src/agent.rs @@ -1399,15 +1399,15 @@ impl acp_thread::AgentTelemetry for NativeAgentConnection { pub struct NativeAgentSessionList { thread_store: Entity, - updates_rx: watch::Receiver<()>, + updates_rx: smol::channel::Receiver, _subscription: Subscription, } impl NativeAgentSessionList { fn new(thread_store: Entity, cx: &mut App) -> Self { - let (mut tx, rx) = watch::channel(()); + let (tx, rx) = smol::channel::unbounded(); let subscription = cx.observe(&thread_store, move |_, _| { - tx.send(()).ok(); + tx.try_send(acp_thread::SessionListUpdate::Refresh).ok(); }); Self { thread_store, @@ -1460,7 +1460,10 @@ impl AgentSessionList for NativeAgentSessionList { .update(cx, |store, cx| store.delete_threads(cx)) } - fn watch(&self, _cx: &mut App) -> Option> { + fn watch( + &self, + _cx: &mut App, + ) -> Option> { Some(self.updates_rx.clone()) } diff --git a/crates/agent_servers/Cargo.toml b/crates/agent_servers/Cargo.toml index 8fb0da2bc0de46351c5e7baf0f294095a031c8ed..6cad63fde98126f6c403deed36bcb50cd2a08166 100644 --- a/crates/agent_servers/Cargo.toml +++ b/crates/agent_servers/Cargo.toml @@ -43,6 +43,7 @@ reqwest_client = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true settings.workspace = true +smol.workspace = true task.workspace = true tempfile.workspace = true thiserror.workspace = true diff --git a/crates/agent_servers/src/acp.rs b/crates/agent_servers/src/acp.rs index f2a87879bffa3ac4f76582d310a90cfdcc9fb529..cf589b983a1bc5fd87bf4fc740f700b08b5da433 100644 --- a/crates/agent_servers/src/acp.rs +++ b/crates/agent_servers/src/acp.rs @@ -80,22 +80,30 @@ pub struct AcpSession { pub struct AcpSessionList { connection: Rc, - updates_tx: Rc>>, - updates_rx: watch::Receiver<()>, + updates_tx: smol::channel::Sender, + updates_rx: smol::channel::Receiver, } impl AcpSessionList { fn new(connection: Rc) -> Self { - let (tx, rx) = watch::channel(()); + let (tx, rx) = smol::channel::unbounded(); Self { connection, - updates_tx: Rc::new(RefCell::new(tx)), + updates_tx: tx, updates_rx: rx, } } fn notify_update(&self) { - self.updates_tx.borrow_mut().send(()).ok(); + self.updates_tx + .try_send(acp_thread::SessionListUpdate::Refresh) + .log_err(); + } + + fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) { + self.updates_tx + .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update }) + .log_err(); } } @@ -133,7 +141,10 @@ impl AgentSessionList for AcpSessionList { }) } - fn watch(&self, _cx: &mut App) -> Option> { + fn watch( + &self, + _cx: &mut App, + ) -> Option> { Some(self.updates_rx.clone()) } @@ -209,8 +220,12 @@ impl AcpConnection { ) }); + let client_session_list: Rc>>> = + Rc::new(RefCell::new(None)); + let client = ClientDelegate { sessions: sessions.clone(), + session_list: client_session_list.clone(), cx: cx.clone(), }; let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, { @@ -300,7 +315,9 @@ impl AcpConnection { .list .is_some() { - Some(Rc::new(AcpSessionList::new(connection.clone()))) + let list = Rc::new(AcpSessionList::new(connection.clone())); + *client_session_list.borrow_mut() = Some(list.clone()); + Some(list) } else { None }; @@ -1045,6 +1062,7 @@ impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions { struct ClientDelegate { sessions: Rc>>, + session_list: Rc>>>, cx: AsyncApp, } @@ -1151,6 +1169,12 @@ impl acp::Client for ClientDelegate { } } + if let acp::SessionUpdate::SessionInfoUpdate(info_update) = ¬ification.update + && let Some(session_list) = self.session_list.borrow().as_ref() + { + session_list.send_info_update(notification.session_id.clone(), info_update.clone()); + } + // Clone so we can inspect meta both before and after handing off to the thread let update_clone = notification.update.clone(); diff --git a/crates/agent_ui/src/acp/thread_history.rs b/crates/agent_ui/src/acp/thread_history.rs index 0a21a28b0e3c6c95701e678638948908e2e7a2fc..36521b25ae5e81613639ba9c98a0b176f3d798db 100644 --- a/crates/agent_ui/src/acp/thread_history.rs +++ b/crates/agent_ui/src/acp/thread_history.rs @@ -1,6 +1,6 @@ use crate::acp::AcpThreadView; use crate::{AgentPanel, RemoveHistory, RemoveSelectedThread}; -use acp_thread::{AgentSessionInfo, AgentSessionList, AgentSessionListRequest}; +use acp_thread::{AgentSessionInfo, AgentSessionList, AgentSessionListRequest, SessionListUpdate}; use agent_client_protocol as acp; use chrono::{Datelike as _, Local, NaiveDate, TimeDelta, Utc}; use editor::{Editor, EditorEvent}; @@ -173,11 +173,31 @@ impl AcpThreadHistory { self.refresh_sessions(false, cx); self._watch_task = self.session_list.as_ref().and_then(|session_list| { - let mut rx = session_list.watch(cx)?; + let rx = session_list.watch(cx)?; Some(cx.spawn(async move |this, cx| { - while let Ok(()) = rx.recv().await { + while let Ok(first_update) = rx.recv().await { + let mut updates = vec![first_update]; + while let Ok(update) = rx.try_recv() { + updates.push(update); + } + + let needs_refresh = updates + .iter() + .any(|u| matches!(u, SessionListUpdate::Refresh)); + this.update(cx, |this, cx| { - this.refresh_sessions(true, cx); + // We will refresh the whole list anyway, so no need to apply incremental updates or do several refreshes + if needs_refresh { + this.refresh_sessions(true, cx); + } else { + for update in updates { + if let SessionListUpdate::SessionInfo { session_id, update } = + update + { + this.apply_info_update(session_id, update, cx); + } + } + } }) .ok(); } @@ -185,6 +205,47 @@ impl AcpThreadHistory { }); } + fn apply_info_update( + &mut self, + session_id: acp::SessionId, + info_update: acp::SessionInfoUpdate, + cx: &mut Context, + ) { + let Some(session) = self + .sessions + .iter_mut() + .find(|s| s.session_id == session_id) + else { + return; + }; + + match info_update.title { + acp::MaybeUndefined::Value(title) => { + session.title = Some(title.into()); + } + acp::MaybeUndefined::Null => { + session.title = None; + } + acp::MaybeUndefined::Undefined => {} + } + match info_update.updated_at { + acp::MaybeUndefined::Value(date_str) => { + if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&date_str) { + session.updated_at = Some(dt.with_timezone(&chrono::Utc)); + } + } + acp::MaybeUndefined::Null => { + session.updated_at = None; + } + acp::MaybeUndefined::Undefined => {} + } + if let Some(meta) = info_update.meta { + session.meta = Some(meta); + } + + self.update_visible_items(true, cx); + } + fn refresh_sessions(&mut self, preserve_selected_item: bool, cx: &mut Context) { let Some(session_list) = self.session_list.clone() else { self.update_visible_items(preserve_selected_item, cx); @@ -976,7 +1037,276 @@ impl Display for TimeBucket { #[cfg(test)] mod tests { use super::*; + use acp_thread::AgentSessionListResponse; use chrono::NaiveDate; + use gpui::TestAppContext; + use std::any::Any; + + fn init_test(cx: &mut TestAppContext) { + cx.update(|cx| { + let settings_store = settings::SettingsStore::test(cx); + cx.set_global(settings_store); + theme::init(theme::LoadThemes::JustBase, cx); + }); + } + + #[derive(Clone)] + struct TestSessionList { + sessions: Vec, + updates_tx: smol::channel::Sender, + updates_rx: smol::channel::Receiver, + } + + impl TestSessionList { + fn new(sessions: Vec) -> Self { + let (tx, rx) = smol::channel::unbounded(); + Self { + sessions, + updates_tx: tx, + updates_rx: rx, + } + } + + fn send_update(&self, update: SessionListUpdate) { + self.updates_tx.try_send(update).ok(); + } + } + + impl AgentSessionList for TestSessionList { + fn list_sessions( + &self, + _request: AgentSessionListRequest, + _cx: &mut App, + ) -> Task> { + Task::ready(Ok(AgentSessionListResponse::new(self.sessions.clone()))) + } + + fn watch(&self, _cx: &mut App) -> Option> { + Some(self.updates_rx.clone()) + } + + fn into_any(self: Rc) -> Rc { + self + } + } + + #[gpui::test] + async fn test_apply_info_update_title(cx: &mut TestAppContext) { + init_test(cx); + + let session_id = acp::SessionId::new("test-session"); + let sessions = vec![AgentSessionInfo { + session_id: session_id.clone(), + cwd: None, + title: Some("Original Title".into()), + updated_at: None, + meta: None, + }]; + let session_list = Rc::new(TestSessionList::new(sessions)); + + let (history, cx) = cx.add_window_view(|window, cx| { + AcpThreadHistory::new(Some(session_list.clone()), window, cx) + }); + cx.run_until_parked(); + + // Send a title update + session_list.send_update(SessionListUpdate::SessionInfo { + session_id: session_id.clone(), + update: acp::SessionInfoUpdate::new().title("New Title"), + }); + cx.run_until_parked(); + + // Check that the title was updated + history.update(cx, |history, _cx| { + let session = history.sessions.iter().find(|s| s.session_id == session_id); + assert_eq!( + session.unwrap().title.as_ref().map(|s| s.as_ref()), + Some("New Title") + ); + }); + } + + #[gpui::test] + async fn test_apply_info_update_clears_title_with_null(cx: &mut TestAppContext) { + init_test(cx); + + let session_id = acp::SessionId::new("test-session"); + let sessions = vec![AgentSessionInfo { + session_id: session_id.clone(), + cwd: None, + title: Some("Original Title".into()), + updated_at: None, + meta: None, + }]; + let session_list = Rc::new(TestSessionList::new(sessions)); + + let (history, cx) = cx.add_window_view(|window, cx| { + AcpThreadHistory::new(Some(session_list.clone()), window, cx) + }); + cx.run_until_parked(); + + // Send an update that clears the title (null) + session_list.send_update(SessionListUpdate::SessionInfo { + session_id: session_id.clone(), + update: acp::SessionInfoUpdate::new().title(None::), + }); + cx.run_until_parked(); + + // Check that the title was cleared + history.update(cx, |history, _cx| { + let session = history.sessions.iter().find(|s| s.session_id == session_id); + assert_eq!(session.unwrap().title, None); + }); + } + + #[gpui::test] + async fn test_apply_info_update_ignores_undefined_fields(cx: &mut TestAppContext) { + init_test(cx); + + let session_id = acp::SessionId::new("test-session"); + let sessions = vec![AgentSessionInfo { + session_id: session_id.clone(), + cwd: None, + title: Some("Original Title".into()), + updated_at: None, + meta: None, + }]; + let session_list = Rc::new(TestSessionList::new(sessions)); + + let (history, cx) = cx.add_window_view(|window, cx| { + AcpThreadHistory::new(Some(session_list.clone()), window, cx) + }); + cx.run_until_parked(); + + // Send an update with no fields set (all undefined) + session_list.send_update(SessionListUpdate::SessionInfo { + session_id: session_id.clone(), + update: acp::SessionInfoUpdate::new(), + }); + cx.run_until_parked(); + + // Check that the title is unchanged + history.update(cx, |history, _cx| { + let session = history.sessions.iter().find(|s| s.session_id == session_id); + assert_eq!( + session.unwrap().title.as_ref().map(|s| s.as_ref()), + Some("Original Title") + ); + }); + } + + #[gpui::test] + async fn test_multiple_info_updates_applied_in_order(cx: &mut TestAppContext) { + init_test(cx); + + let session_id = acp::SessionId::new("test-session"); + let sessions = vec![AgentSessionInfo { + session_id: session_id.clone(), + cwd: None, + title: None, + updated_at: None, + meta: None, + }]; + let session_list = Rc::new(TestSessionList::new(sessions)); + + let (history, cx) = cx.add_window_view(|window, cx| { + AcpThreadHistory::new(Some(session_list.clone()), window, cx) + }); + cx.run_until_parked(); + + // Send multiple updates before the executor runs + session_list.send_update(SessionListUpdate::SessionInfo { + session_id: session_id.clone(), + update: acp::SessionInfoUpdate::new().title("First Title"), + }); + session_list.send_update(SessionListUpdate::SessionInfo { + session_id: session_id.clone(), + update: acp::SessionInfoUpdate::new().title("Second Title"), + }); + cx.run_until_parked(); + + // Check that the final title is "Second Title" (both applied in order) + history.update(cx, |history, _cx| { + let session = history.sessions.iter().find(|s| s.session_id == session_id); + assert_eq!( + session.unwrap().title.as_ref().map(|s| s.as_ref()), + Some("Second Title") + ); + }); + } + + #[gpui::test] + async fn test_refresh_supersedes_info_updates(cx: &mut TestAppContext) { + init_test(cx); + + let session_id = acp::SessionId::new("test-session"); + let sessions = vec![AgentSessionInfo { + session_id: session_id.clone(), + cwd: None, + title: Some("Server Title".into()), + updated_at: None, + meta: None, + }]; + let session_list = Rc::new(TestSessionList::new(sessions)); + + let (history, cx) = cx.add_window_view(|window, cx| { + AcpThreadHistory::new(Some(session_list.clone()), window, cx) + }); + cx.run_until_parked(); + + // Send an info update followed by a refresh + session_list.send_update(SessionListUpdate::SessionInfo { + session_id: session_id.clone(), + update: acp::SessionInfoUpdate::new().title("Local Update"), + }); + session_list.send_update(SessionListUpdate::Refresh); + cx.run_until_parked(); + + // The refresh should have fetched from server, getting "Server Title" + history.update(cx, |history, _cx| { + let session = history.sessions.iter().find(|s| s.session_id == session_id); + assert_eq!( + session.unwrap().title.as_ref().map(|s| s.as_ref()), + Some("Server Title") + ); + }); + } + + #[gpui::test] + async fn test_info_update_for_unknown_session_is_ignored(cx: &mut TestAppContext) { + init_test(cx); + + let session_id = acp::SessionId::new("known-session"); + let sessions = vec![AgentSessionInfo { + session_id, + cwd: None, + title: Some("Original".into()), + updated_at: None, + meta: None, + }]; + let session_list = Rc::new(TestSessionList::new(sessions)); + + let (history, cx) = cx.add_window_view(|window, cx| { + AcpThreadHistory::new(Some(session_list.clone()), window, cx) + }); + cx.run_until_parked(); + + // Send an update for an unknown session + session_list.send_update(SessionListUpdate::SessionInfo { + session_id: acp::SessionId::new("unknown-session"), + update: acp::SessionInfoUpdate::new().title("Should Be Ignored"), + }); + cx.run_until_parked(); + + // Check that the known session is unchanged and no crash occurred + history.update(cx, |history, _cx| { + assert_eq!(history.sessions.len(), 1); + assert_eq!( + history.sessions[0].title.as_ref().map(|s| s.as_ref()), + Some("Original") + ); + }); + } #[test] fn test_time_bucket_from_dates() {