acp: Dedupe session/list calls (#47677)

Ben Brandt created

Noticed we were calling session/list twice now, this makes sure it only
happens once on load

Release Notes:

- N/A

Change summary

crates/acp_thread/src/connection.rs       |  2 
crates/agent_servers/src/acp.rs           | 16 +---
crates/agent_ui/src/acp/thread_history.rs | 78 ++++++++++++++----------
3 files changed, 52 insertions(+), 44 deletions(-)

Detailed changes

crates/agent_servers/src/acp.rs 🔗

@@ -148,6 +148,10 @@ impl AgentSessionList for AcpSessionList {
         Some(self.updates_rx.clone())
     }
 
+    fn notify_refresh(&self) {
+        self.notify_update();
+    }
+
     fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
         self
     }
@@ -578,10 +582,6 @@ impl AgentConnection for AcpConnection {
                 },
             );
 
-            if let Some(session_list) = &self.session_list {
-                session_list.notify_update();
-            }
-
             Ok(thread)
         })
     }
@@ -663,10 +663,6 @@ impl AgentConnection for AcpConnection {
                 session.config_options = config_options.map(ConfigOptions::new);
             }
 
-            if let Some(session_list) = &self.session_list {
-                session_list.notify_update();
-            }
-
             Ok(thread)
         })
     }
@@ -741,10 +737,6 @@ impl AgentConnection for AcpConnection {
                 session.config_options = config_options.map(ConfigOptions::new);
             }
 
-            if let Some(session_list) = &self.session_list {
-                session_list.notify_update();
-            }
-
             Ok(thread)
         })
     }

crates/agent_ui/src/acp/thread_history.rs 🔗

@@ -170,39 +170,47 @@ impl AcpThreadHistory {
         self.sessions.clear();
         self.visible_items.clear();
         self.selected_index = 0;
-        self.refresh_sessions(false, cx);
-
-        self._watch_task = self.session_list.as_ref().and_then(|session_list| {
-            let rx = session_list.watch(cx)?;
-            Some(cx.spawn(async move |this, cx| {
-                while let Ok(first_update) = rx.recv().await {
-                    let mut updates = vec![first_update];
-                    while let Ok(update) = rx.try_recv() {
-                        updates.push(update);
-                    }
 
-                    let needs_refresh = updates
-                        .iter()
-                        .any(|u| matches!(u, SessionListUpdate::Refresh));
-
-                    this.update(cx, |this, cx| {
-                        // We will refresh the whole list anyway, so no need to apply incremental updates or do several refreshes
-                        if needs_refresh {
-                            this.refresh_sessions(true, cx);
-                        } else {
-                            for update in updates {
-                                if let SessionListUpdate::SessionInfo { session_id, update } =
-                                    update
-                                {
-                                    this.apply_info_update(session_id, update, cx);
-                                }
+        let Some(session_list) = self.session_list.as_ref() else {
+            self._watch_task = None;
+            cx.notify();
+            return;
+        };
+        let Some(rx) = session_list.watch(cx) else {
+            // No watch support - do a one-time refresh
+            self._watch_task = None;
+            self.refresh_sessions(false, cx);
+            return;
+        };
+        session_list.notify_refresh();
+
+        self._watch_task = Some(cx.spawn(async move |this, cx| {
+            while let Ok(first_update) = rx.recv().await {
+                let mut updates = vec![first_update];
+                // Collect any additional updates that are already in the channel
+                while let Ok(update) = rx.try_recv() {
+                    updates.push(update);
+                }
+
+                let needs_refresh = updates
+                    .iter()
+                    .any(|u| matches!(u, SessionListUpdate::Refresh));
+
+                this.update(cx, |this, cx| {
+                    // We will refresh the whole list anyway, so no need to apply incremental updates or do several refreshes
+                    if needs_refresh {
+                        this.refresh_sessions(true, cx);
+                    } else {
+                        for update in updates {
+                            if let SessionListUpdate::SessionInfo { session_id, update } = update {
+                                this.apply_info_update(session_id, update, cx);
                             }
                         }
-                    })
-                    .ok();
-                }
-            }))
-        });
+                    }
+                })
+                .ok();
+            }
+        }));
     }
 
     fn apply_info_update(
@@ -311,8 +319,10 @@ impl AcpThreadHistory {
         self.session_list.is_some()
     }
 
-    pub fn refresh(&mut self, cx: &mut Context<Self>) {
-        self.refresh_sessions(true, cx);
+    pub fn refresh(&mut self, _cx: &mut Context<Self>) {
+        if let Some(session_list) = &self.session_list {
+            session_list.notify_refresh();
+        }
     }
 
     pub fn session_for_id(&self, session_id: &acp::SessionId) -> Option<AgentSessionInfo> {
@@ -1085,6 +1095,10 @@ mod tests {
             Some(self.updates_rx.clone())
         }
 
+        fn notify_refresh(&self) {
+            self.send_update(SessionListUpdate::Refresh);
+        }
+
         fn into_any(self: Rc<Self>) -> Rc<dyn Any> {
             self
         }