diff --git a/crates/agent_ui/src/conversation_view.rs b/crates/agent_ui/src/conversation_view.rs index 1f218c6cccebb288d161c6d09077109d491b5a04..740beabce22ab6eb476b8c60b281c3ebc9d9df12 100644 --- a/crates/agent_ui/src/conversation_view.rs +++ b/crates/agent_ui/src/conversation_view.rs @@ -2546,9 +2546,7 @@ impl ConversationView { task.detach_and_log_err(cx); if let Some(store) = SidebarThreadMetadataStore::try_global(cx) { - store - .update(cx, |store, cx| store.delete(session_id.clone(), cx)) - .detach_and_log_err(cx); + store.update(cx, |store, cx| store.delete(session_id.clone(), cx)); } } } diff --git a/crates/agent_ui/src/thread_metadata_store.rs b/crates/agent_ui/src/thread_metadata_store.rs index ae6c5b0b74240b95e61a3c1a6ee9f0533aaed5bb..03b3f645c5e0eecd6d71bd2f69c545d7a7d23522 100644 --- a/crates/agent_ui/src/thread_metadata_store.rs +++ b/crates/agent_ui/src/thread_metadata_store.rs @@ -3,7 +3,7 @@ use std::{path::Path, sync::Arc}; use acp_thread::AgentSessionInfo; use agent::{ThreadStore, ZED_AGENT_ID}; use agent_client_protocol as acp; -use anyhow::{Context as _, Result}; +use anyhow::Context as _; use chrono::{DateTime, Utc}; use collections::HashMap; use db::{ @@ -104,7 +104,7 @@ impl Global for GlobalThreadMetadataStore {} /// Lightweight metadata for any thread (native or ACP), enough to populate /// the sidebar list and route to the correct load path when clicked. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct ThreadMetadata { pub session_id: acp::SessionId, /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents. @@ -183,6 +183,23 @@ pub struct SidebarThreadMetadataStore { threads_by_paths: HashMap>, reload_task: Option>>, session_subscriptions: HashMap, + pending_thread_ops_tx: smol::channel::Sender, + _db_operations_task: Task<()>, +} + +#[derive(Debug, PartialEq)] +enum DbOperation { + Insert(ThreadMetadata), + Delete(acp::SessionId), +} + +impl DbOperation { + fn id(&self) -> &acp::SessionId { + match self { + DbOperation::Insert(thread) => &thread.session_id, + DbOperation::Delete(session_id) => session_id, + } + } } impl SidebarThreadMetadataStore { @@ -273,36 +290,24 @@ impl SidebarThreadMetadataStore { reload_task } - pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context) -> Task> { + pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context) { if !cx.has_flag::() { - return Task::ready(Ok(())); + return; } - let db = self.db.clone(); - cx.spawn(async move |this, cx| { - db.save(metadata).await?; - let reload_task = this.update(cx, |this, cx| this.reload(cx))?; - reload_task.await; - Ok(()) - }) + self.pending_thread_ops_tx + .try_send(DbOperation::Insert(metadata)) + .log_err(); } - pub fn delete( - &mut self, - session_id: acp::SessionId, - cx: &mut Context, - ) -> Task> { + pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context) { if !cx.has_flag::() { - return Task::ready(Ok(())); + return; } - let db = self.db.clone(); - cx.spawn(async move |this, cx| { - db.delete(session_id).await?; - let reload_task = this.update(cx, |this, cx| this.reload(cx))?; - reload_task.await; - Ok(()) - }) + self.pending_thread_ops_tx + .try_send(DbOperation::Delete(session_id)) + .log_err(); } fn new(db: ThreadMetadataDb, cx: &mut Context) -> Self { @@ -338,17 +343,56 @@ impl SidebarThreadMetadataStore { }) .detach(); + let (tx, rx) = smol::channel::unbounded(); + let _db_operations_task = cx.spawn({ + let db = db.clone(); + async move |this, cx| { + while let Ok(first_update) = rx.recv().await { + let mut updates = vec![first_update]; + while let Ok(update) = rx.try_recv() { + updates.push(update); + } + let updates = Self::dedup_db_operations(updates); + for operation in updates { + match operation { + DbOperation::Insert(metadata) => { + db.save(metadata).await.log_err(); + } + DbOperation::Delete(session_id) => { + db.delete(session_id).await.log_err(); + } + } + } + + this.update(cx, |this, cx| this.reload(cx)).ok(); + } + } + }); + let mut this = Self { db, threads: Vec::new(), threads_by_paths: HashMap::default(), reload_task: None, session_subscriptions: HashMap::default(), + pending_thread_ops_tx: tx, + _db_operations_task, }; let _ = this.reload(cx); this } + fn dedup_db_operations(operations: Vec) -> Vec { + let mut ops = HashMap::default(); + for operation in operations.into_iter().rev() { + if ops.contains_key(operation.id()) { + continue; + } + ops.insert(operation.id().clone(), operation); + } + ops.into_values().collect() + } + fn handle_thread_update( &mut self, thread: Entity, @@ -373,7 +417,7 @@ impl SidebarThreadMetadataStore { | acp_thread::AcpThreadEvent::LoadError(_) | acp_thread::AcpThreadEvent::Refusal => { let metadata = ThreadMetadata::from_thread(&thread, cx); - self.save(metadata, cx).detach_and_log_err(cx); + self.save(metadata, cx); } _ => {} } @@ -653,8 +697,8 @@ mod tests { cx.update(|cx| { let store = SidebarThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.save(initial_metadata, cx).detach(); - store.save(second_metadata, cx).detach(); + store.save(initial_metadata, cx); + store.save(second_metadata, cx); }); }); @@ -687,7 +731,7 @@ mod tests { cx.update(|cx| { let store = SidebarThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.save(moved_metadata, cx).detach(); + store.save(moved_metadata, cx); }); }); @@ -719,7 +763,7 @@ mod tests { cx.update(|cx| { let store = SidebarThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.delete(acp::SessionId::new("session-2"), cx).detach(); + store.delete(acp::SessionId::new("session-2"), cx); }); }); @@ -917,7 +961,7 @@ mod tests { cx.update(|cx| { let store = SidebarThreadMetadataStore::global(cx); store.update(cx, |store, cx| { - store.save(existing_metadata, cx).detach(); + store.save(existing_metadata, cx); }); }); @@ -1035,4 +1079,60 @@ mod tests { assert_eq!(list[0].session_id, regular_session_id); assert_eq!(list[0].title.as_ref(), "Regular Thread"); } + + #[test] + fn test_dedup_db_operations_keeps_latest_operation_for_session() { + let now = Utc::now(); + + let operations = vec![ + DbOperation::Insert(make_metadata( + "session-1", + "First Thread", + now, + PathList::default(), + )), + DbOperation::Delete(acp::SessionId::new("session-1")), + ]; + + let deduped = SidebarThreadMetadataStore::dedup_db_operations(operations); + + assert_eq!(deduped.len(), 1); + assert_eq!( + deduped[0], + DbOperation::Delete(acp::SessionId::new("session-1")) + ); + } + + #[test] + fn test_dedup_db_operations_keeps_latest_insert_for_same_session() { + let now = Utc::now(); + let later = now + chrono::Duration::seconds(1); + + let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default()); + let new_metadata = make_metadata("session-1", "New Title", later, PathList::default()); + + let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![ + DbOperation::Insert(old_metadata), + DbOperation::Insert(new_metadata.clone()), + ]); + + assert_eq!(deduped.len(), 1); + assert_eq!(deduped[0], DbOperation::Insert(new_metadata)); + } + + #[test] + fn test_dedup_db_operations_preserves_distinct_sessions() { + let now = Utc::now(); + + let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default()); + let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default()); + let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![ + DbOperation::Insert(metadata1.clone()), + DbOperation::Insert(metadata2.clone()), + ]); + + assert_eq!(deduped.len(), 2); + assert!(deduped.contains(&DbOperation::Insert(metadata1))); + assert!(deduped.contains(&DbOperation::Insert(metadata2))); + } } diff --git a/crates/sidebar/src/sidebar.rs b/crates/sidebar/src/sidebar.rs index 53eebc4d2604e91caa4fc0fec57f27345becbae0..da78c634764fedf0643a7b225190ae5cb28f8e58 100644 --- a/crates/sidebar/src/sidebar.rs +++ b/crates/sidebar/src/sidebar.rs @@ -2193,14 +2193,12 @@ impl Sidebar { cx: &mut Context, ) { // Eagerly save thread metadata so that the sidebar is updated immediately - SidebarThreadMetadataStore::global(cx) - .update(cx, |store, cx| { - store.save( - ThreadMetadata::from_session_info(agent.id(), &session_info), - cx, - ) - }) - .detach_and_log_err(cx); + SidebarThreadMetadataStore::global(cx).update(cx, |store, cx| { + store.save( + ThreadMetadata::from_session_info(agent.id(), &session_info), + cx, + ) + }); if let Some(path_list) = &session_info.work_dirs { if let Some(workspace) = self.find_current_workspace_for_path_list(path_list, cx) { @@ -2476,8 +2474,7 @@ impl Sidebar { } SidebarThreadMetadataStore::global(cx) - .update(cx, |store, cx| store.delete(session_id.clone(), cx)) - .detach_and_log_err(cx); + .update(cx, |store, cx| store.delete(session_id.clone(), cx)); } fn remove_selected_thread( @@ -3347,10 +3344,10 @@ mod tests { created_at: None, folder_paths: path_list, }; - let task = cx.update(|cx| { + cx.update(|cx| { SidebarThreadMetadataStore::global(cx).update(cx, |store, cx| store.save(metadata, cx)) }); - task.await.unwrap(); + cx.run_until_parked(); } fn open_and_focus_sidebar(sidebar: &Entity, cx: &mut gpui::VisualTestContext) { diff --git a/crates/zed/src/zed.rs b/crates/zed/src/zed.rs index 96a93bbff571c392085405087791f9b3e306fd62..66f0239de655492176861e289cafb5fa5a3475de 100644 --- a/crates/zed/src/zed.rs +++ b/crates/zed/src/zed.rs @@ -2265,6 +2265,29 @@ mod tests { open_new, open_paths, pane, }; + async fn flush_workspace_serialization( + window: &WindowHandle, + cx: &mut TestAppContext, + ) { + let all_tasks = window + .update(cx, |multi_workspace, window, cx| { + let mut tasks = multi_workspace + .workspaces() + .iter() + .map(|workspace| { + workspace.update(cx, |workspace, cx| { + workspace.flush_serialization(window, cx) + }) + }) + .collect::>(); + tasks.push(multi_workspace.flush_serialization()); + tasks + }) + .unwrap(); + + futures::future::join_all(all_tasks).await; + } + #[gpui::test] async fn test_open_non_existing_file(cx: &mut TestAppContext) { let app_state = init_test(cx); @@ -5822,8 +5845,9 @@ mod tests { }) .unwrap(); - // --- Flush serialization --- - cx.executor().advance_clock(SERIALIZATION_THROTTLE_TIME); + cx.run_until_parked(); + flush_workspace_serialization(&window_a, cx).await; + flush_workspace_serialization(&window_b, cx).await; cx.run_until_parked(); // Verify all workspaces retained their session_ids.