agent: Reduce DB load in ThreadMetadataStore (#52313)

Bennet Bo Fenner and Gaauwe Rombouts created

## Context

We now batch multiple db operations together if they happen very
frequently (similar to how we handle it in `thread_history.rs`)

## Self-Review Checklist

<!-- Check before requesting review: -->
- [x] I've reviewed my own diff for quality, security, and reliability
- [x] Unsafe blocks (if any) have justifying comments
- [x] The content is consistent with the [UI/UX
checklist](https://github.com/zed-industries/zed/blob/main/CONTRIBUTING.md#uiux-checklist)
- [x] Tests cover the new/changed behavior
- [x] Performance impact has been considered and is acceptable

Release Notes:

- N/A

---------

Co-authored-by: Gaauwe Rombouts <mail@grombouts.nl>

Change summary

crates/agent_ui/src/conversation_view.rs     |   4 
crates/agent_ui/src/thread_metadata_store.rs | 160 +++++++++++++++++----
crates/sidebar/src/sidebar.rs                |  21 +-
crates/zed/src/zed.rs                        |  28 +++
4 files changed, 166 insertions(+), 47 deletions(-)

Detailed changes

crates/agent_ui/src/conversation_view.rs 🔗

@@ -2546,9 +2546,7 @@ impl ConversationView {
         task.detach_and_log_err(cx);
 
         if let Some(store) = SidebarThreadMetadataStore::try_global(cx) {
-            store
-                .update(cx, |store, cx| store.delete(session_id.clone(), cx))
-                .detach_and_log_err(cx);
+            store.update(cx, |store, cx| store.delete(session_id.clone(), cx));
         }
     }
 }

crates/agent_ui/src/thread_metadata_store.rs 🔗

@@ -3,7 +3,7 @@ use std::{path::Path, sync::Arc};
 use acp_thread::AgentSessionInfo;
 use agent::{ThreadStore, ZED_AGENT_ID};
 use agent_client_protocol as acp;
-use anyhow::{Context as _, Result};
+use anyhow::Context as _;
 use chrono::{DateTime, Utc};
 use collections::HashMap;
 use db::{
@@ -104,7 +104,7 @@ impl Global for GlobalThreadMetadataStore {}
 
 /// Lightweight metadata for any thread (native or ACP), enough to populate
 /// the sidebar list and route to the correct load path when clicked.
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, PartialEq)]
 pub struct ThreadMetadata {
     pub session_id: acp::SessionId,
     /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
@@ -183,6 +183,23 @@ pub struct SidebarThreadMetadataStore {
     threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
     reload_task: Option<Shared<Task<()>>>,
     session_subscriptions: HashMap<acp::SessionId, Subscription>,
+    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
+    _db_operations_task: Task<()>,
+}
+
+#[derive(Debug, PartialEq)]
+enum DbOperation {
+    Insert(ThreadMetadata),
+    Delete(acp::SessionId),
+}
+
+impl DbOperation {
+    fn id(&self) -> &acp::SessionId {
+        match self {
+            DbOperation::Insert(thread) => &thread.session_id,
+            DbOperation::Delete(session_id) => session_id,
+        }
+    }
 }
 
 impl SidebarThreadMetadataStore {
@@ -273,36 +290,24 @@ impl SidebarThreadMetadataStore {
         reload_task
     }
 
-    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
+    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
         if !cx.has_flag::<AgentV2FeatureFlag>() {
-            return Task::ready(Ok(()));
+            return;
         }
 
-        let db = self.db.clone();
-        cx.spawn(async move |this, cx| {
-            db.save(metadata).await?;
-            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
-            reload_task.await;
-            Ok(())
-        })
+        self.pending_thread_ops_tx
+            .try_send(DbOperation::Insert(metadata))
+            .log_err();
     }
 
-    pub fn delete(
-        &mut self,
-        session_id: acp::SessionId,
-        cx: &mut Context<Self>,
-    ) -> Task<Result<()>> {
+    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
         if !cx.has_flag::<AgentV2FeatureFlag>() {
-            return Task::ready(Ok(()));
+            return;
         }
 
-        let db = self.db.clone();
-        cx.spawn(async move |this, cx| {
-            db.delete(session_id).await?;
-            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
-            reload_task.await;
-            Ok(())
-        })
+        self.pending_thread_ops_tx
+            .try_send(DbOperation::Delete(session_id))
+            .log_err();
     }
 
     fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
@@ -338,17 +343,56 @@ impl SidebarThreadMetadataStore {
         })
         .detach();
 
