1use std::{path::Path, sync::Arc};
2
3use acp_thread::AcpThreadEvent;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::Context as _;
7use chrono::{DateTime, Utc};
8use collections::{HashMap, HashSet};
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 ThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We skip migrating threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 let store = ThreadMetadataStore::global(cx);
46 let db = store.read(cx).db.clone();
47
48 cx.spawn(async move |cx| {
49 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
50
51 let is_first_migration = existing_entries.is_empty();
52
53 let mut to_migrate = store.read_with(cx, |_store, cx| {
54 ThreadStore::global(cx)
55 .read(cx)
56 .entries()
57 .filter_map(|entry| {
58 if existing_entries.contains(&entry.id.0) {
59 return None;
60 }
61
62 Some(ThreadMetadata {
63 session_id: entry.id,
64 agent_id: ZED_AGENT_ID.clone(),
65 title: entry.title,
66 updated_at: entry.updated_at,
67 created_at: entry.created_at,
68 folder_paths: entry.folder_paths,
69 main_worktree_paths: PathList::default(),
70 archived: true,
71 })
72 })
73 .collect::<Vec<_>>()
74 });
75
76 if to_migrate.is_empty() {
77 return anyhow::Ok(());
78 }
79
80 // On the first migration (no entries in DB yet), keep the 5 most
81 // recent threads per project unarchived.
82 if is_first_migration {
83 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
84 for entry in &mut to_migrate {
85 if entry.folder_paths.is_empty() {
86 continue;
87 }
88 per_project
89 .entry(entry.folder_paths.clone())
90 .or_default()
91 .push(entry);
92 }
93 for entries in per_project.values_mut() {
94 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
95 for entry in entries.iter_mut().take(5) {
96 entry.archived = false;
97 }
98 }
99 }
100
101 log::info!("Migrating {} thread store entries", to_migrate.len());
102
103 // Manually save each entry to the database and call reload, otherwise
104 // we'll end up triggering lots of reloads after each save
105 for entry in to_migrate {
106 db.save(entry).await?;
107 }
108
109 log::info!("Finished migrating thread store entries");
110
111 let _ = store.update(cx, |store, cx| store.reload(cx));
112 anyhow::Ok(())
113 })
114 .detach_and_log_err(cx);
115}
116
117struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
118impl Global for GlobalThreadMetadataStore {}
119
120/// Lightweight metadata for any thread (native or ACP), enough to populate
121/// the sidebar list and route to the correct load path when clicked.
122#[derive(Debug, Clone, PartialEq)]
123pub struct ThreadMetadata {
124 pub session_id: acp::SessionId,
125 pub agent_id: AgentId,
126 pub title: SharedString,
127 pub updated_at: DateTime<Utc>,
128 pub created_at: Option<DateTime<Utc>>,
129 pub folder_paths: PathList,
130 pub main_worktree_paths: PathList,
131 pub archived: bool,
132}
133
134impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
135 fn from(meta: &ThreadMetadata) -> Self {
136 Self {
137 session_id: meta.session_id.clone(),
138 work_dirs: Some(meta.folder_paths.clone()),
139 title: Some(meta.title.clone()),
140 updated_at: Some(meta.updated_at),
141 created_at: meta.created_at,
142 meta: None,
143 }
144 }
145}
146
147/// The store holds all metadata needed to show threads in the sidebar/the archive.
148///
149/// Automatically listens to AcpThread events and updates metadata if it has changed.
150pub struct ThreadMetadataStore {
151 db: ThreadMetadataDb,
152 threads: HashMap<acp::SessionId, ThreadMetadata>,
153 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
154 threads_by_main_paths: HashMap<PathList, HashSet<acp::SessionId>>,
155 reload_task: Option<Shared<Task<()>>>,
156 session_subscriptions: HashMap<acp::SessionId, Subscription>,
157 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
158 _db_operations_task: Task<()>,
159}
160
161#[derive(Debug, PartialEq)]
162enum DbOperation {
163 Upsert(ThreadMetadata),
164 Delete(acp::SessionId),
165}
166
167impl DbOperation {
168 fn id(&self) -> &acp::SessionId {
169 match self {
170 DbOperation::Upsert(thread) => &thread.session_id,
171 DbOperation::Delete(session_id) => session_id,
172 }
173 }
174}
175
176impl ThreadMetadataStore {
177 #[cfg(not(any(test, feature = "test-support")))]
178 pub fn init_global(cx: &mut App) {
179 if cx.has_global::<Self>() {
180 return;
181 }
182
183 let db = ThreadMetadataDb::global(cx);
184 let thread_store = cx.new(|cx| Self::new(db, cx));
185 cx.set_global(GlobalThreadMetadataStore(thread_store));
186 }
187
188 #[cfg(any(test, feature = "test-support"))]
189 pub fn init_global(cx: &mut App) {
190 let thread = std::thread::current();
191 let test_name = thread.name().unwrap_or("unknown_test");
192 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
193 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
194 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
195 cx.set_global(GlobalThreadMetadataStore(thread_store));
196 }
197
198 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
199 cx.try_global::<GlobalThreadMetadataStore>()
200 .map(|store| store.0.clone())
201 }
202
203 pub fn global(cx: &App) -> Entity<Self> {
204 cx.global::<GlobalThreadMetadataStore>().0.clone()
205 }
206
207 pub fn is_empty(&self) -> bool {
208 self.threads.is_empty()
209 }
210
211 /// Returns all thread IDs.
212 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
213 self.threads.keys().cloned()
214 }
215
216 /// Returns the metadata for a specific thread, if it exists.
217 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
218 self.threads.get(session_id)
219 }
220
221 /// Returns all threads.
222 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
223 self.threads.values()
224 }
225
226 /// Returns all archived threads.
227 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
228 self.entries().filter(|t| t.archived)
229 }
230
231 /// Returns all threads for the given path list, excluding archived threads.
232 pub fn entries_for_path(
233 &self,
234 path_list: &PathList,
235 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
236 self.threads_by_paths
237 .get(path_list)
238 .into_iter()
239 .flatten()
240 .filter_map(|s| self.threads.get(s))
241 .filter(|s| !s.archived)
242 }
243
244 /// Returns threads whose `main_worktree_paths` matches the given path list,
245 /// excluding archived threads. This finds threads that were opened in a
246 /// linked worktree but are associated with the given main worktree.
247 pub fn entries_for_main_worktree_path(
248 &self,
249 path_list: &PathList,
250 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
251 self.threads_by_main_paths
252 .get(path_list)
253 .into_iter()
254 .flatten()
255 .filter_map(|s| self.threads.get(s))
256 .filter(|s| !s.archived)
257 }
258
259 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
260 let db = self.db.clone();
261 self.reload_task.take();
262
263 let list_task = cx
264 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
265
266 let reload_task = cx
267 .spawn(async move |this, cx| {
268 let Some(rows) = list_task.await.log_err() else {
269 return;
270 };
271
272 this.update(cx, |this, cx| {
273 this.threads.clear();
274 this.threads_by_paths.clear();
275 this.threads_by_main_paths.clear();
276
277 for row in rows {
278 this.threads_by_paths
279 .entry(row.folder_paths.clone())
280 .or_default()
281 .insert(row.session_id.clone());
282 if !row.main_worktree_paths.is_empty() {
283 this.threads_by_main_paths
284 .entry(row.main_worktree_paths.clone())
285 .or_default()
286 .insert(row.session_id.clone());
287 }
288 this.threads.insert(row.session_id.clone(), row);
289 }
290
291 cx.notify();
292 })
293 .ok();
294 })
295 .shared();
296 self.reload_task = Some(reload_task.clone());
297 reload_task
298 }
299
300 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
301 if !cx.has_flag::<AgentV2FeatureFlag>() {
302 return;
303 }
304
305 for metadata in metadata {
306 self.save_internal(metadata);
307 }
308 cx.notify();
309 }
310
311 #[cfg(any(test, feature = "test-support"))]
312 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
313 self.save(metadata, cx)
314 }
315
316 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
317 if !cx.has_flag::<AgentV2FeatureFlag>() {
318 return;
319 }
320
321 self.save_internal(metadata);
322 cx.notify();
323 }
324
325 fn save_internal(&mut self, metadata: ThreadMetadata) {
326 if let Some(thread) = self.threads.get(&metadata.session_id) {
327 if thread.folder_paths != metadata.folder_paths {
328 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
329 session_ids.remove(&metadata.session_id);
330 }
331 }
332 if thread.main_worktree_paths != metadata.main_worktree_paths
333 && !thread.main_worktree_paths.is_empty()
334 {
335 if let Some(session_ids) = self
336 .threads_by_main_paths
337 .get_mut(&thread.main_worktree_paths)
338 {
339 session_ids.remove(&metadata.session_id);
340 }
341 }
342 }
343
344 self.threads
345 .insert(metadata.session_id.clone(), metadata.clone());
346
347 self.threads_by_paths
348 .entry(metadata.folder_paths.clone())
349 .or_default()
350 .insert(metadata.session_id.clone());
351
352 if !metadata.main_worktree_paths.is_empty() {
353 self.threads_by_main_paths
354 .entry(metadata.main_worktree_paths.clone())
355 .or_default()
356 .insert(metadata.session_id.clone());
357 }
358
359 self.pending_thread_ops_tx
360 .try_send(DbOperation::Upsert(metadata))
361 .log_err();
362 }
363
364 pub fn update_working_directories(
365 &mut self,
366 session_id: &acp::SessionId,
367 work_dirs: PathList,
368 cx: &mut Context<Self>,
369 ) {
370 if !cx.has_flag::<AgentV2FeatureFlag>() {
371 return;
372 }
373
374 if let Some(thread) = self.threads.get(session_id) {
375 self.save_internal(ThreadMetadata {
376 folder_paths: work_dirs,
377 ..thread.clone()
378 });
379 cx.notify();
380 }
381 }
382
383 pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
384 self.update_archived(session_id, true, cx);
385 }
386
387 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
388 self.update_archived(session_id, false, cx);
389 }
390
391 fn update_archived(
392 &mut self,
393 session_id: &acp::SessionId,
394 archived: bool,
395 cx: &mut Context<Self>,
396 ) {
397 if !cx.has_flag::<AgentV2FeatureFlag>() {
398 return;
399 }
400
401 if let Some(thread) = self.threads.get(session_id) {
402 self.save_internal(ThreadMetadata {
403 archived,
404 ..thread.clone()
405 });
406 cx.notify();
407 }
408 }
409
410 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
411 if !cx.has_flag::<AgentV2FeatureFlag>() {
412 return;
413 }
414
415 if let Some(thread) = self.threads.get(&session_id) {
416 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
417 session_ids.remove(&session_id);
418 }
419 if !thread.main_worktree_paths.is_empty() {
420 if let Some(session_ids) = self
421 .threads_by_main_paths
422 .get_mut(&thread.main_worktree_paths)
423 {
424 session_ids.remove(&session_id);
425 }
426 }
427 }
428 self.threads.remove(&session_id);
429 self.pending_thread_ops_tx
430 .try_send(DbOperation::Delete(session_id))
431 .log_err();
432 cx.notify();
433 }
434
435 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
436 let weak_store = cx.weak_entity();
437
438 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
439 // Don't track subagent threads in the sidebar.
440 if thread.parent_session_id().is_some() {
441 return;
442 }
443
444 let thread_entity = cx.entity();
445
446 cx.on_release({
447 let weak_store = weak_store.clone();
448 move |thread, cx| {
449 weak_store
450 .update(cx, |store, _cx| {
451 let session_id = thread.session_id().clone();
452 store.session_subscriptions.remove(&session_id);
453 })
454 .ok();
455 }
456 })
457 .detach();
458
459 weak_store
460 .update(cx, |this, cx| {
461 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
462 this.session_subscriptions
463 .insert(thread.session_id().clone(), subscription);
464 })
465 .ok();
466 })
467 .detach();
468
469 let (tx, rx) = smol::channel::unbounded();
470 let _db_operations_task = cx.background_spawn({
471 let db = db.clone();
472 async move {
473 while let Ok(first_update) = rx.recv().await {
474 let mut updates = vec![first_update];
475 while let Ok(update) = rx.try_recv() {
476 updates.push(update);
477 }
478 let updates = Self::dedup_db_operations(updates);
479 for operation in updates {
480 match operation {
481 DbOperation::Upsert(metadata) => {
482 db.save(metadata).await.log_err();
483 }
484 DbOperation::Delete(session_id) => {
485 db.delete(session_id).await.log_err();
486 }
487 }
488 }
489 }
490 }
491 });
492
493 let mut this = Self {
494 db,
495 threads: HashMap::default(),
496 threads_by_paths: HashMap::default(),
497 threads_by_main_paths: HashMap::default(),
498 reload_task: None,
499 session_subscriptions: HashMap::default(),
500 pending_thread_ops_tx: tx,
501 _db_operations_task,
502 };
503 let _ = this.reload(cx);
504 this
505 }
506
507 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
508 let mut ops = HashMap::default();
509 for operation in operations.into_iter().rev() {
510 if ops.contains_key(operation.id()) {
511 continue;
512 }
513 ops.insert(operation.id().clone(), operation);
514 }
515 ops.into_values().collect()
516 }
517
518 fn handle_thread_event(
519 &mut self,
520 thread: Entity<acp_thread::AcpThread>,
521 event: &AcpThreadEvent,
522 cx: &mut Context<Self>,
523 ) {
524 // Don't track subagent threads in the sidebar.
525 if thread.read(cx).parent_session_id().is_some() {
526 return;
527 }
528
529 match event {
530 AcpThreadEvent::NewEntry
531 | AcpThreadEvent::TitleUpdated
532 | AcpThreadEvent::EntryUpdated(_)
533 | AcpThreadEvent::EntriesRemoved(_)
534 | AcpThreadEvent::ToolAuthorizationRequested(_)
535 | AcpThreadEvent::ToolAuthorizationReceived(_)
536 | AcpThreadEvent::Retry(_)
537 | AcpThreadEvent::Stopped(_)
538 | AcpThreadEvent::Error
539 | AcpThreadEvent::LoadError(_)
540 | AcpThreadEvent::Refusal
541 | AcpThreadEvent::WorkingDirectoriesUpdated => {
542 let thread_ref = thread.read(cx);
543 if thread_ref.entries().is_empty() {
544 return;
545 }
546
547 let existing_thread = self.threads.get(thread_ref.session_id());
548 let session_id = thread_ref.session_id().clone();
549 let title = thread_ref
550 .title()
551 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
552
553 let updated_at = Utc::now();
554
555 let created_at = existing_thread
556 .and_then(|t| t.created_at)
557 .unwrap_or_else(|| updated_at);
558
559 let agent_id = thread_ref.connection().agent_id();
560
561 let folder_paths = {
562 let project = thread_ref.project().read(cx);
563 let paths: Vec<Arc<Path>> = project
564 .visible_worktrees(cx)
565 .map(|worktree| worktree.read(cx).abs_path())
566 .collect();
567 PathList::new(&paths)
568 };
569
570 let main_worktree_paths = thread_ref
571 .project()
572 .read(cx)
573 .project_group_key(cx)
574 .path_list()
575 .clone();
576
577 // Threads without a folder path (e.g. started in an empty
578 // window) are archived by default so they don't get lost,
579 // because they won't show up in the sidebar. Users can reload
580 // them from the archive.
581 let archived = existing_thread
582 .map(|t| t.archived)
583 .unwrap_or(folder_paths.is_empty());
584
585 let metadata = ThreadMetadata {
586 session_id,
587 agent_id,
588 title,
589 created_at: Some(created_at),
590 updated_at,
591 folder_paths,
592 main_worktree_paths,
593 archived,
594 };
595
596 self.save(metadata, cx);
597 }
598 AcpThreadEvent::TokenUsageUpdated
599 | AcpThreadEvent::SubagentSpawned(_)
600 | AcpThreadEvent::PromptCapabilitiesUpdated
601 | AcpThreadEvent::AvailableCommandsUpdated(_)
602 | AcpThreadEvent::ModeUpdated(_)
603 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
604 }
605 }
606}
607
608impl Global for ThreadMetadataStore {}
609
610struct ThreadMetadataDb(ThreadSafeConnection);
611
612impl Domain for ThreadMetadataDb {
613 const NAME: &str = stringify!(ThreadMetadataDb);
614
615 const MIGRATIONS: &[&str] = &[
616 sql!(
617 CREATE TABLE IF NOT EXISTS sidebar_threads(
618 session_id TEXT PRIMARY KEY,
619 agent_id TEXT,
620 title TEXT NOT NULL,
621 updated_at TEXT NOT NULL,
622 created_at TEXT,
623 folder_paths TEXT,
624 folder_paths_order TEXT
625 ) STRICT;
626 ),
627 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
628 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths TEXT),
629 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths_order TEXT),
630 ];
631}
632
633db::static_connection!(ThreadMetadataDb, []);
634
635impl ThreadMetadataDb {
636 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
637 self.select::<Arc<str>>(
638 "SELECT session_id FROM sidebar_threads \
639 ORDER BY updated_at DESC",
640 )?()
641 }
642
643 /// List all sidebar thread metadata, ordered by updated_at descending.
644 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
645 self.select::<ThreadMetadata>(
646 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order \
647 FROM sidebar_threads \
648 ORDER BY updated_at DESC"
649 )?()
650 }
651
652 /// Upsert metadata for a thread.
653 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
654 let id = row.session_id.0.clone();
655 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
656 None
657 } else {
658 Some(row.agent_id.to_string())
659 };
660 let title = row.title.to_string();
661 let updated_at = row.updated_at.to_rfc3339();
662 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
663 let serialized = row.folder_paths.serialize();
664 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
665 (None, None)
666 } else {
667 (Some(serialized.paths), Some(serialized.order))
668 };
669 let main_serialized = row.main_worktree_paths.serialize();
670 let (main_worktree_paths, main_worktree_paths_order) = if row.main_worktree_paths.is_empty()
671 {
672 (None, None)
673 } else {
674 (Some(main_serialized.paths), Some(main_serialized.order))
675 };
676 let archived = row.archived;
677
678 self.write(move |conn| {
679 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order) \
680 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
681 ON CONFLICT(session_id) DO UPDATE SET \
682 agent_id = excluded.agent_id, \
683 title = excluded.title, \
684 updated_at = excluded.updated_at, \
685 created_at = excluded.created_at, \
686 folder_paths = excluded.folder_paths, \
687 folder_paths_order = excluded.folder_paths_order, \
688 archived = excluded.archived, \
689 main_worktree_paths = excluded.main_worktree_paths, \
690 main_worktree_paths_order = excluded.main_worktree_paths_order";
691 let mut stmt = Statement::prepare(conn, sql)?;
692 let mut i = stmt.bind(&id, 1)?;
693 i = stmt.bind(&agent_id, i)?;
694 i = stmt.bind(&title, i)?;
695 i = stmt.bind(&updated_at, i)?;
696 i = stmt.bind(&created_at, i)?;
697 i = stmt.bind(&folder_paths, i)?;
698 i = stmt.bind(&folder_paths_order, i)?;
699 i = stmt.bind(&archived, i)?;
700 i = stmt.bind(&main_worktree_paths, i)?;
701 stmt.bind(&main_worktree_paths_order, i)?;
702 stmt.exec()
703 })
704 .await
705 }
706
707 /// Delete metadata for a single thread.
708 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
709 let id = session_id.0.clone();
710 self.write(move |conn| {
711 let mut stmt =
712 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
713 stmt.bind(&id, 1)?;
714 stmt.exec()
715 })
716 .await
717 }
718}
719
720impl Column for ThreadMetadata {
721 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
722 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
723 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
724 let (title, next): (String, i32) = Column::column(statement, next)?;
725 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
726 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
727 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
728 let (folder_paths_order_str, next): (Option<String>, i32) =
729 Column::column(statement, next)?;
730 let (archived, next): (bool, i32) = Column::column(statement, next)?;
731 let (main_worktree_paths_str, next): (Option<String>, i32) =
732 Column::column(statement, next)?;
733 let (main_worktree_paths_order_str, next): (Option<String>, i32) =
734 Column::column(statement, next)?;
735
736 let agent_id = agent_id
737 .map(|id| AgentId::new(id))
738 .unwrap_or(ZED_AGENT_ID.clone());
739
740 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
741 let created_at = created_at_str
742 .as_deref()
743 .map(DateTime::parse_from_rfc3339)
744 .transpose()?
745 .map(|dt| dt.with_timezone(&Utc));
746
747 let folder_paths = folder_paths_str
748 .map(|paths| {
749 PathList::deserialize(&util::path_list::SerializedPathList {
750 paths,
751 order: folder_paths_order_str.unwrap_or_default(),
752 })
753 })
754 .unwrap_or_default();
755
756 let main_worktree_paths = main_worktree_paths_str
757 .map(|paths| {
758 PathList::deserialize(&util::path_list::SerializedPathList {
759 paths,
760 order: main_worktree_paths_order_str.unwrap_or_default(),
761 })
762 })
763 .unwrap_or_default();
764
765 Ok((
766 ThreadMetadata {
767 session_id: acp::SessionId::new(id),
768 agent_id,
769 title: title.into(),
770 updated_at,
771 created_at,
772 folder_paths,
773 main_worktree_paths,
774 archived,
775 },
776 next,
777 ))
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use acp_thread::{AgentConnection, StubAgentConnection};
785 use action_log::ActionLog;
786 use agent::DbThread;
787 use agent_client_protocol as acp;
788 use feature_flags::FeatureFlagAppExt;
789 use gpui::TestAppContext;
790 use project::FakeFs;
791 use project::Project;
792 use std::path::Path;
793 use std::rc::Rc;
794
795 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
796 DbThread {
797 title: title.to_string().into(),
798 messages: Vec::new(),
799 updated_at,
800 detailed_summary: None,
801 initial_project_snapshot: None,
802 cumulative_token_usage: Default::default(),
803 request_token_usage: Default::default(),
804 model: None,
805 profile: None,
806 imported: false,
807 subagent_context: None,
808 speed: None,
809 thinking_enabled: false,
810 thinking_effort: None,
811 draft_prompt: None,
812 ui_scroll_position: None,
813 }
814 }
815
816 fn make_metadata(
817 session_id: &str,
818 title: &str,
819 updated_at: DateTime<Utc>,
820 folder_paths: PathList,
821 ) -> ThreadMetadata {
822 ThreadMetadata {
823 archived: false,
824 session_id: acp::SessionId::new(session_id),
825 agent_id: agent::ZED_AGENT_ID.clone(),
826 title: title.to_string().into(),
827 updated_at,
828 created_at: Some(updated_at),
829 folder_paths,
830 main_worktree_paths: PathList::default(),
831 }
832 }
833
834 fn init_test(cx: &mut TestAppContext) {
835 cx.update(|cx| {
836 let settings_store = settings::SettingsStore::test(cx);
837 cx.set_global(settings_store);
838 cx.update_flags(true, vec!["agent-v2".to_string()]);
839 ThreadMetadataStore::init_global(cx);
840 ThreadStore::init_global(cx);
841 });
842 cx.run_until_parked();
843 }
844
845 #[gpui::test]
846 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
847 let first_paths = PathList::new(&[Path::new("/project-a")]);
848 let second_paths = PathList::new(&[Path::new("/project-b")]);
849 let now = Utc::now();
850 let older = now - chrono::Duration::seconds(1);
851
852 let thread = std::thread::current();
853 let test_name = thread.name().unwrap_or("unknown_test");
854 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
855 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
856 &db_name,
857 )));
858
859 db.save(make_metadata(
860 "session-1",
861 "First Thread",
862 now,
863 first_paths.clone(),
864 ))
865 .await
866 .unwrap();
867 db.save(make_metadata(
868 "session-2",
869 "Second Thread",
870 older,
871 second_paths.clone(),
872 ))
873 .await
874 .unwrap();
875
876 cx.update(|cx| {
877 let settings_store = settings::SettingsStore::test(cx);
878 cx.set_global(settings_store);
879 cx.update_flags(true, vec!["agent-v2".to_string()]);
880 ThreadMetadataStore::init_global(cx);
881 });
882
883 cx.run_until_parked();
884
885 cx.update(|cx| {
886 let store = ThreadMetadataStore::global(cx);
887 let store = store.read(cx);
888
889 let entry_ids = store
890 .entry_ids()
891 .map(|session_id| session_id.0.to_string())
892 .collect::<Vec<_>>();
893 assert_eq!(entry_ids.len(), 2);
894 assert!(entry_ids.contains(&"session-1".to_string()));
895 assert!(entry_ids.contains(&"session-2".to_string()));
896
897 let first_path_entries = store
898 .entries_for_path(&first_paths)
899 .map(|entry| entry.session_id.0.to_string())
900 .collect::<Vec<_>>();
901 assert_eq!(first_path_entries, vec!["session-1"]);
902
903 let second_path_entries = store
904 .entries_for_path(&second_paths)
905 .map(|entry| entry.session_id.0.to_string())
906 .collect::<Vec<_>>();
907 assert_eq!(second_path_entries, vec!["session-2"]);
908 });
909 }
910
911 #[gpui::test]
912 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
913 init_test(cx);
914
915 let first_paths = PathList::new(&[Path::new("/project-a")]);
916 let second_paths = PathList::new(&[Path::new("/project-b")]);
917 let initial_time = Utc::now();
918 let updated_time = initial_time + chrono::Duration::seconds(1);
919
920 let initial_metadata = make_metadata(
921 "session-1",
922 "First Thread",
923 initial_time,
924 first_paths.clone(),
925 );
926
927 let second_metadata = make_metadata(
928 "session-2",
929 "Second Thread",
930 initial_time,
931 second_paths.clone(),
932 );
933
934 cx.update(|cx| {
935 let store = ThreadMetadataStore::global(cx);
936 store.update(cx, |store, cx| {
937 store.save(initial_metadata, cx);
938 store.save(second_metadata, cx);
939 });
940 });
941
942 cx.run_until_parked();
943
944 cx.update(|cx| {
945 let store = ThreadMetadataStore::global(cx);
946 let store = store.read(cx);
947
948 let first_path_entries = store
949 .entries_for_path(&first_paths)
950 .map(|entry| entry.session_id.0.to_string())
951 .collect::<Vec<_>>();
952 assert_eq!(first_path_entries, vec!["session-1"]);
953
954 let second_path_entries = store
955 .entries_for_path(&second_paths)
956 .map(|entry| entry.session_id.0.to_string())
957 .collect::<Vec<_>>();
958 assert_eq!(second_path_entries, vec!["session-2"]);
959 });
960
961 let moved_metadata = make_metadata(
962 "session-1",
963 "First Thread",
964 updated_time,
965 second_paths.clone(),
966 );
967
968 cx.update(|cx| {
969 let store = ThreadMetadataStore::global(cx);
970 store.update(cx, |store, cx| {
971 store.save(moved_metadata, cx);
972 });
973 });
974
975 cx.run_until_parked();
976
977 cx.update(|cx| {
978 let store = ThreadMetadataStore::global(cx);
979 let store = store.read(cx);
980
981 let entry_ids = store
982 .entry_ids()
983 .map(|session_id| session_id.0.to_string())
984 .collect::<Vec<_>>();
985 assert_eq!(entry_ids.len(), 2);
986 assert!(entry_ids.contains(&"session-1".to_string()));
987 assert!(entry_ids.contains(&"session-2".to_string()));
988
989 let first_path_entries = store
990 .entries_for_path(&first_paths)
991 .map(|entry| entry.session_id.0.to_string())
992 .collect::<Vec<_>>();
993 assert!(first_path_entries.is_empty());
994
995 let second_path_entries = store
996 .entries_for_path(&second_paths)
997 .map(|entry| entry.session_id.0.to_string())
998 .collect::<Vec<_>>();
999 assert_eq!(second_path_entries.len(), 2);
1000 assert!(second_path_entries.contains(&"session-1".to_string()));
1001 assert!(second_path_entries.contains(&"session-2".to_string()));
1002 });
1003
1004 cx.update(|cx| {
1005 let store = ThreadMetadataStore::global(cx);
1006 store.update(cx, |store, cx| {
1007 store.delete(acp::SessionId::new("session-2"), cx);
1008 });
1009 });
1010
1011 cx.run_until_parked();
1012
1013 cx.update(|cx| {
1014 let store = ThreadMetadataStore::global(cx);
1015 let store = store.read(cx);
1016
1017 let entry_ids = store
1018 .entry_ids()
1019 .map(|session_id| session_id.0.to_string())
1020 .collect::<Vec<_>>();
1021 assert_eq!(entry_ids, vec!["session-1"]);
1022
1023 let second_path_entries = store
1024 .entries_for_path(&second_paths)
1025 .map(|entry| entry.session_id.0.to_string())
1026 .collect::<Vec<_>>();
1027 assert_eq!(second_path_entries, vec!["session-1"]);
1028 });
1029 }
1030
1031 #[gpui::test]
1032 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1033 init_test(cx);
1034
1035 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1036 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1037 let now = Utc::now();
1038
1039 let existing_metadata = ThreadMetadata {
1040 session_id: acp::SessionId::new("a-session-0"),
1041 agent_id: agent::ZED_AGENT_ID.clone(),
1042 title: "Existing Metadata".into(),
1043 updated_at: now - chrono::Duration::seconds(10),
1044 created_at: Some(now - chrono::Duration::seconds(10)),
1045 folder_paths: project_a_paths.clone(),
1046 main_worktree_paths: PathList::default(),
1047 archived: false,
1048 };
1049
1050 cx.update(|cx| {
1051 let store = ThreadMetadataStore::global(cx);
1052 store.update(cx, |store, cx| {
1053 store.save(existing_metadata, cx);
1054 });
1055 });
1056 cx.run_until_parked();
1057
1058 let threads_to_save = vec![
1059 (
1060 "a-session-0",
1061 "Thread A0 From Native Store",
1062 project_a_paths.clone(),
1063 now,
1064 ),
1065 (
1066 "a-session-1",
1067 "Thread A1",
1068 project_a_paths.clone(),
1069 now + chrono::Duration::seconds(1),
1070 ),
1071 (
1072 "b-session-0",
1073 "Thread B0",
1074 project_b_paths.clone(),
1075 now + chrono::Duration::seconds(2),
1076 ),
1077 (
1078 "projectless",
1079 "Projectless",
1080 PathList::default(),
1081 now + chrono::Duration::seconds(3),
1082 ),
1083 ];
1084
1085 for (session_id, title, paths, updated_at) in &threads_to_save {
1086 let save_task = cx.update(|cx| {
1087 let thread_store = ThreadStore::global(cx);
1088 let session_id = session_id.to_string();
1089 let title = title.to_string();
1090 let paths = paths.clone();
1091 thread_store.update(cx, |store, cx| {
1092 store.save_thread(
1093 acp::SessionId::new(session_id),
1094 make_db_thread(&title, *updated_at),
1095 paths,
1096 cx,
1097 )
1098 })
1099 });
1100 save_task.await.unwrap();
1101 cx.run_until_parked();
1102 }
1103
1104 cx.update(|cx| migrate_thread_metadata(cx));
1105 cx.run_until_parked();
1106
1107 let list = cx.update(|cx| {
1108 let store = ThreadMetadataStore::global(cx);
1109 store.read(cx).entries().cloned().collect::<Vec<_>>()
1110 });
1111
1112 assert_eq!(list.len(), 4);
1113 assert!(
1114 list.iter()
1115 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1116 );
1117
1118 let existing_metadata = list
1119 .iter()
1120 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1121 .unwrap();
1122 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1123 assert!(!existing_metadata.archived);
1124
1125 let migrated_session_ids = list
1126 .iter()
1127 .map(|metadata| metadata.session_id.0.as_ref())
1128 .collect::<Vec<_>>();
1129 assert!(migrated_session_ids.contains(&"a-session-1"));
1130 assert!(migrated_session_ids.contains(&"b-session-0"));
1131 assert!(migrated_session_ids.contains(&"projectless"));
1132
1133 let migrated_entries = list
1134 .iter()
1135 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1136 .collect::<Vec<_>>();
1137 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1138 }
1139
1140 #[gpui::test]
1141 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1142 cx: &mut TestAppContext,
1143 ) {
1144 init_test(cx);
1145
1146 let project_paths = PathList::new(&[Path::new("/project-a")]);
1147 let existing_updated_at = Utc::now();
1148
1149 let existing_metadata = ThreadMetadata {
1150 session_id: acp::SessionId::new("existing-session"),
1151 agent_id: agent::ZED_AGENT_ID.clone(),
1152 title: "Existing Metadata".into(),
1153 updated_at: existing_updated_at,
1154 created_at: Some(existing_updated_at),
1155 folder_paths: project_paths.clone(),
1156 main_worktree_paths: PathList::default(),
1157 archived: false,
1158 };
1159
1160 cx.update(|cx| {
1161 let store = ThreadMetadataStore::global(cx);
1162 store.update(cx, |store, cx| {
1163 store.save(existing_metadata, cx);
1164 });
1165 });
1166 cx.run_until_parked();
1167
1168 let save_task = cx.update(|cx| {
1169 let thread_store = ThreadStore::global(cx);
1170 thread_store.update(cx, |store, cx| {
1171 store.save_thread(
1172 acp::SessionId::new("existing-session"),
1173 make_db_thread(
1174 "Updated Native Thread Title",
1175 existing_updated_at + chrono::Duration::seconds(1),
1176 ),
1177 project_paths.clone(),
1178 cx,
1179 )
1180 })
1181 });
1182 save_task.await.unwrap();
1183 cx.run_until_parked();
1184
1185 cx.update(|cx| migrate_thread_metadata(cx));
1186 cx.run_until_parked();
1187
1188 let list = cx.update(|cx| {
1189 let store = ThreadMetadataStore::global(cx);
1190 store.read(cx).entries().cloned().collect::<Vec<_>>()
1191 });
1192
1193 assert_eq!(list.len(), 1);
1194 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1195 }
1196
1197 #[gpui::test]
1198 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1199 cx: &mut TestAppContext,
1200 ) {
1201 init_test(cx);
1202
1203 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1204 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1205 let now = Utc::now();
1206
1207 // Create 7 threads for project A and 3 for project B
1208 let mut threads_to_save = Vec::new();
1209 for i in 0..7 {
1210 threads_to_save.push((
1211 format!("a-session-{i}"),
1212 format!("Thread A{i}"),
1213 project_a_paths.clone(),
1214 now + chrono::Duration::seconds(i as i64),
1215 ));
1216 }
1217 for i in 0..3 {
1218 threads_to_save.push((
1219 format!("b-session-{i}"),
1220 format!("Thread B{i}"),
1221 project_b_paths.clone(),
1222 now + chrono::Duration::seconds(i as i64),
1223 ));
1224 }
1225
1226 for (session_id, title, paths, updated_at) in &threads_to_save {
1227 let save_task = cx.update(|cx| {
1228 let thread_store = ThreadStore::global(cx);
1229 let session_id = session_id.to_string();
1230 let title = title.to_string();
1231 let paths = paths.clone();
1232 thread_store.update(cx, |store, cx| {
1233 store.save_thread(
1234 acp::SessionId::new(session_id),
1235 make_db_thread(&title, *updated_at),
1236 paths,
1237 cx,
1238 )
1239 })
1240 });
1241 save_task.await.unwrap();
1242 cx.run_until_parked();
1243 }
1244
1245 cx.update(|cx| migrate_thread_metadata(cx));
1246 cx.run_until_parked();
1247
1248 let list = cx.update(|cx| {
1249 let store = ThreadMetadataStore::global(cx);
1250 store.read(cx).entries().cloned().collect::<Vec<_>>()
1251 });
1252
1253 assert_eq!(list.len(), 10);
1254
1255 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1256 let mut project_a_entries: Vec<_> = list
1257 .iter()
1258 .filter(|m| m.folder_paths == project_a_paths)
1259 .collect();
1260 assert_eq!(project_a_entries.len(), 7);
1261 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1262
1263 for entry in &project_a_entries[..5] {
1264 assert!(
1265 !entry.archived,
1266 "Expected {} to be unarchived (top 5 most recent)",
1267 entry.session_id.0
1268 );
1269 }
1270 for entry in &project_a_entries[5..] {
1271 assert!(
1272 entry.archived,
1273 "Expected {} to be archived (older than top 5)",
1274 entry.session_id.0
1275 );
1276 }
1277
1278 // Project B: all 3 should be unarchived (under the limit)
1279 let project_b_entries: Vec<_> = list
1280 .iter()
1281 .filter(|m| m.folder_paths == project_b_paths)
1282 .collect();
1283 assert_eq!(project_b_entries.len(), 3);
1284 assert!(project_b_entries.iter().all(|m| !m.archived));
1285 }
1286
1287 #[gpui::test]
1288 async fn test_empty_thread_events_do_not_create_metadata(cx: &mut TestAppContext) {
1289 init_test(cx);
1290
1291 let fs = FakeFs::new(cx.executor());
1292 let project = Project::test(fs, None::<&Path>, cx).await;
1293 let connection = Rc::new(StubAgentConnection::new());
1294
1295 let thread = cx
1296 .update(|cx| {
1297 connection
1298 .clone()
1299 .new_session(project.clone(), PathList::default(), cx)
1300 })
1301 .await
1302 .unwrap();
1303 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1304
1305 cx.update(|cx| {
1306 thread.update(cx, |thread, cx| {
1307 thread.set_title("Draft Thread".into(), cx).detach();
1308 });
1309 });
1310 cx.run_until_parked();
1311
1312 let metadata_ids = cx.update(|cx| {
1313 ThreadMetadataStore::global(cx)
1314 .read(cx)
1315 .entry_ids()
1316 .collect::<Vec<_>>()
1317 });
1318 assert!(
1319 metadata_ids.is_empty(),
1320 "expected empty draft thread title updates to be ignored"
1321 );
1322
1323 cx.update(|cx| {
1324 thread.update(cx, |thread, cx| {
1325 thread.push_user_content_block(None, "Hello".into(), cx);
1326 });
1327 });
1328 cx.run_until_parked();
1329
1330 let metadata_ids = cx.update(|cx| {
1331 ThreadMetadataStore::global(cx)
1332 .read(cx)
1333 .entry_ids()
1334 .collect::<Vec<_>>()
1335 });
1336 assert_eq!(metadata_ids, vec![session_id]);
1337 }
1338
1339 #[gpui::test]
1340 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1341 init_test(cx);
1342
1343 let fs = FakeFs::new(cx.executor());
1344 let project = Project::test(fs, None::<&Path>, cx).await;
1345 let connection = Rc::new(StubAgentConnection::new());
1346
1347 let thread = cx
1348 .update(|cx| {
1349 connection
1350 .clone()
1351 .new_session(project.clone(), PathList::default(), cx)
1352 })
1353 .await
1354 .unwrap();
1355 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1356
1357 cx.update(|cx| {
1358 thread.update(cx, |thread, cx| {
1359 thread.push_user_content_block(None, "Hello".into(), cx);
1360 });
1361 });
1362 cx.run_until_parked();
1363
1364 let metadata_ids = cx.update(|cx| {
1365 ThreadMetadataStore::global(cx)
1366 .read(cx)
1367 .entry_ids()
1368 .collect::<Vec<_>>()
1369 });
1370 assert_eq!(metadata_ids, vec![session_id.clone()]);
1371
1372 drop(thread);
1373 cx.update(|_| {});
1374 cx.run_until_parked();
1375
1376 let metadata_ids = cx.update(|cx| {
1377 ThreadMetadataStore::global(cx)
1378 .read(cx)
1379 .entry_ids()
1380 .collect::<Vec<_>>()
1381 });
1382 assert_eq!(metadata_ids, vec![session_id]);
1383 }
1384
1385 #[gpui::test]
1386 async fn test_threads_without_project_association_are_archived_by_default(
1387 cx: &mut TestAppContext,
1388 ) {
1389 init_test(cx);
1390
1391 let fs = FakeFs::new(cx.executor());
1392 let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1393 let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1394 let connection = Rc::new(StubAgentConnection::new());
1395
1396 let thread_without_worktree = cx
1397 .update(|cx| {
1398 connection.clone().new_session(
1399 project_without_worktree.clone(),
1400 PathList::default(),
1401 cx,
1402 )
1403 })
1404 .await
1405 .unwrap();
1406 let session_without_worktree =
1407 cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1408
1409 cx.update(|cx| {
1410 thread_without_worktree.update(cx, |thread, cx| {
1411 thread.push_user_content_block(None, "content".into(), cx);
1412 thread.set_title("No Project Thread".into(), cx).detach();
1413 });
1414 });
1415 cx.run_until_parked();
1416
1417 let thread_with_worktree = cx
1418 .update(|cx| {
1419 connection.clone().new_session(
1420 project_with_worktree.clone(),
1421 PathList::default(),
1422 cx,
1423 )
1424 })
1425 .await
1426 .unwrap();
1427 let session_with_worktree =
1428 cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1429
1430 cx.update(|cx| {
1431 thread_with_worktree.update(cx, |thread, cx| {
1432 thread.push_user_content_block(None, "content".into(), cx);
1433 thread.set_title("Project Thread".into(), cx).detach();
1434 });
1435 });
1436 cx.run_until_parked();
1437
1438 cx.update(|cx| {
1439 let store = ThreadMetadataStore::global(cx);
1440 let store = store.read(cx);
1441
1442 let without_worktree = store
1443 .entry(&session_without_worktree)
1444 .expect("missing metadata for thread without project association");
1445 assert!(without_worktree.folder_paths.is_empty());
1446 assert!(
1447 without_worktree.archived,
1448 "expected thread without project association to be archived"
1449 );
1450
1451 let with_worktree = store
1452 .entry(&session_with_worktree)
1453 .expect("missing metadata for thread with project association");
1454 assert_eq!(
1455 with_worktree.folder_paths,
1456 PathList::new(&[Path::new("/project-a")])
1457 );
1458 assert!(
1459 !with_worktree.archived,
1460 "expected thread with project association to remain unarchived"
1461 );
1462 });
1463 }
1464
1465 #[gpui::test]
1466 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1467 init_test(cx);
1468
1469 let fs = FakeFs::new(cx.executor());
1470 let project = Project::test(fs, None::<&Path>, cx).await;
1471 let connection = Rc::new(StubAgentConnection::new());
1472
1473 // Create a regular (non-subagent) AcpThread.
1474 let regular_thread = cx
1475 .update(|cx| {
1476 connection
1477 .clone()
1478 .new_session(project.clone(), PathList::default(), cx)
1479 })
1480 .await
1481 .unwrap();
1482
1483 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1484
1485 // Set a title on the regular thread to trigger a save via handle_thread_update.
1486 cx.update(|cx| {
1487 regular_thread.update(cx, |thread, cx| {
1488 thread.push_user_content_block(None, "content".into(), cx);
1489 thread.set_title("Regular Thread".into(), cx).detach();
1490 });
1491 });
1492 cx.run_until_parked();
1493
1494 // Create a subagent AcpThread
1495 let subagent_session_id = acp::SessionId::new("subagent-session");
1496 let subagent_thread = cx.update(|cx| {
1497 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1498 cx.new(|cx| {
1499 acp_thread::AcpThread::new(
1500 Some(regular_session_id.clone()),
1501 Some("Subagent Thread".into()),
1502 None,
1503 connection.clone(),
1504 project.clone(),
1505 action_log,
1506 subagent_session_id.clone(),
1507 watch::Receiver::constant(acp::PromptCapabilities::new()),
1508 cx,
1509 )
1510 })
1511 });
1512
1513 // Set a title on the subagent thread to trigger handle_thread_update.
1514 cx.update(|cx| {
1515 subagent_thread.update(cx, |thread, cx| {
1516 thread
1517 .set_title("Subagent Thread Title".into(), cx)
1518 .detach();
1519 });
1520 });
1521 cx.run_until_parked();
1522
1523 // List all metadata from the store cache.
1524 let list = cx.update(|cx| {
1525 let store = ThreadMetadataStore::global(cx);
1526 store.read(cx).entries().cloned().collect::<Vec<_>>()
1527 });
1528
1529 // The subagent thread should NOT appear in the sidebar metadata.
1530 // Only the regular thread should be listed.
1531 assert_eq!(
1532 list.len(),
1533 1,
1534 "Expected only the regular thread in sidebar metadata, \
1535 but found {} entries (subagent threads are leaking into the sidebar)",
1536 list.len(),
1537 );
1538 assert_eq!(list[0].session_id, regular_session_id);
1539 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1540 }
1541
1542 #[test]
1543 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1544 let now = Utc::now();
1545
1546 let operations = vec![
1547 DbOperation::Upsert(make_metadata(
1548 "session-1",
1549 "First Thread",
1550 now,
1551 PathList::default(),
1552 )),
1553 DbOperation::Delete(acp::SessionId::new("session-1")),
1554 ];
1555
1556 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1557
1558 assert_eq!(deduped.len(), 1);
1559 assert_eq!(
1560 deduped[0],
1561 DbOperation::Delete(acp::SessionId::new("session-1"))
1562 );
1563 }
1564
1565 #[test]
1566 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1567 let now = Utc::now();
1568 let later = now + chrono::Duration::seconds(1);
1569
1570 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1571 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1572
1573 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1574 DbOperation::Upsert(old_metadata),
1575 DbOperation::Upsert(new_metadata.clone()),
1576 ]);
1577
1578 assert_eq!(deduped.len(), 1);
1579 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1580 }
1581
1582 #[test]
1583 fn test_dedup_db_operations_preserves_distinct_sessions() {
1584 let now = Utc::now();
1585
1586 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1587 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1588 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1589 DbOperation::Upsert(metadata1.clone()),
1590 DbOperation::Upsert(metadata2.clone()),
1591 ]);
1592
1593 assert_eq!(deduped.len(), 2);
1594 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1595 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1596 }
1597
1598 #[gpui::test]
1599 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1600 init_test(cx);
1601
1602 let paths = PathList::new(&[Path::new("/project-a")]);
1603 let now = Utc::now();
1604 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1605
1606 cx.update(|cx| {
1607 let store = ThreadMetadataStore::global(cx);
1608 store.update(cx, |store, cx| {
1609 store.save(metadata, cx);
1610 });
1611 });
1612
1613 cx.run_until_parked();
1614
1615 cx.update(|cx| {
1616 let store = ThreadMetadataStore::global(cx);
1617 let store = store.read(cx);
1618
1619 let path_entries = store
1620 .entries_for_path(&paths)
1621 .map(|e| e.session_id.0.to_string())
1622 .collect::<Vec<_>>();
1623 assert_eq!(path_entries, vec!["session-1"]);
1624
1625 let archived = store
1626 .archived_entries()
1627 .map(|e| e.session_id.0.to_string())
1628 .collect::<Vec<_>>();
1629 assert!(archived.is_empty());
1630 });
1631
1632 cx.update(|cx| {
1633 let store = ThreadMetadataStore::global(cx);
1634 store.update(cx, |store, cx| {
1635 store.archive(&acp::SessionId::new("session-1"), cx);
1636 });
1637 });
1638
1639 cx.run_until_parked();
1640
1641 cx.update(|cx| {
1642 let store = ThreadMetadataStore::global(cx);
1643 let store = store.read(cx);
1644
1645 let path_entries = store
1646 .entries_for_path(&paths)
1647 .map(|e| e.session_id.0.to_string())
1648 .collect::<Vec<_>>();
1649 assert!(path_entries.is_empty());
1650
1651 let archived = store.archived_entries().collect::<Vec<_>>();
1652 assert_eq!(archived.len(), 1);
1653 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1654 assert!(archived[0].archived);
1655 });
1656
1657 cx.update(|cx| {
1658 let store = ThreadMetadataStore::global(cx);
1659 store.update(cx, |store, cx| {
1660 store.unarchive(&acp::SessionId::new("session-1"), cx);
1661 });
1662 });
1663
1664 cx.run_until_parked();
1665
1666 cx.update(|cx| {
1667 let store = ThreadMetadataStore::global(cx);
1668 let store = store.read(cx);
1669
1670 let path_entries = store
1671 .entries_for_path(&paths)
1672 .map(|e| e.session_id.0.to_string())
1673 .collect::<Vec<_>>();
1674 assert_eq!(path_entries, vec!["session-1"]);
1675
1676 let archived = store
1677 .archived_entries()
1678 .map(|e| e.session_id.0.to_string())
1679 .collect::<Vec<_>>();
1680 assert!(archived.is_empty());
1681 });
1682 }
1683
1684 #[gpui::test]
1685 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1686 init_test(cx);
1687
1688 let paths = PathList::new(&[Path::new("/project-a")]);
1689 let now = Utc::now();
1690
1691 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1692 let metadata2 = make_metadata(
1693 "session-2",
1694 "Archived Thread",
1695 now - chrono::Duration::seconds(1),
1696 paths.clone(),
1697 );
1698
1699 cx.update(|cx| {
1700 let store = ThreadMetadataStore::global(cx);
1701 store.update(cx, |store, cx| {
1702 store.save(metadata1, cx);
1703 store.save(metadata2, cx);
1704 });
1705 });
1706
1707 cx.run_until_parked();
1708
1709 cx.update(|cx| {
1710 let store = ThreadMetadataStore::global(cx);
1711 store.update(cx, |store, cx| {
1712 store.archive(&acp::SessionId::new("session-2"), cx);
1713 });
1714 });
1715
1716 cx.run_until_parked();
1717
1718 cx.update(|cx| {
1719 let store = ThreadMetadataStore::global(cx);
1720 let store = store.read(cx);
1721
1722 let path_entries = store
1723 .entries_for_path(&paths)
1724 .map(|e| e.session_id.0.to_string())
1725 .collect::<Vec<_>>();
1726 assert_eq!(path_entries, vec!["session-1"]);
1727
1728 let all_entries = store
1729 .entries()
1730 .map(|e| e.session_id.0.to_string())
1731 .collect::<Vec<_>>();
1732 assert_eq!(all_entries.len(), 2);
1733 assert!(all_entries.contains(&"session-1".to_string()));
1734 assert!(all_entries.contains(&"session-2".to_string()));
1735
1736 let archived = store
1737 .archived_entries()
1738 .map(|e| e.session_id.0.to_string())
1739 .collect::<Vec<_>>();
1740 assert_eq!(archived, vec!["session-2"]);
1741 });
1742 }
1743
1744 #[gpui::test]
1745 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1746 init_test(cx);
1747
1748 let paths = PathList::new(&[Path::new("/project-a")]);
1749 let now = Utc::now();
1750
1751 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1752 let m2 = make_metadata(
1753 "session-2",
1754 "Thread Two",
1755 now - chrono::Duration::seconds(1),
1756 paths.clone(),
1757 );
1758 let m3 = make_metadata(
1759 "session-3",
1760 "Thread Three",
1761 now - chrono::Duration::seconds(2),
1762 paths,
1763 );
1764
1765 cx.update(|cx| {
1766 let store = ThreadMetadataStore::global(cx);
1767 store.update(cx, |store, cx| {
1768 store.save_all(vec![m1, m2, m3], cx);
1769 });
1770 });
1771
1772 cx.run_until_parked();
1773
1774 cx.update(|cx| {
1775 let store = ThreadMetadataStore::global(cx);
1776 let store = store.read(cx);
1777
1778 let all_entries = store
1779 .entries()
1780 .map(|e| e.session_id.0.to_string())
1781 .collect::<Vec<_>>();
1782 assert_eq!(all_entries.len(), 3);
1783 assert!(all_entries.contains(&"session-1".to_string()));
1784 assert!(all_entries.contains(&"session-2".to_string()));
1785 assert!(all_entries.contains(&"session-3".to_string()));
1786
1787 let entry_ids = store.entry_ids().collect::<Vec<_>>();
1788 assert_eq!(entry_ids.len(), 3);
1789 });
1790 }
1791
1792 #[gpui::test]
1793 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1794 init_test(cx);
1795
1796 let paths = PathList::new(&[Path::new("/project-a")]);
1797 let now = Utc::now();
1798 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1799
1800 cx.update(|cx| {
1801 let store = ThreadMetadataStore::global(cx);
1802 store.update(cx, |store, cx| {
1803 store.save(metadata, cx);
1804 });
1805 });
1806
1807 cx.run_until_parked();
1808
1809 cx.update(|cx| {
1810 let store = ThreadMetadataStore::global(cx);
1811 store.update(cx, |store, cx| {
1812 store.archive(&acp::SessionId::new("session-1"), cx);
1813 });
1814 });
1815
1816 cx.run_until_parked();
1817
1818 cx.update(|cx| {
1819 let store = ThreadMetadataStore::global(cx);
1820 store.update(cx, |store, cx| {
1821 let _ = store.reload(cx);
1822 });
1823 });
1824
1825 cx.run_until_parked();
1826
1827 cx.update(|cx| {
1828 let store = ThreadMetadataStore::global(cx);
1829 let store = store.read(cx);
1830
1831 let thread = store
1832 .entries()
1833 .find(|e| e.session_id.0.as_ref() == "session-1")
1834 .expect("thread should exist after reload");
1835 assert!(thread.archived);
1836
1837 let path_entries = store
1838 .entries_for_path(&paths)
1839 .map(|e| e.session_id.0.to_string())
1840 .collect::<Vec<_>>();
1841 assert!(path_entries.is_empty());
1842
1843 let archived = store
1844 .archived_entries()
1845 .map(|e| e.session_id.0.to_string())
1846 .collect::<Vec<_>>();
1847 assert_eq!(archived, vec!["session-1"]);
1848 });
1849 }
1850
1851 #[gpui::test]
1852 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1853 init_test(cx);
1854
1855 cx.run_until_parked();
1856
1857 cx.update(|cx| {
1858 let store = ThreadMetadataStore::global(cx);
1859 store.update(cx, |store, cx| {
1860 store.archive(&acp::SessionId::new("nonexistent"), cx);
1861 });
1862 });
1863
1864 cx.run_until_parked();
1865
1866 cx.update(|cx| {
1867 let store = ThreadMetadataStore::global(cx);
1868 let store = store.read(cx);
1869
1870 assert!(store.is_empty());
1871 assert_eq!(store.entries().count(), 0);
1872 assert_eq!(store.archived_entries().count(), 0);
1873 });
1874 }
1875
1876 #[gpui::test]
1877 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1878 init_test(cx);
1879
1880 let paths = PathList::new(&[Path::new("/project-a")]);
1881 let now = Utc::now();
1882 let metadata = make_metadata("session-1", "Thread 1", now, paths);
1883 let session_id = metadata.session_id.clone();
1884
1885 cx.update(|cx| {
1886 let store = ThreadMetadataStore::global(cx);
1887 store.update(cx, |store, cx| {
1888 store.save(metadata.clone(), cx);
1889 store.archive(&session_id, cx);
1890 });
1891 });
1892
1893 cx.run_until_parked();
1894
1895 cx.update(|cx| {
1896 let store = ThreadMetadataStore::global(cx);
1897 let store = store.read(cx);
1898
1899 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1900 pretty_assertions::assert_eq!(
1901 entries,
1902 vec![ThreadMetadata {
1903 archived: true,
1904 ..metadata
1905 }]
1906 );
1907 });
1908 }
1909}