1use std::{path::Path, sync::Arc};
2
3use acp_thread::AgentSessionInfo;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::{Context as _, Result};
7use chrono::{DateTime, Utc};
8use collections::HashMap;
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 SidebarThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We migrate the last 10 threads per project and skip threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
46
47 let store = SidebarThreadMetadataStore::global(cx);
48 let db = store.read(cx).db.clone();
49
50 cx.spawn(async move |cx| {
51 if !db.is_empty()? {
52 return Ok::<(), anyhow::Error>(());
53 }
54
55 let metadata = store.read_with(cx, |_store, app| {
56 let mut migrated_threads_per_project = HashMap::default();
57
58 ThreadStore::global(app)
59 .read(app)
60 .entries()
61 .filter_map(|entry| {
62 if entry.folder_paths.is_empty() {
63 return None;
64 }
65
66 let migrated_thread_count = migrated_threads_per_project
67 .entry(entry.folder_paths.clone())
68 .or_insert(0);
69 if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
70 return None;
71 }
72 *migrated_thread_count += 1;
73
74 Some(ThreadMetadata {
75 session_id: entry.id,
76 agent_id: None,
77 title: entry.title,
78 updated_at: entry.updated_at,
79 created_at: entry.created_at,
80 folder_paths: entry.folder_paths,
81 })
82 })
83 .collect::<Vec<_>>()
84 });
85
86 log::info!("Migrating {} thread store entries", metadata.len());
87
88 // Manually save each entry to the database and call reload, otherwise
89 // we'll end up triggering lots of reloads after each save
90 for entry in metadata {
91 db.save(entry).await?;
92 }
93
94 log::info!("Finished migrating thread store entries");
95
96 let _ = store.update(cx, |store, cx| store.reload(cx));
97 Ok(())
98 })
99 .detach_and_log_err(cx);
100}
101
102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
103impl Global for GlobalThreadMetadataStore {}
104
105/// Lightweight metadata for any thread (native or ACP), enough to populate
106/// the sidebar list and route to the correct load path when clicked.
107#[derive(Debug, Clone)]
108pub struct ThreadMetadata {
109 pub session_id: acp::SessionId,
110 /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
111 pub agent_id: Option<AgentId>,
112 pub title: SharedString,
113 pub updated_at: DateTime<Utc>,
114 pub created_at: Option<DateTime<Utc>>,
115 pub folder_paths: PathList,
116}
117
118impl ThreadMetadata {
119 pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
120 let session_id = session.session_id.clone();
121 let title = session.title.clone().unwrap_or_default();
122 let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
123 let created_at = session.created_at.unwrap_or(updated_at);
124 let folder_paths = session.work_dirs.clone().unwrap_or_default();
125 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
126 None
127 } else {
128 Some(agent_id)
129 };
130 Self {
131 session_id,
132 agent_id,
133 title,
134 updated_at,
135 created_at: Some(created_at),
136 folder_paths,
137 }
138 }
139
140 pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
141 let thread_ref = thread.read(cx);
142 let session_id = thread_ref.session_id().clone();
143 let title = thread_ref
144 .title()
145 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
146 let updated_at = Utc::now();
147
148 let agent_id = thread_ref.connection().agent_id();
149
150 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
151 None
152 } else {
153 Some(agent_id)
154 };
155
156 let folder_paths = {
157 let project = thread_ref.project().read(cx);
158 let paths: Vec<Arc<Path>> = project
159 .visible_worktrees(cx)
160 .map(|worktree| worktree.read(cx).abs_path())
161 .collect();
162 PathList::new(&paths)
163 };
164
165 Self {
166 session_id,
167 agent_id,
168 title,
169 created_at: Some(updated_at), // handled by db `ON CONFLICT`
170 updated_at,
171 folder_paths,
172 }
173 }
174}
175
176/// The store holds all metadata needed to show threads in the sidebar.
177/// Effectively, all threads stored in here are "non-archived".
178///
179/// Automatically listens to AcpThread events and updates metadata if it has changed.
180pub struct SidebarThreadMetadataStore {
181 db: ThreadMetadataDb,
182 threads: Vec<ThreadMetadata>,
183 threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
184 reload_task: Option<Shared<Task<()>>>,
185 session_subscriptions: HashMap<acp::SessionId, Subscription>,
186}
187
188impl SidebarThreadMetadataStore {
189 #[cfg(not(any(test, feature = "test-support")))]
190 pub fn init_global(cx: &mut App) {
191 if cx.has_global::<Self>() {
192 return;
193 }
194
195 let db = ThreadMetadataDb::global(cx);
196 let thread_store = cx.new(|cx| Self::new(db, cx));
197 cx.set_global(GlobalThreadMetadataStore(thread_store));
198 }
199
200 #[cfg(any(test, feature = "test-support"))]
201 pub fn init_global(cx: &mut App) {
202 let thread = std::thread::current();
203 let test_name = thread.name().unwrap_or("unknown_test");
204 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
205 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
206 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
207 cx.set_global(GlobalThreadMetadataStore(thread_store));
208 }
209
210 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
211 cx.try_global::<GlobalThreadMetadataStore>()
212 .map(|store| store.0.clone())
213 }
214
215 pub fn global(cx: &App) -> Entity<Self> {
216 cx.global::<GlobalThreadMetadataStore>().0.clone()
217 }
218
219 pub fn is_empty(&self) -> bool {
220 self.threads.is_empty()
221 }
222
223 pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
224 self.threads.iter().cloned()
225 }
226
227 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
228 self.threads.iter().map(|thread| thread.session_id.clone())
229 }
230
231 pub fn has_entries_for_path(&self, path_list: &PathList) -> bool {
232 self.threads_by_paths
233 .get(path_list)
234 .is_some_and(|entries| !entries.is_empty())
235 }
236
237 pub fn entries_for_path(
238 &self,
239 path_list: &PathList,
240 ) -> impl Iterator<Item = ThreadMetadata> + '_ {
241 self.threads_by_paths
242 .get(path_list)
243 .into_iter()
244 .flatten()
245 .cloned()
246 }
247
248 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
249 let db = self.db.clone();
250 self.reload_task.take();
251
252 let list_task = cx
253 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
254
255 let reload_task = cx
256 .spawn(async move |this, cx| {
257 let Some(rows) = list_task.await.log_err() else {
258 return;
259 };
260
261 this.update(cx, |this, cx| {
262 this.threads.clear();
263 this.threads_by_paths.clear();
264
265 for row in rows {
266 this.threads_by_paths
267 .entry(row.folder_paths.clone())
268 .or_default()
269 .push(row.clone());
270 this.threads.push(row);
271 }
272
273 cx.notify();
274 })
275 .ok();
276 })
277 .shared();
278 self.reload_task = Some(reload_task.clone());
279 reload_task
280 }
281
282 pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
283 if !cx.has_flag::<AgentV2FeatureFlag>() {
284 return Task::ready(Ok(()));
285 }
286
287 let db = self.db.clone();
288 cx.spawn(async move |this, cx| {
289 db.save(metadata).await?;
290 let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
291 reload_task.await;
292 Ok(())
293 })
294 }
295
296 pub fn delete(
297 &mut self,
298 session_id: acp::SessionId,
299 cx: &mut Context<Self>,
300 ) -> Task<Result<()>> {
301 if !cx.has_flag::<AgentV2FeatureFlag>() {
302 return Task::ready(Ok(()));
303 }
304
305 let db = self.db.clone();
306 cx.spawn(async move |this, cx| {
307 db.delete(session_id).await?;
308 let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
309 reload_task.await;
310 Ok(())
311 })
312 }
313
314 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
315 let weak_store = cx.weak_entity();
316
317 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
318 // Don't track subagent threads in the sidebar.
319 if thread.parent_session_id().is_some() {
320 return;
321 }
322
323 let thread_entity = cx.entity();
324
325 cx.on_release({
326 let weak_store = weak_store.clone();
327 move |thread, cx| {
328 weak_store
329 .update(cx, |store, _cx| {
330 store.session_subscriptions.remove(thread.session_id());
331 })
332 .ok();
333 }
334 })
335 .detach();
336
337 weak_store
338 .update(cx, |this, cx| {
339 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
340 this.session_subscriptions
341 .insert(thread.session_id().clone(), subscription);
342 })
343 .ok();
344 })
345 .detach();
346
347 let mut this = Self {
348 db,
349 threads: Vec::new(),
350 threads_by_paths: HashMap::default(),
351 reload_task: None,
352 session_subscriptions: HashMap::default(),
353 };
354 let _ = this.reload(cx);
355 this
356 }
357
358 fn handle_thread_update(
359 &mut self,
360 thread: Entity<acp_thread::AcpThread>,
361 event: &acp_thread::AcpThreadEvent,
362 cx: &mut Context<Self>,
363 ) {
364 // Don't track subagent threads in the sidebar.
365 if thread.read(cx).parent_session_id().is_some() {
366 return;
367 }
368
369 match event {
370 acp_thread::AcpThreadEvent::NewEntry
371 | acp_thread::AcpThreadEvent::TitleUpdated
372 | acp_thread::AcpThreadEvent::EntryUpdated(_)
373 | acp_thread::AcpThreadEvent::EntriesRemoved(_)
374 | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
375 | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
376 | acp_thread::AcpThreadEvent::Retry(_)
377 | acp_thread::AcpThreadEvent::Stopped(_)
378 | acp_thread::AcpThreadEvent::Error
379 | acp_thread::AcpThreadEvent::LoadError(_)
380 | acp_thread::AcpThreadEvent::Refusal => {
381 let metadata = ThreadMetadata::from_thread(&thread, cx);
382 self.save(metadata, cx).detach_and_log_err(cx);
383 }
384 _ => {}
385 }
386 }
387}
388
389impl Global for SidebarThreadMetadataStore {}
390
391struct ThreadMetadataDb(ThreadSafeConnection);
392
393impl Domain for ThreadMetadataDb {
394 const NAME: &str = stringify!(ThreadMetadataDb);
395
396 const MIGRATIONS: &[&str] = &[sql!(
397 CREATE TABLE IF NOT EXISTS sidebar_threads(
398 session_id TEXT PRIMARY KEY,
399 agent_id TEXT,
400 title TEXT NOT NULL,
401 updated_at TEXT NOT NULL,
402 created_at TEXT,
403 folder_paths TEXT,
404 folder_paths_order TEXT
405 ) STRICT;
406 )];
407}
408
409db::static_connection!(ThreadMetadataDb, []);
410
411impl ThreadMetadataDb {
412 pub fn is_empty(&self) -> anyhow::Result<bool> {
413 self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
414 .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
415 }
416
417 /// Returns all thread metadata, sorted by most recently created first.
418 /// When `created_at` is NULL, falls back to `updated_at`.
419 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
420 self.select::<ThreadMetadata>(
421 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
422 FROM sidebar_threads \
423 ORDER BY COALESCE(created_at, updated_at) DESC"
424 )?()
425 }
426
427 /// Upsert metadata for a thread.
428 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
429 let id = row.session_id.0.clone();
430 let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
431 let title = row.title.to_string();
432 let updated_at = row.updated_at.to_rfc3339();
433 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
434 let serialized = row.folder_paths.serialize();
435 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
436 (None, None)
437 } else {
438 (Some(serialized.paths), Some(serialized.order))
439 };
440
441 self.write(move |conn| {
442 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
443 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
444 ON CONFLICT(session_id) DO UPDATE SET \
445 agent_id = excluded.agent_id, \
446 title = excluded.title, \
447 updated_at = excluded.updated_at, \
448 folder_paths = excluded.folder_paths, \
449 folder_paths_order = excluded.folder_paths_order";
450 let mut stmt = Statement::prepare(conn, sql)?;
451 let mut i = stmt.bind(&id, 1)?;
452 i = stmt.bind(&agent_id, i)?;
453 i = stmt.bind(&title, i)?;
454 i = stmt.bind(&updated_at, i)?;
455 i = stmt.bind(&created_at, i)?;
456 i = stmt.bind(&folder_paths, i)?;
457 stmt.bind(&folder_paths_order, i)?;
458 stmt.exec()
459 })
460 .await
461 }
462
463 /// Delete metadata for a single thread.
464 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
465 let id = session_id.0.clone();
466 self.write(move |conn| {
467 let mut stmt =
468 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
469 stmt.bind(&id, 1)?;
470 stmt.exec()
471 })
472 .await
473 }
474}
475
476impl Column for ThreadMetadata {
477 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
478 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
479 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
480 let (title, next): (String, i32) = Column::column(statement, next)?;
481 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
482 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
483 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
484 let (folder_paths_order_str, next): (Option<String>, i32) =
485 Column::column(statement, next)?;
486
487 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
488 let created_at = created_at_str
489 .as_deref()
490 .map(DateTime::parse_from_rfc3339)
491 .transpose()?
492 .map(|dt| dt.with_timezone(&Utc));
493
494 let folder_paths = folder_paths_str
495 .map(|paths| {
496 PathList::deserialize(&util::path_list::SerializedPathList {
497 paths,
498 order: folder_paths_order_str.unwrap_or_default(),
499 })
500 })
501 .unwrap_or_default();
502
503 Ok((
504 ThreadMetadata {
505 session_id: acp::SessionId::new(id),
506 agent_id: agent_id.map(|id| AgentId::new(id)),
507 title: title.into(),
508 updated_at,
509 created_at,
510 folder_paths,
511 },
512 next,
513 ))
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use acp_thread::{AgentConnection, StubAgentConnection};
521 use action_log::ActionLog;
522 use agent::DbThread;
523 use agent_client_protocol as acp;
524 use feature_flags::FeatureFlagAppExt;
525 use gpui::TestAppContext;
526 use project::FakeFs;
527 use project::Project;
528 use std::path::Path;
529 use std::rc::Rc;
530
531 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
532 DbThread {
533 title: title.to_string().into(),
534 messages: Vec::new(),
535 updated_at,
536 detailed_summary: None,
537 initial_project_snapshot: None,
538 cumulative_token_usage: Default::default(),
539 request_token_usage: Default::default(),
540 model: None,
541 profile: None,
542 imported: false,
543 subagent_context: None,
544 speed: None,
545 thinking_enabled: false,
546 thinking_effort: None,
547 draft_prompt: None,
548 ui_scroll_position: None,
549 }
550 }
551
552 fn make_metadata(
553 session_id: &str,
554 title: &str,
555 updated_at: DateTime<Utc>,
556 folder_paths: PathList,
557 ) -> ThreadMetadata {
558 ThreadMetadata {
559 session_id: acp::SessionId::new(session_id),
560 agent_id: None,
561 title: title.to_string().into(),
562 updated_at,
563 created_at: Some(updated_at),
564 folder_paths,
565 }
566 }
567
568 #[gpui::test]
569 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
570 let first_paths = PathList::new(&[Path::new("/project-a")]);
571 let second_paths = PathList::new(&[Path::new("/project-b")]);
572 let now = Utc::now();
573 let older = now - chrono::Duration::seconds(1);
574
575 let thread = std::thread::current();
576 let test_name = thread.name().unwrap_or("unknown_test");
577 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
578 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
579 &db_name,
580 )));
581
582 db.save(make_metadata(
583 "session-1",
584 "First Thread",
585 now,
586 first_paths.clone(),
587 ))
588 .await
589 .unwrap();
590 db.save(make_metadata(
591 "session-2",
592 "Second Thread",
593 older,
594 second_paths.clone(),
595 ))
596 .await
597 .unwrap();
598
599 cx.update(|cx| {
600 let settings_store = settings::SettingsStore::test(cx);
601 cx.set_global(settings_store);
602 cx.update_flags(true, vec!["agent-v2".to_string()]);
603 SidebarThreadMetadataStore::init_global(cx);
604 });
605
606 cx.run_until_parked();
607
608 cx.update(|cx| {
609 let store = SidebarThreadMetadataStore::global(cx);
610 let store = store.read(cx);
611
612 let entry_ids = store
613 .entry_ids()
614 .map(|session_id| session_id.0.to_string())
615 .collect::<Vec<_>>();
616 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
617
618 let first_path_entries = store
619 .entries_for_path(&first_paths)
620 .map(|entry| entry.session_id.0.to_string())
621 .collect::<Vec<_>>();
622 assert_eq!(first_path_entries, vec!["session-1"]);
623
624 let second_path_entries = store
625 .entries_for_path(&second_paths)
626 .map(|entry| entry.session_id.0.to_string())
627 .collect::<Vec<_>>();
628 assert_eq!(second_path_entries, vec!["session-2"]);
629 });
630 }
631
632 #[gpui::test]
633 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
634 cx.update(|cx| {
635 let settings_store = settings::SettingsStore::test(cx);
636 cx.set_global(settings_store);
637 cx.update_flags(true, vec!["agent-v2".to_string()]);
638 SidebarThreadMetadataStore::init_global(cx);
639 });
640
641 let first_paths = PathList::new(&[Path::new("/project-a")]);
642 let second_paths = PathList::new(&[Path::new("/project-b")]);
643 let initial_time = Utc::now();
644 let updated_time = initial_time + chrono::Duration::seconds(1);
645
646 let initial_metadata = make_metadata(
647 "session-1",
648 "First Thread",
649 initial_time,
650 first_paths.clone(),
651 );
652
653 let second_metadata = make_metadata(
654 "session-2",
655 "Second Thread",
656 initial_time,
657 second_paths.clone(),
658 );
659
660 cx.update(|cx| {
661 let store = SidebarThreadMetadataStore::global(cx);
662 store.update(cx, |store, cx| {
663 store.save(initial_metadata, cx).detach();
664 store.save(second_metadata, cx).detach();
665 });
666 });
667
668 cx.run_until_parked();
669
670 cx.update(|cx| {
671 let store = SidebarThreadMetadataStore::global(cx);
672 let store = store.read(cx);
673
674 let first_path_entries = store
675 .entries_for_path(&first_paths)
676 .map(|entry| entry.session_id.0.to_string())
677 .collect::<Vec<_>>();
678 assert_eq!(first_path_entries, vec!["session-1"]);
679
680 let second_path_entries = store
681 .entries_for_path(&second_paths)
682 .map(|entry| entry.session_id.0.to_string())
683 .collect::<Vec<_>>();
684 assert_eq!(second_path_entries, vec!["session-2"]);
685 });
686
687 let moved_metadata = make_metadata(
688 "session-1",
689 "First Thread",
690 updated_time,
691 second_paths.clone(),
692 );
693
694 cx.update(|cx| {
695 let store = SidebarThreadMetadataStore::global(cx);
696 store.update(cx, |store, cx| {
697 store.save(moved_metadata, cx).detach();
698 });
699 });
700
701 cx.run_until_parked();
702
703 cx.update(|cx| {
704 let store = SidebarThreadMetadataStore::global(cx);
705 let store = store.read(cx);
706
707 let entry_ids = store
708 .entry_ids()
709 .map(|session_id| session_id.0.to_string())
710 .collect::<Vec<_>>();
711 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
712
713 let first_path_entries = store
714 .entries_for_path(&first_paths)
715 .map(|entry| entry.session_id.0.to_string())
716 .collect::<Vec<_>>();
717 assert!(first_path_entries.is_empty());
718
719 let second_path_entries = store
720 .entries_for_path(&second_paths)
721 .map(|entry| entry.session_id.0.to_string())
722 .collect::<Vec<_>>();
723 assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
724 });
725
726 cx.update(|cx| {
727 let store = SidebarThreadMetadataStore::global(cx);
728 store.update(cx, |store, cx| {
729 store.delete(acp::SessionId::new("session-2"), cx).detach();
730 });
731 });
732
733 cx.run_until_parked();
734
735 cx.update(|cx| {
736 let store = SidebarThreadMetadataStore::global(cx);
737 let store = store.read(cx);
738
739 let entry_ids = store
740 .entry_ids()
741 .map(|session_id| session_id.0.to_string())
742 .collect::<Vec<_>>();
743 assert_eq!(entry_ids, vec!["session-1"]);
744
745 let second_path_entries = store
746 .entries_for_path(&second_paths)
747 .map(|entry| entry.session_id.0.to_string())
748 .collect::<Vec<_>>();
749 assert_eq!(second_path_entries, vec!["session-1"]);
750 });
751 }
752
753 #[gpui::test]
754 async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
755 cx.update(|cx| {
756 ThreadStore::init_global(cx);
757 SidebarThreadMetadataStore::init_global(cx);
758 });
759
760 // Verify the cache is empty before migration
761 let list = cx.update(|cx| {
762 let store = SidebarThreadMetadataStore::global(cx);
763 store.read(cx).entries().collect::<Vec<_>>()
764 });
765 assert_eq!(list.len(), 0);
766
767 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
768 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
769 let now = Utc::now();
770
771 for index in 0..12 {
772 let updated_at = now + chrono::Duration::seconds(index as i64);
773 let session_id = format!("project-a-session-{index}");
774 let title = format!("Project A Thread {index}");
775
776 let save_task = cx.update(|cx| {
777 let thread_store = ThreadStore::global(cx);
778 let session_id = session_id.clone();
779 let title = title.clone();
780 let project_a_paths = project_a_paths.clone();
781 thread_store.update(cx, |store, cx| {
782 store.save_thread(
783 acp::SessionId::new(session_id),
784 make_db_thread(&title, updated_at),
785 project_a_paths,
786 cx,
787 )
788 })
789 });
790 save_task.await.unwrap();
791 cx.run_until_parked();
792 }
793
794 for index in 0..3 {
795 let updated_at = now + chrono::Duration::seconds(100 + index as i64);
796 let session_id = format!("project-b-session-{index}");
797 let title = format!("Project B Thread {index}");
798
799 let save_task = cx.update(|cx| {
800 let thread_store = ThreadStore::global(cx);
801 let session_id = session_id.clone();
802 let title = title.clone();
803 let project_b_paths = project_b_paths.clone();
804 thread_store.update(cx, |store, cx| {
805 store.save_thread(
806 acp::SessionId::new(session_id),
807 make_db_thread(&title, updated_at),
808 project_b_paths,
809 cx,
810 )
811 })
812 });
813 save_task.await.unwrap();
814 cx.run_until_parked();
815 }
816
817 let save_projectless = cx.update(|cx| {
818 let thread_store = ThreadStore::global(cx);
819 thread_store.update(cx, |store, cx| {
820 store.save_thread(
821 acp::SessionId::new("projectless-session"),
822 make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
823 PathList::default(),
824 cx,
825 )
826 })
827 });
828 save_projectless.await.unwrap();
829 cx.run_until_parked();
830
831 // Run migration
832 cx.update(|cx| {
833 migrate_thread_metadata(cx);
834 });
835
836 cx.run_until_parked();
837
838 // Verify the metadata was migrated, limited to 10 per project, and
839 // projectless threads were skipped.
840 let list = cx.update(|cx| {
841 let store = SidebarThreadMetadataStore::global(cx);
842 store.read(cx).entries().collect::<Vec<_>>()
843 });
844 assert_eq!(list.len(), 13);
845
846 assert!(
847 list.iter()
848 .all(|metadata| !metadata.folder_paths.is_empty())
849 );
850 assert!(
851 list.iter()
852 .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
853 );
854
855 let project_a_entries = list
856 .iter()
857 .filter(|metadata| metadata.folder_paths == project_a_paths)
858 .collect::<Vec<_>>();
859 assert_eq!(project_a_entries.len(), 10);
860 assert_eq!(
861 project_a_entries
862 .iter()
863 .map(|metadata| metadata.session_id.0.as_ref())
864 .collect::<Vec<_>>(),
865 vec![
866 "project-a-session-11",
867 "project-a-session-10",
868 "project-a-session-9",
869 "project-a-session-8",
870 "project-a-session-7",
871 "project-a-session-6",
872 "project-a-session-5",
873 "project-a-session-4",
874 "project-a-session-3",
875 "project-a-session-2",
876 ]
877 );
878 assert!(
879 project_a_entries
880 .iter()
881 .all(|metadata| metadata.agent_id.is_none())
882 );
883
884 let project_b_entries = list
885 .iter()
886 .filter(|metadata| metadata.folder_paths == project_b_paths)
887 .collect::<Vec<_>>();
888 assert_eq!(project_b_entries.len(), 3);
889 assert_eq!(
890 project_b_entries
891 .iter()
892 .map(|metadata| metadata.session_id.0.as_ref())
893 .collect::<Vec<_>>(),
894 vec![
895 "project-b-session-2",
896 "project-b-session-1",
897 "project-b-session-0",
898 ]
899 );
900 assert!(
901 project_b_entries
902 .iter()
903 .all(|metadata| metadata.agent_id.is_none())
904 );
905 }
906
907 #[gpui::test]
908 async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
909 cx.update(|cx| {
910 ThreadStore::init_global(cx);
911 SidebarThreadMetadataStore::init_global(cx);
912 });
913
914 // Pre-populate the metadata store with existing data
915 let existing_metadata = ThreadMetadata {
916 session_id: acp::SessionId::new("existing-session"),
917 agent_id: None,
918 title: "Existing Thread".into(),
919 updated_at: Utc::now(),
920 created_at: Some(Utc::now()),
921 folder_paths: PathList::default(),
922 };
923
924 cx.update(|cx| {
925 let store = SidebarThreadMetadataStore::global(cx);
926 store.update(cx, |store, cx| {
927 store.save(existing_metadata, cx).detach();
928 });
929 });
930
931 cx.run_until_parked();
932
933 // Add an entry to native thread store that should NOT be migrated
934 let save_task = cx.update(|cx| {
935 let thread_store = ThreadStore::global(cx);
936 thread_store.update(cx, |store, cx| {
937 store.save_thread(
938 acp::SessionId::new("native-session"),
939 make_db_thread("Native Thread", Utc::now()),
940 PathList::default(),
941 cx,
942 )
943 })
944 });
945 save_task.await.unwrap();
946 cx.run_until_parked();
947
948 // Run migration - should skip because metadata store is not empty
949 cx.update(|cx| {
950 migrate_thread_metadata(cx);
951 });
952
953 cx.run_until_parked();
954
955 // Verify only the existing metadata is present (migration was skipped)
956 let list = cx.update(|cx| {
957 let store = SidebarThreadMetadataStore::global(cx);
958 store.read(cx).entries().collect::<Vec<_>>()
959 });
960 assert_eq!(list.len(), 1);
961 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
962 }
963
964 #[gpui::test]
965 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
966 cx.update(|cx| {
967 let settings_store = settings::SettingsStore::test(cx);
968 cx.set_global(settings_store);
969 cx.update_flags(true, vec!["agent-v2".to_string()]);
970 ThreadStore::init_global(cx);
971 SidebarThreadMetadataStore::init_global(cx);
972 });
973
974 let fs = FakeFs::new(cx.executor());
975 let project = Project::test(fs, None::<&Path>, cx).await;
976 let connection = Rc::new(StubAgentConnection::new());
977
978 // Create a regular (non-subagent) AcpThread.
979 let regular_thread = cx
980 .update(|cx| {
981 connection
982 .clone()
983 .new_session(project.clone(), PathList::default(), cx)
984 })
985 .await
986 .unwrap();
987
988 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
989
990 // Set a title on the regular thread to trigger a save via handle_thread_update.
991 cx.update(|cx| {
992 regular_thread.update(cx, |thread, cx| {
993 thread.set_title("Regular Thread".into(), cx).detach();
994 });
995 });
996 cx.run_until_parked();
997
998 // Create a subagent AcpThread
999 let subagent_session_id = acp::SessionId::new("subagent-session");
1000 let subagent_thread = cx.update(|cx| {
1001 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1002 cx.new(|cx| {
1003 acp_thread::AcpThread::new(
1004 Some(regular_session_id.clone()),
1005 Some("Subagent Thread".into()),
1006 None,
1007 connection.clone(),
1008 project.clone(),
1009 action_log,
1010 subagent_session_id.clone(),
1011 watch::Receiver::constant(acp::PromptCapabilities::new()),
1012 cx,
1013 )
1014 })
1015 });
1016
1017 // Set a title on the subagent thread to trigger handle_thread_update.
1018 cx.update(|cx| {
1019 subagent_thread.update(cx, |thread, cx| {
1020 thread
1021 .set_title("Subagent Thread Title".into(), cx)
1022 .detach();
1023 });
1024 });
1025 cx.run_until_parked();
1026
1027 // List all metadata from the store cache.
1028 let list = cx.update(|cx| {
1029 let store = SidebarThreadMetadataStore::global(cx);
1030 store.read(cx).entries().collect::<Vec<_>>()
1031 });
1032
1033 // The subagent thread should NOT appear in the sidebar metadata.
1034 // Only the regular thread should be listed.
1035 assert_eq!(
1036 list.len(),
1037 1,
1038 "Expected only the regular thread in sidebar metadata, \
1039 but found {} entries (subagent threads are leaking into the sidebar)",
1040 list.len(),
1041 );
1042 assert_eq!(list[0].session_id, regular_session_id);
1043 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1044 }
1045}