1use std::{path::Path, sync::Arc};
2
3use acp_thread::AgentSessionInfo;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::{Context as _, Result};
7use chrono::{DateTime, Utc};
8use collections::HashMap;
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 SidebarThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We migrate the last 10 threads per project and skip threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
46
47 let store = SidebarThreadMetadataStore::global(cx);
48 let db = store.read(cx).db.clone();
49
50 cx.spawn(async move |cx| {
51 if !db.is_empty()? {
52 return Ok::<(), anyhow::Error>(());
53 }
54
55 let metadata = store.read_with(cx, |_store, app| {
56 let mut migrated_threads_per_project = HashMap::default();
57
58 ThreadStore::global(app)
59 .read(app)
60 .entries()
61 .filter_map(|entry| {
62 if entry.folder_paths.is_empty() {
63 return None;
64 }
65
66 let migrated_thread_count = migrated_threads_per_project
67 .entry(entry.folder_paths.clone())
68 .or_insert(0);
69 if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
70 return None;
71 }
72 *migrated_thread_count += 1;
73
74 Some(ThreadMetadata {
75 session_id: entry.id,
76 agent_id: None,
77 title: entry.title,
78 updated_at: entry.updated_at,
79 created_at: entry.created_at,
80 folder_paths: entry.folder_paths,
81 })
82 })
83 .collect::<Vec<_>>()
84 });
85
86 log::info!("Migrating {} thread store entries", metadata.len());
87
88 // Manually save each entry to the database and call reload, otherwise
89 // we'll end up triggering lots of reloads after each save
90 for entry in metadata {
91 db.save(entry).await?;
92 }
93
94 log::info!("Finished migrating thread store entries");
95
96 let _ = store.update(cx, |store, cx| store.reload(cx));
97 Ok(())
98 })
99 .detach_and_log_err(cx);
100}
101
102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
103impl Global for GlobalThreadMetadataStore {}
104
105/// Lightweight metadata for any thread (native or ACP), enough to populate
106/// the sidebar list and route to the correct load path when clicked.
107#[derive(Debug, Clone)]
108pub struct ThreadMetadata {
109 pub session_id: acp::SessionId,
110 /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
111 pub agent_id: Option<AgentId>,
112 pub title: SharedString,
113 pub updated_at: DateTime<Utc>,
114 pub created_at: Option<DateTime<Utc>>,
115 pub folder_paths: PathList,
116}
117
118impl ThreadMetadata {
119 pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
120 let session_id = session.session_id.clone();
121 let title = session.title.clone().unwrap_or_default();
122 let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
123 let created_at = session.created_at.unwrap_or(updated_at);
124 let folder_paths = session.work_dirs.clone().unwrap_or_default();
125 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
126 None
127 } else {
128 Some(agent_id)
129 };
130 Self {
131 session_id,
132 agent_id,
133 title,
134 updated_at,
135 created_at: Some(created_at),
136 folder_paths,
137 }
138 }
139
140 pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
141 let thread_ref = thread.read(cx);
142 let session_id = thread_ref.session_id().clone();
143 let title = thread_ref
144 .title()
145 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
146 let updated_at = Utc::now();
147
148 let agent_id = thread_ref.connection().agent_id();
149
150 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
151 None
152 } else {
153 Some(agent_id)
154 };
155
156 let folder_paths = {
157 let project = thread_ref.project().read(cx);
158 let paths: Vec<Arc<Path>> = project
159 .visible_worktrees(cx)
160 .map(|worktree| worktree.read(cx).abs_path())
161 .collect();
162 PathList::new(&paths)
163 };
164
165 Self {
166 session_id,
167 agent_id,
168 title,
169 created_at: Some(updated_at), // handled by db `ON CONFLICT`
170 updated_at,
171 folder_paths,
172 }
173 }
174}
175
176/// The store holds all metadata needed to show threads in the sidebar.
177/// Effectively, all threads stored in here are "non-archived".
178///
179/// Automatically listens to AcpThread events and updates metadata if it has changed.
180pub struct SidebarThreadMetadataStore {
181 db: ThreadMetadataDb,
182 threads: Vec<ThreadMetadata>,
183 threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
184 reload_task: Option<Shared<Task<()>>>,
185 session_subscriptions: HashMap<acp::SessionId, Subscription>,
186}
187
188impl SidebarThreadMetadataStore {
189 #[cfg(not(any(test, feature = "test-support")))]
190 pub fn init_global(cx: &mut App) {
191 if cx.has_global::<Self>() {
192 return;
193 }
194
195 let db = ThreadMetadataDb::global(cx);
196 let thread_store = cx.new(|cx| Self::new(db, cx));
197 cx.set_global(GlobalThreadMetadataStore(thread_store));
198 }
199
200 #[cfg(any(test, feature = "test-support"))]
201 pub fn init_global(cx: &mut App) {
202 let thread = std::thread::current();
203 let test_name = thread.name().unwrap_or("unknown_test");
204 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
205 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
206 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
207 cx.set_global(GlobalThreadMetadataStore(thread_store));
208 }
209
210 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
211 cx.try_global::<GlobalThreadMetadataStore>()
212 .map(|store| store.0.clone())
213 }
214
215 pub fn global(cx: &App) -> Entity<Self> {
216 cx.global::<GlobalThreadMetadataStore>().0.clone()
217 }
218
219 pub fn is_empty(&self) -> bool {
220 self.threads.is_empty()
221 }
222
223 pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
224 self.threads.iter().cloned()
225 }
226
227 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
228 self.threads.iter().map(|thread| thread.session_id.clone())
229 }
230
231 pub fn entries_for_path(
232 &self,
233 path_list: &PathList,
234 ) -> impl Iterator<Item = ThreadMetadata> + '_ {
235 self.threads_by_paths
236 .get(path_list)
237 .into_iter()
238 .flatten()
239 .cloned()
240 }
241
242 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
243 let db = self.db.clone();
244 self.reload_task.take();
245
246 let list_task = cx
247 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
248
249 let reload_task = cx
250 .spawn(async move |this, cx| {
251 let Some(rows) = list_task.await.log_err() else {
252 return;
253 };
254
255 this.update(cx, |this, cx| {
256 this.threads.clear();
257 this.threads_by_paths.clear();
258
259 for row in rows {
260 this.threads_by_paths
261 .entry(row.folder_paths.clone())
262 .or_default()
263 .push(row.clone());
264 this.threads.push(row);
265 }
266
267 cx.notify();
268 })
269 .ok();
270 })
271 .shared();
272 self.reload_task = Some(reload_task.clone());
273 reload_task
274 }
275
276 pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
277 if !cx.has_flag::<AgentV2FeatureFlag>() {
278 return Task::ready(Ok(()));
279 }
280
281 let db = self.db.clone();
282 cx.spawn(async move |this, cx| {
283 db.save(metadata).await?;
284 let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
285 reload_task.await;
286 Ok(())
287 })
288 }
289
290 pub fn delete(
291 &mut self,
292 session_id: acp::SessionId,
293 cx: &mut Context<Self>,
294 ) -> Task<Result<()>> {
295 if !cx.has_flag::<AgentV2FeatureFlag>() {
296 return Task::ready(Ok(()));
297 }
298
299 let db = self.db.clone();
300 cx.spawn(async move |this, cx| {
301 db.delete(session_id).await?;
302 let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
303 reload_task.await;
304 Ok(())
305 })
306 }
307
308 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
309 let weak_store = cx.weak_entity();
310
311 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
312 // Don't track subagent threads in the sidebar.
313 if thread.parent_session_id().is_some() {
314 return;
315 }
316
317 let thread_entity = cx.entity();
318
319 cx.on_release({
320 let weak_store = weak_store.clone();
321 move |thread, cx| {
322 weak_store
323 .update(cx, |store, _cx| {
324 store.session_subscriptions.remove(thread.session_id());
325 })
326 .ok();
327 }
328 })
329 .detach();
330
331 weak_store
332 .update(cx, |this, cx| {
333 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
334 this.session_subscriptions
335 .insert(thread.session_id().clone(), subscription);
336 })
337 .ok();
338 })
339 .detach();
340
341 let mut this = Self {
342 db,
343 threads: Vec::new(),
344 threads_by_paths: HashMap::default(),
345 reload_task: None,
346 session_subscriptions: HashMap::default(),
347 };
348 let _ = this.reload(cx);
349 this
350 }
351
352 fn handle_thread_update(
353 &mut self,
354 thread: Entity<acp_thread::AcpThread>,
355 event: &acp_thread::AcpThreadEvent,
356 cx: &mut Context<Self>,
357 ) {
358 // Don't track subagent threads in the sidebar.
359 if thread.read(cx).parent_session_id().is_some() {
360 return;
361 }
362
363 match event {
364 acp_thread::AcpThreadEvent::NewEntry
365 | acp_thread::AcpThreadEvent::TitleUpdated
366 | acp_thread::AcpThreadEvent::EntryUpdated(_)
367 | acp_thread::AcpThreadEvent::EntriesRemoved(_)
368 | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
369 | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
370 | acp_thread::AcpThreadEvent::Retry(_)
371 | acp_thread::AcpThreadEvent::Stopped(_)
372 | acp_thread::AcpThreadEvent::Error
373 | acp_thread::AcpThreadEvent::LoadError(_)
374 | acp_thread::AcpThreadEvent::Refusal => {
375 let metadata = ThreadMetadata::from_thread(&thread, cx);
376 self.save(metadata, cx).detach_and_log_err(cx);
377 }
378 _ => {}
379 }
380 }
381}
382
383impl Global for SidebarThreadMetadataStore {}
384
385struct ThreadMetadataDb(ThreadSafeConnection);
386
387impl Domain for ThreadMetadataDb {
388 const NAME: &str = stringify!(ThreadMetadataDb);
389
390 const MIGRATIONS: &[&str] = &[sql!(
391 CREATE TABLE IF NOT EXISTS sidebar_threads(
392 session_id TEXT PRIMARY KEY,
393 agent_id TEXT,
394 title TEXT NOT NULL,
395 updated_at TEXT NOT NULL,
396 created_at TEXT,
397 folder_paths TEXT,
398 folder_paths_order TEXT
399 ) STRICT;
400 )];
401}
402
403db::static_connection!(ThreadMetadataDb, []);
404
405impl ThreadMetadataDb {
406 pub fn is_empty(&self) -> anyhow::Result<bool> {
407 self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
408 .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
409 }
410
411 /// List all sidebar thread metadata, ordered by updated_at descending.
412 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
413 self.select::<ThreadMetadata>(
414 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
415 FROM sidebar_threads \
416 ORDER BY updated_at DESC"
417 )?()
418 }
419
420 /// Upsert metadata for a thread.
421 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
422 let id = row.session_id.0.clone();
423 let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
424 let title = row.title.to_string();
425 let updated_at = row.updated_at.to_rfc3339();
426 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
427 let serialized = row.folder_paths.serialize();
428 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
429 (None, None)
430 } else {
431 (Some(serialized.paths), Some(serialized.order))
432 };
433
434 self.write(move |conn| {
435 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
436 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
437 ON CONFLICT(session_id) DO UPDATE SET \
438 agent_id = excluded.agent_id, \
439 title = excluded.title, \
440 updated_at = excluded.updated_at, \
441 folder_paths = excluded.folder_paths, \
442 folder_paths_order = excluded.folder_paths_order";
443 let mut stmt = Statement::prepare(conn, sql)?;
444 let mut i = stmt.bind(&id, 1)?;
445 i = stmt.bind(&agent_id, i)?;
446 i = stmt.bind(&title, i)?;
447 i = stmt.bind(&updated_at, i)?;
448 i = stmt.bind(&created_at, i)?;
449 i = stmt.bind(&folder_paths, i)?;
450 stmt.bind(&folder_paths_order, i)?;
451 stmt.exec()
452 })
453 .await
454 }
455
456 /// Delete metadata for a single thread.
457 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
458 let id = session_id.0.clone();
459 self.write(move |conn| {
460 let mut stmt =
461 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
462 stmt.bind(&id, 1)?;
463 stmt.exec()
464 })
465 .await
466 }
467}
468
469impl Column for ThreadMetadata {
470 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
471 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
472 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
473 let (title, next): (String, i32) = Column::column(statement, next)?;
474 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
475 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
476 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
477 let (folder_paths_order_str, next): (Option<String>, i32) =
478 Column::column(statement, next)?;
479
480 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
481 let created_at = created_at_str
482 .as_deref()
483 .map(DateTime::parse_from_rfc3339)
484 .transpose()?
485 .map(|dt| dt.with_timezone(&Utc));
486
487 let folder_paths = folder_paths_str
488 .map(|paths| {
489 PathList::deserialize(&util::path_list::SerializedPathList {
490 paths,
491 order: folder_paths_order_str.unwrap_or_default(),
492 })
493 })
494 .unwrap_or_default();
495
496 Ok((
497 ThreadMetadata {
498 session_id: acp::SessionId::new(id),
499 agent_id: agent_id.map(|id| AgentId::new(id)),
500 title: title.into(),
501 updated_at,
502 created_at,
503 folder_paths,
504 },
505 next,
506 ))
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use acp_thread::{AgentConnection, StubAgentConnection};
514 use action_log::ActionLog;
515 use agent::DbThread;
516 use agent_client_protocol as acp;
517 use feature_flags::FeatureFlagAppExt;
518 use gpui::TestAppContext;
519 use project::FakeFs;
520 use project::Project;
521 use std::path::Path;
522 use std::rc::Rc;
523
524 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
525 DbThread {
526 title: title.to_string().into(),
527 messages: Vec::new(),
528 updated_at,
529 detailed_summary: None,
530 initial_project_snapshot: None,
531 cumulative_token_usage: Default::default(),
532 request_token_usage: Default::default(),
533 model: None,
534 profile: None,
535 imported: false,
536 subagent_context: None,
537 speed: None,
538 thinking_enabled: false,
539 thinking_effort: None,
540 draft_prompt: None,
541 ui_scroll_position: None,
542 }
543 }
544
545 fn make_metadata(
546 session_id: &str,
547 title: &str,
548 updated_at: DateTime<Utc>,
549 folder_paths: PathList,
550 ) -> ThreadMetadata {
551 ThreadMetadata {
552 session_id: acp::SessionId::new(session_id),
553 agent_id: None,
554 title: title.to_string().into(),
555 updated_at,
556 created_at: Some(updated_at),
557 folder_paths,
558 }
559 }
560
561 #[gpui::test]
562 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
563 let first_paths = PathList::new(&[Path::new("/project-a")]);
564 let second_paths = PathList::new(&[Path::new("/project-b")]);
565 let now = Utc::now();
566 let older = now - chrono::Duration::seconds(1);
567
568 let thread = std::thread::current();
569 let test_name = thread.name().unwrap_or("unknown_test");
570 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
571 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
572 &db_name,
573 )));
574
575 db.save(make_metadata(
576 "session-1",
577 "First Thread",
578 now,
579 first_paths.clone(),
580 ))
581 .await
582 .unwrap();
583 db.save(make_metadata(
584 "session-2",
585 "Second Thread",
586 older,
587 second_paths.clone(),
588 ))
589 .await
590 .unwrap();
591
592 cx.update(|cx| {
593 let settings_store = settings::SettingsStore::test(cx);
594 cx.set_global(settings_store);
595 cx.update_flags(true, vec!["agent-v2".to_string()]);
596 SidebarThreadMetadataStore::init_global(cx);
597 });
598
599 cx.run_until_parked();
600
601 cx.update(|cx| {
602 let store = SidebarThreadMetadataStore::global(cx);
603 let store = store.read(cx);
604
605 let entry_ids = store
606 .entry_ids()
607 .map(|session_id| session_id.0.to_string())
608 .collect::<Vec<_>>();
609 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
610
611 let first_path_entries = store
612 .entries_for_path(&first_paths)
613 .map(|entry| entry.session_id.0.to_string())
614 .collect::<Vec<_>>();
615 assert_eq!(first_path_entries, vec!["session-1"]);
616
617 let second_path_entries = store
618 .entries_for_path(&second_paths)
619 .map(|entry| entry.session_id.0.to_string())
620 .collect::<Vec<_>>();
621 assert_eq!(second_path_entries, vec!["session-2"]);
622 });
623 }
624
625 #[gpui::test]
626 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
627 cx.update(|cx| {
628 let settings_store = settings::SettingsStore::test(cx);
629 cx.set_global(settings_store);
630 cx.update_flags(true, vec!["agent-v2".to_string()]);
631 SidebarThreadMetadataStore::init_global(cx);
632 });
633
634 let first_paths = PathList::new(&[Path::new("/project-a")]);
635 let second_paths = PathList::new(&[Path::new("/project-b")]);
636 let initial_time = Utc::now();
637 let updated_time = initial_time + chrono::Duration::seconds(1);
638
639 let initial_metadata = make_metadata(
640 "session-1",
641 "First Thread",
642 initial_time,
643 first_paths.clone(),
644 );
645
646 let second_metadata = make_metadata(
647 "session-2",
648 "Second Thread",
649 initial_time,
650 second_paths.clone(),
651 );
652
653 cx.update(|cx| {
654 let store = SidebarThreadMetadataStore::global(cx);
655 store.update(cx, |store, cx| {
656 store.save(initial_metadata, cx).detach();
657 store.save(second_metadata, cx).detach();
658 });
659 });
660
661 cx.run_until_parked();
662
663 cx.update(|cx| {
664 let store = SidebarThreadMetadataStore::global(cx);
665 let store = store.read(cx);
666
667 let first_path_entries = store
668 .entries_for_path(&first_paths)
669 .map(|entry| entry.session_id.0.to_string())
670 .collect::<Vec<_>>();
671 assert_eq!(first_path_entries, vec!["session-1"]);
672
673 let second_path_entries = store
674 .entries_for_path(&second_paths)
675 .map(|entry| entry.session_id.0.to_string())
676 .collect::<Vec<_>>();
677 assert_eq!(second_path_entries, vec!["session-2"]);
678 });
679
680 let moved_metadata = make_metadata(
681 "session-1",
682 "First Thread",
683 updated_time,
684 second_paths.clone(),
685 );
686
687 cx.update(|cx| {
688 let store = SidebarThreadMetadataStore::global(cx);
689 store.update(cx, |store, cx| {
690 store.save(moved_metadata, cx).detach();
691 });
692 });
693
694 cx.run_until_parked();
695
696 cx.update(|cx| {
697 let store = SidebarThreadMetadataStore::global(cx);
698 let store = store.read(cx);
699
700 let entry_ids = store
701 .entry_ids()
702 .map(|session_id| session_id.0.to_string())
703 .collect::<Vec<_>>();
704 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
705
706 let first_path_entries = store
707 .entries_for_path(&first_paths)
708 .map(|entry| entry.session_id.0.to_string())
709 .collect::<Vec<_>>();
710 assert!(first_path_entries.is_empty());
711
712 let second_path_entries = store
713 .entries_for_path(&second_paths)
714 .map(|entry| entry.session_id.0.to_string())
715 .collect::<Vec<_>>();
716 assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
717 });
718
719 cx.update(|cx| {
720 let store = SidebarThreadMetadataStore::global(cx);
721 store.update(cx, |store, cx| {
722 store.delete(acp::SessionId::new("session-2"), cx).detach();
723 });
724 });
725
726 cx.run_until_parked();
727
728 cx.update(|cx| {
729 let store = SidebarThreadMetadataStore::global(cx);
730 let store = store.read(cx);
731
732 let entry_ids = store
733 .entry_ids()
734 .map(|session_id| session_id.0.to_string())
735 .collect::<Vec<_>>();
736 assert_eq!(entry_ids, vec!["session-1"]);
737
738 let second_path_entries = store
739 .entries_for_path(&second_paths)
740 .map(|entry| entry.session_id.0.to_string())
741 .collect::<Vec<_>>();
742 assert_eq!(second_path_entries, vec!["session-1"]);
743 });
744 }
745
746 #[gpui::test]
747 async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
748 cx.update(|cx| {
749 ThreadStore::init_global(cx);
750 SidebarThreadMetadataStore::init_global(cx);
751 });
752
753 // Verify the cache is empty before migration
754 let list = cx.update(|cx| {
755 let store = SidebarThreadMetadataStore::global(cx);
756 store.read(cx).entries().collect::<Vec<_>>()
757 });
758 assert_eq!(list.len(), 0);
759
760 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
761 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
762 let now = Utc::now();
763
764 for index in 0..12 {
765 let updated_at = now + chrono::Duration::seconds(index as i64);
766 let session_id = format!("project-a-session-{index}");
767 let title = format!("Project A Thread {index}");
768
769 let save_task = cx.update(|cx| {
770 let thread_store = ThreadStore::global(cx);
771 let session_id = session_id.clone();
772 let title = title.clone();
773 let project_a_paths = project_a_paths.clone();
774 thread_store.update(cx, |store, cx| {
775 store.save_thread(
776 acp::SessionId::new(session_id),
777 make_db_thread(&title, updated_at),
778 project_a_paths,
779 cx,
780 )
781 })
782 });
783 save_task.await.unwrap();
784 cx.run_until_parked();
785 }
786
787 for index in 0..3 {
788 let updated_at = now + chrono::Duration::seconds(100 + index as i64);
789 let session_id = format!("project-b-session-{index}");
790 let title = format!("Project B Thread {index}");
791
792 let save_task = cx.update(|cx| {
793 let thread_store = ThreadStore::global(cx);
794 let session_id = session_id.clone();
795 let title = title.clone();
796 let project_b_paths = project_b_paths.clone();
797 thread_store.update(cx, |store, cx| {
798 store.save_thread(
799 acp::SessionId::new(session_id),
800 make_db_thread(&title, updated_at),
801 project_b_paths,
802 cx,
803 )
804 })
805 });
806 save_task.await.unwrap();
807 cx.run_until_parked();
808 }
809
810 let save_projectless = cx.update(|cx| {
811 let thread_store = ThreadStore::global(cx);
812 thread_store.update(cx, |store, cx| {
813 store.save_thread(
814 acp::SessionId::new("projectless-session"),
815 make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
816 PathList::default(),
817 cx,
818 )
819 })
820 });
821 save_projectless.await.unwrap();
822 cx.run_until_parked();
823
824 // Run migration
825 cx.update(|cx| {
826 migrate_thread_metadata(cx);
827 });
828
829 cx.run_until_parked();
830
831 // Verify the metadata was migrated, limited to 10 per project, and
832 // projectless threads were skipped.
833 let list = cx.update(|cx| {
834 let store = SidebarThreadMetadataStore::global(cx);
835 store.read(cx).entries().collect::<Vec<_>>()
836 });
837 assert_eq!(list.len(), 13);
838
839 assert!(
840 list.iter()
841 .all(|metadata| !metadata.folder_paths.is_empty())
842 );
843 assert!(
844 list.iter()
845 .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
846 );
847
848 let project_a_entries = list
849 .iter()
850 .filter(|metadata| metadata.folder_paths == project_a_paths)
851 .collect::<Vec<_>>();
852 assert_eq!(project_a_entries.len(), 10);
853 assert_eq!(
854 project_a_entries
855 .iter()
856 .map(|metadata| metadata.session_id.0.as_ref())
857 .collect::<Vec<_>>(),
858 vec![
859 "project-a-session-11",
860 "project-a-session-10",
861 "project-a-session-9",
862 "project-a-session-8",
863 "project-a-session-7",
864 "project-a-session-6",
865 "project-a-session-5",
866 "project-a-session-4",
867 "project-a-session-3",
868 "project-a-session-2",
869 ]
870 );
871 assert!(
872 project_a_entries
873 .iter()
874 .all(|metadata| metadata.agent_id.is_none())
875 );
876
877 let project_b_entries = list
878 .iter()
879 .filter(|metadata| metadata.folder_paths == project_b_paths)
880 .collect::<Vec<_>>();
881 assert_eq!(project_b_entries.len(), 3);
882 assert_eq!(
883 project_b_entries
884 .iter()
885 .map(|metadata| metadata.session_id.0.as_ref())
886 .collect::<Vec<_>>(),
887 vec![
888 "project-b-session-2",
889 "project-b-session-1",
890 "project-b-session-0",
891 ]
892 );
893 assert!(
894 project_b_entries
895 .iter()
896 .all(|metadata| metadata.agent_id.is_none())
897 );
898 }
899
900 #[gpui::test]
901 async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
902 cx.update(|cx| {
903 ThreadStore::init_global(cx);
904 SidebarThreadMetadataStore::init_global(cx);
905 });
906
907 // Pre-populate the metadata store with existing data
908 let existing_metadata = ThreadMetadata {
909 session_id: acp::SessionId::new("existing-session"),
910 agent_id: None,
911 title: "Existing Thread".into(),
912 updated_at: Utc::now(),
913 created_at: Some(Utc::now()),
914 folder_paths: PathList::default(),
915 };
916
917 cx.update(|cx| {
918 let store = SidebarThreadMetadataStore::global(cx);
919 store.update(cx, |store, cx| {
920 store.save(existing_metadata, cx).detach();
921 });
922 });
923
924 cx.run_until_parked();
925
926 // Add an entry to native thread store that should NOT be migrated
927 let save_task = cx.update(|cx| {
928 let thread_store = ThreadStore::global(cx);
929 thread_store.update(cx, |store, cx| {
930 store.save_thread(
931 acp::SessionId::new("native-session"),
932 make_db_thread("Native Thread", Utc::now()),
933 PathList::default(),
934 cx,
935 )
936 })
937 });
938 save_task.await.unwrap();
939 cx.run_until_parked();
940
941 // Run migration - should skip because metadata store is not empty
942 cx.update(|cx| {
943 migrate_thread_metadata(cx);
944 });
945
946 cx.run_until_parked();
947
948 // Verify only the existing metadata is present (migration was skipped)
949 let list = cx.update(|cx| {
950 let store = SidebarThreadMetadataStore::global(cx);
951 store.read(cx).entries().collect::<Vec<_>>()
952 });
953 assert_eq!(list.len(), 1);
954 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
955 }
956
957 #[gpui::test]
958 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
959 cx.update(|cx| {
960 let settings_store = settings::SettingsStore::test(cx);
961 cx.set_global(settings_store);
962 cx.update_flags(true, vec!["agent-v2".to_string()]);
963 ThreadStore::init_global(cx);
964 SidebarThreadMetadataStore::init_global(cx);
965 });
966
967 let fs = FakeFs::new(cx.executor());
968 let project = Project::test(fs, None::<&Path>, cx).await;
969 let connection = Rc::new(StubAgentConnection::new());
970
971 // Create a regular (non-subagent) AcpThread.
972 let regular_thread = cx
973 .update(|cx| {
974 connection
975 .clone()
976 .new_session(project.clone(), PathList::default(), cx)
977 })
978 .await
979 .unwrap();
980
981 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
982
983 // Set a title on the regular thread to trigger a save via handle_thread_update.
984 cx.update(|cx| {
985 regular_thread.update(cx, |thread, cx| {
986 thread.set_title("Regular Thread".into(), cx).detach();
987 });
988 });
989 cx.run_until_parked();
990
991 // Create a subagent AcpThread
992 let subagent_session_id = acp::SessionId::new("subagent-session");
993 let subagent_thread = cx.update(|cx| {
994 let action_log = cx.new(|_| ActionLog::new(project.clone()));
995 cx.new(|cx| {
996 acp_thread::AcpThread::new(
997 Some(regular_session_id.clone()),
998 Some("Subagent Thread".into()),
999 None,
1000 connection.clone(),
1001 project.clone(),
1002 action_log,
1003 subagent_session_id.clone(),
1004 watch::Receiver::constant(acp::PromptCapabilities::new()),
1005 cx,
1006 )
1007 })
1008 });
1009
1010 // Set a title on the subagent thread to trigger handle_thread_update.
1011 cx.update(|cx| {
1012 subagent_thread.update(cx, |thread, cx| {
1013 thread
1014 .set_title("Subagent Thread Title".into(), cx)
1015 .detach();
1016 });
1017 });
1018 cx.run_until_parked();
1019
1020 // List all metadata from the store cache.
1021 let list = cx.update(|cx| {
1022 let store = SidebarThreadMetadataStore::global(cx);
1023 store.read(cx).entries().collect::<Vec<_>>()
1024 });
1025
1026 // The subagent thread should NOT appear in the sidebar metadata.
1027 // Only the regular thread should be listed.
1028 assert_eq!(
1029 list.len(),
1030 1,
1031 "Expected only the regular thread in sidebar metadata, \
1032 but found {} entries (subagent threads are leaking into the sidebar)",
1033 list.len(),
1034 );
1035 assert_eq!(list[0].session_id, regular_session_id);
1036 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1037 }
1038}