1use std::{path::Path, sync::Arc};
2
3use acp_thread::AcpThreadEvent;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::Context as _;
7use chrono::{DateTime, Utc};
8use collections::{HashMap, HashSet};
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 ThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We skip migrating threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 let store = ThreadMetadataStore::global(cx);
46 let db = store.read(cx).db.clone();
47
48 cx.spawn(async move |cx| {
49 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
50
51 let is_first_migration = existing_entries.is_empty();
52
53 let mut to_migrate = store.read_with(cx, |_store, cx| {
54 ThreadStore::global(cx)
55 .read(cx)
56 .entries()
57 .filter_map(|entry| {
58 if existing_entries.contains(&entry.id.0) {
59 return None;
60 }
61
62 Some(ThreadMetadata {
63 session_id: entry.id,
64 agent_id: ZED_AGENT_ID.clone(),
65 title: entry.title,
66 updated_at: entry.updated_at,
67 created_at: entry.created_at,
68 folder_paths: entry.folder_paths,
69 main_worktree_paths: PathList::default(),
70 archived: true,
71 })
72 })
73 .collect::<Vec<_>>()
74 });
75
76 if to_migrate.is_empty() {
77 return anyhow::Ok(());
78 }
79
80 // On the first migration (no entries in DB yet), keep the 5 most
81 // recent threads per project unarchived.
82 if is_first_migration {
83 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
84 for entry in &mut to_migrate {
85 if entry.folder_paths.is_empty() {
86 continue;
87 }
88 per_project
89 .entry(entry.folder_paths.clone())
90 .or_default()
91 .push(entry);
92 }
93 for entries in per_project.values_mut() {
94 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
95 for entry in entries.iter_mut().take(5) {
96 entry.archived = false;
97 }
98 }
99 }
100
101 log::info!("Migrating {} thread store entries", to_migrate.len());
102
103 // Manually save each entry to the database and call reload, otherwise
104 // we'll end up triggering lots of reloads after each save
105 for entry in to_migrate {
106 db.save(entry).await?;
107 }
108
109 log::info!("Finished migrating thread store entries");
110
111 let _ = store.update(cx, |store, cx| store.reload(cx));
112 anyhow::Ok(())
113 })
114 .detach_and_log_err(cx);
115}
116
117struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
118impl Global for GlobalThreadMetadataStore {}
119
120/// Lightweight metadata for any thread (native or ACP), enough to populate
121/// the sidebar list and route to the correct load path when clicked.
122#[derive(Debug, Clone, PartialEq)]
123pub struct ThreadMetadata {
124 pub session_id: acp::SessionId,
125 pub agent_id: AgentId,
126 pub title: SharedString,
127 pub updated_at: DateTime<Utc>,
128 pub created_at: Option<DateTime<Utc>>,
129 pub folder_paths: PathList,
130 pub main_worktree_paths: PathList,
131 pub archived: bool,
132}
133
134impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
135 fn from(meta: &ThreadMetadata) -> Self {
136 Self {
137 session_id: meta.session_id.clone(),
138 work_dirs: Some(meta.folder_paths.clone()),
139 title: Some(meta.title.clone()),
140 updated_at: Some(meta.updated_at),
141 created_at: meta.created_at,
142 meta: None,
143 }
144 }
145}
146
147/// The store holds all metadata needed to show threads in the sidebar/the archive.
148///
149/// Automatically listens to AcpThread events and updates metadata if it has changed.
150pub struct ThreadMetadataStore {
151 db: ThreadMetadataDb,
152 threads: HashMap<acp::SessionId, ThreadMetadata>,
153 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
154 threads_by_main_paths: HashMap<PathList, HashSet<acp::SessionId>>,
155 reload_task: Option<Shared<Task<()>>>,
156 session_subscriptions: HashMap<acp::SessionId, Subscription>,
157 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
158 _db_operations_task: Task<()>,
159}
160
161#[derive(Debug, PartialEq)]
162enum DbOperation {
163 Upsert(ThreadMetadata),
164 Delete(acp::SessionId),
165}
166
167impl DbOperation {
168 fn id(&self) -> &acp::SessionId {
169 match self {
170 DbOperation::Upsert(thread) => &thread.session_id,
171 DbOperation::Delete(session_id) => session_id,
172 }
173 }
174}
175
176impl ThreadMetadataStore {
177 #[cfg(not(any(test, feature = "test-support")))]
178 pub fn init_global(cx: &mut App) {
179 if cx.has_global::<Self>() {
180 return;
181 }
182
183 let db = ThreadMetadataDb::global(cx);
184 let thread_store = cx.new(|cx| Self::new(db, cx));
185 cx.set_global(GlobalThreadMetadataStore(thread_store));
186 }
187
188 #[cfg(any(test, feature = "test-support"))]
189 pub fn init_global(cx: &mut App) {
190 let thread = std::thread::current();
191 let test_name = thread.name().unwrap_or("unknown_test");
192 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
193 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
194 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
195 cx.set_global(GlobalThreadMetadataStore(thread_store));
196 }
197
198 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
199 cx.try_global::<GlobalThreadMetadataStore>()
200 .map(|store| store.0.clone())
201 }
202
203 pub fn global(cx: &App) -> Entity<Self> {
204 cx.global::<GlobalThreadMetadataStore>().0.clone()
205 }
206
207 pub fn is_empty(&self) -> bool {
208 self.threads.is_empty()
209 }
210
211 /// Returns all thread IDs.
212 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
213 self.threads.keys().cloned()
214 }
215
216 /// Returns the metadata for a specific thread, if it exists.
217 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
218 self.threads.get(session_id)
219 }
220
221 /// Returns all threads.
222 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
223 self.threads.values()
224 }
225
226 /// Returns all archived threads.
227 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
228 self.entries().filter(|t| t.archived)
229 }
230
231 /// Returns all threads for the given path list, excluding archived threads.
232 pub fn entries_for_path(
233 &self,
234 path_list: &PathList,
235 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
236 self.threads_by_paths
237 .get(path_list)
238 .into_iter()
239 .flatten()
240 .filter_map(|s| self.threads.get(s))
241 .filter(|s| !s.archived)
242 }
243
244 /// Returns threads whose `main_worktree_paths` matches the given path list,
245 /// excluding archived threads. This finds threads that were opened in a
246 /// linked worktree but are associated with the given main worktree.
247 pub fn entries_for_main_worktree_path(
248 &self,
249 path_list: &PathList,
250 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
251 self.threads_by_main_paths
252 .get(path_list)
253 .into_iter()
254 .flatten()
255 .filter_map(|s| self.threads.get(s))
256 .filter(|s| !s.archived)
257 }
258
259 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
260 let db = self.db.clone();
261 self.reload_task.take();
262
263 let list_task = cx
264 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
265
266 let reload_task = cx
267 .spawn(async move |this, cx| {
268 let Some(rows) = list_task.await.log_err() else {
269 return;
270 };
271
272 this.update(cx, |this, cx| {
273 this.threads.clear();
274 this.threads_by_paths.clear();
275 this.threads_by_main_paths.clear();
276
277 for row in rows {
278 this.threads_by_paths
279 .entry(row.folder_paths.clone())
280 .or_default()
281 .insert(row.session_id.clone());
282 if !row.main_worktree_paths.is_empty() {
283 this.threads_by_main_paths
284 .entry(row.main_worktree_paths.clone())
285 .or_default()
286 .insert(row.session_id.clone());
287 }
288 this.threads.insert(row.session_id.clone(), row);
289 }
290
291 cx.notify();
292 })
293 .ok();
294 })
295 .shared();
296 self.reload_task = Some(reload_task.clone());
297 reload_task
298 }
299
300 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
301 if !cx.has_flag::<AgentV2FeatureFlag>() {
302 return;
303 }
304
305 for metadata in metadata {
306 self.save_internal(metadata);
307 }
308 cx.notify();
309 }
310
311 #[cfg(any(test, feature = "test-support"))]
312 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
313 self.save(metadata, cx)
314 }
315
316 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
317 if !cx.has_flag::<AgentV2FeatureFlag>() {
318 return;
319 }
320
321 self.save_internal(metadata);
322 cx.notify();
323 }
324
325 fn save_internal(&mut self, metadata: ThreadMetadata) {
326 if let Some(thread) = self.threads.get(&metadata.session_id) {
327 if thread.folder_paths != metadata.folder_paths {
328 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
329 session_ids.remove(&metadata.session_id);
330 }
331 }
332 if thread.main_worktree_paths != metadata.main_worktree_paths
333 && !thread.main_worktree_paths.is_empty()
334 {
335 if let Some(session_ids) = self
336 .threads_by_main_paths
337 .get_mut(&thread.main_worktree_paths)
338 {
339 session_ids.remove(&metadata.session_id);
340 }
341 }
342 }
343
344 self.threads
345 .insert(metadata.session_id.clone(), metadata.clone());
346
347 self.threads_by_paths
348 .entry(metadata.folder_paths.clone())
349 .or_default()
350 .insert(metadata.session_id.clone());
351
352 if !metadata.main_worktree_paths.is_empty() {
353 self.threads_by_main_paths
354 .entry(metadata.main_worktree_paths.clone())
355 .or_default()
356 .insert(metadata.session_id.clone());
357 }
358
359 self.pending_thread_ops_tx
360 .try_send(DbOperation::Upsert(metadata))
361 .log_err();
362 }
363
364 pub fn update_working_directories(
365 &mut self,
366 session_id: &acp::SessionId,
367 work_dirs: PathList,
368 cx: &mut Context<Self>,
369 ) {
370 if !cx.has_flag::<AgentV2FeatureFlag>() {
371 return;
372 }
373
374 if let Some(thread) = self.threads.get(session_id) {
375 self.save_internal(ThreadMetadata {
376 folder_paths: work_dirs,
377 ..thread.clone()
378 });
379 cx.notify();
380 }
381 }
382
383 pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
384 self.update_archived(session_id, true, cx);
385 }
386
387 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
388 self.update_archived(session_id, false, cx);
389 }
390
391 fn update_archived(
392 &mut self,
393 session_id: &acp::SessionId,
394 archived: bool,
395 cx: &mut Context<Self>,
396 ) {
397 if !cx.has_flag::<AgentV2FeatureFlag>() {
398 return;
399 }
400
401 if let Some(thread) = self.threads.get(session_id) {
402 self.save_internal(ThreadMetadata {
403 archived,
404 ..thread.clone()
405 });
406 cx.notify();
407 }
408 }
409
410 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
411 if !cx.has_flag::<AgentV2FeatureFlag>() {
412 return;
413 }
414
415 if let Some(thread) = self.threads.get(&session_id) {
416 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
417 session_ids.remove(&session_id);
418 }
419 if !thread.main_worktree_paths.is_empty() {
420 if let Some(session_ids) = self
421 .threads_by_main_paths
422 .get_mut(&thread.main_worktree_paths)
423 {
424 session_ids.remove(&session_id);
425 }
426 }
427 }
428 self.threads.remove(&session_id);
429 self.pending_thread_ops_tx
430 .try_send(DbOperation::Delete(session_id))
431 .log_err();
432 cx.notify();
433 }
434
435 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
436 let weak_store = cx.weak_entity();
437
438 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
439 // Don't track subagent threads in the sidebar.
440 if thread.parent_session_id().is_some() {
441 return;
442 }
443
444 let thread_entity = cx.entity();
445
446 cx.on_release({
447 let weak_store = weak_store.clone();
448 move |thread, cx| {
449 weak_store
450 .update(cx, |store, _cx| {
451 let session_id = thread.session_id().clone();
452 store.session_subscriptions.remove(&session_id);
453 })
454 .ok();
455 }
456 })
457 .detach();
458
459 weak_store
460 .update(cx, |this, cx| {
461 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
462 this.session_subscriptions
463 .insert(thread.session_id().clone(), subscription);
464 })
465 .ok();
466 })
467 .detach();
468
469 let (tx, rx) = smol::channel::unbounded();
470 let _db_operations_task = cx.background_spawn({
471 let db = db.clone();
472 async move {
473 while let Ok(first_update) = rx.recv().await {
474 let mut updates = vec![first_update];
475 while let Ok(update) = rx.try_recv() {
476 updates.push(update);
477 }
478 let updates = Self::dedup_db_operations(updates);
479 for operation in updates {
480 match operation {
481 DbOperation::Upsert(metadata) => {
482 db.save(metadata).await.log_err();
483 }
484 DbOperation::Delete(session_id) => {
485 db.delete(session_id).await.log_err();
486 }
487 }
488 }
489 }
490 }
491 });
492
493 let mut this = Self {
494 db,
495 threads: HashMap::default(),
496 threads_by_paths: HashMap::default(),
497 threads_by_main_paths: HashMap::default(),
498 reload_task: None,
499 session_subscriptions: HashMap::default(),
500 pending_thread_ops_tx: tx,
501 _db_operations_task,
502 };
503 let _ = this.reload(cx);
504 this
505 }
506
507 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
508 let mut ops = HashMap::default();
509 for operation in operations.into_iter().rev() {
510 if ops.contains_key(operation.id()) {
511 continue;
512 }
513 ops.insert(operation.id().clone(), operation);
514 }
515 ops.into_values().collect()
516 }
517
518 fn handle_thread_event(
519 &mut self,
520 thread: Entity<acp_thread::AcpThread>,
521 event: &AcpThreadEvent,
522 cx: &mut Context<Self>,
523 ) {
524 // Don't track subagent threads in the sidebar.
525 if thread.read(cx).parent_session_id().is_some() {
526 return;
527 }
528
529 match event {
530 AcpThreadEvent::NewEntry
531 | AcpThreadEvent::TitleUpdated
532 | AcpThreadEvent::EntryUpdated(_)
533 | AcpThreadEvent::EntriesRemoved(_)
534 | AcpThreadEvent::ToolAuthorizationRequested(_)
535 | AcpThreadEvent::ToolAuthorizationReceived(_)
536 | AcpThreadEvent::Retry(_)
537 | AcpThreadEvent::Stopped(_)
538 | AcpThreadEvent::Error
539 | AcpThreadEvent::LoadError(_)
540 | AcpThreadEvent::Refusal
541 | AcpThreadEvent::WorkingDirectoriesUpdated => {
542 let thread_ref = thread.read(cx);
543 if thread_ref.entries().is_empty() {
544 return;
545 }
546
547 let existing_thread = self.threads.get(thread_ref.session_id());
548 let session_id = thread_ref.session_id().clone();
549 let title = thread_ref
550 .title()
551 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
552
553 let updated_at = Utc::now();
554
555 let created_at = existing_thread
556 .and_then(|t| t.created_at)
557 .unwrap_or_else(|| updated_at);
558
559 let agent_id = thread_ref.connection().agent_id();
560
561 let folder_paths = {
562 let project = thread_ref.project().read(cx);
563 let paths: Vec<Arc<Path>> = project
564 .visible_worktrees(cx)
565 .map(|worktree| worktree.read(cx).abs_path())
566 .collect();
567 PathList::new(&paths)
568 };
569
570 let main_worktree_paths = {
571 let project = thread_ref.project().read(cx);
572 let mut main_paths: Vec<Arc<Path>> = Vec::new();
573 for repo in project.repositories(cx).values() {
574 let snapshot = repo.read(cx).snapshot();
575 if snapshot.is_linked_worktree() {
576 main_paths.push(snapshot.original_repo_abs_path.clone());
577 }
578 }
579 main_paths.sort();
580 main_paths.dedup();
581 PathList::new(&main_paths)
582 };
583
584 // Threads without a folder path (e.g. started in an empty
585 // window) are archived by default so they don't get lost,
586 // because they won't show up in the sidebar. Users can reload
587 // them from the archive.
588 let archived = existing_thread
589 .map(|t| t.archived)
590 .unwrap_or(folder_paths.is_empty());
591
592 let metadata = ThreadMetadata {
593 session_id,
594 agent_id,
595 title,
596 created_at: Some(created_at),
597 updated_at,
598 folder_paths,
599 main_worktree_paths,
600 archived,
601 };
602
603 self.save(metadata, cx);
604 }
605 AcpThreadEvent::TokenUsageUpdated
606 | AcpThreadEvent::SubagentSpawned(_)
607 | AcpThreadEvent::PromptCapabilitiesUpdated
608 | AcpThreadEvent::AvailableCommandsUpdated(_)
609 | AcpThreadEvent::ModeUpdated(_)
610 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
611 }
612 }
613}
614
615impl Global for ThreadMetadataStore {}
616
617struct ThreadMetadataDb(ThreadSafeConnection);
618
619impl Domain for ThreadMetadataDb {
620 const NAME: &str = stringify!(ThreadMetadataDb);
621
622 const MIGRATIONS: &[&str] = &[
623 sql!(
624 CREATE TABLE IF NOT EXISTS sidebar_threads(
625 session_id TEXT PRIMARY KEY,
626 agent_id TEXT,
627 title TEXT NOT NULL,
628 updated_at TEXT NOT NULL,
629 created_at TEXT,
630 folder_paths TEXT,
631 folder_paths_order TEXT
632 ) STRICT;
633 ),
634 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
635 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths TEXT),
636 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths_order TEXT),
637 ];
638}
639
640db::static_connection!(ThreadMetadataDb, []);
641
642impl ThreadMetadataDb {
643 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
644 self.select::<Arc<str>>(
645 "SELECT session_id FROM sidebar_threads \
646 ORDER BY updated_at DESC",
647 )?()
648 }
649
650 /// List all sidebar thread metadata, ordered by updated_at descending.
651 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
652 self.select::<ThreadMetadata>(
653 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order \
654 FROM sidebar_threads \
655 ORDER BY updated_at DESC"
656 )?()
657 }
658
659 /// Upsert metadata for a thread.
660 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
661 let id = row.session_id.0.clone();
662 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
663 None
664 } else {
665 Some(row.agent_id.to_string())
666 };
667 let title = row.title.to_string();
668 let updated_at = row.updated_at.to_rfc3339();
669 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
670 let serialized = row.folder_paths.serialize();
671 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
672 (None, None)
673 } else {
674 (Some(serialized.paths), Some(serialized.order))
675 };
676 let main_serialized = row.main_worktree_paths.serialize();
677 let (main_worktree_paths, main_worktree_paths_order) = if row.main_worktree_paths.is_empty()
678 {
679 (None, None)
680 } else {
681 (Some(main_serialized.paths), Some(main_serialized.order))
682 };
683 let archived = row.archived;
684
685 self.write(move |conn| {
686 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order) \
687 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
688 ON CONFLICT(session_id) DO UPDATE SET \
689 agent_id = excluded.agent_id, \
690 title = excluded.title, \
691 updated_at = excluded.updated_at, \
692 created_at = excluded.created_at, \
693 folder_paths = excluded.folder_paths, \
694 folder_paths_order = excluded.folder_paths_order, \
695 archived = excluded.archived, \
696 main_worktree_paths = excluded.main_worktree_paths, \
697 main_worktree_paths_order = excluded.main_worktree_paths_order";
698 let mut stmt = Statement::prepare(conn, sql)?;
699 let mut i = stmt.bind(&id, 1)?;
700 i = stmt.bind(&agent_id, i)?;
701 i = stmt.bind(&title, i)?;
702 i = stmt.bind(&updated_at, i)?;
703 i = stmt.bind(&created_at, i)?;
704 i = stmt.bind(&folder_paths, i)?;
705 i = stmt.bind(&folder_paths_order, i)?;
706 i = stmt.bind(&archived, i)?;
707 i = stmt.bind(&main_worktree_paths, i)?;
708 stmt.bind(&main_worktree_paths_order, i)?;
709 stmt.exec()
710 })
711 .await
712 }
713
714 /// Delete metadata for a single thread.
715 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
716 let id = session_id.0.clone();
717 self.write(move |conn| {
718 let mut stmt =
719 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
720 stmt.bind(&id, 1)?;
721 stmt.exec()
722 })
723 .await
724 }
725}
726
727impl Column for ThreadMetadata {
728 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
729 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
730 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
731 let (title, next): (String, i32) = Column::column(statement, next)?;
732 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
733 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
734 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
735 let (folder_paths_order_str, next): (Option<String>, i32) =
736 Column::column(statement, next)?;
737 let (archived, next): (bool, i32) = Column::column(statement, next)?;
738 let (main_worktree_paths_str, next): (Option<String>, i32) =
739 Column::column(statement, next)?;
740 let (main_worktree_paths_order_str, next): (Option<String>, i32) =
741 Column::column(statement, next)?;
742
743 let agent_id = agent_id
744 .map(|id| AgentId::new(id))
745 .unwrap_or(ZED_AGENT_ID.clone());
746
747 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
748 let created_at = created_at_str
749 .as_deref()
750 .map(DateTime::parse_from_rfc3339)
751 .transpose()?
752 .map(|dt| dt.with_timezone(&Utc));
753
754 let folder_paths = folder_paths_str
755 .map(|paths| {
756 PathList::deserialize(&util::path_list::SerializedPathList {
757 paths,
758 order: folder_paths_order_str.unwrap_or_default(),
759 })
760 })
761 .unwrap_or_default();
762
763 let main_worktree_paths = main_worktree_paths_str
764 .map(|paths| {
765 PathList::deserialize(&util::path_list::SerializedPathList {
766 paths,
767 order: main_worktree_paths_order_str.unwrap_or_default(),
768 })
769 })
770 .unwrap_or_default();
771
772 Ok((
773 ThreadMetadata {
774 session_id: acp::SessionId::new(id),
775 agent_id,
776 title: title.into(),
777 updated_at,
778 created_at,
779 folder_paths,
780 main_worktree_paths,
781 archived,
782 },
783 next,
784 ))
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use super::*;
791 use acp_thread::{AgentConnection, StubAgentConnection};
792 use action_log::ActionLog;
793 use agent::DbThread;
794 use agent_client_protocol as acp;
795 use feature_flags::FeatureFlagAppExt;
796 use gpui::TestAppContext;
797 use project::FakeFs;
798 use project::Project;
799 use std::path::Path;
800 use std::rc::Rc;
801
802 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
803 DbThread {
804 title: title.to_string().into(),
805 messages: Vec::new(),
806 updated_at,
807 detailed_summary: None,
808 initial_project_snapshot: None,
809 cumulative_token_usage: Default::default(),
810 request_token_usage: Default::default(),
811 model: None,
812 profile: None,
813 imported: false,
814 subagent_context: None,
815 speed: None,
816 thinking_enabled: false,
817 thinking_effort: None,
818 draft_prompt: None,
819 ui_scroll_position: None,
820 }
821 }
822
823 fn make_metadata(
824 session_id: &str,
825 title: &str,
826 updated_at: DateTime<Utc>,
827 folder_paths: PathList,
828 ) -> ThreadMetadata {
829 ThreadMetadata {
830 archived: false,
831 session_id: acp::SessionId::new(session_id),
832 agent_id: agent::ZED_AGENT_ID.clone(),
833 title: title.to_string().into(),
834 updated_at,
835 created_at: Some(updated_at),
836 folder_paths,
837 main_worktree_paths: PathList::default(),
838 }
839 }
840
841 fn init_test(cx: &mut TestAppContext) {
842 cx.update(|cx| {
843 let settings_store = settings::SettingsStore::test(cx);
844 cx.set_global(settings_store);
845 cx.update_flags(true, vec!["agent-v2".to_string()]);
846 ThreadMetadataStore::init_global(cx);
847 ThreadStore::init_global(cx);
848 });
849 cx.run_until_parked();
850 }
851
852 #[gpui::test]
853 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
854 let first_paths = PathList::new(&[Path::new("/project-a")]);
855 let second_paths = PathList::new(&[Path::new("/project-b")]);
856 let now = Utc::now();
857 let older = now - chrono::Duration::seconds(1);
858
859 let thread = std::thread::current();
860 let test_name = thread.name().unwrap_or("unknown_test");
861 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
862 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
863 &db_name,
864 )));
865
866 db.save(make_metadata(
867 "session-1",
868 "First Thread",
869 now,
870 first_paths.clone(),
871 ))
872 .await
873 .unwrap();
874 db.save(make_metadata(
875 "session-2",
876 "Second Thread",
877 older,
878 second_paths.clone(),
879 ))
880 .await
881 .unwrap();
882
883 cx.update(|cx| {
884 let settings_store = settings::SettingsStore::test(cx);
885 cx.set_global(settings_store);
886 cx.update_flags(true, vec!["agent-v2".to_string()]);
887 ThreadMetadataStore::init_global(cx);
888 });
889
890 cx.run_until_parked();
891
892 cx.update(|cx| {
893 let store = ThreadMetadataStore::global(cx);
894 let store = store.read(cx);
895
896 let entry_ids = store
897 .entry_ids()
898 .map(|session_id| session_id.0.to_string())
899 .collect::<Vec<_>>();
900 assert_eq!(entry_ids.len(), 2);
901 assert!(entry_ids.contains(&"session-1".to_string()));
902 assert!(entry_ids.contains(&"session-2".to_string()));
903
904 let first_path_entries = store
905 .entries_for_path(&first_paths)
906 .map(|entry| entry.session_id.0.to_string())
907 .collect::<Vec<_>>();
908 assert_eq!(first_path_entries, vec!["session-1"]);
909
910 let second_path_entries = store
911 .entries_for_path(&second_paths)
912 .map(|entry| entry.session_id.0.to_string())
913 .collect::<Vec<_>>();
914 assert_eq!(second_path_entries, vec!["session-2"]);
915 });
916 }
917
918 #[gpui::test]
919 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
920 init_test(cx);
921
922 let first_paths = PathList::new(&[Path::new("/project-a")]);
923 let second_paths = PathList::new(&[Path::new("/project-b")]);
924 let initial_time = Utc::now();
925 let updated_time = initial_time + chrono::Duration::seconds(1);
926
927 let initial_metadata = make_metadata(
928 "session-1",
929 "First Thread",
930 initial_time,
931 first_paths.clone(),
932 );
933
934 let second_metadata = make_metadata(
935 "session-2",
936 "Second Thread",
937 initial_time,
938 second_paths.clone(),
939 );
940
941 cx.update(|cx| {
942 let store = ThreadMetadataStore::global(cx);
943 store.update(cx, |store, cx| {
944 store.save(initial_metadata, cx);
945 store.save(second_metadata, cx);
946 });
947 });
948
949 cx.run_until_parked();
950
951 cx.update(|cx| {
952 let store = ThreadMetadataStore::global(cx);
953 let store = store.read(cx);
954
955 let first_path_entries = store
956 .entries_for_path(&first_paths)
957 .map(|entry| entry.session_id.0.to_string())
958 .collect::<Vec<_>>();
959 assert_eq!(first_path_entries, vec!["session-1"]);
960
961 let second_path_entries = store
962 .entries_for_path(&second_paths)
963 .map(|entry| entry.session_id.0.to_string())
964 .collect::<Vec<_>>();
965 assert_eq!(second_path_entries, vec!["session-2"]);
966 });
967
968 let moved_metadata = make_metadata(
969 "session-1",
970 "First Thread",
971 updated_time,
972 second_paths.clone(),
973 );
974
975 cx.update(|cx| {
976 let store = ThreadMetadataStore::global(cx);
977 store.update(cx, |store, cx| {
978 store.save(moved_metadata, cx);
979 });
980 });
981
982 cx.run_until_parked();
983
984 cx.update(|cx| {
985 let store = ThreadMetadataStore::global(cx);
986 let store = store.read(cx);
987
988 let entry_ids = store
989 .entry_ids()
990 .map(|session_id| session_id.0.to_string())
991 .collect::<Vec<_>>();
992 assert_eq!(entry_ids.len(), 2);
993 assert!(entry_ids.contains(&"session-1".to_string()));
994 assert!(entry_ids.contains(&"session-2".to_string()));
995
996 let first_path_entries = store
997 .entries_for_path(&first_paths)
998 .map(|entry| entry.session_id.0.to_string())
999 .collect::<Vec<_>>();
1000 assert!(first_path_entries.is_empty());
1001
1002 let second_path_entries = store
1003 .entries_for_path(&second_paths)
1004 .map(|entry| entry.session_id.0.to_string())
1005 .collect::<Vec<_>>();
1006 assert_eq!(second_path_entries.len(), 2);
1007 assert!(second_path_entries.contains(&"session-1".to_string()));
1008 assert!(second_path_entries.contains(&"session-2".to_string()));
1009 });
1010
1011 cx.update(|cx| {
1012 let store = ThreadMetadataStore::global(cx);
1013 store.update(cx, |store, cx| {
1014 store.delete(acp::SessionId::new("session-2"), cx);
1015 });
1016 });
1017
1018 cx.run_until_parked();
1019
1020 cx.update(|cx| {
1021 let store = ThreadMetadataStore::global(cx);
1022 let store = store.read(cx);
1023
1024 let entry_ids = store
1025 .entry_ids()
1026 .map(|session_id| session_id.0.to_string())
1027 .collect::<Vec<_>>();
1028 assert_eq!(entry_ids, vec!["session-1"]);
1029
1030 let second_path_entries = store
1031 .entries_for_path(&second_paths)
1032 .map(|entry| entry.session_id.0.to_string())
1033 .collect::<Vec<_>>();
1034 assert_eq!(second_path_entries, vec!["session-1"]);
1035 });
1036 }
1037
1038 #[gpui::test]
1039 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1040 init_test(cx);
1041
1042 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1043 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1044 let now = Utc::now();
1045
1046 let existing_metadata = ThreadMetadata {
1047 session_id: acp::SessionId::new("a-session-0"),
1048 agent_id: agent::ZED_AGENT_ID.clone(),
1049 title: "Existing Metadata".into(),
1050 updated_at: now - chrono::Duration::seconds(10),
1051 created_at: Some(now - chrono::Duration::seconds(10)),
1052 folder_paths: project_a_paths.clone(),
1053 main_worktree_paths: PathList::default(),
1054 archived: false,
1055 };
1056
1057 cx.update(|cx| {
1058 let store = ThreadMetadataStore::global(cx);
1059 store.update(cx, |store, cx| {
1060 store.save(existing_metadata, cx);
1061 });
1062 });
1063 cx.run_until_parked();
1064
1065 let threads_to_save = vec![
1066 (
1067 "a-session-0",
1068 "Thread A0 From Native Store",
1069 project_a_paths.clone(),
1070 now,
1071 ),
1072 (
1073 "a-session-1",
1074 "Thread A1",
1075 project_a_paths.clone(),
1076 now + chrono::Duration::seconds(1),
1077 ),
1078 (
1079 "b-session-0",
1080 "Thread B0",
1081 project_b_paths.clone(),
1082 now + chrono::Duration::seconds(2),
1083 ),
1084 (
1085 "projectless",
1086 "Projectless",
1087 PathList::default(),
1088 now + chrono::Duration::seconds(3),
1089 ),
1090 ];
1091
1092 for (session_id, title, paths, updated_at) in &threads_to_save {
1093 let save_task = cx.update(|cx| {
1094 let thread_store = ThreadStore::global(cx);
1095 let session_id = session_id.to_string();
1096 let title = title.to_string();
1097 let paths = paths.clone();
1098 thread_store.update(cx, |store, cx| {
1099 store.save_thread(
1100 acp::SessionId::new(session_id),
1101 make_db_thread(&title, *updated_at),
1102 paths,
1103 cx,
1104 )
1105 })
1106 });
1107 save_task.await.unwrap();
1108 cx.run_until_parked();
1109 }
1110
1111 cx.update(|cx| migrate_thread_metadata(cx));
1112 cx.run_until_parked();
1113
1114 let list = cx.update(|cx| {
1115 let store = ThreadMetadataStore::global(cx);
1116 store.read(cx).entries().cloned().collect::<Vec<_>>()
1117 });
1118
1119 assert_eq!(list.len(), 4);
1120 assert!(
1121 list.iter()
1122 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1123 );
1124
1125 let existing_metadata = list
1126 .iter()
1127 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1128 .unwrap();
1129 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1130 assert!(!existing_metadata.archived);
1131
1132 let migrated_session_ids = list
1133 .iter()
1134 .map(|metadata| metadata.session_id.0.as_ref())
1135 .collect::<Vec<_>>();
1136 assert!(migrated_session_ids.contains(&"a-session-1"));
1137 assert!(migrated_session_ids.contains(&"b-session-0"));
1138 assert!(migrated_session_ids.contains(&"projectless"));
1139
1140 let migrated_entries = list
1141 .iter()
1142 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1143 .collect::<Vec<_>>();
1144 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1145 }
1146
1147 #[gpui::test]
1148 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1149 cx: &mut TestAppContext,
1150 ) {
1151 init_test(cx);
1152
1153 let project_paths = PathList::new(&[Path::new("/project-a")]);
1154 let existing_updated_at = Utc::now();
1155
1156 let existing_metadata = ThreadMetadata {
1157 session_id: acp::SessionId::new("existing-session"),
1158 agent_id: agent::ZED_AGENT_ID.clone(),
1159 title: "Existing Metadata".into(),
1160 updated_at: existing_updated_at,
1161 created_at: Some(existing_updated_at),
1162 folder_paths: project_paths.clone(),
1163 main_worktree_paths: PathList::default(),
1164 archived: false,
1165 };
1166
1167 cx.update(|cx| {
1168 let store = ThreadMetadataStore::global(cx);
1169 store.update(cx, |store, cx| {
1170 store.save(existing_metadata, cx);
1171 });
1172 });
1173 cx.run_until_parked();
1174
1175 let save_task = cx.update(|cx| {
1176 let thread_store = ThreadStore::global(cx);
1177 thread_store.update(cx, |store, cx| {
1178 store.save_thread(
1179 acp::SessionId::new("existing-session"),
1180 make_db_thread(
1181 "Updated Native Thread Title",
1182 existing_updated_at + chrono::Duration::seconds(1),
1183 ),
1184 project_paths.clone(),
1185 cx,
1186 )
1187 })
1188 });
1189 save_task.await.unwrap();
1190 cx.run_until_parked();
1191
1192 cx.update(|cx| migrate_thread_metadata(cx));
1193 cx.run_until_parked();
1194
1195 let list = cx.update(|cx| {
1196 let store = ThreadMetadataStore::global(cx);
1197 store.read(cx).entries().cloned().collect::<Vec<_>>()
1198 });
1199
1200 assert_eq!(list.len(), 1);
1201 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1202 }
1203
1204 #[gpui::test]
1205 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1206 cx: &mut TestAppContext,
1207 ) {
1208 init_test(cx);
1209
1210 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1211 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1212 let now = Utc::now();
1213
1214 // Create 7 threads for project A and 3 for project B
1215 let mut threads_to_save = Vec::new();
1216 for i in 0..7 {
1217 threads_to_save.push((
1218 format!("a-session-{i}"),
1219 format!("Thread A{i}"),
1220 project_a_paths.clone(),
1221 now + chrono::Duration::seconds(i as i64),
1222 ));
1223 }
1224 for i in 0..3 {
1225 threads_to_save.push((
1226 format!("b-session-{i}"),
1227 format!("Thread B{i}"),
1228 project_b_paths.clone(),
1229 now + chrono::Duration::seconds(i as i64),
1230 ));
1231 }
1232
1233 for (session_id, title, paths, updated_at) in &threads_to_save {
1234 let save_task = cx.update(|cx| {
1235 let thread_store = ThreadStore::global(cx);
1236 let session_id = session_id.to_string();
1237 let title = title.to_string();
1238 let paths = paths.clone();
1239 thread_store.update(cx, |store, cx| {
1240 store.save_thread(
1241 acp::SessionId::new(session_id),
1242 make_db_thread(&title, *updated_at),
1243 paths,
1244 cx,
1245 )
1246 })
1247 });
1248 save_task.await.unwrap();
1249 cx.run_until_parked();
1250 }
1251
1252 cx.update(|cx| migrate_thread_metadata(cx));
1253 cx.run_until_parked();
1254
1255 let list = cx.update(|cx| {
1256 let store = ThreadMetadataStore::global(cx);
1257 store.read(cx).entries().cloned().collect::<Vec<_>>()
1258 });
1259
1260 assert_eq!(list.len(), 10);
1261
1262 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1263 let mut project_a_entries: Vec<_> = list
1264 .iter()
1265 .filter(|m| m.folder_paths == project_a_paths)
1266 .collect();
1267 assert_eq!(project_a_entries.len(), 7);
1268 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1269
1270 for entry in &project_a_entries[..5] {
1271 assert!(
1272 !entry.archived,
1273 "Expected {} to be unarchived (top 5 most recent)",
1274 entry.session_id.0
1275 );
1276 }
1277 for entry in &project_a_entries[5..] {
1278 assert!(
1279 entry.archived,
1280 "Expected {} to be archived (older than top 5)",
1281 entry.session_id.0
1282 );
1283 }
1284
1285 // Project B: all 3 should be unarchived (under the limit)
1286 let project_b_entries: Vec<_> = list
1287 .iter()
1288 .filter(|m| m.folder_paths == project_b_paths)
1289 .collect();
1290 assert_eq!(project_b_entries.len(), 3);
1291 assert!(project_b_entries.iter().all(|m| !m.archived));
1292 }
1293
1294 #[gpui::test]
1295 async fn test_empty_thread_events_do_not_create_metadata(cx: &mut TestAppContext) {
1296 init_test(cx);
1297
1298 let fs = FakeFs::new(cx.executor());
1299 let project = Project::test(fs, None::<&Path>, cx).await;
1300 let connection = Rc::new(StubAgentConnection::new());
1301
1302 let thread = cx
1303 .update(|cx| {
1304 connection
1305 .clone()
1306 .new_session(project.clone(), PathList::default(), cx)
1307 })
1308 .await
1309 .unwrap();
1310 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1311
1312 cx.update(|cx| {
1313 thread.update(cx, |thread, cx| {
1314 thread.set_title("Draft Thread".into(), cx).detach();
1315 });
1316 });
1317 cx.run_until_parked();
1318
1319 let metadata_ids = cx.update(|cx| {
1320 ThreadMetadataStore::global(cx)
1321 .read(cx)
1322 .entry_ids()
1323 .collect::<Vec<_>>()
1324 });
1325 assert!(
1326 metadata_ids.is_empty(),
1327 "expected empty draft thread title updates to be ignored"
1328 );
1329
1330 cx.update(|cx| {
1331 thread.update(cx, |thread, cx| {
1332 thread.push_user_content_block(None, "Hello".into(), cx);
1333 });
1334 });
1335 cx.run_until_parked();
1336
1337 let metadata_ids = cx.update(|cx| {
1338 ThreadMetadataStore::global(cx)
1339 .read(cx)
1340 .entry_ids()
1341 .collect::<Vec<_>>()
1342 });
1343 assert_eq!(metadata_ids, vec![session_id]);
1344 }
1345
1346 #[gpui::test]
1347 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1348 init_test(cx);
1349
1350 let fs = FakeFs::new(cx.executor());
1351 let project = Project::test(fs, None::<&Path>, cx).await;
1352 let connection = Rc::new(StubAgentConnection::new());
1353
1354 let thread = cx
1355 .update(|cx| {
1356 connection
1357 .clone()
1358 .new_session(project.clone(), PathList::default(), cx)
1359 })
1360 .await
1361 .unwrap();
1362 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1363
1364 cx.update(|cx| {
1365 thread.update(cx, |thread, cx| {
1366 thread.push_user_content_block(None, "Hello".into(), cx);
1367 });
1368 });
1369 cx.run_until_parked();
1370
1371 let metadata_ids = cx.update(|cx| {
1372 ThreadMetadataStore::global(cx)
1373 .read(cx)
1374 .entry_ids()
1375 .collect::<Vec<_>>()
1376 });
1377 assert_eq!(metadata_ids, vec![session_id.clone()]);
1378
1379 drop(thread);
1380 cx.update(|_| {});
1381 cx.run_until_parked();
1382
1383 let metadata_ids = cx.update(|cx| {
1384 ThreadMetadataStore::global(cx)
1385 .read(cx)
1386 .entry_ids()
1387 .collect::<Vec<_>>()
1388 });
1389 assert_eq!(metadata_ids, vec![session_id]);
1390 }
1391
1392 #[gpui::test]
1393 async fn test_threads_without_project_association_are_archived_by_default(
1394 cx: &mut TestAppContext,
1395 ) {
1396 init_test(cx);
1397
1398 let fs = FakeFs::new(cx.executor());
1399 let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1400 let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1401 let connection = Rc::new(StubAgentConnection::new());
1402
1403 let thread_without_worktree = cx
1404 .update(|cx| {
1405 connection.clone().new_session(
1406 project_without_worktree.clone(),
1407 PathList::default(),
1408 cx,
1409 )
1410 })
1411 .await
1412 .unwrap();
1413 let session_without_worktree =
1414 cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1415
1416 cx.update(|cx| {
1417 thread_without_worktree.update(cx, |thread, cx| {
1418 thread.push_user_content_block(None, "content".into(), cx);
1419 thread.set_title("No Project Thread".into(), cx).detach();
1420 });
1421 });
1422 cx.run_until_parked();
1423
1424 let thread_with_worktree = cx
1425 .update(|cx| {
1426 connection.clone().new_session(
1427 project_with_worktree.clone(),
1428 PathList::default(),
1429 cx,
1430 )
1431 })
1432 .await
1433 .unwrap();
1434 let session_with_worktree =
1435 cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1436
1437 cx.update(|cx| {
1438 thread_with_worktree.update(cx, |thread, cx| {
1439 thread.push_user_content_block(None, "content".into(), cx);
1440 thread.set_title("Project Thread".into(), cx).detach();
1441 });
1442 });
1443 cx.run_until_parked();
1444
1445 cx.update(|cx| {
1446 let store = ThreadMetadataStore::global(cx);
1447 let store = store.read(cx);
1448
1449 let without_worktree = store
1450 .entry(&session_without_worktree)
1451 .expect("missing metadata for thread without project association");
1452 assert!(without_worktree.folder_paths.is_empty());
1453 assert!(
1454 without_worktree.archived,
1455 "expected thread without project association to be archived"
1456 );
1457
1458 let with_worktree = store
1459 .entry(&session_with_worktree)
1460 .expect("missing metadata for thread with project association");
1461 assert_eq!(
1462 with_worktree.folder_paths,
1463 PathList::new(&[Path::new("/project-a")])
1464 );
1465 assert!(
1466 !with_worktree.archived,
1467 "expected thread with project association to remain unarchived"
1468 );
1469 });
1470 }
1471
1472 #[gpui::test]
1473 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1474 init_test(cx);
1475
1476 let fs = FakeFs::new(cx.executor());
1477 let project = Project::test(fs, None::<&Path>, cx).await;
1478 let connection = Rc::new(StubAgentConnection::new());
1479
1480 // Create a regular (non-subagent) AcpThread.
1481 let regular_thread = cx
1482 .update(|cx| {
1483 connection
1484 .clone()
1485 .new_session(project.clone(), PathList::default(), cx)
1486 })
1487 .await
1488 .unwrap();
1489
1490 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1491
1492 // Set a title on the regular thread to trigger a save via handle_thread_update.
1493 cx.update(|cx| {
1494 regular_thread.update(cx, |thread, cx| {
1495 thread.push_user_content_block(None, "content".into(), cx);
1496 thread.set_title("Regular Thread".into(), cx).detach();
1497 });
1498 });
1499 cx.run_until_parked();
1500
1501 // Create a subagent AcpThread
1502 let subagent_session_id = acp::SessionId::new("subagent-session");
1503 let subagent_thread = cx.update(|cx| {
1504 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1505 cx.new(|cx| {
1506 acp_thread::AcpThread::new(
1507 Some(regular_session_id.clone()),
1508 Some("Subagent Thread".into()),
1509 None,
1510 connection.clone(),
1511 project.clone(),
1512 action_log,
1513 subagent_session_id.clone(),
1514 watch::Receiver::constant(acp::PromptCapabilities::new()),
1515 cx,
1516 )
1517 })
1518 });
1519
1520 // Set a title on the subagent thread to trigger handle_thread_update.
1521 cx.update(|cx| {
1522 subagent_thread.update(cx, |thread, cx| {
1523 thread
1524 .set_title("Subagent Thread Title".into(), cx)
1525 .detach();
1526 });
1527 });
1528 cx.run_until_parked();
1529
1530 // List all metadata from the store cache.
1531 let list = cx.update(|cx| {
1532 let store = ThreadMetadataStore::global(cx);
1533 store.read(cx).entries().cloned().collect::<Vec<_>>()
1534 });
1535
1536 // The subagent thread should NOT appear in the sidebar metadata.
1537 // Only the regular thread should be listed.
1538 assert_eq!(
1539 list.len(),
1540 1,
1541 "Expected only the regular thread in sidebar metadata, \
1542 but found {} entries (subagent threads are leaking into the sidebar)",
1543 list.len(),
1544 );
1545 assert_eq!(list[0].session_id, regular_session_id);
1546 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1547 }
1548
1549 #[test]
1550 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1551 let now = Utc::now();
1552
1553 let operations = vec![
1554 DbOperation::Upsert(make_metadata(
1555 "session-1",
1556 "First Thread",
1557 now,
1558 PathList::default(),
1559 )),
1560 DbOperation::Delete(acp::SessionId::new("session-1")),
1561 ];
1562
1563 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1564
1565 assert_eq!(deduped.len(), 1);
1566 assert_eq!(
1567 deduped[0],
1568 DbOperation::Delete(acp::SessionId::new("session-1"))
1569 );
1570 }
1571
1572 #[test]
1573 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1574 let now = Utc::now();
1575 let later = now + chrono::Duration::seconds(1);
1576
1577 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1578 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1579
1580 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1581 DbOperation::Upsert(old_metadata),
1582 DbOperation::Upsert(new_metadata.clone()),
1583 ]);
1584
1585 assert_eq!(deduped.len(), 1);
1586 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1587 }
1588
1589 #[test]
1590 fn test_dedup_db_operations_preserves_distinct_sessions() {
1591 let now = Utc::now();
1592
1593 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1594 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1595 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1596 DbOperation::Upsert(metadata1.clone()),
1597 DbOperation::Upsert(metadata2.clone()),
1598 ]);
1599
1600 assert_eq!(deduped.len(), 2);
1601 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1602 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1603 }
1604
1605 #[gpui::test]
1606 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1607 init_test(cx);
1608
1609 let paths = PathList::new(&[Path::new("/project-a")]);
1610 let now = Utc::now();
1611 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1612
1613 cx.update(|cx| {
1614 let store = ThreadMetadataStore::global(cx);
1615 store.update(cx, |store, cx| {
1616 store.save(metadata, cx);
1617 });
1618 });
1619
1620 cx.run_until_parked();
1621
1622 cx.update(|cx| {
1623 let store = ThreadMetadataStore::global(cx);
1624 let store = store.read(cx);
1625
1626 let path_entries = store
1627 .entries_for_path(&paths)
1628 .map(|e| e.session_id.0.to_string())
1629 .collect::<Vec<_>>();
1630 assert_eq!(path_entries, vec!["session-1"]);
1631
1632 let archived = store
1633 .archived_entries()
1634 .map(|e| e.session_id.0.to_string())
1635 .collect::<Vec<_>>();
1636 assert!(archived.is_empty());
1637 });
1638
1639 cx.update(|cx| {
1640 let store = ThreadMetadataStore::global(cx);
1641 store.update(cx, |store, cx| {
1642 store.archive(&acp::SessionId::new("session-1"), cx);
1643 });
1644 });
1645
1646 cx.run_until_parked();
1647
1648 cx.update(|cx| {
1649 let store = ThreadMetadataStore::global(cx);
1650 let store = store.read(cx);
1651
1652 let path_entries = store
1653 .entries_for_path(&paths)
1654 .map(|e| e.session_id.0.to_string())
1655 .collect::<Vec<_>>();
1656 assert!(path_entries.is_empty());
1657
1658 let archived = store.archived_entries().collect::<Vec<_>>();
1659 assert_eq!(archived.len(), 1);
1660 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1661 assert!(archived[0].archived);
1662 });
1663
1664 cx.update(|cx| {
1665 let store = ThreadMetadataStore::global(cx);
1666 store.update(cx, |store, cx| {
1667 store.unarchive(&acp::SessionId::new("session-1"), cx);
1668 });
1669 });
1670
1671 cx.run_until_parked();
1672
1673 cx.update(|cx| {
1674 let store = ThreadMetadataStore::global(cx);
1675 let store = store.read(cx);
1676
1677 let path_entries = store
1678 .entries_for_path(&paths)
1679 .map(|e| e.session_id.0.to_string())
1680 .collect::<Vec<_>>();
1681 assert_eq!(path_entries, vec!["session-1"]);
1682
1683 let archived = store
1684 .archived_entries()
1685 .map(|e| e.session_id.0.to_string())
1686 .collect::<Vec<_>>();
1687 assert!(archived.is_empty());
1688 });
1689 }
1690
1691 #[gpui::test]
1692 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1693 init_test(cx);
1694
1695 let paths = PathList::new(&[Path::new("/project-a")]);
1696 let now = Utc::now();
1697
1698 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1699 let metadata2 = make_metadata(
1700 "session-2",
1701 "Archived Thread",
1702 now - chrono::Duration::seconds(1),
1703 paths.clone(),
1704 );
1705
1706 cx.update(|cx| {
1707 let store = ThreadMetadataStore::global(cx);
1708 store.update(cx, |store, cx| {
1709 store.save(metadata1, cx);
1710 store.save(metadata2, cx);
1711 });
1712 });
1713
1714 cx.run_until_parked();
1715
1716 cx.update(|cx| {
1717 let store = ThreadMetadataStore::global(cx);
1718 store.update(cx, |store, cx| {
1719 store.archive(&acp::SessionId::new("session-2"), cx);
1720 });
1721 });
1722
1723 cx.run_until_parked();
1724
1725 cx.update(|cx| {
1726 let store = ThreadMetadataStore::global(cx);
1727 let store = store.read(cx);
1728
1729 let path_entries = store
1730 .entries_for_path(&paths)
1731 .map(|e| e.session_id.0.to_string())
1732 .collect::<Vec<_>>();
1733 assert_eq!(path_entries, vec!["session-1"]);
1734
1735 let all_entries = store
1736 .entries()
1737 .map(|e| e.session_id.0.to_string())
1738 .collect::<Vec<_>>();
1739 assert_eq!(all_entries.len(), 2);
1740 assert!(all_entries.contains(&"session-1".to_string()));
1741 assert!(all_entries.contains(&"session-2".to_string()));
1742
1743 let archived = store
1744 .archived_entries()
1745 .map(|e| e.session_id.0.to_string())
1746 .collect::<Vec<_>>();
1747 assert_eq!(archived, vec!["session-2"]);
1748 });
1749 }
1750
1751 #[gpui::test]
1752 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1753 init_test(cx);
1754
1755 let paths = PathList::new(&[Path::new("/project-a")]);
1756 let now = Utc::now();
1757
1758 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1759 let m2 = make_metadata(
1760 "session-2",
1761 "Thread Two",
1762 now - chrono::Duration::seconds(1),
1763 paths.clone(),
1764 );
1765 let m3 = make_metadata(
1766 "session-3",
1767 "Thread Three",
1768 now - chrono::Duration::seconds(2),
1769 paths,
1770 );
1771
1772 cx.update(|cx| {
1773 let store = ThreadMetadataStore::global(cx);
1774 store.update(cx, |store, cx| {
1775 store.save_all(vec![m1, m2, m3], cx);
1776 });
1777 });
1778
1779 cx.run_until_parked();
1780
1781 cx.update(|cx| {
1782 let store = ThreadMetadataStore::global(cx);
1783 let store = store.read(cx);
1784
1785 let all_entries = store
1786 .entries()
1787 .map(|e| e.session_id.0.to_string())
1788 .collect::<Vec<_>>();
1789 assert_eq!(all_entries.len(), 3);
1790 assert!(all_entries.contains(&"session-1".to_string()));
1791 assert!(all_entries.contains(&"session-2".to_string()));
1792 assert!(all_entries.contains(&"session-3".to_string()));
1793
1794 let entry_ids = store.entry_ids().collect::<Vec<_>>();
1795 assert_eq!(entry_ids.len(), 3);
1796 });
1797 }
1798
1799 #[gpui::test]
1800 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1801 init_test(cx);
1802
1803 let paths = PathList::new(&[Path::new("/project-a")]);
1804 let now = Utc::now();
1805 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1806
1807 cx.update(|cx| {
1808 let store = ThreadMetadataStore::global(cx);
1809 store.update(cx, |store, cx| {
1810 store.save(metadata, cx);
1811 });
1812 });
1813
1814 cx.run_until_parked();
1815
1816 cx.update(|cx| {
1817 let store = ThreadMetadataStore::global(cx);
1818 store.update(cx, |store, cx| {
1819 store.archive(&acp::SessionId::new("session-1"), cx);
1820 });
1821 });
1822
1823 cx.run_until_parked();
1824
1825 cx.update(|cx| {
1826 let store = ThreadMetadataStore::global(cx);
1827 store.update(cx, |store, cx| {
1828 let _ = store.reload(cx);
1829 });
1830 });
1831
1832 cx.run_until_parked();
1833
1834 cx.update(|cx| {
1835 let store = ThreadMetadataStore::global(cx);
1836 let store = store.read(cx);
1837
1838 let thread = store
1839 .entries()
1840 .find(|e| e.session_id.0.as_ref() == "session-1")
1841 .expect("thread should exist after reload");
1842 assert!(thread.archived);
1843
1844 let path_entries = store
1845 .entries_for_path(&paths)
1846 .map(|e| e.session_id.0.to_string())
1847 .collect::<Vec<_>>();
1848 assert!(path_entries.is_empty());
1849
1850 let archived = store
1851 .archived_entries()
1852 .map(|e| e.session_id.0.to_string())
1853 .collect::<Vec<_>>();
1854 assert_eq!(archived, vec!["session-1"]);
1855 });
1856 }
1857
1858 #[gpui::test]
1859 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1860 init_test(cx);
1861
1862 cx.run_until_parked();
1863
1864 cx.update(|cx| {
1865 let store = ThreadMetadataStore::global(cx);
1866 store.update(cx, |store, cx| {
1867 store.archive(&acp::SessionId::new("nonexistent"), cx);
1868 });
1869 });
1870
1871 cx.run_until_parked();
1872
1873 cx.update(|cx| {
1874 let store = ThreadMetadataStore::global(cx);
1875 let store = store.read(cx);
1876
1877 assert!(store.is_empty());
1878 assert_eq!(store.entries().count(), 0);
1879 assert_eq!(store.archived_entries().count(), 0);
1880 });
1881 }
1882
1883 #[gpui::test]
1884 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1885 init_test(cx);
1886
1887 let paths = PathList::new(&[Path::new("/project-a")]);
1888 let now = Utc::now();
1889 let metadata = make_metadata("session-1", "Thread 1", now, paths);
1890 let session_id = metadata.session_id.clone();
1891
1892 cx.update(|cx| {
1893 let store = ThreadMetadataStore::global(cx);
1894 store.update(cx, |store, cx| {
1895 store.save(metadata.clone(), cx);
1896 store.archive(&session_id, cx);
1897 });
1898 });
1899
1900 cx.run_until_parked();
1901
1902 cx.update(|cx| {
1903 let store = ThreadMetadataStore::global(cx);
1904 let store = store.read(cx);
1905
1906 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1907 pretty_assertions::assert_eq!(
1908 entries,
1909 vec![ThreadMetadata {
1910 archived: true,
1911 ..metadata
1912 }]
1913 );
1914 });
1915 }
1916}