1use std::{path::Path, sync::Arc};
2
3use acp_thread::AgentSessionInfo;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::Context as _;
7use chrono::{DateTime, Utc};
8use collections::HashMap;
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 SidebarThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We migrate the last 10 threads per project and skip threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
46
47 let store = SidebarThreadMetadataStore::global(cx);
48 let db = store.read(cx).db.clone();
49
50 cx.spawn(async move |cx| {
51 if !db.is_empty()? {
52 return Ok::<(), anyhow::Error>(());
53 }
54
55 let metadata = store.read_with(cx, |_store, app| {
56 let mut migrated_threads_per_project = HashMap::default();
57
58 ThreadStore::global(app)
59 .read(app)
60 .entries()
61 .filter_map(|entry| {
62 if entry.folder_paths.is_empty() {
63 return None;
64 }
65
66 let migrated_thread_count = migrated_threads_per_project
67 .entry(entry.folder_paths.clone())
68 .or_insert(0);
69 if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
70 return None;
71 }
72 *migrated_thread_count += 1;
73
74 Some(ThreadMetadata {
75 session_id: entry.id,
76 agent_id: None,
77 title: entry.title,
78 updated_at: entry.updated_at,
79 created_at: entry.created_at,
80 folder_paths: entry.folder_paths,
81 })
82 })
83 .collect::<Vec<_>>()
84 });
85
86 log::info!("Migrating {} thread store entries", metadata.len());
87
88 // Manually save each entry to the database and call reload, otherwise
89 // we'll end up triggering lots of reloads after each save
90 for entry in metadata {
91 db.save(entry).await?;
92 }
93
94 log::info!("Finished migrating thread store entries");
95
96 let _ = store.update(cx, |store, cx| store.reload(cx));
97 Ok(())
98 })
99 .detach_and_log_err(cx);
100}
101
102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
103impl Global for GlobalThreadMetadataStore {}
104
105/// Lightweight metadata for any thread (native or ACP), enough to populate
106/// the sidebar list and route to the correct load path when clicked.
107#[derive(Debug, Clone, PartialEq)]
108pub struct ThreadMetadata {
109 pub session_id: acp::SessionId,
110 /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
111 pub agent_id: Option<AgentId>,
112 pub title: SharedString,
113 pub updated_at: DateTime<Utc>,
114 pub created_at: Option<DateTime<Utc>>,
115 pub folder_paths: PathList,
116}
117
118impl ThreadMetadata {
119 pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
120 let session_id = session.session_id.clone();
121 let title = session.title.clone().unwrap_or_default();
122 let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
123 let created_at = session.created_at.unwrap_or(updated_at);
124 let folder_paths = session.work_dirs.clone().unwrap_or_default();
125 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
126 None
127 } else {
128 Some(agent_id)
129 };
130 Self {
131 session_id,
132 agent_id,
133 title,
134 updated_at,
135 created_at: Some(created_at),
136 folder_paths,
137 }
138 }
139
140 pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
141 let thread_ref = thread.read(cx);
142 let session_id = thread_ref.session_id().clone();
143 let title = thread_ref
144 .title()
145 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
146 let updated_at = Utc::now();
147
148 let agent_id = thread_ref.connection().agent_id();
149
150 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
151 None
152 } else {
153 Some(agent_id)
154 };
155
156 let folder_paths = {
157 let project = thread_ref.project().read(cx);
158 let paths: Vec<Arc<Path>> = project
159 .visible_worktrees(cx)
160 .map(|worktree| worktree.read(cx).abs_path())
161 .collect();
162 PathList::new(&paths)
163 };
164
165 Self {
166 session_id,
167 agent_id,
168 title,
169 created_at: Some(updated_at), // handled by db `ON CONFLICT`
170 updated_at,
171 folder_paths,
172 }
173 }
174}
175
176/// The store holds all metadata needed to show threads in the sidebar.
177/// Effectively, all threads stored in here are "non-archived".
178///
179/// Automatically listens to AcpThread events and updates metadata if it has changed.
180pub struct SidebarThreadMetadataStore {
181 db: ThreadMetadataDb,
182 threads: Vec<ThreadMetadata>,
183 threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
184 reload_task: Option<Shared<Task<()>>>,
185 session_subscriptions: HashMap<acp::SessionId, Subscription>,
186 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
187 _db_operations_task: Task<()>,
188}
189
190#[derive(Debug, PartialEq)]
191enum DbOperation {
192 Insert(ThreadMetadata),
193 Delete(acp::SessionId),
194}
195
196impl DbOperation {
197 fn id(&self) -> &acp::SessionId {
198 match self {
199 DbOperation::Insert(thread) => &thread.session_id,
200 DbOperation::Delete(session_id) => session_id,
201 }
202 }
203}
204
205impl SidebarThreadMetadataStore {
206 #[cfg(not(any(test, feature = "test-support")))]
207 pub fn init_global(cx: &mut App) {
208 if cx.has_global::<Self>() {
209 return;
210 }
211
212 let db = ThreadMetadataDb::global(cx);
213 let thread_store = cx.new(|cx| Self::new(db, cx));
214 cx.set_global(GlobalThreadMetadataStore(thread_store));
215 }
216
217 #[cfg(any(test, feature = "test-support"))]
218 pub fn init_global(cx: &mut App) {
219 let thread = std::thread::current();
220 let test_name = thread.name().unwrap_or("unknown_test");
221 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
222 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
223 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
224 cx.set_global(GlobalThreadMetadataStore(thread_store));
225 }
226
227 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
228 cx.try_global::<GlobalThreadMetadataStore>()
229 .map(|store| store.0.clone())
230 }
231
232 pub fn global(cx: &App) -> Entity<Self> {
233 cx.global::<GlobalThreadMetadataStore>().0.clone()
234 }
235
236 pub fn is_empty(&self) -> bool {
237 self.threads.is_empty()
238 }
239
240 pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
241 self.threads.iter().cloned()
242 }
243
244 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
245 self.threads.iter().map(|thread| thread.session_id.clone())
246 }
247
248 pub fn entries_for_path(
249 &self,
250 path_list: &PathList,
251 ) -> impl Iterator<Item = ThreadMetadata> + '_ {
252 self.threads_by_paths
253 .get(path_list)
254 .into_iter()
255 .flatten()
256 .cloned()
257 }
258
259 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
260 let db = self.db.clone();
261 self.reload_task.take();
262
263 let list_task = cx
264 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
265
266 let reload_task = cx
267 .spawn(async move |this, cx| {
268 let Some(rows) = list_task.await.log_err() else {
269 return;
270 };
271
272 this.update(cx, |this, cx| {
273 this.threads.clear();
274 this.threads_by_paths.clear();
275
276 for row in rows {
277 this.threads_by_paths
278 .entry(row.folder_paths.clone())
279 .or_default()
280 .push(row.clone());
281 this.threads.push(row);
282 }
283
284 cx.notify();
285 })
286 .ok();
287 })
288 .shared();
289 self.reload_task = Some(reload_task.clone());
290 reload_task
291 }
292
293 pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
294 if !cx.has_flag::<AgentV2FeatureFlag>() {
295 return;
296 }
297
298 self.pending_thread_ops_tx
299 .try_send(DbOperation::Insert(metadata))
300 .log_err();
301 }
302
303 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
304 if !cx.has_flag::<AgentV2FeatureFlag>() {
305 return;
306 }
307
308 self.pending_thread_ops_tx
309 .try_send(DbOperation::Delete(session_id))
310 .log_err();
311 }
312
313 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
314 let weak_store = cx.weak_entity();
315
316 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
317 // Don't track subagent threads in the sidebar.
318 if thread.parent_session_id().is_some() {
319 return;
320 }
321
322 let thread_entity = cx.entity();
323
324 cx.on_release({
325 let weak_store = weak_store.clone();
326 move |thread, cx| {
327 weak_store
328 .update(cx, |store, _cx| {
329 store.session_subscriptions.remove(thread.session_id());
330 })
331 .ok();
332 }
333 })
334 .detach();
335
336 weak_store
337 .update(cx, |this, cx| {
338 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
339 this.session_subscriptions
340 .insert(thread.session_id().clone(), subscription);
341 })
342 .ok();
343 })
344 .detach();
345
346 let (tx, rx) = smol::channel::unbounded();
347 let _db_operations_task = cx.spawn({
348 let db = db.clone();
349 async move |this, cx| {
350 while let Ok(first_update) = rx.recv().await {
351 let mut updates = vec![first_update];
352 while let Ok(update) = rx.try_recv() {
353 updates.push(update);
354 }
355 let updates = Self::dedup_db_operations(updates);
356 for operation in updates {
357 match operation {
358 DbOperation::Insert(metadata) => {
359 db.save(metadata).await.log_err();
360 }
361 DbOperation::Delete(session_id) => {
362 db.delete(session_id).await.log_err();
363 }
364 }
365 }
366
367 this.update(cx, |this, cx| this.reload(cx)).ok();
368 }
369 }
370 });
371
372 let mut this = Self {
373 db,
374 threads: Vec::new(),
375 threads_by_paths: HashMap::default(),
376 reload_task: None,
377 session_subscriptions: HashMap::default(),
378 pending_thread_ops_tx: tx,
379 _db_operations_task,
380 };
381 let _ = this.reload(cx);
382 this
383 }
384
385 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
386 let mut ops = HashMap::default();
387 for operation in operations.into_iter().rev() {
388 if ops.contains_key(operation.id()) {
389 continue;
390 }
391 ops.insert(operation.id().clone(), operation);
392 }
393 ops.into_values().collect()
394 }
395
396 fn handle_thread_update(
397 &mut self,
398 thread: Entity<acp_thread::AcpThread>,
399 event: &acp_thread::AcpThreadEvent,
400 cx: &mut Context<Self>,
401 ) {
402 // Don't track subagent threads in the sidebar.
403 if thread.read(cx).parent_session_id().is_some() {
404 return;
405 }
406
407 match event {
408 acp_thread::AcpThreadEvent::NewEntry
409 | acp_thread::AcpThreadEvent::TitleUpdated
410 | acp_thread::AcpThreadEvent::EntryUpdated(_)
411 | acp_thread::AcpThreadEvent::EntriesRemoved(_)
412 | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
413 | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
414 | acp_thread::AcpThreadEvent::Retry(_)
415 | acp_thread::AcpThreadEvent::Stopped(_)
416 | acp_thread::AcpThreadEvent::Error
417 | acp_thread::AcpThreadEvent::LoadError(_)
418 | acp_thread::AcpThreadEvent::Refusal => {
419 let metadata = ThreadMetadata::from_thread(&thread, cx);
420 self.save(metadata, cx);
421 }
422 _ => {}
423 }
424 }
425}
426
427impl Global for SidebarThreadMetadataStore {}
428
429struct ThreadMetadataDb(ThreadSafeConnection);
430
431impl Domain for ThreadMetadataDb {
432 const NAME: &str = stringify!(ThreadMetadataDb);
433
434 const MIGRATIONS: &[&str] = &[sql!(
435 CREATE TABLE IF NOT EXISTS sidebar_threads(
436 session_id TEXT PRIMARY KEY,
437 agent_id TEXT,
438 title TEXT NOT NULL,
439 updated_at TEXT NOT NULL,
440 created_at TEXT,
441 folder_paths TEXT,
442 folder_paths_order TEXT
443 ) STRICT;
444 )];
445}
446
447db::static_connection!(ThreadMetadataDb, []);
448
449impl ThreadMetadataDb {
450 pub fn is_empty(&self) -> anyhow::Result<bool> {
451 self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
452 .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
453 }
454
455 /// List all sidebar thread metadata, ordered by updated_at descending.
456 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
457 self.select::<ThreadMetadata>(
458 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
459 FROM sidebar_threads \
460 ORDER BY updated_at DESC"
461 )?()
462 }
463
464 /// Upsert metadata for a thread.
465 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
466 let id = row.session_id.0.clone();
467 let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
468 let title = row.title.to_string();
469 let updated_at = row.updated_at.to_rfc3339();
470 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
471 let serialized = row.folder_paths.serialize();
472 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
473 (None, None)
474 } else {
475 (Some(serialized.paths), Some(serialized.order))
476 };
477
478 self.write(move |conn| {
479 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
480 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
481 ON CONFLICT(session_id) DO UPDATE SET \
482 agent_id = excluded.agent_id, \
483 title = excluded.title, \
484 updated_at = excluded.updated_at, \
485 folder_paths = excluded.folder_paths, \
486 folder_paths_order = excluded.folder_paths_order";
487 let mut stmt = Statement::prepare(conn, sql)?;
488 let mut i = stmt.bind(&id, 1)?;
489 i = stmt.bind(&agent_id, i)?;
490 i = stmt.bind(&title, i)?;
491 i = stmt.bind(&updated_at, i)?;
492 i = stmt.bind(&created_at, i)?;
493 i = stmt.bind(&folder_paths, i)?;
494 stmt.bind(&folder_paths_order, i)?;
495 stmt.exec()
496 })
497 .await
498 }
499
500 /// Delete metadata for a single thread.
501 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
502 let id = session_id.0.clone();
503 self.write(move |conn| {
504 let mut stmt =
505 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
506 stmt.bind(&id, 1)?;
507 stmt.exec()
508 })
509 .await
510 }
511}
512
513impl Column for ThreadMetadata {
514 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
515 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
516 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
517 let (title, next): (String, i32) = Column::column(statement, next)?;
518 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
519 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
520 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
521 let (folder_paths_order_str, next): (Option<String>, i32) =
522 Column::column(statement, next)?;
523
524 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
525 let created_at = created_at_str
526 .as_deref()
527 .map(DateTime::parse_from_rfc3339)
528 .transpose()?
529 .map(|dt| dt.with_timezone(&Utc));
530
531 let folder_paths = folder_paths_str
532 .map(|paths| {
533 PathList::deserialize(&util::path_list::SerializedPathList {
534 paths,
535 order: folder_paths_order_str.unwrap_or_default(),
536 })
537 })
538 .unwrap_or_default();
539
540 Ok((
541 ThreadMetadata {
542 session_id: acp::SessionId::new(id),
543 agent_id: agent_id.map(|id| AgentId::new(id)),
544 title: title.into(),
545 updated_at,
546 created_at,
547 folder_paths,
548 },
549 next,
550 ))
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use super::*;
557 use acp_thread::{AgentConnection, StubAgentConnection};
558 use action_log::ActionLog;
559 use agent::DbThread;
560 use agent_client_protocol as acp;
561 use feature_flags::FeatureFlagAppExt;
562 use gpui::TestAppContext;
563 use project::FakeFs;
564 use project::Project;
565 use std::path::Path;
566 use std::rc::Rc;
567
568 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
569 DbThread {
570 title: title.to_string().into(),
571 messages: Vec::new(),
572 updated_at,
573 detailed_summary: None,
574 initial_project_snapshot: None,
575 cumulative_token_usage: Default::default(),
576 request_token_usage: Default::default(),
577 model: None,
578 profile: None,
579 imported: false,
580 subagent_context: None,
581 speed: None,
582 thinking_enabled: false,
583 thinking_effort: None,
584 draft_prompt: None,
585 ui_scroll_position: None,
586 }
587 }
588
589 fn make_metadata(
590 session_id: &str,
591 title: &str,
592 updated_at: DateTime<Utc>,
593 folder_paths: PathList,
594 ) -> ThreadMetadata {
595 ThreadMetadata {
596 session_id: acp::SessionId::new(session_id),
597 agent_id: None,
598 title: title.to_string().into(),
599 updated_at,
600 created_at: Some(updated_at),
601 folder_paths,
602 }
603 }
604
605 #[gpui::test]
606 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
607 let first_paths = PathList::new(&[Path::new("/project-a")]);
608 let second_paths = PathList::new(&[Path::new("/project-b")]);
609 let now = Utc::now();
610 let older = now - chrono::Duration::seconds(1);
611
612 let thread = std::thread::current();
613 let test_name = thread.name().unwrap_or("unknown_test");
614 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
615 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
616 &db_name,
617 )));
618
619 db.save(make_metadata(
620 "session-1",
621 "First Thread",
622 now,
623 first_paths.clone(),
624 ))
625 .await
626 .unwrap();
627 db.save(make_metadata(
628 "session-2",
629 "Second Thread",
630 older,
631 second_paths.clone(),
632 ))
633 .await
634 .unwrap();
635
636 cx.update(|cx| {
637 let settings_store = settings::SettingsStore::test(cx);
638 cx.set_global(settings_store);
639 cx.update_flags(true, vec!["agent-v2".to_string()]);
640 SidebarThreadMetadataStore::init_global(cx);
641 });
642
643 cx.run_until_parked();
644
645 cx.update(|cx| {
646 let store = SidebarThreadMetadataStore::global(cx);
647 let store = store.read(cx);
648
649 let entry_ids = store
650 .entry_ids()
651 .map(|session_id| session_id.0.to_string())
652 .collect::<Vec<_>>();
653 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
654
655 let first_path_entries = store
656 .entries_for_path(&first_paths)
657 .map(|entry| entry.session_id.0.to_string())
658 .collect::<Vec<_>>();
659 assert_eq!(first_path_entries, vec!["session-1"]);
660
661 let second_path_entries = store
662 .entries_for_path(&second_paths)
663 .map(|entry| entry.session_id.0.to_string())
664 .collect::<Vec<_>>();
665 assert_eq!(second_path_entries, vec!["session-2"]);
666 });
667 }
668
669 #[gpui::test]
670 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
671 cx.update(|cx| {
672 let settings_store = settings::SettingsStore::test(cx);
673 cx.set_global(settings_store);
674 cx.update_flags(true, vec!["agent-v2".to_string()]);
675 SidebarThreadMetadataStore::init_global(cx);
676 });
677
678 let first_paths = PathList::new(&[Path::new("/project-a")]);
679 let second_paths = PathList::new(&[Path::new("/project-b")]);
680 let initial_time = Utc::now();
681 let updated_time = initial_time + chrono::Duration::seconds(1);
682
683 let initial_metadata = make_metadata(
684 "session-1",
685 "First Thread",
686 initial_time,
687 first_paths.clone(),
688 );
689
690 let second_metadata = make_metadata(
691 "session-2",
692 "Second Thread",
693 initial_time,
694 second_paths.clone(),
695 );
696
697 cx.update(|cx| {
698 let store = SidebarThreadMetadataStore::global(cx);
699 store.update(cx, |store, cx| {
700 store.save(initial_metadata, cx);
701 store.save(second_metadata, cx);
702 });
703 });
704
705 cx.run_until_parked();
706
707 cx.update(|cx| {
708 let store = SidebarThreadMetadataStore::global(cx);
709 let store = store.read(cx);
710
711 let first_path_entries = store
712 .entries_for_path(&first_paths)
713 .map(|entry| entry.session_id.0.to_string())
714 .collect::<Vec<_>>();
715 assert_eq!(first_path_entries, vec!["session-1"]);
716
717 let second_path_entries = store
718 .entries_for_path(&second_paths)
719 .map(|entry| entry.session_id.0.to_string())
720 .collect::<Vec<_>>();
721 assert_eq!(second_path_entries, vec!["session-2"]);
722 });
723
724 let moved_metadata = make_metadata(
725 "session-1",
726 "First Thread",
727 updated_time,
728 second_paths.clone(),
729 );
730
731 cx.update(|cx| {
732 let store = SidebarThreadMetadataStore::global(cx);
733 store.update(cx, |store, cx| {
734 store.save(moved_metadata, cx);
735 });
736 });
737
738 cx.run_until_parked();
739
740 cx.update(|cx| {
741 let store = SidebarThreadMetadataStore::global(cx);
742 let store = store.read(cx);
743
744 let entry_ids = store
745 .entry_ids()
746 .map(|session_id| session_id.0.to_string())
747 .collect::<Vec<_>>();
748 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
749
750 let first_path_entries = store
751 .entries_for_path(&first_paths)
752 .map(|entry| entry.session_id.0.to_string())
753 .collect::<Vec<_>>();
754 assert!(first_path_entries.is_empty());
755
756 let second_path_entries = store
757 .entries_for_path(&second_paths)
758 .map(|entry| entry.session_id.0.to_string())
759 .collect::<Vec<_>>();
760 assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
761 });
762
763 cx.update(|cx| {
764 let store = SidebarThreadMetadataStore::global(cx);
765 store.update(cx, |store, cx| {
766 store.delete(acp::SessionId::new("session-2"), cx);
767 });
768 });
769
770 cx.run_until_parked();
771
772 cx.update(|cx| {
773 let store = SidebarThreadMetadataStore::global(cx);
774 let store = store.read(cx);
775
776 let entry_ids = store
777 .entry_ids()
778 .map(|session_id| session_id.0.to_string())
779 .collect::<Vec<_>>();
780 assert_eq!(entry_ids, vec!["session-1"]);
781
782 let second_path_entries = store
783 .entries_for_path(&second_paths)
784 .map(|entry| entry.session_id.0.to_string())
785 .collect::<Vec<_>>();
786 assert_eq!(second_path_entries, vec!["session-1"]);
787 });
788 }
789
790 #[gpui::test]
791 async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
792 cx.update(|cx| {
793 ThreadStore::init_global(cx);
794 SidebarThreadMetadataStore::init_global(cx);
795 });
796
797 // Verify the cache is empty before migration
798 let list = cx.update(|cx| {
799 let store = SidebarThreadMetadataStore::global(cx);
800 store.read(cx).entries().collect::<Vec<_>>()
801 });
802 assert_eq!(list.len(), 0);
803
804 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
805 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
806 let now = Utc::now();
807
808 for index in 0..12 {
809 let updated_at = now + chrono::Duration::seconds(index as i64);
810 let session_id = format!("project-a-session-{index}");
811 let title = format!("Project A Thread {index}");
812
813 let save_task = cx.update(|cx| {
814 let thread_store = ThreadStore::global(cx);
815 let session_id = session_id.clone();
816 let title = title.clone();
817 let project_a_paths = project_a_paths.clone();
818 thread_store.update(cx, |store, cx| {
819 store.save_thread(
820 acp::SessionId::new(session_id),
821 make_db_thread(&title, updated_at),
822 project_a_paths,
823 cx,
824 )
825 })
826 });
827 save_task.await.unwrap();
828 cx.run_until_parked();
829 }
830
831 for index in 0..3 {
832 let updated_at = now + chrono::Duration::seconds(100 + index as i64);
833 let session_id = format!("project-b-session-{index}");
834 let title = format!("Project B Thread {index}");
835
836 let save_task = cx.update(|cx| {
837 let thread_store = ThreadStore::global(cx);
838 let session_id = session_id.clone();
839 let title = title.clone();
840 let project_b_paths = project_b_paths.clone();
841 thread_store.update(cx, |store, cx| {
842 store.save_thread(
843 acp::SessionId::new(session_id),
844 make_db_thread(&title, updated_at),
845 project_b_paths,
846 cx,
847 )
848 })
849 });
850 save_task.await.unwrap();
851 cx.run_until_parked();
852 }
853
854 let save_projectless = cx.update(|cx| {
855 let thread_store = ThreadStore::global(cx);
856 thread_store.update(cx, |store, cx| {
857 store.save_thread(
858 acp::SessionId::new("projectless-session"),
859 make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
860 PathList::default(),
861 cx,
862 )
863 })
864 });
865 save_projectless.await.unwrap();
866 cx.run_until_parked();
867
868 // Run migration
869 cx.update(|cx| {
870 migrate_thread_metadata(cx);
871 });
872
873 cx.run_until_parked();
874
875 // Verify the metadata was migrated, limited to 10 per project, and
876 // projectless threads were skipped.
877 let list = cx.update(|cx| {
878 let store = SidebarThreadMetadataStore::global(cx);
879 store.read(cx).entries().collect::<Vec<_>>()
880 });
881 assert_eq!(list.len(), 13);
882
883 assert!(
884 list.iter()
885 .all(|metadata| !metadata.folder_paths.is_empty())
886 );
887 assert!(
888 list.iter()
889 .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
890 );
891
892 let project_a_entries = list
893 .iter()
894 .filter(|metadata| metadata.folder_paths == project_a_paths)
895 .collect::<Vec<_>>();
896 assert_eq!(project_a_entries.len(), 10);
897 assert_eq!(
898 project_a_entries
899 .iter()
900 .map(|metadata| metadata.session_id.0.as_ref())
901 .collect::<Vec<_>>(),
902 vec![
903 "project-a-session-11",
904 "project-a-session-10",
905 "project-a-session-9",
906 "project-a-session-8",
907 "project-a-session-7",
908 "project-a-session-6",
909 "project-a-session-5",
910 "project-a-session-4",
911 "project-a-session-3",
912 "project-a-session-2",
913 ]
914 );
915 assert!(
916 project_a_entries
917 .iter()
918 .all(|metadata| metadata.agent_id.is_none())
919 );
920
921 let project_b_entries = list
922 .iter()
923 .filter(|metadata| metadata.folder_paths == project_b_paths)
924 .collect::<Vec<_>>();
925 assert_eq!(project_b_entries.len(), 3);
926 assert_eq!(
927 project_b_entries
928 .iter()
929 .map(|metadata| metadata.session_id.0.as_ref())
930 .collect::<Vec<_>>(),
931 vec![
932 "project-b-session-2",
933 "project-b-session-1",
934 "project-b-session-0",
935 ]
936 );
937 assert!(
938 project_b_entries
939 .iter()
940 .all(|metadata| metadata.agent_id.is_none())
941 );
942 }
943
944 #[gpui::test]
945 async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
946 cx.update(|cx| {
947 ThreadStore::init_global(cx);
948 SidebarThreadMetadataStore::init_global(cx);
949 });
950
951 // Pre-populate the metadata store with existing data
952 let existing_metadata = ThreadMetadata {
953 session_id: acp::SessionId::new("existing-session"),
954 agent_id: None,
955 title: "Existing Thread".into(),
956 updated_at: Utc::now(),
957 created_at: Some(Utc::now()),
958 folder_paths: PathList::default(),
959 };
960
961 cx.update(|cx| {
962 let store = SidebarThreadMetadataStore::global(cx);
963 store.update(cx, |store, cx| {
964 store.save(existing_metadata, cx);
965 });
966 });
967
968 cx.run_until_parked();
969
970 // Add an entry to native thread store that should NOT be migrated
971 let save_task = cx.update(|cx| {
972 let thread_store = ThreadStore::global(cx);
973 thread_store.update(cx, |store, cx| {
974 store.save_thread(
975 acp::SessionId::new("native-session"),
976 make_db_thread("Native Thread", Utc::now()),
977 PathList::default(),
978 cx,
979 )
980 })
981 });
982 save_task.await.unwrap();
983 cx.run_until_parked();
984
985 // Run migration - should skip because metadata store is not empty
986 cx.update(|cx| {
987 migrate_thread_metadata(cx);
988 });
989
990 cx.run_until_parked();
991
992 // Verify only the existing metadata is present (migration was skipped)
993 let list = cx.update(|cx| {
994 let store = SidebarThreadMetadataStore::global(cx);
995 store.read(cx).entries().collect::<Vec<_>>()
996 });
997 assert_eq!(list.len(), 1);
998 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
999 }
1000
1001 #[gpui::test]
1002 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1003 cx.update(|cx| {
1004 let settings_store = settings::SettingsStore::test(cx);
1005 cx.set_global(settings_store);
1006 cx.update_flags(true, vec!["agent-v2".to_string()]);
1007 ThreadStore::init_global(cx);
1008 SidebarThreadMetadataStore::init_global(cx);
1009 });
1010
1011 let fs = FakeFs::new(cx.executor());
1012 let project = Project::test(fs, None::<&Path>, cx).await;
1013 let connection = Rc::new(StubAgentConnection::new());
1014
1015 // Create a regular (non-subagent) AcpThread.
1016 let regular_thread = cx
1017 .update(|cx| {
1018 connection
1019 .clone()
1020 .new_session(project.clone(), PathList::default(), cx)
1021 })
1022 .await
1023 .unwrap();
1024
1025 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1026
1027 // Set a title on the regular thread to trigger a save via handle_thread_update.
1028 cx.update(|cx| {
1029 regular_thread.update(cx, |thread, cx| {
1030 thread.set_title("Regular Thread".into(), cx).detach();
1031 });
1032 });
1033 cx.run_until_parked();
1034
1035 // Create a subagent AcpThread
1036 let subagent_session_id = acp::SessionId::new("subagent-session");
1037 let subagent_thread = cx.update(|cx| {
1038 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1039 cx.new(|cx| {
1040 acp_thread::AcpThread::new(
1041 Some(regular_session_id.clone()),
1042 Some("Subagent Thread".into()),
1043 None,
1044 connection.clone(),
1045 project.clone(),
1046 action_log,
1047 subagent_session_id.clone(),
1048 watch::Receiver::constant(acp::PromptCapabilities::new()),
1049 cx,
1050 )
1051 })
1052 });
1053
1054 // Set a title on the subagent thread to trigger handle_thread_update.
1055 cx.update(|cx| {
1056 subagent_thread.update(cx, |thread, cx| {
1057 thread
1058 .set_title("Subagent Thread Title".into(), cx)
1059 .detach();
1060 });
1061 });
1062 cx.run_until_parked();
1063
1064 // List all metadata from the store cache.
1065 let list = cx.update(|cx| {
1066 let store = SidebarThreadMetadataStore::global(cx);
1067 store.read(cx).entries().collect::<Vec<_>>()
1068 });
1069
1070 // The subagent thread should NOT appear in the sidebar metadata.
1071 // Only the regular thread should be listed.
1072 assert_eq!(
1073 list.len(),
1074 1,
1075 "Expected only the regular thread in sidebar metadata, \
1076 but found {} entries (subagent threads are leaking into the sidebar)",
1077 list.len(),
1078 );
1079 assert_eq!(list[0].session_id, regular_session_id);
1080 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1081 }
1082
1083 #[test]
1084 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1085 let now = Utc::now();
1086
1087 let operations = vec![
1088 DbOperation::Insert(make_metadata(
1089 "session-1",
1090 "First Thread",
1091 now,
1092 PathList::default(),
1093 )),
1094 DbOperation::Delete(acp::SessionId::new("session-1")),
1095 ];
1096
1097 let deduped = SidebarThreadMetadataStore::dedup_db_operations(operations);
1098
1099 assert_eq!(deduped.len(), 1);
1100 assert_eq!(
1101 deduped[0],
1102 DbOperation::Delete(acp::SessionId::new("session-1"))
1103 );
1104 }
1105
1106 #[test]
1107 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1108 let now = Utc::now();
1109 let later = now + chrono::Duration::seconds(1);
1110
1111 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1112 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1113
1114 let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1115 DbOperation::Insert(old_metadata),
1116 DbOperation::Insert(new_metadata.clone()),
1117 ]);
1118
1119 assert_eq!(deduped.len(), 1);
1120 assert_eq!(deduped[0], DbOperation::Insert(new_metadata));
1121 }
1122
1123 #[test]
1124 fn test_dedup_db_operations_preserves_distinct_sessions() {
1125 let now = Utc::now();
1126
1127 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1128 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1129 let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1130 DbOperation::Insert(metadata1.clone()),
1131 DbOperation::Insert(metadata2.clone()),
1132 ]);
1133
1134 assert_eq!(deduped.len(), 2);
1135 assert!(deduped.contains(&DbOperation::Insert(metadata1)));
1136 assert!(deduped.contains(&DbOperation::Insert(metadata2)));
1137 }
1138}