+        let (tx, rx) = smol::channel::unbounded();
+        let _db_operations_task = cx.spawn({
+            let db = db.clone();
+            async move |this, cx| {
+                while let Ok(first_update) = rx.recv().await {
+                    let mut updates = vec![first_update];
+                    while let Ok(update) = rx.try_recv() {
+                        updates.push(update);
+                    }
+                    let updates = Self::dedup_db_operations(updates);
+                    for operation in updates {
+                        match operation {
+                            DbOperation::Insert(metadata) => {
+                                db.save(metadata).await.log_err();
+                            }
+                            DbOperation::Delete(session_id) => {
+                                db.delete(session_id).await.log_err();
+                            }
+                        }
+                    }
+
+                    this.update(cx, |this, cx| this.reload(cx)).ok();
+                }
+            }
+        });
+
         let mut this = Self {
             db,
             threads: Vec::new(),
             threads_by_paths: HashMap::default(),
             reload_task: None,
             session_subscriptions: HashMap::default(),
+            pending_thread_ops_tx: tx,
+            _db_operations_task,
         };
         let _ = this.reload(cx);
         this
     }
 
+    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
+        let mut ops = HashMap::default();
+        for operation in operations.into_iter().rev() {
+            if ops.contains_key(operation.id()) {
+                continue;
+            }
+            ops.insert(operation.id().clone(), operation);
+        }
+        ops.into_values().collect()
+    }
+
     fn handle_thread_update(
         &mut self,
         thread: Entity<acp_thread::AcpThread>,
@@ -373,7 +417,7 @@ impl SidebarThreadMetadataStore {
             | acp_thread::AcpThreadEvent::LoadError(_)
             | acp_thread::AcpThreadEvent::Refusal => {
                 let metadata = ThreadMetadata::from_thread(&thread, cx);
-                self.save(metadata, cx).detach_and_log_err(cx);
+                self.save(metadata, cx);
             }
             _ => {}
         }
@@ -653,8 +697,8 @@ mod tests {
         cx.update(|cx| {
             let store = SidebarThreadMetadataStore::global(cx);
             store.update(cx, |store, cx| {
-                store.save(initial_metadata, cx).detach();
-                store.save(second_metadata, cx).detach();
+                store.save(initial_metadata, cx);
+                store.save(second_metadata, cx);
             });
         });
 
@@ -687,7 +731,7 @@ mod tests {
         cx.update(|cx| {
             let store = SidebarThreadMetadataStore::global(cx);
             store.update(cx, |store, cx| {
-                store.save(moved_metadata, cx).detach();
+                store.save(moved_metadata, cx);
             });
         });
 
@@ -719,7 +763,7 @@ mod tests {
         cx.update(|cx| {
             let store = SidebarThreadMetadataStore::global(cx);
             store.update(cx, |store, cx| {
-                store.delete(acp::SessionId::new("session-2"), cx).detach();
+                store.delete(acp::SessionId::new("session-2"), cx);
             });
         });
 
@@ -917,7 +961,7 @@ mod tests {
         cx.update(|cx| {
             let store = SidebarThreadMetadataStore::global(cx);
             store.update(cx, |store, cx| {
-                store.save(existing_metadata, cx).detach();
+                store.save(existing_metadata, cx);
             });
         });
 
@@ -1035,4 +1079,60 @@ mod tests {
         assert_eq!(list[0].session_id, regular_session_id);
         assert_eq!(list[0].title.as_ref(), "Regular Thread");
     }
+
+    #[test]
+    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
+        let now = Utc::now();
+
+        let operations = vec![
+            DbOperation::Insert(make_metadata(
+                "session-1",
+                "First Thread",
+                now,
+                PathList::default(),
+            )),
+            DbOperation::Delete(acp::SessionId::new("session-1")),
+        ];
+
+        let deduped = SidebarThreadMetadataStore::dedup_db_operations(operations);
+
+        assert_eq!(deduped.len(), 1);
+        assert_eq!(
+            deduped[0],
+            DbOperation::Delete(acp::SessionId::new("session-1"))
+        );
+    }
+
+    #[test]
+    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
+        let now = Utc::now();
+        let later = now + chrono::Duration::seconds(1);
+
+        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
+        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
+
+        let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
+            DbOperation::Insert(old_metadata),
+            DbOperation::Insert(new_metadata.clone()),
+        ]);
+
+        assert_eq!(deduped.len(), 1);
+        assert_eq!(deduped[0], DbOperation::Insert(new_metadata));
+    }
+
+    #[test]
+    fn test_dedup_db_operations_preserves_distinct_sessions() {
+        let now = Utc::now();
+
+        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
+        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
+        let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
+            DbOperation::Insert(metadata1.clone()),
+            DbOperation::Insert(metadata2.clone()),
+        ]);
+
+        assert_eq!(deduped.len(), 2);
+        assert!(deduped.contains(&DbOperation::Insert(metadata1)));
+        assert!(deduped.contains(&DbOperation::Insert(metadata2)));
+    }
 }

