acp: Support the session_info_update notification (#47363)

Ben Brandt created

Release Notes:

- N/A

Change summary

Cargo.lock                                |   1 
crates/acp_thread/src/connection.rs       |  11 
crates/agent/src/agent.rs                 |  11 
crates/agent_servers/Cargo.toml           |   1 
crates/agent_servers/src/acp.rs           |  38 ++
crates/agent_ui/src/acp/thread_history.rs | 338 ++++++++++++++++++++++++
6 files changed, 384 insertions(+), 16 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -292,6 +292,7 @@ dependencies = [
  "serde",
  "serde_json",
  "settings",
+ "smol",
  "task",
  "tempfile",
  "terminal",

crates/acp_thread/src/connection.rs 🔗

@@ -226,6 +226,15 @@ impl AgentSessionInfo {
     }
 }
 
+#[derive(Debug, Clone)]
+pub enum SessionListUpdate {
+    Refresh,
+    SessionInfo {
+        session_id: acp::SessionId,
+        update: acp::SessionInfoUpdate,
+    },
+}
+
 pub trait AgentSessionList {
     fn list_sessions(
         &self,
@@ -245,7 +254,7 @@ pub trait AgentSessionList {
         Task::ready(Err(anyhow::anyhow!("delete_sessions not supported")))
     }
 
-    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
+    fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
         None
     }
 

crates/agent/src/agent.rs 🔗

@@ -1399,15 +1399,15 @@ impl acp_thread::AgentTelemetry for NativeAgentConnection {
 
 pub struct NativeAgentSessionList {
     thread_store: Entity<ThreadStore>,
-    updates_rx: watch::Receiver<()>,
+    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
     _subscription: Subscription,
 }
 
 impl NativeAgentSessionList {
     fn new(thread_store: Entity<ThreadStore>, cx: &mut App) -> Self {
-        let (mut tx, rx) = watch::channel(());
+        let (tx, rx) = smol::channel::unbounded();
         let subscription = cx.observe(&thread_store, move |_, _| {
-            tx.send(()).ok();
+            tx.try_send(acp_thread::SessionListUpdate::Refresh).ok();
         });
         Self {
             thread_store,
@@ -1460,7 +1460,10 @@ impl AgentSessionList for NativeAgentSessionList {
             .update(cx, |store, cx| store.delete_threads(cx))
     }
 
-    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
+    fn watch(
+        &self,
+        _cx: &mut App,
+    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
         Some(self.updates_rx.clone())
     }
 

crates/agent_servers/Cargo.toml 🔗

@@ -43,6 +43,7 @@ reqwest_client = { workspace = true, optional = true }
 serde.workspace = true
 serde_json.workspace = true
 settings.workspace = true
+smol.workspace = true
 task.workspace = true
 tempfile.workspace = true
 thiserror.workspace = true

crates/agent_servers/src/acp.rs 🔗

@@ -80,22 +80,30 @@ pub struct AcpSession {
 
 pub struct AcpSessionList {
     connection: Rc<acp::ClientSideConnection>,
-    updates_tx: Rc<RefCell<watch::Sender<()>>>,
-    updates_rx: watch::Receiver<()>,
+    updates_tx: smol::channel::Sender<acp_thread::SessionListUpdate>,
+    updates_rx: smol::channel::Receiver<acp_thread::SessionListUpdate>,
 }
 
 impl AcpSessionList {
     fn new(connection: Rc<acp::ClientSideConnection>) -> Self {
-        let (tx, rx) = watch::channel(());
+        let (tx, rx) = smol::channel::unbounded();
         Self {
             connection,
-            updates_tx: Rc::new(RefCell::new(tx)),
+            updates_tx: tx,
             updates_rx: rx,
         }
     }
 
     fn notify_update(&self) {
-        self.updates_tx.borrow_mut().send(()).ok();
+        self.updates_tx
+            .try_send(acp_thread::SessionListUpdate::Refresh)
+            .log_err();
+    }
+
+    fn send_info_update(&self, session_id: acp::SessionId, update: acp::SessionInfoUpdate) {
+        self.updates_tx
+            .try_send(acp_thread::SessionListUpdate::SessionInfo { session_id, update })
+            .log_err();
     }
 }
 
@@ -133,7 +141,10 @@ impl AgentSessionList for AcpSessionList {
         })
     }
 
-    fn watch(&self, _cx: &mut App) -> Option<watch::Receiver<()>> {
+    fn watch(
+        &self,
+        _cx: &mut App,
+    ) -> Option<smol::channel::Receiver<acp_thread::SessionListUpdate>> {
         Some(self.updates_rx.clone())
     }
 
@@ -209,8 +220,12 @@ impl AcpConnection {
             )
         });
 
+        let client_session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>> =
+            Rc::new(RefCell::new(None));
+
         let client = ClientDelegate {
             sessions: sessions.clone(),
+            session_list: client_session_list.clone(),
             cx: cx.clone(),
         };
         let (connection, io_task) = acp::ClientSideConnection::new(client, stdin, stdout, {
@@ -300,7 +315,9 @@ impl AcpConnection {
             .list
             .is_some()
         {
-            Some(Rc::new(AcpSessionList::new(connection.clone())))
+            let list = Rc::new(AcpSessionList::new(connection.clone()));
+            *client_session_list.borrow_mut() = Some(list.clone());
+            Some(list)
         } else {
             None
         };
@@ -1045,6 +1062,7 @@ impl acp_thread::AgentSessionConfigOptions for AcpSessionConfigOptions {
 
 struct ClientDelegate {
     sessions: Rc<RefCell<HashMap<acp::SessionId, AcpSession>>>,
+    session_list: Rc<RefCell<Option<Rc<AcpSessionList>>>>,
     cx: AsyncApp,
 }
 
@@ -1151,6 +1169,12 @@ impl acp::Client for ClientDelegate {
             }
         }
 
+        if let acp::SessionUpdate::SessionInfoUpdate(info_update) = &notification.update
+            && let Some(session_list) = self.session_list.borrow().as_ref()
+        {
+            session_list.send_info_update(notification.session_id.clone(), info_update.clone());
+        }
+
         // Clone so we can inspect meta both before and after handing off to the thread
         let update_clone = notification.update.clone();
 

crates/agent_ui/src/acp/thread_history.rs 🔗

@@ -1,6 +1,6 @@
 use crate::acp::AcpThreadView;
 use crate::{AgentPanel, RemoveHistory, RemoveSelectedThread};
-use acp_thread::{AgentSessionInfo, AgentSessionList, AgentSessionListRequest};
+use acp_thread::{AgentSessionInfo, AgentSessionList, AgentSessionListRequest, SessionListUpdate};
 use agent_client_protocol as acp;
 use chrono::{Datelike as _, Local, NaiveDate, TimeDelta, Utc};
 use editor::{Editor, EditorEvent};
@@ -173,11 +173,31 @@ impl AcpThreadHistory {
         self.refresh_sessions(false, cx);
 
         self._watch_task = self.session_list.as_ref().and_then(|session_list| {
-            let mut rx = session_list.watch(cx)?;
+            let rx = session_list.watch(cx)?;
             Some(cx.spawn(async move |this, cx| {
-                while let Ok(()) = rx.recv().await {
+                while let Ok(first_update) = rx.recv().await {
+                    let mut updates = vec![first_update];
+                    while let Ok(update) = rx.try_recv() {
+                        updates.push(update);
+                    }
+
+                    let needs_refresh = updates
+                        .iter()
+                        .any(|u| matches!(u, SessionListUpdate::Refresh));
+
                     this.update(cx, |this, cx| {
-                        this.refresh_sessions(true, cx);
+                        // We will refresh the whole list anyway, so no need to apply incremental updates or do several refreshes
+                        if needs_refresh {
+                            this.refresh_sessions(true, cx);
+                        } else {
+                            for update in updates {
+                                if let SessionListUpdate::SessionInfo { session_id, update } =
+                                    update
+                                {
+                                    this.apply_info_update(session_id, update, cx);
+                                }
+                            }
+                        }
                     })
                     .ok();
                 }
@@ -185,6 +205,47 @@ impl AcpThreadHistory {
         });
     }
 
+    fn apply_info_update(
+        &mut self,
+        session_id: acp::SessionId,
+        info_update: acp::SessionInfoUpdate,
+        cx: &mut Context<Self>,
+    ) {
+        let Some(session) = self
+            .sessions
+            .iter_mut()
+            .find(|s| s.session_id == session_id)
+        else {
+            return;
+        };
+
+        match info_update.title {
+            acp::MaybeUndefined::Value(title) => {
+                session.title = Some(title.into());
+            }
+            acp::MaybeUndefined::Null => {
+                session.title = None;
+            }
+            acp::MaybeUndefined::Undefined => {}
+        }
+        match info_update.updated_at {
+            acp::MaybeUndefined::Value(date_str) => {
+                if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&date_str) {
+                    session.updated_at = Some(dt.with_timezone(&chrono::Utc));
+                }
+            }
+            acp::MaybeUndefined::Null => {
+                session.updated_at = None;
+            }
+            acp::MaybeUndefined::Undefined => {}
+        }
+        if let Some(meta) = info_update.meta {
+            session.meta = Some(meta);
+        }
+
+        self.update_visible_items(true, cx);
+    }
+
     fn refresh_sessions(&mut self, preserve_selected_item: bool, cx: &mut Context<Self>) {
         let Some(session_list) = self.session_list.clone() else {
             self.update_visible_items(preserve_selected_item, cx);
@@ -976,7 +1037,276 @@ impl Display for TimeBucket {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use acp_thread::AgentSessionListResponse;
     use chrono::NaiveDate;
+    use gpui::TestAppContext;
+    use std::any::Any;
+
+    fn init_test(cx: &mut TestAppContext) {
+        cx.update(|cx| {
+            let settings_store = settings::SettingsStore::test(cx);
+            cx.set_global(settings_store);
+            theme::init(theme::LoadThemes::JustBase, cx);
+        });
+    }
+
+    #[derive(Clone)]
+    struct TestSessionList {
+        sessions: Vec<AgentSessionInfo>,
+        updates_tx: smol::channel::Sender<SessionListUpdate>,
+        updates_rx: smol::channel::Receiver<SessionListUpdate>,
+    }
+
+    impl TestSessionList {
+        fn new(sessions: Vec<AgentSessionInfo>) -> Self {
+            let (tx, rx) = smol::channel::unbounded();
+            Self {
+                sessions,
+                updates_tx: tx,
+                updates_rx: rx,
+            }
+        }
+
+        fn send_update(&self, update: SessionListUpdate) {
+            self.updates_tx.try_send(update).ok();
+        }
+    }
+
+    impl AgentSessionList for TestSessionList {
+        fn list_sessions(
+            &self,
+            _request: AgentSessionListRequest,
+            _cx: &mut App,
+        ) -> Task<anyhow::Result<AgentSessionListResponse>> {
+            Task::ready(Ok(AgentSessionListResponse::new(self.sessions.clone())))
+        }
+
+        fn watch(&self, _cx: &mut App) -> Option<smol::channel::Receiver<SessionListUpdate>> {
+            Some(self.updates_rx.clone())
+        }
+
+        fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
+            self
+        }
+    }
+
+    #[gpui::test]
+    async fn test_apply_info_update_title(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let session_id = acp::SessionId::new("test-session");
+        let sessions = vec![AgentSessionInfo {
+            session_id: session_id.clone(),
+            cwd: None,
+            title: Some("Original Title".into()),
+            updated_at: None,
+            meta: None,
+        }];
+        let session_list = Rc::new(TestSessionList::new(sessions));
+
+        let (history, cx) = cx.add_window_view(|window, cx| {
+            AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+        });
+        cx.run_until_parked();
+
+        // Send a title update
+        session_list.send_update(SessionListUpdate::SessionInfo {
+            session_id: session_id.clone(),
+            update: acp::SessionInfoUpdate::new().title("New Title"),
+        });
+        cx.run_until_parked();
+
+        // Check that the title was updated
+        history.update(cx, |history, _cx| {
+            let session = history.sessions.iter().find(|s| s.session_id == session_id);
+            assert_eq!(
+                session.unwrap().title.as_ref().map(|s| s.as_ref()),
+                Some("New Title")
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_apply_info_update_clears_title_with_null(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let session_id = acp::SessionId::new("test-session");
+        let sessions = vec![AgentSessionInfo {
+            session_id: session_id.clone(),
+            cwd: None,
+            title: Some("Original Title".into()),
+            updated_at: None,
+            meta: None,
+        }];
+        let session_list = Rc::new(TestSessionList::new(sessions));
+
+        let (history, cx) = cx.add_window_view(|window, cx| {
+            AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+        });
+        cx.run_until_parked();
+
+        // Send an update that clears the title (null)
+        session_list.send_update(SessionListUpdate::SessionInfo {
+            session_id: session_id.clone(),
+            update: acp::SessionInfoUpdate::new().title(None::<String>),
+        });
+        cx.run_until_parked();
+
+        // Check that the title was cleared
+        history.update(cx, |history, _cx| {
+            let session = history.sessions.iter().find(|s| s.session_id == session_id);
+            assert_eq!(session.unwrap().title, None);
+        });
+    }
+
+    #[gpui::test]
+    async fn test_apply_info_update_ignores_undefined_fields(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let session_id = acp::SessionId::new("test-session");
+        let sessions = vec![AgentSessionInfo {
+            session_id: session_id.clone(),
+            cwd: None,
+            title: Some("Original Title".into()),
+            updated_at: None,
+            meta: None,
+        }];
+        let session_list = Rc::new(TestSessionList::new(sessions));
+
+        let (history, cx) = cx.add_window_view(|window, cx| {
+            AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+        });
+        cx.run_until_parked();
+
+        // Send an update with no fields set (all undefined)
+        session_list.send_update(SessionListUpdate::SessionInfo {
+            session_id: session_id.clone(),
+            update: acp::SessionInfoUpdate::new(),
+        });
+        cx.run_until_parked();
+
+        // Check that the title is unchanged
+        history.update(cx, |history, _cx| {
+            let session = history.sessions.iter().find(|s| s.session_id == session_id);
+            assert_eq!(
+                session.unwrap().title.as_ref().map(|s| s.as_ref()),
+                Some("Original Title")
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_multiple_info_updates_applied_in_order(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let session_id = acp::SessionId::new("test-session");
+        let sessions = vec![AgentSessionInfo {
+            session_id: session_id.clone(),
+            cwd: None,
+            title: None,
+            updated_at: None,
+            meta: None,
+        }];
+        let session_list = Rc::new(TestSessionList::new(sessions));
+
+        let (history, cx) = cx.add_window_view(|window, cx| {
+            AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+        });
+        cx.run_until_parked();
+
+        // Send multiple updates before the executor runs
+        session_list.send_update(SessionListUpdate::SessionInfo {
+            session_id: session_id.clone(),
+            update: acp::SessionInfoUpdate::new().title("First Title"),
+        });
+        session_list.send_update(SessionListUpdate::SessionInfo {
+            session_id: session_id.clone(),
+            update: acp::SessionInfoUpdate::new().title("Second Title"),
+        });
+        cx.run_until_parked();
+
+        // Check that the final title is "Second Title" (both applied in order)
+        history.update(cx, |history, _cx| {
+            let session = history.sessions.iter().find(|s| s.session_id == session_id);
+            assert_eq!(
+                session.unwrap().title.as_ref().map(|s| s.as_ref()),
+                Some("Second Title")
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_refresh_supersedes_info_updates(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let session_id = acp::SessionId::new("test-session");
+        let sessions = vec![AgentSessionInfo {
+            session_id: session_id.clone(),
+            cwd: None,
+            title: Some("Server Title".into()),
+            updated_at: None,
+            meta: None,
+        }];
+        let session_list = Rc::new(TestSessionList::new(sessions));
+
+        let (history, cx) = cx.add_window_view(|window, cx| {
+            AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+        });
+        cx.run_until_parked();
+
+        // Send an info update followed by a refresh
+        session_list.send_update(SessionListUpdate::SessionInfo {
+            session_id: session_id.clone(),
+            update: acp::SessionInfoUpdate::new().title("Local Update"),
+        });
+        session_list.send_update(SessionListUpdate::Refresh);
+        cx.run_until_parked();
+
+        // The refresh should have fetched from server, getting "Server Title"
+        history.update(cx, |history, _cx| {
+            let session = history.sessions.iter().find(|s| s.session_id == session_id);
+            assert_eq!(
+                session.unwrap().title.as_ref().map(|s| s.as_ref()),
+                Some("Server Title")
+            );
+        });
+    }
+
+    #[gpui::test]
+    async fn test_info_update_for_unknown_session_is_ignored(cx: &mut TestAppContext) {
+        init_test(cx);
+
+        let session_id = acp::SessionId::new("known-session");
+        let sessions = vec![AgentSessionInfo {
+            session_id,
+            cwd: None,
+            title: Some("Original".into()),
+            updated_at: None,
+            meta: None,
+        }];
+        let session_list = Rc::new(TestSessionList::new(sessions));
+
+        let (history, cx) = cx.add_window_view(|window, cx| {
+            AcpThreadHistory::new(Some(session_list.clone()), window, cx)
+        });
+        cx.run_until_parked();
+
+        // Send an update for an unknown session
+        session_list.send_update(SessionListUpdate::SessionInfo {
+            session_id: acp::SessionId::new("unknown-session"),
+            update: acp::SessionInfoUpdate::new().title("Should Be Ignored"),
+        });
+        cx.run_until_parked();
+
+        // Check that the known session is unchanged and no crash occurred
+        history.update(cx, |history, _cx| {
+            assert_eq!(history.sessions.len(), 1);
+            assert_eq!(
+                history.sessions[0].title.as_ref().map(|s| s.as_ref()),
+                Some("Original")
+            );
+        });
+    }
 
     #[test]
     fn test_time_bucket_from_dates() {