1use std::{path::Path, sync::Arc};
2
3use acp_thread::AcpThreadEvent;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::Context as _;
7use chrono::{DateTime, Utc};
8use collections::{HashMap, HashSet};
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 ThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We skip migrating threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 let store = ThreadMetadataStore::global(cx);
46 let db = store.read(cx).db.clone();
47
48 cx.spawn(async move |cx| {
49 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
50
51 let is_first_migration = existing_entries.is_empty();
52
53 let mut to_migrate = store.read_with(cx, |_store, cx| {
54 ThreadStore::global(cx)
55 .read(cx)
56 .entries()
57 .filter_map(|entry| {
58 if existing_entries.contains(&entry.id.0) {
59 return None;
60 }
61
62 Some(ThreadMetadata {
63 session_id: entry.id,
64 agent_id: ZED_AGENT_ID.clone(),
65 title: entry.title,
66 updated_at: entry.updated_at,
67 created_at: entry.created_at,
68 folder_paths: entry.folder_paths,
69 main_worktree_paths: PathList::default(),
70 archived: true,
71 })
72 })
73 .collect::<Vec<_>>()
74 });
75
76 if to_migrate.is_empty() {
77 return anyhow::Ok(());
78 }
79
80 // On the first migration (no entries in DB yet), keep the 5 most
81 // recent threads per project unarchived.
82 if is_first_migration {
83 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
84 for entry in &mut to_migrate {
85 if entry.folder_paths.is_empty() {
86 continue;
87 }
88 per_project
89 .entry(entry.folder_paths.clone())
90 .or_default()
91 .push(entry);
92 }
93 for entries in per_project.values_mut() {
94 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
95 for entry in entries.iter_mut().take(5) {
96 entry.archived = false;
97 }
98 }
99 }
100
101 log::info!("Migrating {} thread store entries", to_migrate.len());
102
103 // Manually save each entry to the database and call reload, otherwise
104 // we'll end up triggering lots of reloads after each save
105 for entry in to_migrate {
106 db.save(entry).await?;
107 }
108
109 log::info!("Finished migrating thread store entries");
110
111 let _ = store.update(cx, |store, cx| store.reload(cx));
112 anyhow::Ok(())
113 })
114 .detach_and_log_err(cx);
115}
116
117struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
118impl Global for GlobalThreadMetadataStore {}
119
120/// Lightweight metadata for any thread (native or ACP), enough to populate
121/// the sidebar list and route to the correct load path when clicked.
122#[derive(Debug, Clone, PartialEq)]
123pub struct ThreadMetadata {
124 pub session_id: acp::SessionId,
125 pub agent_id: AgentId,
126 pub title: SharedString,
127 pub updated_at: DateTime<Utc>,
128 pub created_at: Option<DateTime<Utc>>,
129 pub folder_paths: PathList,
130 pub main_worktree_paths: PathList,
131 pub archived: bool,
132}
133
134impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
135 fn from(meta: &ThreadMetadata) -> Self {
136 Self {
137 session_id: meta.session_id.clone(),
138 work_dirs: Some(meta.folder_paths.clone()),
139 title: Some(meta.title.clone()),
140 updated_at: Some(meta.updated_at),
141 created_at: meta.created_at,
142 meta: None,
143 }
144 }
145}
146
147/// The store holds all metadata needed to show threads in the sidebar/the archive.
148///
149/// Automatically listens to AcpThread events and updates metadata if it has changed.
150pub struct ThreadMetadataStore {
151 db: ThreadMetadataDb,
152 threads: HashMap<acp::SessionId, ThreadMetadata>,
153 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
154 threads_by_main_paths: HashMap<PathList, HashSet<acp::SessionId>>,
155 reload_task: Option<Shared<Task<()>>>,
156 session_subscriptions: HashMap<acp::SessionId, Subscription>,
157 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
158 _db_operations_task: Task<()>,
159}
160
161#[derive(Debug, PartialEq)]
162enum DbOperation {
163 Upsert(ThreadMetadata),
164 Delete(acp::SessionId),
165}
166
167impl DbOperation {
168 fn id(&self) -> &acp::SessionId {
169 match self {
170 DbOperation::Upsert(thread) => &thread.session_id,
171 DbOperation::Delete(session_id) => session_id,
172 }
173 }
174}
175
176impl ThreadMetadataStore {
177 #[cfg(not(any(test, feature = "test-support")))]
178 pub fn init_global(cx: &mut App) {
179 if cx.has_global::<Self>() {
180 return;
181 }
182
183 let db = ThreadMetadataDb::global(cx);
184 let thread_store = cx.new(|cx| Self::new(db, cx));
185 cx.set_global(GlobalThreadMetadataStore(thread_store));
186 }
187
188 #[cfg(any(test, feature = "test-support"))]
189 pub fn init_global(cx: &mut App) {
190 let thread = std::thread::current();
191 let test_name = thread.name().unwrap_or("unknown_test");
192 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
193 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
194 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
195 cx.set_global(GlobalThreadMetadataStore(thread_store));
196 }
197
198 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
199 cx.try_global::<GlobalThreadMetadataStore>()
200 .map(|store| store.0.clone())
201 }
202
203 pub fn global(cx: &App) -> Entity<Self> {
204 cx.global::<GlobalThreadMetadataStore>().0.clone()
205 }
206
207 pub fn is_empty(&self) -> bool {
208 self.threads.is_empty()
209 }
210
211 /// Returns all thread IDs.
212 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
213 self.threads.keys().cloned()
214 }
215
216 /// Returns the metadata for a specific thread, if it exists.
217 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
218 self.threads.get(session_id)
219 }
220
221 /// Returns all threads.
222 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
223 self.threads.values()
224 }
225
226 /// Returns all archived threads.
227 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
228 self.entries().filter(|t| t.archived)
229 }
230
231 /// Returns all threads for the given path list, excluding archived threads.
232 pub fn entries_for_path(
233 &self,
234 path_list: &PathList,
235 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
236 self.threads_by_paths
237 .get(path_list)
238 .into_iter()
239 .flatten()
240 .filter_map(|s| self.threads.get(s))
241 .filter(|s| !s.archived)
242 }
243
244 /// Returns threads whose `main_worktree_paths` matches the given path list,
245 /// excluding archived threads. This finds threads that were opened in a
246 /// linked worktree but are associated with the given main worktree.
247 pub fn entries_for_main_worktree_path(
248 &self,
249 path_list: &PathList,
250 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
251 self.threads_by_main_paths
252 .get(path_list)
253 .into_iter()
254 .flatten()
255 .filter_map(|s| self.threads.get(s))
256 .filter(|s| !s.archived)
257 }
258
259 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
260 let db = self.db.clone();
261 self.reload_task.take();
262
263 let list_task = cx
264 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
265
266 let reload_task = cx
267 .spawn(async move |this, cx| {
268 let Some(rows) = list_task.await.log_err() else {
269 return;
270 };
271
272 this.update(cx, |this, cx| {
273 this.threads.clear();
274 this.threads_by_paths.clear();
275 this.threads_by_main_paths.clear();
276
277 for row in rows {
278 this.threads_by_paths
279 .entry(row.folder_paths.clone())
280 .or_default()
281 .insert(row.session_id.clone());
282 if !row.main_worktree_paths.is_empty() {
283 this.threads_by_main_paths
284 .entry(row.main_worktree_paths.clone())
285 .or_default()
286 .insert(row.session_id.clone());
287 }
288 this.threads.insert(row.session_id.clone(), row);
289 }
290
291 cx.notify();
292 })
293 .ok();
294 })
295 .shared();
296 self.reload_task = Some(reload_task.clone());
297 reload_task
298 }
299
300 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
301 if !cx.has_flag::<AgentV2FeatureFlag>() {
302 return;
303 }
304
305 for metadata in metadata {
306 self.save_internal(metadata);
307 }
308 cx.notify();
309 }
310
311 #[cfg(any(test, feature = "test-support"))]
312 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
313 self.save(metadata, cx)
314 }
315
316 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
317 if !cx.has_flag::<AgentV2FeatureFlag>() {
318 return;
319 }
320
321 self.save_internal(metadata);
322 cx.notify();
323 }
324
325 fn save_internal(&mut self, metadata: ThreadMetadata) {
326 if let Some(thread) = self.threads.get(&metadata.session_id) {
327 if thread.folder_paths != metadata.folder_paths {
328 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
329 session_ids.remove(&metadata.session_id);
330 }
331 }
332 if thread.main_worktree_paths != metadata.main_worktree_paths
333 && !thread.main_worktree_paths.is_empty()
334 {
335 if let Some(session_ids) = self
336 .threads_by_main_paths
337 .get_mut(&thread.main_worktree_paths)
338 {
339 session_ids.remove(&metadata.session_id);
340 }
341 }
342 }
343
344 self.threads
345 .insert(metadata.session_id.clone(), metadata.clone());
346
347 self.threads_by_paths
348 .entry(metadata.folder_paths.clone())
349 .or_default()
350 .insert(metadata.session_id.clone());
351
352 if !metadata.main_worktree_paths.is_empty() {
353 self.threads_by_main_paths
354 .entry(metadata.main_worktree_paths.clone())
355 .or_default()
356 .insert(metadata.session_id.clone());
357 }
358
359 self.pending_thread_ops_tx
360 .try_send(DbOperation::Upsert(metadata))
361 .log_err();
362 }
363
364 pub fn update_working_directories(
365 &mut self,
366 session_id: &acp::SessionId,
367 work_dirs: PathList,
368 cx: &mut Context<Self>,
369 ) {
370 if !cx.has_flag::<AgentV2FeatureFlag>() {
371 return;
372 }
373
374 if let Some(thread) = self.threads.get(session_id) {
375 self.save_internal(ThreadMetadata {
376 folder_paths: work_dirs,
377 ..thread.clone()
378 });
379 cx.notify();
380 }
381 }
382
383 pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
384 self.update_archived(session_id, true, cx);
385 }
386
387 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
388 self.update_archived(session_id, false, cx);
389 }
390
391 fn update_archived(
392 &mut self,
393 session_id: &acp::SessionId,
394 archived: bool,
395 cx: &mut Context<Self>,
396 ) {
397 if !cx.has_flag::<AgentV2FeatureFlag>() {
398 return;
399 }
400
401 if let Some(thread) = self.threads.get(session_id) {
402 self.save_internal(ThreadMetadata {
403 archived,
404 ..thread.clone()
405 });
406 cx.notify();
407 }
408 }
409
410 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
411 if !cx.has_flag::<AgentV2FeatureFlag>() {
412 return;
413 }
414
415 if let Some(thread) = self.threads.get(&session_id) {
416 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
417 session_ids.remove(&session_id);
418 }
419 if !thread.main_worktree_paths.is_empty() {
420 if let Some(session_ids) = self
421 .threads_by_main_paths
422 .get_mut(&thread.main_worktree_paths)
423 {
424 session_ids.remove(&session_id);
425 }
426 }
427 }
428 self.threads.remove(&session_id);
429 self.pending_thread_ops_tx
430 .try_send(DbOperation::Delete(session_id))
431 .log_err();
432 cx.notify();
433 }
434
435 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
436 let weak_store = cx.weak_entity();
437
438 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
439 // Don't track subagent threads in the sidebar.
440 if thread.parent_session_id().is_some() {
441 return;
442 }
443
444 let thread_entity = cx.entity();
445
446 cx.on_release({
447 let weak_store = weak_store.clone();
448 move |thread, cx| {
449 weak_store
450 .update(cx, |store, cx| {
451 let session_id = thread.session_id().clone();
452 store.session_subscriptions.remove(&session_id);
453 if thread.entries().is_empty() {
454 // Empty threads can be unloaded without ever being
455 // durably persisted by the underlying agent.
456 store.delete(session_id, cx);
457 }
458 })
459 .ok();
460 }
461 })
462 .detach();
463
464 weak_store
465 .update(cx, |this, cx| {
466 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
467 this.session_subscriptions
468 .insert(thread.session_id().clone(), subscription);
469 })
470 .ok();
471 })
472 .detach();
473
474 let (tx, rx) = smol::channel::unbounded();
475 let _db_operations_task = cx.background_spawn({
476 let db = db.clone();
477 async move {
478 while let Ok(first_update) = rx.recv().await {
479 let mut updates = vec![first_update];
480 while let Ok(update) = rx.try_recv() {
481 updates.push(update);
482 }
483 let updates = Self::dedup_db_operations(updates);
484 for operation in updates {
485 match operation {
486 DbOperation::Upsert(metadata) => {
487 db.save(metadata).await.log_err();
488 }
489 DbOperation::Delete(session_id) => {
490 db.delete(session_id).await.log_err();
491 }
492 }
493 }
494 }
495 }
496 });
497
498 let mut this = Self {
499 db,
500 threads: HashMap::default(),
501 threads_by_paths: HashMap::default(),
502 threads_by_main_paths: HashMap::default(),
503 reload_task: None,
504 session_subscriptions: HashMap::default(),
505 pending_thread_ops_tx: tx,
506 _db_operations_task,
507 };
508 let _ = this.reload(cx);
509 this
510 }
511
512 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
513 let mut ops = HashMap::default();
514 for operation in operations.into_iter().rev() {
515 if ops.contains_key(operation.id()) {
516 continue;
517 }
518 ops.insert(operation.id().clone(), operation);
519 }
520 ops.into_values().collect()
521 }
522
523 fn handle_thread_event(
524 &mut self,
525 thread: Entity<acp_thread::AcpThread>,
526 event: &AcpThreadEvent,
527 cx: &mut Context<Self>,
528 ) {
529 // Don't track subagent threads in the sidebar.
530 if thread.read(cx).parent_session_id().is_some() {
531 return;
532 }
533
534 match event {
535 AcpThreadEvent::NewEntry
536 | AcpThreadEvent::TitleUpdated
537 | AcpThreadEvent::EntryUpdated(_)
538 | AcpThreadEvent::EntriesRemoved(_)
539 | AcpThreadEvent::ToolAuthorizationRequested(_)
540 | AcpThreadEvent::ToolAuthorizationReceived(_)
541 | AcpThreadEvent::Retry(_)
542 | AcpThreadEvent::Stopped(_)
543 | AcpThreadEvent::Error
544 | AcpThreadEvent::LoadError(_)
545 | AcpThreadEvent::Refusal
546 | AcpThreadEvent::WorkingDirectoriesUpdated => {
547 let thread_ref = thread.read(cx);
548 let existing_thread = self.threads.get(thread_ref.session_id());
549 let session_id = thread_ref.session_id().clone();
550 let title = thread_ref
551 .title()
552 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
553
554 let updated_at = Utc::now();
555
556 let created_at = existing_thread
557 .and_then(|t| t.created_at)
558 .unwrap_or_else(|| updated_at);
559
560 let agent_id = thread_ref.connection().agent_id();
561
562 let folder_paths = {
563 let project = thread_ref.project().read(cx);
564 let paths: Vec<Arc<Path>> = project
565 .visible_worktrees(cx)
566 .map(|worktree| worktree.read(cx).abs_path())
567 .collect();
568 PathList::new(&paths)
569 };
570
571 let main_worktree_paths = {
572 let project = thread_ref.project().read(cx);
573 let mut main_paths: Vec<Arc<Path>> = Vec::new();
574 for repo in project.repositories(cx).values() {
575 let snapshot = repo.read(cx).snapshot();
576 if snapshot.is_linked_worktree() {
577 main_paths.push(snapshot.original_repo_abs_path.clone());
578 }
579 }
580 main_paths.sort();
581 main_paths.dedup();
582 PathList::new(&main_paths)
583 };
584
585 // Threads without a folder path (e.g. started in an empty
586 // window) are archived by default so they don't get lost,
587 // because they won't show up in the sidebar. Users can reload
588 // them from the archive.
589 let archived = existing_thread
590 .map(|t| t.archived)
591 .unwrap_or(folder_paths.is_empty());
592
593 let metadata = ThreadMetadata {
594 session_id,
595 agent_id,
596 title,
597 created_at: Some(created_at),
598 updated_at,
599 folder_paths,
600 main_worktree_paths,
601 archived,
602 };
603
604 self.save(metadata, cx);
605 }
606 AcpThreadEvent::TokenUsageUpdated
607 | AcpThreadEvent::SubagentSpawned(_)
608 | AcpThreadEvent::PromptCapabilitiesUpdated
609 | AcpThreadEvent::AvailableCommandsUpdated(_)
610 | AcpThreadEvent::ModeUpdated(_)
611 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
612 }
613 }
614}
615
616impl Global for ThreadMetadataStore {}
617
618struct ThreadMetadataDb(ThreadSafeConnection);
619
620impl Domain for ThreadMetadataDb {
621 const NAME: &str = stringify!(ThreadMetadataDb);
622
623 const MIGRATIONS: &[&str] = &[
624 sql!(
625 CREATE TABLE IF NOT EXISTS sidebar_threads(
626 session_id TEXT PRIMARY KEY,
627 agent_id TEXT,
628 title TEXT NOT NULL,
629 updated_at TEXT NOT NULL,
630 created_at TEXT,
631 folder_paths TEXT,
632 folder_paths_order TEXT
633 ) STRICT;
634 ),
635 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
636 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths TEXT),
637 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths_order TEXT),
638 ];
639}
640
641db::static_connection!(ThreadMetadataDb, []);
642
643impl ThreadMetadataDb {
644 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
645 self.select::<Arc<str>>(
646 "SELECT session_id FROM sidebar_threads \
647 ORDER BY updated_at DESC",
648 )?()
649 }
650
651 /// List all sidebar thread metadata, ordered by updated_at descending.
652 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
653 self.select::<ThreadMetadata>(
654 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order \
655 FROM sidebar_threads \
656 ORDER BY updated_at DESC"
657 )?()
658 }
659
660 /// Upsert metadata for a thread.
661 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
662 let id = row.session_id.0.clone();
663 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
664 None
665 } else {
666 Some(row.agent_id.to_string())
667 };
668 let title = row.title.to_string();
669 let updated_at = row.updated_at.to_rfc3339();
670 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
671 let serialized = row.folder_paths.serialize();
672 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
673 (None, None)
674 } else {
675 (Some(serialized.paths), Some(serialized.order))
676 };
677 let main_serialized = row.main_worktree_paths.serialize();
678 let (main_worktree_paths, main_worktree_paths_order) = if row.main_worktree_paths.is_empty()
679 {
680 (None, None)
681 } else {
682 (Some(main_serialized.paths), Some(main_serialized.order))
683 };
684 let archived = row.archived;
685
686 self.write(move |conn| {
687 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order) \
688 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
689 ON CONFLICT(session_id) DO UPDATE SET \
690 agent_id = excluded.agent_id, \
691 title = excluded.title, \
692 updated_at = excluded.updated_at, \
693 created_at = excluded.created_at, \
694 folder_paths = excluded.folder_paths, \
695 folder_paths_order = excluded.folder_paths_order, \
696 archived = excluded.archived, \
697 main_worktree_paths = excluded.main_worktree_paths, \
698 main_worktree_paths_order = excluded.main_worktree_paths_order";
699 let mut stmt = Statement::prepare(conn, sql)?;
700 let mut i = stmt.bind(&id, 1)?;
701 i = stmt.bind(&agent_id, i)?;
702 i = stmt.bind(&title, i)?;
703 i = stmt.bind(&updated_at, i)?;
704 i = stmt.bind(&created_at, i)?;
705 i = stmt.bind(&folder_paths, i)?;
706 i = stmt.bind(&folder_paths_order, i)?;
707 i = stmt.bind(&archived, i)?;
708 i = stmt.bind(&main_worktree_paths, i)?;
709 stmt.bind(&main_worktree_paths_order, i)?;
710 stmt.exec()
711 })
712 .await
713 }
714
715 /// Delete metadata for a single thread.
716 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
717 let id = session_id.0.clone();
718 self.write(move |conn| {
719 let mut stmt =
720 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
721 stmt.bind(&id, 1)?;
722 stmt.exec()
723 })
724 .await
725 }
726}
727
728impl Column for ThreadMetadata {
729 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
730 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
731 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
732 let (title, next): (String, i32) = Column::column(statement, next)?;
733 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
734 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
735 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
736 let (folder_paths_order_str, next): (Option<String>, i32) =
737 Column::column(statement, next)?;
738 let (archived, next): (bool, i32) = Column::column(statement, next)?;
739 let (main_worktree_paths_str, next): (Option<String>, i32) =
740 Column::column(statement, next)?;
741 let (main_worktree_paths_order_str, next): (Option<String>, i32) =
742 Column::column(statement, next)?;
743
744 let agent_id = agent_id
745 .map(|id| AgentId::new(id))
746 .unwrap_or(ZED_AGENT_ID.clone());
747
748 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
749 let created_at = created_at_str
750 .as_deref()
751 .map(DateTime::parse_from_rfc3339)
752 .transpose()?
753 .map(|dt| dt.with_timezone(&Utc));
754
755 let folder_paths = folder_paths_str
756 .map(|paths| {
757 PathList::deserialize(&util::path_list::SerializedPathList {
758 paths,
759 order: folder_paths_order_str.unwrap_or_default(),
760 })
761 })
762 .unwrap_or_default();
763
764 let main_worktree_paths = main_worktree_paths_str
765 .map(|paths| {
766 PathList::deserialize(&util::path_list::SerializedPathList {
767 paths,
768 order: main_worktree_paths_order_str.unwrap_or_default(),
769 })
770 })
771 .unwrap_or_default();
772
773 Ok((
774 ThreadMetadata {
775 session_id: acp::SessionId::new(id),
776 agent_id,
777 title: title.into(),
778 updated_at,
779 created_at,
780 folder_paths,
781 main_worktree_paths,
782 archived,
783 },
784 next,
785 ))
786 }
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use acp_thread::{AgentConnection, StubAgentConnection};
793 use action_log::ActionLog;
794 use agent::DbThread;
795 use agent_client_protocol as acp;
796 use feature_flags::FeatureFlagAppExt;
797 use gpui::TestAppContext;
798 use project::FakeFs;
799 use project::Project;
800 use std::path::Path;
801 use std::rc::Rc;
802
803 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
804 DbThread {
805 title: title.to_string().into(),
806 messages: Vec::new(),
807 updated_at,
808 detailed_summary: None,
809 initial_project_snapshot: None,
810 cumulative_token_usage: Default::default(),
811 request_token_usage: Default::default(),
812 model: None,
813 profile: None,
814 imported: false,
815 subagent_context: None,
816 speed: None,
817 thinking_enabled: false,
818 thinking_effort: None,
819 draft_prompt: None,
820 ui_scroll_position: None,
821 }
822 }
823
824 fn make_metadata(
825 session_id: &str,
826 title: &str,
827 updated_at: DateTime<Utc>,
828 folder_paths: PathList,
829 ) -> ThreadMetadata {
830 ThreadMetadata {
831 archived: false,
832 session_id: acp::SessionId::new(session_id),
833 agent_id: agent::ZED_AGENT_ID.clone(),
834 title: title.to_string().into(),
835 updated_at,
836 created_at: Some(updated_at),
837 folder_paths,
838 main_worktree_paths: PathList::default(),
839 }
840 }
841
842 fn init_test(cx: &mut TestAppContext) {
843 cx.update(|cx| {
844 let settings_store = settings::SettingsStore::test(cx);
845 cx.set_global(settings_store);
846 cx.update_flags(true, vec!["agent-v2".to_string()]);
847 ThreadMetadataStore::init_global(cx);
848 ThreadStore::init_global(cx);
849 });
850 cx.run_until_parked();
851 }
852
853 #[gpui::test]
854 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
855 let first_paths = PathList::new(&[Path::new("/project-a")]);
856 let second_paths = PathList::new(&[Path::new("/project-b")]);
857 let now = Utc::now();
858 let older = now - chrono::Duration::seconds(1);
859
860 let thread = std::thread::current();
861 let test_name = thread.name().unwrap_or("unknown_test");
862 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
863 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
864 &db_name,
865 )));
866
867 db.save(make_metadata(
868 "session-1",
869 "First Thread",
870 now,
871 first_paths.clone(),
872 ))
873 .await
874 .unwrap();
875 db.save(make_metadata(
876 "session-2",
877 "Second Thread",
878 older,
879 second_paths.clone(),
880 ))
881 .await
882 .unwrap();
883
884 cx.update(|cx| {
885 let settings_store = settings::SettingsStore::test(cx);
886 cx.set_global(settings_store);
887 cx.update_flags(true, vec!["agent-v2".to_string()]);
888 ThreadMetadataStore::init_global(cx);
889 });
890
891 cx.run_until_parked();
892
893 cx.update(|cx| {
894 let store = ThreadMetadataStore::global(cx);
895 let store = store.read(cx);
896
897 let entry_ids = store
898 .entry_ids()
899 .map(|session_id| session_id.0.to_string())
900 .collect::<Vec<_>>();
901 assert_eq!(entry_ids.len(), 2);
902 assert!(entry_ids.contains(&"session-1".to_string()));
903 assert!(entry_ids.contains(&"session-2".to_string()));
904
905 let first_path_entries = store
906 .entries_for_path(&first_paths)
907 .map(|entry| entry.session_id.0.to_string())
908 .collect::<Vec<_>>();
909 assert_eq!(first_path_entries, vec!["session-1"]);
910
911 let second_path_entries = store
912 .entries_for_path(&second_paths)
913 .map(|entry| entry.session_id.0.to_string())
914 .collect::<Vec<_>>();
915 assert_eq!(second_path_entries, vec!["session-2"]);
916 });
917 }
918
919 #[gpui::test]
920 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
921 init_test(cx);
922
923 let first_paths = PathList::new(&[Path::new("/project-a")]);
924 let second_paths = PathList::new(&[Path::new("/project-b")]);
925 let initial_time = Utc::now();
926 let updated_time = initial_time + chrono::Duration::seconds(1);
927
928 let initial_metadata = make_metadata(
929 "session-1",
930 "First Thread",
931 initial_time,
932 first_paths.clone(),
933 );
934
935 let second_metadata = make_metadata(
936 "session-2",
937 "Second Thread",
938 initial_time,
939 second_paths.clone(),
940 );
941
942 cx.update(|cx| {
943 let store = ThreadMetadataStore::global(cx);
944 store.update(cx, |store, cx| {
945 store.save(initial_metadata, cx);
946 store.save(second_metadata, cx);
947 });
948 });
949
950 cx.run_until_parked();
951
952 cx.update(|cx| {
953 let store = ThreadMetadataStore::global(cx);
954 let store = store.read(cx);
955
956 let first_path_entries = store
957 .entries_for_path(&first_paths)
958 .map(|entry| entry.session_id.0.to_string())
959 .collect::<Vec<_>>();
960 assert_eq!(first_path_entries, vec!["session-1"]);
961
962 let second_path_entries = store
963 .entries_for_path(&second_paths)
964 .map(|entry| entry.session_id.0.to_string())
965 .collect::<Vec<_>>();
966 assert_eq!(second_path_entries, vec!["session-2"]);
967 });
968
969 let moved_metadata = make_metadata(
970 "session-1",
971 "First Thread",
972 updated_time,
973 second_paths.clone(),
974 );
975
976 cx.update(|cx| {
977 let store = ThreadMetadataStore::global(cx);
978 store.update(cx, |store, cx| {
979 store.save(moved_metadata, cx);
980 });
981 });
982
983 cx.run_until_parked();
984
985 cx.update(|cx| {
986 let store = ThreadMetadataStore::global(cx);
987 let store = store.read(cx);
988
989 let entry_ids = store
990 .entry_ids()
991 .map(|session_id| session_id.0.to_string())
992 .collect::<Vec<_>>();
993 assert_eq!(entry_ids.len(), 2);
994 assert!(entry_ids.contains(&"session-1".to_string()));
995 assert!(entry_ids.contains(&"session-2".to_string()));
996
997 let first_path_entries = store
998 .entries_for_path(&first_paths)
999 .map(|entry| entry.session_id.0.to_string())
1000 .collect::<Vec<_>>();
1001 assert!(first_path_entries.is_empty());
1002
1003 let second_path_entries = store
1004 .entries_for_path(&second_paths)
1005 .map(|entry| entry.session_id.0.to_string())
1006 .collect::<Vec<_>>();
1007 assert_eq!(second_path_entries.len(), 2);
1008 assert!(second_path_entries.contains(&"session-1".to_string()));
1009 assert!(second_path_entries.contains(&"session-2".to_string()));
1010 });
1011
1012 cx.update(|cx| {
1013 let store = ThreadMetadataStore::global(cx);
1014 store.update(cx, |store, cx| {
1015 store.delete(acp::SessionId::new("session-2"), cx);
1016 });
1017 });
1018
1019 cx.run_until_parked();
1020
1021 cx.update(|cx| {
1022 let store = ThreadMetadataStore::global(cx);
1023 let store = store.read(cx);
1024
1025 let entry_ids = store
1026 .entry_ids()
1027 .map(|session_id| session_id.0.to_string())
1028 .collect::<Vec<_>>();
1029 assert_eq!(entry_ids, vec!["session-1"]);
1030
1031 let second_path_entries = store
1032 .entries_for_path(&second_paths)
1033 .map(|entry| entry.session_id.0.to_string())
1034 .collect::<Vec<_>>();
1035 assert_eq!(second_path_entries, vec!["session-1"]);
1036 });
1037 }
1038
1039 #[gpui::test]
1040 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1041 init_test(cx);
1042
1043 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1044 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1045 let now = Utc::now();
1046
1047 let existing_metadata = ThreadMetadata {
1048 session_id: acp::SessionId::new("a-session-0"),
1049 agent_id: agent::ZED_AGENT_ID.clone(),
1050 title: "Existing Metadata".into(),
1051 updated_at: now - chrono::Duration::seconds(10),
1052 created_at: Some(now - chrono::Duration::seconds(10)),
1053 folder_paths: project_a_paths.clone(),
1054 main_worktree_paths: PathList::default(),
1055 archived: false,
1056 };
1057
1058 cx.update(|cx| {
1059 let store = ThreadMetadataStore::global(cx);
1060 store.update(cx, |store, cx| {
1061 store.save(existing_metadata, cx);
1062 });
1063 });
1064 cx.run_until_parked();
1065
1066 let threads_to_save = vec![
1067 (
1068 "a-session-0",
1069 "Thread A0 From Native Store",
1070 project_a_paths.clone(),
1071 now,
1072 ),
1073 (
1074 "a-session-1",
1075 "Thread A1",
1076 project_a_paths.clone(),
1077 now + chrono::Duration::seconds(1),
1078 ),
1079 (
1080 "b-session-0",
1081 "Thread B0",
1082 project_b_paths.clone(),
1083 now + chrono::Duration::seconds(2),
1084 ),
1085 (
1086 "projectless",
1087 "Projectless",
1088 PathList::default(),
1089 now + chrono::Duration::seconds(3),
1090 ),
1091 ];
1092
1093 for (session_id, title, paths, updated_at) in &threads_to_save {
1094 let save_task = cx.update(|cx| {
1095 let thread_store = ThreadStore::global(cx);
1096 let session_id = session_id.to_string();
1097 let title = title.to_string();
1098 let paths = paths.clone();
1099 thread_store.update(cx, |store, cx| {
1100 store.save_thread(
1101 acp::SessionId::new(session_id),
1102 make_db_thread(&title, *updated_at),
1103 paths,
1104 cx,
1105 )
1106 })
1107 });
1108 save_task.await.unwrap();
1109 cx.run_until_parked();
1110 }
1111
1112 cx.update(|cx| migrate_thread_metadata(cx));
1113 cx.run_until_parked();
1114
1115 let list = cx.update(|cx| {
1116 let store = ThreadMetadataStore::global(cx);
1117 store.read(cx).entries().cloned().collect::<Vec<_>>()
1118 });
1119
1120 assert_eq!(list.len(), 4);
1121 assert!(
1122 list.iter()
1123 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1124 );
1125
1126 let existing_metadata = list
1127 .iter()
1128 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1129 .unwrap();
1130 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1131 assert!(!existing_metadata.archived);
1132
1133 let migrated_session_ids = list
1134 .iter()
1135 .map(|metadata| metadata.session_id.0.as_ref())
1136 .collect::<Vec<_>>();
1137 assert!(migrated_session_ids.contains(&"a-session-1"));
1138 assert!(migrated_session_ids.contains(&"b-session-0"));
1139 assert!(migrated_session_ids.contains(&"projectless"));
1140
1141 let migrated_entries = list
1142 .iter()
1143 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1144 .collect::<Vec<_>>();
1145 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1146 }
1147
1148 #[gpui::test]
1149 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1150 cx: &mut TestAppContext,
1151 ) {
1152 init_test(cx);
1153
1154 let project_paths = PathList::new(&[Path::new("/project-a")]);
1155 let existing_updated_at = Utc::now();
1156
1157 let existing_metadata = ThreadMetadata {
1158 session_id: acp::SessionId::new("existing-session"),
1159 agent_id: agent::ZED_AGENT_ID.clone(),
1160 title: "Existing Metadata".into(),
1161 updated_at: existing_updated_at,
1162 created_at: Some(existing_updated_at),
1163 folder_paths: project_paths.clone(),
1164 main_worktree_paths: PathList::default(),
1165 archived: false,
1166 };
1167
1168 cx.update(|cx| {
1169 let store = ThreadMetadataStore::global(cx);
1170 store.update(cx, |store, cx| {
1171 store.save(existing_metadata, cx);
1172 });
1173 });
1174 cx.run_until_parked();
1175
1176 let save_task = cx.update(|cx| {
1177 let thread_store = ThreadStore::global(cx);
1178 thread_store.update(cx, |store, cx| {
1179 store.save_thread(
1180 acp::SessionId::new("existing-session"),
1181 make_db_thread(
1182 "Updated Native Thread Title",
1183 existing_updated_at + chrono::Duration::seconds(1),
1184 ),
1185 project_paths.clone(),
1186 cx,
1187 )
1188 })
1189 });
1190 save_task.await.unwrap();
1191 cx.run_until_parked();
1192
1193 cx.update(|cx| migrate_thread_metadata(cx));
1194 cx.run_until_parked();
1195
1196 let list = cx.update(|cx| {
1197 let store = ThreadMetadataStore::global(cx);
1198 store.read(cx).entries().cloned().collect::<Vec<_>>()
1199 });
1200
1201 assert_eq!(list.len(), 1);
1202 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1203 }
1204
1205 #[gpui::test]
1206 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1207 cx: &mut TestAppContext,
1208 ) {
1209 init_test(cx);
1210
1211 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1212 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1213 let now = Utc::now();
1214
1215 // Create 7 threads for project A and 3 for project B
1216 let mut threads_to_save = Vec::new();
1217 for i in 0..7 {
1218 threads_to_save.push((
1219 format!("a-session-{i}"),
1220 format!("Thread A{i}"),
1221 project_a_paths.clone(),
1222 now + chrono::Duration::seconds(i as i64),
1223 ));
1224 }
1225 for i in 0..3 {
1226 threads_to_save.push((
1227 format!("b-session-{i}"),
1228 format!("Thread B{i}"),
1229 project_b_paths.clone(),
1230 now + chrono::Duration::seconds(i as i64),
1231 ));
1232 }
1233
1234 for (session_id, title, paths, updated_at) in &threads_to_save {
1235 let save_task = cx.update(|cx| {
1236 let thread_store = ThreadStore::global(cx);
1237 let session_id = session_id.to_string();
1238 let title = title.to_string();
1239 let paths = paths.clone();
1240 thread_store.update(cx, |store, cx| {
1241 store.save_thread(
1242 acp::SessionId::new(session_id),
1243 make_db_thread(&title, *updated_at),
1244 paths,
1245 cx,
1246 )
1247 })
1248 });
1249 save_task.await.unwrap();
1250 cx.run_until_parked();
1251 }
1252
1253 cx.update(|cx| migrate_thread_metadata(cx));
1254 cx.run_until_parked();
1255
1256 let list = cx.update(|cx| {
1257 let store = ThreadMetadataStore::global(cx);
1258 store.read(cx).entries().cloned().collect::<Vec<_>>()
1259 });
1260
1261 assert_eq!(list.len(), 10);
1262
1263 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1264 let mut project_a_entries: Vec<_> = list
1265 .iter()
1266 .filter(|m| m.folder_paths == project_a_paths)
1267 .collect();
1268 assert_eq!(project_a_entries.len(), 7);
1269 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1270
1271 for entry in &project_a_entries[..5] {
1272 assert!(
1273 !entry.archived,
1274 "Expected {} to be unarchived (top 5 most recent)",
1275 entry.session_id.0
1276 );
1277 }
1278 for entry in &project_a_entries[5..] {
1279 assert!(
1280 entry.archived,
1281 "Expected {} to be archived (older than top 5)",
1282 entry.session_id.0
1283 );
1284 }
1285
1286 // Project B: all 3 should be unarchived (under the limit)
1287 let project_b_entries: Vec<_> = list
1288 .iter()
1289 .filter(|m| m.folder_paths == project_b_paths)
1290 .collect();
1291 assert_eq!(project_b_entries.len(), 3);
1292 assert!(project_b_entries.iter().all(|m| !m.archived));
1293 }
1294
1295 #[gpui::test]
1296 async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1297 init_test(cx);
1298
1299 let fs = FakeFs::new(cx.executor());
1300 let project = Project::test(fs, None::<&Path>, cx).await;
1301 let connection = Rc::new(StubAgentConnection::new());
1302
1303 let thread = cx
1304 .update(|cx| {
1305 connection
1306 .clone()
1307 .new_session(project.clone(), PathList::default(), cx)
1308 })
1309 .await
1310 .unwrap();
1311 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1312
1313 cx.update(|cx| {
1314 thread.update(cx, |thread, cx| {
1315 thread.set_title("Draft Thread".into(), cx).detach();
1316 });
1317 });
1318 cx.run_until_parked();
1319
1320 let metadata_ids = cx.update(|cx| {
1321 ThreadMetadataStore::global(cx)
1322 .read(cx)
1323 .entry_ids()
1324 .collect::<Vec<_>>()
1325 });
1326 assert_eq!(metadata_ids, vec![session_id]);
1327
1328 drop(thread);
1329 cx.update(|_| {});
1330 cx.run_until_parked();
1331 cx.run_until_parked();
1332
1333 let metadata_ids = cx.update(|cx| {
1334 ThreadMetadataStore::global(cx)
1335 .read(cx)
1336 .entry_ids()
1337 .collect::<Vec<_>>()
1338 });
1339 assert!(
1340 metadata_ids.is_empty(),
1341 "expected empty draft thread metadata to be deleted on release"
1342 );
1343 }
1344
1345 #[gpui::test]
1346 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1347 init_test(cx);
1348
1349 let fs = FakeFs::new(cx.executor());
1350 let project = Project::test(fs, None::<&Path>, cx).await;
1351 let connection = Rc::new(StubAgentConnection::new());
1352
1353 let thread = cx
1354 .update(|cx| {
1355 connection
1356 .clone()
1357 .new_session(project.clone(), PathList::default(), cx)
1358 })
1359 .await
1360 .unwrap();
1361 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1362
1363 cx.update(|cx| {
1364 thread.update(cx, |thread, cx| {
1365 thread.push_user_content_block(None, "Hello".into(), cx);
1366 });
1367 });
1368 cx.run_until_parked();
1369
1370 let metadata_ids = cx.update(|cx| {
1371 ThreadMetadataStore::global(cx)
1372 .read(cx)
1373 .entry_ids()
1374 .collect::<Vec<_>>()
1375 });
1376 assert_eq!(metadata_ids, vec![session_id.clone()]);
1377
1378 drop(thread);
1379 cx.update(|_| {});
1380 cx.run_until_parked();
1381
1382 let metadata_ids = cx.update(|cx| {
1383 ThreadMetadataStore::global(cx)
1384 .read(cx)
1385 .entry_ids()
1386 .collect::<Vec<_>>()
1387 });
1388 assert_eq!(metadata_ids, vec![session_id]);
1389 }
1390
1391 #[gpui::test]
1392 async fn test_threads_without_project_association_are_archived_by_default(
1393 cx: &mut TestAppContext,
1394 ) {
1395 init_test(cx);
1396
1397 let fs = FakeFs::new(cx.executor());
1398 let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1399 let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1400 let connection = Rc::new(StubAgentConnection::new());
1401
1402 let thread_without_worktree = cx
1403 .update(|cx| {
1404 connection.clone().new_session(
1405 project_without_worktree.clone(),
1406 PathList::default(),
1407 cx,
1408 )
1409 })
1410 .await
1411 .unwrap();
1412 let session_without_worktree =
1413 cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1414
1415 cx.update(|cx| {
1416 thread_without_worktree.update(cx, |thread, cx| {
1417 thread.set_title("No Project Thread".into(), cx).detach();
1418 });
1419 });
1420 cx.run_until_parked();
1421
1422 let thread_with_worktree = cx
1423 .update(|cx| {
1424 connection.clone().new_session(
1425 project_with_worktree.clone(),
1426 PathList::default(),
1427 cx,
1428 )
1429 })
1430 .await
1431 .unwrap();
1432 let session_with_worktree =
1433 cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1434
1435 cx.update(|cx| {
1436 thread_with_worktree.update(cx, |thread, cx| {
1437 thread.set_title("Project Thread".into(), cx).detach();
1438 });
1439 });
1440 cx.run_until_parked();
1441
1442 cx.update(|cx| {
1443 let store = ThreadMetadataStore::global(cx);
1444 let store = store.read(cx);
1445
1446 let without_worktree = store
1447 .entry(&session_without_worktree)
1448 .expect("missing metadata for thread without project association");
1449 assert!(without_worktree.folder_paths.is_empty());
1450 assert!(
1451 without_worktree.archived,
1452 "expected thread without project association to be archived"
1453 );
1454
1455 let with_worktree = store
1456 .entry(&session_with_worktree)
1457 .expect("missing metadata for thread with project association");
1458 assert_eq!(
1459 with_worktree.folder_paths,
1460 PathList::new(&[Path::new("/project-a")])
1461 );
1462 assert!(
1463 !with_worktree.archived,
1464 "expected thread with project association to remain unarchived"
1465 );
1466 });
1467 }
1468
1469 #[gpui::test]
1470 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1471 init_test(cx);
1472
1473 let fs = FakeFs::new(cx.executor());
1474 let project = Project::test(fs, None::<&Path>, cx).await;
1475 let connection = Rc::new(StubAgentConnection::new());
1476
1477 // Create a regular (non-subagent) AcpThread.
1478 let regular_thread = cx
1479 .update(|cx| {
1480 connection
1481 .clone()
1482 .new_session(project.clone(), PathList::default(), cx)
1483 })
1484 .await
1485 .unwrap();
1486
1487 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1488
1489 // Set a title on the regular thread to trigger a save via handle_thread_update.
1490 cx.update(|cx| {
1491 regular_thread.update(cx, |thread, cx| {
1492 thread.set_title("Regular Thread".into(), cx).detach();
1493 });
1494 });
1495 cx.run_until_parked();
1496
1497 // Create a subagent AcpThread
1498 let subagent_session_id = acp::SessionId::new("subagent-session");
1499 let subagent_thread = cx.update(|cx| {
1500 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1501 cx.new(|cx| {
1502 acp_thread::AcpThread::new(
1503 Some(regular_session_id.clone()),
1504 Some("Subagent Thread".into()),
1505 None,
1506 connection.clone(),
1507 project.clone(),
1508 action_log,
1509 subagent_session_id.clone(),
1510 watch::Receiver::constant(acp::PromptCapabilities::new()),
1511 cx,
1512 )
1513 })
1514 });
1515
1516 // Set a title on the subagent thread to trigger handle_thread_update.
1517 cx.update(|cx| {
1518 subagent_thread.update(cx, |thread, cx| {
1519 thread
1520 .set_title("Subagent Thread Title".into(), cx)
1521 .detach();
1522 });
1523 });
1524 cx.run_until_parked();
1525
1526 // List all metadata from the store cache.
1527 let list = cx.update(|cx| {
1528 let store = ThreadMetadataStore::global(cx);
1529 store.read(cx).entries().cloned().collect::<Vec<_>>()
1530 });
1531
1532 // The subagent thread should NOT appear in the sidebar metadata.
1533 // Only the regular thread should be listed.
1534 assert_eq!(
1535 list.len(),
1536 1,
1537 "Expected only the regular thread in sidebar metadata, \
1538 but found {} entries (subagent threads are leaking into the sidebar)",
1539 list.len(),
1540 );
1541 assert_eq!(list[0].session_id, regular_session_id);
1542 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1543 }
1544
1545 #[test]
1546 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1547 let now = Utc::now();
1548
1549 let operations = vec![
1550 DbOperation::Upsert(make_metadata(
1551 "session-1",
1552 "First Thread",
1553 now,
1554 PathList::default(),
1555 )),
1556 DbOperation::Delete(acp::SessionId::new("session-1")),
1557 ];
1558
1559 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1560
1561 assert_eq!(deduped.len(), 1);
1562 assert_eq!(
1563 deduped[0],
1564 DbOperation::Delete(acp::SessionId::new("session-1"))
1565 );
1566 }
1567
1568 #[test]
1569 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1570 let now = Utc::now();
1571 let later = now + chrono::Duration::seconds(1);
1572
1573 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1574 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1575
1576 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1577 DbOperation::Upsert(old_metadata),
1578 DbOperation::Upsert(new_metadata.clone()),
1579 ]);
1580
1581 assert_eq!(deduped.len(), 1);
1582 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1583 }
1584
1585 #[test]
1586 fn test_dedup_db_operations_preserves_distinct_sessions() {
1587 let now = Utc::now();
1588
1589 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1590 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1591 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1592 DbOperation::Upsert(metadata1.clone()),
1593 DbOperation::Upsert(metadata2.clone()),
1594 ]);
1595
1596 assert_eq!(deduped.len(), 2);
1597 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1598 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1599 }
1600
1601 #[gpui::test]
1602 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1603 init_test(cx);
1604
1605 let paths = PathList::new(&[Path::new("/project-a")]);
1606 let now = Utc::now();
1607 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1608
1609 cx.update(|cx| {
1610 let store = ThreadMetadataStore::global(cx);
1611 store.update(cx, |store, cx| {
1612 store.save(metadata, cx);
1613 });
1614 });
1615
1616 cx.run_until_parked();
1617
1618 cx.update(|cx| {
1619 let store = ThreadMetadataStore::global(cx);
1620 let store = store.read(cx);
1621
1622 let path_entries = store
1623 .entries_for_path(&paths)
1624 .map(|e| e.session_id.0.to_string())
1625 .collect::<Vec<_>>();
1626 assert_eq!(path_entries, vec!["session-1"]);
1627
1628 let archived = store
1629 .archived_entries()
1630 .map(|e| e.session_id.0.to_string())
1631 .collect::<Vec<_>>();
1632 assert!(archived.is_empty());
1633 });
1634
1635 cx.update(|cx| {
1636 let store = ThreadMetadataStore::global(cx);
1637 store.update(cx, |store, cx| {
1638 store.archive(&acp::SessionId::new("session-1"), cx);
1639 });
1640 });
1641
1642 cx.run_until_parked();
1643
1644 cx.update(|cx| {
1645 let store = ThreadMetadataStore::global(cx);
1646 let store = store.read(cx);
1647
1648 let path_entries = store
1649 .entries_for_path(&paths)
1650 .map(|e| e.session_id.0.to_string())
1651 .collect::<Vec<_>>();
1652 assert!(path_entries.is_empty());
1653
1654 let archived = store.archived_entries().collect::<Vec<_>>();
1655 assert_eq!(archived.len(), 1);
1656 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1657 assert!(archived[0].archived);
1658 });
1659
1660 cx.update(|cx| {
1661 let store = ThreadMetadataStore::global(cx);
1662 store.update(cx, |store, cx| {
1663 store.unarchive(&acp::SessionId::new("session-1"), cx);
1664 });
1665 });
1666
1667 cx.run_until_parked();
1668
1669 cx.update(|cx| {
1670 let store = ThreadMetadataStore::global(cx);
1671 let store = store.read(cx);
1672
1673 let path_entries = store
1674 .entries_for_path(&paths)
1675 .map(|e| e.session_id.0.to_string())
1676 .collect::<Vec<_>>();
1677 assert_eq!(path_entries, vec!["session-1"]);
1678
1679 let archived = store
1680 .archived_entries()
1681 .map(|e| e.session_id.0.to_string())
1682 .collect::<Vec<_>>();
1683 assert!(archived.is_empty());
1684 });
1685 }
1686
1687 #[gpui::test]
1688 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1689 init_test(cx);
1690
1691 let paths = PathList::new(&[Path::new("/project-a")]);
1692 let now = Utc::now();
1693
1694 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1695 let metadata2 = make_metadata(
1696 "session-2",
1697 "Archived Thread",
1698 now - chrono::Duration::seconds(1),
1699 paths.clone(),
1700 );
1701
1702 cx.update(|cx| {
1703 let store = ThreadMetadataStore::global(cx);
1704 store.update(cx, |store, cx| {
1705 store.save(metadata1, cx);
1706 store.save(metadata2, cx);
1707 });
1708 });
1709
1710 cx.run_until_parked();
1711
1712 cx.update(|cx| {
1713 let store = ThreadMetadataStore::global(cx);
1714 store.update(cx, |store, cx| {
1715 store.archive(&acp::SessionId::new("session-2"), cx);
1716 });
1717 });
1718
1719 cx.run_until_parked();
1720
1721 cx.update(|cx| {
1722 let store = ThreadMetadataStore::global(cx);
1723 let store = store.read(cx);
1724
1725 let path_entries = store
1726 .entries_for_path(&paths)
1727 .map(|e| e.session_id.0.to_string())
1728 .collect::<Vec<_>>();
1729 assert_eq!(path_entries, vec!["session-1"]);
1730
1731 let all_entries = store
1732 .entries()
1733 .map(|e| e.session_id.0.to_string())
1734 .collect::<Vec<_>>();
1735 assert_eq!(all_entries.len(), 2);
1736 assert!(all_entries.contains(&"session-1".to_string()));
1737 assert!(all_entries.contains(&"session-2".to_string()));
1738
1739 let archived = store
1740 .archived_entries()
1741 .map(|e| e.session_id.0.to_string())
1742 .collect::<Vec<_>>();
1743 assert_eq!(archived, vec!["session-2"]);
1744 });
1745 }
1746
1747 #[gpui::test]
1748 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1749 init_test(cx);
1750
1751 let paths = PathList::new(&[Path::new("/project-a")]);
1752 let now = Utc::now();
1753
1754 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1755 let m2 = make_metadata(
1756 "session-2",
1757 "Thread Two",
1758 now - chrono::Duration::seconds(1),
1759 paths.clone(),
1760 );
1761 let m3 = make_metadata(
1762 "session-3",
1763 "Thread Three",
1764 now - chrono::Duration::seconds(2),
1765 paths,
1766 );
1767
1768 cx.update(|cx| {
1769 let store = ThreadMetadataStore::global(cx);
1770 store.update(cx, |store, cx| {
1771 store.save_all(vec![m1, m2, m3], cx);
1772 });
1773 });
1774
1775 cx.run_until_parked();
1776
1777 cx.update(|cx| {
1778 let store = ThreadMetadataStore::global(cx);
1779 let store = store.read(cx);
1780
1781 let all_entries = store
1782 .entries()
1783 .map(|e| e.session_id.0.to_string())
1784 .collect::<Vec<_>>();
1785 assert_eq!(all_entries.len(), 3);
1786 assert!(all_entries.contains(&"session-1".to_string()));
1787 assert!(all_entries.contains(&"session-2".to_string()));
1788 assert!(all_entries.contains(&"session-3".to_string()));
1789
1790 let entry_ids = store.entry_ids().collect::<Vec<_>>();
1791 assert_eq!(entry_ids.len(), 3);
1792 });
1793 }
1794
1795 #[gpui::test]
1796 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1797 init_test(cx);
1798
1799 let paths = PathList::new(&[Path::new("/project-a")]);
1800 let now = Utc::now();
1801 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1802
1803 cx.update(|cx| {
1804 let store = ThreadMetadataStore::global(cx);
1805 store.update(cx, |store, cx| {
1806 store.save(metadata, cx);
1807 });
1808 });
1809
1810 cx.run_until_parked();
1811
1812 cx.update(|cx| {
1813 let store = ThreadMetadataStore::global(cx);
1814 store.update(cx, |store, cx| {
1815 store.archive(&acp::SessionId::new("session-1"), cx);
1816 });
1817 });
1818
1819 cx.run_until_parked();
1820
1821 cx.update(|cx| {
1822 let store = ThreadMetadataStore::global(cx);
1823 store.update(cx, |store, cx| {
1824 let _ = store.reload(cx);
1825 });
1826 });
1827
1828 cx.run_until_parked();
1829
1830 cx.update(|cx| {
1831 let store = ThreadMetadataStore::global(cx);
1832 let store = store.read(cx);
1833
1834 let thread = store
1835 .entries()
1836 .find(|e| e.session_id.0.as_ref() == "session-1")
1837 .expect("thread should exist after reload");
1838 assert!(thread.archived);
1839
1840 let path_entries = store
1841 .entries_for_path(&paths)
1842 .map(|e| e.session_id.0.to_string())
1843 .collect::<Vec<_>>();
1844 assert!(path_entries.is_empty());
1845
1846 let archived = store
1847 .archived_entries()
1848 .map(|e| e.session_id.0.to_string())
1849 .collect::<Vec<_>>();
1850 assert_eq!(archived, vec!["session-1"]);
1851 });
1852 }
1853
1854 #[gpui::test]
1855 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1856 init_test(cx);
1857
1858 cx.run_until_parked();
1859
1860 cx.update(|cx| {
1861 let store = ThreadMetadataStore::global(cx);
1862 store.update(cx, |store, cx| {
1863 store.archive(&acp::SessionId::new("nonexistent"), cx);
1864 });
1865 });
1866
1867 cx.run_until_parked();
1868
1869 cx.update(|cx| {
1870 let store = ThreadMetadataStore::global(cx);
1871 let store = store.read(cx);
1872
1873 assert!(store.is_empty());
1874 assert_eq!(store.entries().count(), 0);
1875 assert_eq!(store.archived_entries().count(), 0);
1876 });
1877 }
1878
1879 #[gpui::test]
1880 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1881 init_test(cx);
1882
1883 let paths = PathList::new(&[Path::new("/project-a")]);
1884 let now = Utc::now();
1885 let metadata = make_metadata("session-1", "Thread 1", now, paths);
1886 let session_id = metadata.session_id.clone();
1887
1888 cx.update(|cx| {
1889 let store = ThreadMetadataStore::global(cx);
1890 store.update(cx, |store, cx| {
1891 store.save(metadata.clone(), cx);
1892 store.archive(&session_id, cx);
1893 });
1894 });
1895
1896 cx.run_until_parked();
1897
1898 cx.update(|cx| {
1899 let store = ThreadMetadataStore::global(cx);
1900 let store = store.read(cx);
1901
1902 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1903 pretty_assertions::assert_eq!(
1904 entries,
1905 vec![ThreadMetadata {
1906 archived: true,
1907 ..metadata
1908 }]
1909 );
1910 });
1911 }
1912}