crates/sidebar/src/sidebar.rs 🔗

@@ -2193,14 +2193,12 @@ impl Sidebar {
         cx: &mut Context<Self>,
     ) {
         // Eagerly save thread metadata so that the sidebar is updated immediately
-        SidebarThreadMetadataStore::global(cx)
-            .update(cx, |store, cx| {
-                store.save(
-                    ThreadMetadata::from_session_info(agent.id(), &session_info),
-                    cx,
-                )
-            })
-            .detach_and_log_err(cx);
+        SidebarThreadMetadataStore::global(cx).update(cx, |store, cx| {
+            store.save(
+                ThreadMetadata::from_session_info(agent.id(), &session_info),
+                cx,
+            )
+        });
 
         if let Some(path_list) = &session_info.work_dirs {
             if let Some(workspace) = self.find_current_workspace_for_path_list(path_list, cx) {
@@ -2476,8 +2474,7 @@ impl Sidebar {
         }
 
         SidebarThreadMetadataStore::global(cx)
-            .update(cx, |store, cx| store.delete(session_id.clone(), cx))
-            .detach_and_log_err(cx);
+            .update(cx, |store, cx| store.delete(session_id.clone(), cx));
     }
 
     fn remove_selected_thread(
@@ -3347,10 +3344,10 @@ mod tests {
             created_at: None,
             folder_paths: path_list,
         };
-        let task = cx.update(|cx| {
+        cx.update(|cx| {
             SidebarThreadMetadataStore::global(cx).update(cx, |store, cx| store.save(metadata, cx))
         });
-        task.await.unwrap();
+        cx.run_until_parked();
     }
 
     fn open_and_focus_sidebar(sidebar: &Entity<Sidebar>, cx: &mut gpui::VisualTestContext) {

crates/zed/src/zed.rs 🔗

@@ -2265,6 +2265,29 @@ mod tests {
         open_new, open_paths, pane,
     };
 
+    async fn flush_workspace_serialization(
+        window: &WindowHandle<MultiWorkspace>,
+        cx: &mut TestAppContext,
+    ) {
+        let all_tasks = window
+            .update(cx, |multi_workspace, window, cx| {
+                let mut tasks = multi_workspace
+                    .workspaces()
+                    .iter()
+                    .map(|workspace| {
+                        workspace.update(cx, |workspace, cx| {
+                            workspace.flush_serialization(window, cx)
+                        })
+                    })
+                    .collect::<Vec<_>>();
+                tasks.push(multi_workspace.flush_serialization());
+                tasks
+            })
+            .unwrap();
+
+        futures::future::join_all(all_tasks).await;
+    }
+
     #[gpui::test]
     async fn test_open_non_existing_file(cx: &mut TestAppContext) {
         let app_state = init_test(cx);
@@ -5822,8 +5845,9 @@ mod tests {
             })
             .unwrap();
 
-        // --- Flush serialization ---
-        cx.executor().advance_clock(SERIALIZATION_THROTTLE_TIME);
+        cx.run_until_parked();
+        flush_workspace_serialization(&window_a, cx).await;
+        flush_workspace_serialization(&window_b, cx).await;
         cx.run_until_parked();
 
         // Verify all workspaces retained their session_ids.