1use std::{
2 path::{Path, PathBuf},
3 sync::Arc,
4};
5
6use acp_thread::AcpThreadEvent;
7use agent::{ThreadStore, ZED_AGENT_ID};
8use agent_client_protocol as acp;
9use anyhow::Context as _;
10use chrono::{DateTime, Utc};
11use collections::{HashMap, HashSet};
12use db::{
13 sqlez::{
14 bindable::Column, domain::Domain, statement::Statement,
15 thread_safe_connection::ThreadSafeConnection,
16 },
17 sqlez_macros::sql,
18};
19use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
20use futures::{FutureExt as _, future::Shared};
21use gpui::{AppContext as _, Entity, Global, Subscription, Task};
22use project::AgentId;
23use ui::{App, Context, SharedString};
24use util::ResultExt as _;
25use workspace::PathList;
26
27use crate::DEFAULT_THREAD_TITLE;
28
29pub fn init(cx: &mut App) {
30 ThreadMetadataStore::init_global(cx);
31
32 if cx.has_flag::<AgentV2FeatureFlag>() {
33 migrate_thread_metadata(cx);
34 }
35 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
36 if has_flag {
37 migrate_thread_metadata(cx);
38 }
39 })
40 .detach();
41}
42
43/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
44/// We skip migrating threads that do not have a project.
45///
46/// TODO: Remove this after N weeks of shipping the sidebar
47fn migrate_thread_metadata(cx: &mut App) {
48 let store = ThreadMetadataStore::global(cx);
49 let db = store.read(cx).db.clone();
50
51 cx.spawn(async move |cx| {
52 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
53
54 let is_first_migration = existing_entries.is_empty();
55
56 let mut to_migrate = store.read_with(cx, |_store, cx| {
57 ThreadStore::global(cx)
58 .read(cx)
59 .entries()
60 .filter_map(|entry| {
61 if existing_entries.contains(&entry.id.0) || entry.folder_paths.is_empty() {
62 return None;
63 }
64
65 Some(ThreadMetadata {
66 session_id: entry.id,
67 agent_id: ZED_AGENT_ID.clone(),
68 title: entry.title,
69 updated_at: entry.updated_at,
70 created_at: entry.created_at,
71 folder_paths: entry.folder_paths,
72 archived: true,
73 })
74 })
75 .collect::<Vec<_>>()
76 });
77
78 if to_migrate.is_empty() {
79 return anyhow::Ok(());
80 }
81
82 // On the first migration (no entries in DB yet), keep the 5 most
83 // recent threads per project unarchived.
84 if is_first_migration {
85 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
86 for entry in &mut to_migrate {
87 per_project
88 .entry(entry.folder_paths.clone())
89 .or_default()
90 .push(entry);
91 }
92 for entries in per_project.values_mut() {
93 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
94 for entry in entries.iter_mut().take(5) {
95 entry.archived = false;
96 }
97 }
98 }
99
100 log::info!("Migrating {} thread store entries", to_migrate.len());
101
102 // Manually save each entry to the database and call reload, otherwise
103 // we'll end up triggering lots of reloads after each save
104 for entry in to_migrate {
105 db.save(entry).await?;
106 }
107
108 log::info!("Finished migrating thread store entries");
109
110 let _ = store.update(cx, |store, cx| store.reload(cx));
111 anyhow::Ok(())
112 })
113 .detach_and_log_err(cx);
114}
115
116struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
117impl Global for GlobalThreadMetadataStore {}
118
119/// Lightweight metadata for any thread (native or ACP), enough to populate
120/// the sidebar list and route to the correct load path when clicked.
121#[derive(Debug, Clone, PartialEq)]
122pub struct ThreadMetadata {
123 pub session_id: acp::SessionId,
124 pub agent_id: AgentId,
125 pub title: SharedString,
126 pub updated_at: DateTime<Utc>,
127 pub created_at: Option<DateTime<Utc>>,
128 pub folder_paths: PathList,
129 pub archived: bool,
130}
131
132impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
133 fn from(meta: &ThreadMetadata) -> Self {
134 Self {
135 session_id: meta.session_id.clone(),
136 work_dirs: Some(meta.folder_paths.clone()),
137 title: Some(meta.title.clone()),
138 updated_at: Some(meta.updated_at),
139 created_at: meta.created_at,
140 meta: None,
141 }
142 }
143}
144
145pub struct ArchivedGitWorktree {
146 pub id: i64,
147 pub worktree_path: PathBuf,
148 pub main_repo_path: PathBuf,
149 pub branch_name: Option<String>,
150 pub commit_hash: String,
151 pub restored: bool,
152}
153
154/// The store holds all metadata needed to show threads in the sidebar/the archive.
155///
156/// Automatically listens to AcpThread events and updates metadata if it has changed.
157pub struct ThreadMetadataStore {
158 db: ThreadMetadataDb,
159 threads: HashMap<acp::SessionId, ThreadMetadata>,
160 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
161 reload_task: Option<Shared<Task<()>>>,
162 session_subscriptions: HashMap<acp::SessionId, Subscription>,
163 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
164 _db_operations_task: Task<()>,
165}
166
167#[derive(Debug, PartialEq)]
168enum DbOperation {
169 Upsert(ThreadMetadata),
170 Delete(acp::SessionId),
171}
172
173impl DbOperation {
174 fn id(&self) -> &acp::SessionId {
175 match self {
176 DbOperation::Upsert(thread) => &thread.session_id,
177 DbOperation::Delete(session_id) => session_id,
178 }
179 }
180}
181
182impl ThreadMetadataStore {
183 #[cfg(not(any(test, feature = "test-support")))]
184 pub fn init_global(cx: &mut App) {
185 if cx.has_global::<Self>() {
186 return;
187 }
188
189 let db = ThreadMetadataDb::global(cx);
190 let thread_store = cx.new(|cx| Self::new(db, cx));
191 cx.set_global(GlobalThreadMetadataStore(thread_store));
192 }
193
194 #[cfg(any(test, feature = "test-support"))]
195 pub fn init_global(cx: &mut App) {
196 let thread = std::thread::current();
197 let test_name = thread.name().unwrap_or("unknown_test");
198 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
199 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
200 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
201 cx.set_global(GlobalThreadMetadataStore(thread_store));
202 }
203
204 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
205 cx.try_global::<GlobalThreadMetadataStore>()
206 .map(|store| store.0.clone())
207 }
208
209 pub fn global(cx: &App) -> Entity<Self> {
210 cx.global::<GlobalThreadMetadataStore>().0.clone()
211 }
212
213 pub fn is_empty(&self) -> bool {
214 self.threads.is_empty()
215 }
216
217 /// Returns all thread IDs.
218 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
219 self.threads.keys().cloned()
220 }
221
222 /// Returns the metadata for a specific thread, if it exists.
223 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
224 self.threads.get(session_id)
225 }
226
227 /// Returns all threads.
228 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
229 self.threads.values()
230 }
231
232 /// Returns all archived threads.
233 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
234 self.entries().filter(|t| t.archived)
235 }
236
237 /// Returns all threads for the given path list, excluding archived threads.
238 pub fn entries_for_path(
239 &self,
240 path_list: &PathList,
241 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
242 self.threads_by_paths
243 .get(path_list)
244 .into_iter()
245 .flatten()
246 .filter_map(|s| self.threads.get(s))
247 .filter(|s| !s.archived)
248 }
249
250 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
251 let db = self.db.clone();
252 self.reload_task.take();
253
254 let list_task = cx
255 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
256
257 let reload_task = cx
258 .spawn(async move |this, cx| {
259 let Some(rows) = list_task.await.log_err() else {
260 return;
261 };
262
263 this.update(cx, |this, cx| {
264 this.threads.clear();
265 this.threads_by_paths.clear();
266
267 for row in rows {
268 this.threads_by_paths
269 .entry(row.folder_paths.clone())
270 .or_default()
271 .insert(row.session_id.clone());
272 this.threads.insert(row.session_id.clone(), row);
273 }
274
275 cx.notify();
276 })
277 .ok();
278 })
279 .shared();
280 self.reload_task = Some(reload_task.clone());
281 reload_task
282 }
283
284 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
285 if !cx.has_flag::<AgentV2FeatureFlag>() {
286 return;
287 }
288
289 for metadata in metadata {
290 self.save_internal(metadata);
291 }
292 cx.notify();
293 }
294
295 #[cfg(any(test, feature = "test-support"))]
296 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
297 self.save(metadata, cx)
298 }
299
300 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
301 if !cx.has_flag::<AgentV2FeatureFlag>() {
302 return;
303 }
304
305 self.save_internal(metadata);
306 cx.notify();
307 }
308
309 fn save_internal(&mut self, metadata: ThreadMetadata) {
310 // If the folder paths have changed, we need to clear the old entry
311 if let Some(thread) = self.threads.get(&metadata.session_id)
312 && thread.folder_paths != metadata.folder_paths
313 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
314 {
315 session_ids.remove(&metadata.session_id);
316 }
317
318 self.threads
319 .insert(metadata.session_id.clone(), metadata.clone());
320
321 self.threads_by_paths
322 .entry(metadata.folder_paths.clone())
323 .or_default()
324 .insert(metadata.session_id.clone());
325
326 self.pending_thread_ops_tx
327 .try_send(DbOperation::Upsert(metadata))
328 .log_err();
329 }
330
331 pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
332 self.update_archived(session_id, true, cx);
333 }
334
335 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
336 self.update_archived(session_id, false, cx);
337 }
338
339 fn update_archived(
340 &mut self,
341 session_id: &acp::SessionId,
342 archived: bool,
343 cx: &mut Context<Self>,
344 ) {
345 if !cx.has_flag::<AgentV2FeatureFlag>() {
346 return;
347 }
348
349 if let Some(thread) = self.threads.get(session_id) {
350 self.save_internal(ThreadMetadata {
351 archived,
352 ..thread.clone()
353 });
354 cx.notify();
355 }
356 }
357
358 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
359 if !cx.has_flag::<AgentV2FeatureFlag>() {
360 return;
361 }
362
363 if let Some(thread) = self.threads.get(&session_id)
364 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
365 {
366 session_ids.remove(&session_id);
367 }
368 self.threads.remove(&session_id);
369 self.pending_thread_ops_tx
370 .try_send(DbOperation::Delete(session_id))
371 .log_err();
372 cx.notify();
373 }
374
375 pub fn create_archived_worktree(
376 &self,
377 worktree_path: String,
378 main_repo_path: String,
379 branch_name: Option<String>,
380 commit_hash: String,
381 cx: &mut Context<Self>,
382 ) -> Task<anyhow::Result<i64>> {
383 let db = self.db.clone();
384 cx.background_spawn(async move {
385 db.create_archived_worktree(
386 &worktree_path,
387 &main_repo_path,
388 branch_name.as_deref(),
389 &commit_hash,
390 )
391 .await
392 })
393 }
394
395 pub fn get_archived_worktree_by_path(
396 &self,
397 worktree_path: String,
398 cx: &mut Context<Self>,
399 ) -> Task<anyhow::Result<Option<ArchivedGitWorktree>>> {
400 let db = self.db.clone();
401 cx.background_spawn(async move { db.get_archived_worktree_by_path(&worktree_path).await })
402 }
403
404 pub fn delete_archived_worktree(
405 &self,
406 id: i64,
407 cx: &mut Context<Self>,
408 ) -> Task<anyhow::Result<()>> {
409 let db = self.db.clone();
410 cx.background_spawn(async move { db.delete_archived_worktree(id).await })
411 }
412
413 pub fn update_archived_worktree_restored(
414 &self,
415 id: i64,
416 worktree_path: String,
417 branch_name: Option<String>,
418 cx: &mut Context<Self>,
419 ) -> Task<anyhow::Result<()>> {
420 let db = self.db.clone();
421 cx.background_spawn(async move {
422 db.update_archived_worktree_restored(id, &worktree_path, branch_name.as_deref())
423 .await
424 })
425 }
426
427 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
428 let weak_store = cx.weak_entity();
429
430 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
431 // Don't track subagent threads in the sidebar.
432 if thread.parent_session_id().is_some() {
433 return;
434 }
435
436 let thread_entity = cx.entity();
437
438 cx.on_release({
439 let weak_store = weak_store.clone();
440 move |thread, cx| {
441 weak_store
442 .update(cx, |store, cx| {
443 let session_id = thread.session_id().clone();
444 store.session_subscriptions.remove(&session_id);
445 if thread.entries().is_empty() {
446 // Empty threads can be unloaded without ever being
447 // durably persisted by the underlying agent.
448 store.delete(session_id, cx);
449 }
450 })
451 .ok();
452 }
453 })
454 .detach();
455
456 weak_store
457 .update(cx, |this, cx| {
458 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
459 this.session_subscriptions
460 .insert(thread.session_id().clone(), subscription);
461 })
462 .ok();
463 })
464 .detach();
465
466 let (tx, rx) = smol::channel::unbounded();
467 let _db_operations_task = cx.background_spawn({
468 let db = db.clone();
469 async move {
470 while let Ok(first_update) = rx.recv().await {
471 let mut updates = vec![first_update];
472 while let Ok(update) = rx.try_recv() {
473 updates.push(update);
474 }
475 let updates = Self::dedup_db_operations(updates);
476 for operation in updates {
477 match operation {
478 DbOperation::Upsert(metadata) => {
479 db.save(metadata).await.log_err();
480 }
481 DbOperation::Delete(session_id) => {
482 db.delete(session_id).await.log_err();
483 }
484 }
485 }
486 }
487 }
488 });
489
490 let mut this = Self {
491 db,
492 threads: HashMap::default(),
493 threads_by_paths: HashMap::default(),
494 reload_task: None,
495 session_subscriptions: HashMap::default(),
496 pending_thread_ops_tx: tx,
497 _db_operations_task,
498 };
499 let _ = this.reload(cx);
500 this
501 }
502
503 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
504 let mut ops = HashMap::default();
505 for operation in operations.into_iter().rev() {
506 if ops.contains_key(operation.id()) {
507 continue;
508 }
509 ops.insert(operation.id().clone(), operation);
510 }
511 ops.into_values().collect()
512 }
513
514 fn handle_thread_event(
515 &mut self,
516 thread: Entity<acp_thread::AcpThread>,
517 event: &AcpThreadEvent,
518 cx: &mut Context<Self>,
519 ) {
520 // Don't track subagent threads in the sidebar.
521 if thread.read(cx).parent_session_id().is_some() {
522 return;
523 }
524
525 match event {
526 AcpThreadEvent::NewEntry
527 | AcpThreadEvent::TitleUpdated
528 | AcpThreadEvent::EntryUpdated(_)
529 | AcpThreadEvent::EntriesRemoved(_)
530 | AcpThreadEvent::ToolAuthorizationRequested(_)
531 | AcpThreadEvent::ToolAuthorizationReceived(_)
532 | AcpThreadEvent::Retry(_)
533 | AcpThreadEvent::Stopped(_)
534 | AcpThreadEvent::Error
535 | AcpThreadEvent::LoadError(_)
536 | AcpThreadEvent::Refusal
537 | AcpThreadEvent::WorkingDirectoriesUpdated => {
538 let thread_ref = thread.read(cx);
539 let existing_thread = self.threads.get(thread_ref.session_id());
540 let session_id = thread_ref.session_id().clone();
541 let title = thread_ref
542 .title()
543 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
544
545 let updated_at = Utc::now();
546
547 let created_at = existing_thread
548 .and_then(|t| t.created_at)
549 .unwrap_or_else(|| updated_at);
550
551 let agent_id = thread_ref.connection().agent_id();
552
553 let folder_paths = {
554 let project = thread_ref.project().read(cx);
555 let paths: Vec<Arc<Path>> = project
556 .visible_worktrees(cx)
557 .map(|worktree| worktree.read(cx).abs_path())
558 .collect();
559 PathList::new(&paths)
560 };
561
562 let archived = existing_thread.map(|t| t.archived).unwrap_or(false);
563
564 let metadata = ThreadMetadata {
565 session_id,
566 agent_id,
567 title,
568 created_at: Some(created_at),
569 updated_at,
570 folder_paths,
571 archived,
572 };
573
574 self.save(metadata, cx);
575 }
576 AcpThreadEvent::TokenUsageUpdated
577 | AcpThreadEvent::SubagentSpawned(_)
578 | AcpThreadEvent::PromptCapabilitiesUpdated
579 | AcpThreadEvent::AvailableCommandsUpdated(_)
580 | AcpThreadEvent::ModeUpdated(_)
581 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
582 }
583 }
584}
585
586impl Global for ThreadMetadataStore {}
587
588struct ThreadMetadataDb(ThreadSafeConnection);
589
590impl Domain for ThreadMetadataDb {
591 const NAME: &str = stringify!(ThreadMetadataDb);
592
593 const MIGRATIONS: &[&str] = &[
594 sql!(
595 CREATE TABLE IF NOT EXISTS sidebar_threads(
596 session_id TEXT PRIMARY KEY,
597 agent_id TEXT,
598 title TEXT NOT NULL,
599 updated_at TEXT NOT NULL,
600 created_at TEXT,
601 folder_paths TEXT,
602 folder_paths_order TEXT
603 ) STRICT;
604 ),
605 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
606 sql!(
607 CREATE TABLE IF NOT EXISTS archived_git_worktrees(
608 id INTEGER PRIMARY KEY,
609 worktree_path TEXT NOT NULL,
610 main_repo_path TEXT NOT NULL,
611 branch_name TEXT,
612 commit_hash TEXT NOT NULL,
613 restored INTEGER NOT NULL DEFAULT 0
614 ) STRICT;
615
616 CREATE UNIQUE INDEX IF NOT EXISTS idx_archived_worktrees_path
617 ON archived_git_worktrees(worktree_path);
618 ),
619 ];
620}
621
622db::static_connection!(ThreadMetadataDb, []);
623
624impl ThreadMetadataDb {
625 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
626 self.select::<Arc<str>>(
627 "SELECT session_id FROM sidebar_threads \
628 ORDER BY updated_at DESC",
629 )?()
630 }
631
632 /// List all sidebar thread metadata, ordered by updated_at descending.
633 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
634 self.select::<ThreadMetadata>(
635 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
636 FROM sidebar_threads \
637 ORDER BY updated_at DESC"
638 )?()
639 }
640
641 /// Upsert metadata for a thread.
642 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
643 let id = row.session_id.0.clone();
644 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
645 None
646 } else {
647 Some(row.agent_id.to_string())
648 };
649 let title = row.title.to_string();
650 let updated_at = row.updated_at.to_rfc3339();
651 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
652 let serialized = row.folder_paths.serialize();
653 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
654 (None, None)
655 } else {
656 (Some(serialized.paths), Some(serialized.order))
657 };
658 let archived = row.archived;
659
660 self.write(move |conn| {
661 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
662 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
663 ON CONFLICT(session_id) DO UPDATE SET \
664 agent_id = excluded.agent_id, \
665 title = excluded.title, \
666 updated_at = excluded.updated_at, \
667 created_at = excluded.created_at, \
668 folder_paths = excluded.folder_paths, \
669 folder_paths_order = excluded.folder_paths_order, \
670 archived = excluded.archived";
671 let mut stmt = Statement::prepare(conn, sql)?;
672 let mut i = stmt.bind(&id, 1)?;
673 i = stmt.bind(&agent_id, i)?;
674 i = stmt.bind(&title, i)?;
675 i = stmt.bind(&updated_at, i)?;
676 i = stmt.bind(&created_at, i)?;
677 i = stmt.bind(&folder_paths, i)?;
678 i = stmt.bind(&folder_paths_order, i)?;
679 stmt.bind(&archived, i)?;
680 stmt.exec()
681 })
682 .await
683 }
684
685 /// Delete metadata for a single thread.
686 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
687 let id = session_id.0.clone();
688 self.write(move |conn| {
689 let mut stmt =
690 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
691 stmt.bind(&id, 1)?;
692 stmt.exec()
693 })
694 .await
695 }
696
697 pub async fn create_archived_worktree(
698 &self,
699 worktree_path: &str,
700 main_repo_path: &str,
701 branch_name: Option<&str>,
702 commit_hash: &str,
703 ) -> anyhow::Result<i64> {
704 let worktree_path = worktree_path.to_string();
705 let main_repo_path = main_repo_path.to_string();
706 let branch_name = branch_name.map(|s| s.to_string());
707 let commit_hash = commit_hash.to_string();
708 self.write(move |conn| {
709 let mut stmt = Statement::prepare(
710 conn,
711 "INSERT INTO archived_git_worktrees(\
712 worktree_path, main_repo_path, branch_name, commit_hash\
713 ) VALUES (?, ?, ?, ?)",
714 )?;
715 let mut i = stmt.bind(&worktree_path, 1)?;
716 i = stmt.bind(&main_repo_path, i)?;
717 i = stmt.bind(&branch_name, i)?;
718 stmt.bind(&commit_hash, i)?;
719 stmt.exec()?;
720
721 let mut id_stmt = Statement::prepare(conn, "SELECT last_insert_rowid()")?;
722 let id = id_stmt
723 .maybe_row::<i64>()?
724 .ok_or_else(|| anyhow::anyhow!("No row ID returned after INSERT"))?;
725
726 Ok(id)
727 })
728 .await
729 }
730
731 pub async fn get_archived_worktree_by_path(
732 &self,
733 worktree_path: &str,
734 ) -> anyhow::Result<Option<ArchivedGitWorktree>> {
735 let worktree_path = worktree_path.to_string();
736 self.select_row_bound::<String, ArchivedGitWorktree>(
737 "SELECT id, worktree_path, main_repo_path, branch_name, commit_hash, restored \
738 FROM archived_git_worktrees WHERE worktree_path = ?",
739 )?(worktree_path)
740 }
741
742 pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> {
743 self.write(move |conn| {
744 let mut stmt =
745 Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
746 stmt.bind(&id, 1)?;
747 stmt.exec()
748 })
749 .await
750 }
751
752 pub async fn update_archived_worktree_restored(
753 &self,
754 id: i64,
755 worktree_path: &str,
756 branch_name: Option<&str>,
757 ) -> anyhow::Result<()> {
758 let worktree_path = worktree_path.to_string();
759 let branch_name = branch_name.map(|s| s.to_string());
760 self.write(move |conn| {
761 let mut stmt = Statement::prepare(
762 conn,
763 "UPDATE archived_git_worktrees \
764 SET restored = 1, worktree_path = ?, branch_name = ? \
765 WHERE id = ?",
766 )?;
767 let mut i = stmt.bind(&worktree_path, 1)?;
768 i = stmt.bind(&branch_name, i)?;
769 stmt.bind(&id, i)?;
770 stmt.exec()
771 })
772 .await
773 }
774}
775
776impl Column for ThreadMetadata {
777 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
778 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
779 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
780 let (title, next): (String, i32) = Column::column(statement, next)?;
781 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
782 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
783 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
784 let (folder_paths_order_str, next): (Option<String>, i32) =
785 Column::column(statement, next)?;
786 let (archived, next): (bool, i32) = Column::column(statement, next)?;
787
788 let agent_id = agent_id
789 .map(|id| AgentId::new(id))
790 .unwrap_or(ZED_AGENT_ID.clone());
791
792 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
793 let created_at = created_at_str
794 .as_deref()
795 .map(DateTime::parse_from_rfc3339)
796 .transpose()?
797 .map(|dt| dt.with_timezone(&Utc));
798
799 let folder_paths = folder_paths_str
800 .map(|paths| {
801 PathList::deserialize(&util::path_list::SerializedPathList {
802 paths,
803 order: folder_paths_order_str.unwrap_or_default(),
804 })
805 })
806 .unwrap_or_default();
807
808 Ok((
809 ThreadMetadata {
810 session_id: acp::SessionId::new(id),
811 agent_id,
812 title: title.into(),
813 updated_at,
814 created_at,
815 folder_paths,
816 archived,
817 },
818 next,
819 ))
820 }
821}
822
823impl Column for ArchivedGitWorktree {
824 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
825 let (id, next): (i64, i32) = Column::column(statement, start_index)?;
826 let (worktree_path_str, next): (String, i32) = Column::column(statement, next)?;
827 let (main_repo_path_str, next): (String, i32) = Column::column(statement, next)?;
828 let (branch_name, next): (Option<String>, i32) = Column::column(statement, next)?;
829 let (commit_hash, next): (String, i32) = Column::column(statement, next)?;
830 let (restored_int, next): (i64, i32) = Column::column(statement, next)?;
831 Ok((
832 ArchivedGitWorktree {
833 id,
834 worktree_path: PathBuf::from(worktree_path_str),
835 main_repo_path: PathBuf::from(main_repo_path_str),
836 branch_name,
837 commit_hash,
838 restored: restored_int != 0,
839 },
840 next,
841 ))
842 }
843}
844
845#[cfg(test)]
846mod tests {
847 use super::*;
848 use acp_thread::{AgentConnection, StubAgentConnection};
849 use action_log::ActionLog;
850 use agent::DbThread;
851 use agent_client_protocol as acp;
852 use feature_flags::FeatureFlagAppExt;
853 use gpui::TestAppContext;
854 use project::FakeFs;
855 use project::Project;
856 use std::path::Path;
857 use std::rc::Rc;
858
859 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
860 DbThread {
861 title: title.to_string().into(),
862 messages: Vec::new(),
863 updated_at,
864 detailed_summary: None,
865 initial_project_snapshot: None,
866 cumulative_token_usage: Default::default(),
867 request_token_usage: Default::default(),
868 model: None,
869 profile: None,
870 imported: false,
871 subagent_context: None,
872 speed: None,
873 thinking_enabled: false,
874 thinking_effort: None,
875 draft_prompt: None,
876 ui_scroll_position: None,
877 }
878 }
879
880 fn make_metadata(
881 session_id: &str,
882 title: &str,
883 updated_at: DateTime<Utc>,
884 folder_paths: PathList,
885 ) -> ThreadMetadata {
886 ThreadMetadata {
887 archived: false,
888 session_id: acp::SessionId::new(session_id),
889 agent_id: agent::ZED_AGENT_ID.clone(),
890 title: title.to_string().into(),
891 updated_at,
892 created_at: Some(updated_at),
893 folder_paths,
894 }
895 }
896
897 fn init_test(cx: &mut TestAppContext) {
898 cx.update(|cx| {
899 let settings_store = settings::SettingsStore::test(cx);
900 cx.set_global(settings_store);
901 cx.update_flags(true, vec!["agent-v2".to_string()]);
902 ThreadMetadataStore::init_global(cx);
903 ThreadStore::init_global(cx);
904 });
905 cx.run_until_parked();
906 }
907
908 #[gpui::test]
909 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
910 let first_paths = PathList::new(&[Path::new("/project-a")]);
911 let second_paths = PathList::new(&[Path::new("/project-b")]);
912 let now = Utc::now();
913 let older = now - chrono::Duration::seconds(1);
914
915 let thread = std::thread::current();
916 let test_name = thread.name().unwrap_or("unknown_test");
917 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
918 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
919 &db_name,
920 )));
921
922 db.save(make_metadata(
923 "session-1",
924 "First Thread",
925 now,
926 first_paths.clone(),
927 ))
928 .await
929 .unwrap();
930 db.save(make_metadata(
931 "session-2",
932 "Second Thread",
933 older,
934 second_paths.clone(),
935 ))
936 .await
937 .unwrap();
938
939 cx.update(|cx| {
940 let settings_store = settings::SettingsStore::test(cx);
941 cx.set_global(settings_store);
942 cx.update_flags(true, vec!["agent-v2".to_string()]);
943 ThreadMetadataStore::init_global(cx);
944 });
945
946 cx.run_until_parked();
947
948 cx.update(|cx| {
949 let store = ThreadMetadataStore::global(cx);
950 let store = store.read(cx);
951
952 let entry_ids = store
953 .entry_ids()
954 .map(|session_id| session_id.0.to_string())
955 .collect::<Vec<_>>();
956 assert_eq!(entry_ids.len(), 2);
957 assert!(entry_ids.contains(&"session-1".to_string()));
958 assert!(entry_ids.contains(&"session-2".to_string()));
959
960 let first_path_entries = store
961 .entries_for_path(&first_paths)
962 .map(|entry| entry.session_id.0.to_string())
963 .collect::<Vec<_>>();
964 assert_eq!(first_path_entries, vec!["session-1"]);
965
966 let second_path_entries = store
967 .entries_for_path(&second_paths)
968 .map(|entry| entry.session_id.0.to_string())
969 .collect::<Vec<_>>();
970 assert_eq!(second_path_entries, vec!["session-2"]);
971 });
972 }
973
974 #[gpui::test]
975 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
976 init_test(cx);
977
978 let first_paths = PathList::new(&[Path::new("/project-a")]);
979 let second_paths = PathList::new(&[Path::new("/project-b")]);
980 let initial_time = Utc::now();
981 let updated_time = initial_time + chrono::Duration::seconds(1);
982
983 let initial_metadata = make_metadata(
984 "session-1",
985 "First Thread",
986 initial_time,
987 first_paths.clone(),
988 );
989
990 let second_metadata = make_metadata(
991 "session-2",
992 "Second Thread",
993 initial_time,
994 second_paths.clone(),
995 );
996
997 cx.update(|cx| {
998 let store = ThreadMetadataStore::global(cx);
999 store.update(cx, |store, cx| {
1000 store.save(initial_metadata, cx);
1001 store.save(second_metadata, cx);
1002 });
1003 });
1004
1005 cx.run_until_parked();
1006
1007 cx.update(|cx| {
1008 let store = ThreadMetadataStore::global(cx);
1009 let store = store.read(cx);
1010
1011 let first_path_entries = store
1012 .entries_for_path(&first_paths)
1013 .map(|entry| entry.session_id.0.to_string())
1014 .collect::<Vec<_>>();
1015 assert_eq!(first_path_entries, vec!["session-1"]);
1016
1017 let second_path_entries = store
1018 .entries_for_path(&second_paths)
1019 .map(|entry| entry.session_id.0.to_string())
1020 .collect::<Vec<_>>();
1021 assert_eq!(second_path_entries, vec!["session-2"]);
1022 });
1023
1024 let moved_metadata = make_metadata(
1025 "session-1",
1026 "First Thread",
1027 updated_time,
1028 second_paths.clone(),
1029 );
1030
1031 cx.update(|cx| {
1032 let store = ThreadMetadataStore::global(cx);
1033 store.update(cx, |store, cx| {
1034 store.save(moved_metadata, cx);
1035 });
1036 });
1037
1038 cx.run_until_parked();
1039
1040 cx.update(|cx| {
1041 let store = ThreadMetadataStore::global(cx);
1042 let store = store.read(cx);
1043
1044 let entry_ids = store
1045 .entry_ids()
1046 .map(|session_id| session_id.0.to_string())
1047 .collect::<Vec<_>>();
1048 assert_eq!(entry_ids.len(), 2);
1049 assert!(entry_ids.contains(&"session-1".to_string()));
1050 assert!(entry_ids.contains(&"session-2".to_string()));
1051
1052 let first_path_entries = store
1053 .entries_for_path(&first_paths)
1054 .map(|entry| entry.session_id.0.to_string())
1055 .collect::<Vec<_>>();
1056 assert!(first_path_entries.is_empty());
1057
1058 let second_path_entries = store
1059 .entries_for_path(&second_paths)
1060 .map(|entry| entry.session_id.0.to_string())
1061 .collect::<Vec<_>>();
1062 assert_eq!(second_path_entries.len(), 2);
1063 assert!(second_path_entries.contains(&"session-1".to_string()));
1064 assert!(second_path_entries.contains(&"session-2".to_string()));
1065 });
1066
1067 cx.update(|cx| {
1068 let store = ThreadMetadataStore::global(cx);
1069 store.update(cx, |store, cx| {
1070 store.delete(acp::SessionId::new("session-2"), cx);
1071 });
1072 });
1073
1074 cx.run_until_parked();
1075
1076 cx.update(|cx| {
1077 let store = ThreadMetadataStore::global(cx);
1078 let store = store.read(cx);
1079
1080 let entry_ids = store
1081 .entry_ids()
1082 .map(|session_id| session_id.0.to_string())
1083 .collect::<Vec<_>>();
1084 assert_eq!(entry_ids, vec!["session-1"]);
1085
1086 let second_path_entries = store
1087 .entries_for_path(&second_paths)
1088 .map(|entry| entry.session_id.0.to_string())
1089 .collect::<Vec<_>>();
1090 assert_eq!(second_path_entries, vec!["session-1"]);
1091 });
1092 }
1093
1094 #[gpui::test]
1095 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1096 init_test(cx);
1097
1098 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1099 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1100 let now = Utc::now();
1101
1102 let existing_metadata = ThreadMetadata {
1103 session_id: acp::SessionId::new("a-session-0"),
1104 agent_id: agent::ZED_AGENT_ID.clone(),
1105 title: "Existing Metadata".into(),
1106 updated_at: now - chrono::Duration::seconds(10),
1107 created_at: Some(now - chrono::Duration::seconds(10)),
1108 folder_paths: project_a_paths.clone(),
1109 archived: false,
1110 };
1111
1112 cx.update(|cx| {
1113 let store = ThreadMetadataStore::global(cx);
1114 store.update(cx, |store, cx| {
1115 store.save(existing_metadata, cx);
1116 });
1117 });
1118 cx.run_until_parked();
1119
1120 let threads_to_save = vec![
1121 (
1122 "a-session-0",
1123 "Thread A0 From Native Store",
1124 project_a_paths.clone(),
1125 now,
1126 ),
1127 (
1128 "a-session-1",
1129 "Thread A1",
1130 project_a_paths.clone(),
1131 now + chrono::Duration::seconds(1),
1132 ),
1133 (
1134 "b-session-0",
1135 "Thread B0",
1136 project_b_paths.clone(),
1137 now + chrono::Duration::seconds(2),
1138 ),
1139 (
1140 "projectless",
1141 "Projectless",
1142 PathList::default(),
1143 now + chrono::Duration::seconds(3),
1144 ),
1145 ];
1146
1147 for (session_id, title, paths, updated_at) in &threads_to_save {
1148 let save_task = cx.update(|cx| {
1149 let thread_store = ThreadStore::global(cx);
1150 let session_id = session_id.to_string();
1151 let title = title.to_string();
1152 let paths = paths.clone();
1153 thread_store.update(cx, |store, cx| {
1154 store.save_thread(
1155 acp::SessionId::new(session_id),
1156 make_db_thread(&title, *updated_at),
1157 paths,
1158 cx,
1159 )
1160 })
1161 });
1162 save_task.await.unwrap();
1163 cx.run_until_parked();
1164 }
1165
1166 cx.update(|cx| migrate_thread_metadata(cx));
1167 cx.run_until_parked();
1168
1169 let list = cx.update(|cx| {
1170 let store = ThreadMetadataStore::global(cx);
1171 store.read(cx).entries().cloned().collect::<Vec<_>>()
1172 });
1173
1174 assert_eq!(list.len(), 3);
1175 assert!(
1176 list.iter()
1177 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1178 );
1179
1180 let existing_metadata = list
1181 .iter()
1182 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1183 .unwrap();
1184 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1185 assert!(!existing_metadata.archived);
1186
1187 let migrated_session_ids = list
1188 .iter()
1189 .map(|metadata| metadata.session_id.0.as_ref())
1190 .collect::<Vec<_>>();
1191 assert!(migrated_session_ids.contains(&"a-session-1"));
1192 assert!(migrated_session_ids.contains(&"b-session-0"));
1193 assert!(!migrated_session_ids.contains(&"projectless"));
1194
1195 let migrated_entries = list
1196 .iter()
1197 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1198 .collect::<Vec<_>>();
1199 assert!(
1200 migrated_entries
1201 .iter()
1202 .all(|metadata| !metadata.folder_paths.is_empty())
1203 );
1204 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1205 }
1206
1207 #[gpui::test]
1208 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1209 cx: &mut TestAppContext,
1210 ) {
1211 init_test(cx);
1212
1213 let project_paths = PathList::new(&[Path::new("/project-a")]);
1214 let existing_updated_at = Utc::now();
1215
1216 let existing_metadata = ThreadMetadata {
1217 session_id: acp::SessionId::new("existing-session"),
1218 agent_id: agent::ZED_AGENT_ID.clone(),
1219 title: "Existing Metadata".into(),
1220 updated_at: existing_updated_at,
1221 created_at: Some(existing_updated_at),
1222 folder_paths: project_paths.clone(),
1223 archived: false,
1224 };
1225
1226 cx.update(|cx| {
1227 let store = ThreadMetadataStore::global(cx);
1228 store.update(cx, |store, cx| {
1229 store.save(existing_metadata, cx);
1230 });
1231 });
1232 cx.run_until_parked();
1233
1234 let save_task = cx.update(|cx| {
1235 let thread_store = ThreadStore::global(cx);
1236 thread_store.update(cx, |store, cx| {
1237 store.save_thread(
1238 acp::SessionId::new("existing-session"),
1239 make_db_thread(
1240 "Updated Native Thread Title",
1241 existing_updated_at + chrono::Duration::seconds(1),
1242 ),
1243 project_paths.clone(),
1244 cx,
1245 )
1246 })
1247 });
1248 save_task.await.unwrap();
1249 cx.run_until_parked();
1250
1251 cx.update(|cx| migrate_thread_metadata(cx));
1252 cx.run_until_parked();
1253
1254 let list = cx.update(|cx| {
1255 let store = ThreadMetadataStore::global(cx);
1256 store.read(cx).entries().cloned().collect::<Vec<_>>()
1257 });
1258
1259 assert_eq!(list.len(), 1);
1260 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1261 }
1262
1263 #[gpui::test]
1264 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1265 cx: &mut TestAppContext,
1266 ) {
1267 init_test(cx);
1268
1269 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1270 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1271 let now = Utc::now();
1272
1273 // Create 7 threads for project A and 3 for project B
1274 let mut threads_to_save = Vec::new();
1275 for i in 0..7 {
1276 threads_to_save.push((
1277 format!("a-session-{i}"),
1278 format!("Thread A{i}"),
1279 project_a_paths.clone(),
1280 now + chrono::Duration::seconds(i as i64),
1281 ));
1282 }
1283 for i in 0..3 {
1284 threads_to_save.push((
1285 format!("b-session-{i}"),
1286 format!("Thread B{i}"),
1287 project_b_paths.clone(),
1288 now + chrono::Duration::seconds(i as i64),
1289 ));
1290 }
1291
1292 for (session_id, title, paths, updated_at) in &threads_to_save {
1293 let save_task = cx.update(|cx| {
1294 let thread_store = ThreadStore::global(cx);
1295 let session_id = session_id.to_string();
1296 let title = title.to_string();
1297 let paths = paths.clone();
1298 thread_store.update(cx, |store, cx| {
1299 store.save_thread(
1300 acp::SessionId::new(session_id),
1301 make_db_thread(&title, *updated_at),
1302 paths,
1303 cx,
1304 )
1305 })
1306 });
1307 save_task.await.unwrap();
1308 cx.run_until_parked();
1309 }
1310
1311 cx.update(|cx| migrate_thread_metadata(cx));
1312 cx.run_until_parked();
1313
1314 let list = cx.update(|cx| {
1315 let store = ThreadMetadataStore::global(cx);
1316 store.read(cx).entries().cloned().collect::<Vec<_>>()
1317 });
1318
1319 assert_eq!(list.len(), 10);
1320
1321 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1322 let mut project_a_entries: Vec<_> = list
1323 .iter()
1324 .filter(|m| m.folder_paths == project_a_paths)
1325 .collect();
1326 assert_eq!(project_a_entries.len(), 7);
1327 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1328
1329 for entry in &project_a_entries[..5] {
1330 assert!(
1331 !entry.archived,
1332 "Expected {} to be unarchived (top 5 most recent)",
1333 entry.session_id.0
1334 );
1335 }
1336 for entry in &project_a_entries[5..] {
1337 assert!(
1338 entry.archived,
1339 "Expected {} to be archived (older than top 5)",
1340 entry.session_id.0
1341 );
1342 }
1343
1344 // Project B: all 3 should be unarchived (under the limit)
1345 let project_b_entries: Vec<_> = list
1346 .iter()
1347 .filter(|m| m.folder_paths == project_b_paths)
1348 .collect();
1349 assert_eq!(project_b_entries.len(), 3);
1350 assert!(project_b_entries.iter().all(|m| !m.archived));
1351 }
1352
1353 #[gpui::test]
1354 async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1355 init_test(cx);
1356
1357 let fs = FakeFs::new(cx.executor());
1358 let project = Project::test(fs, None::<&Path>, cx).await;
1359 let connection = Rc::new(StubAgentConnection::new());
1360
1361 let thread = cx
1362 .update(|cx| {
1363 connection
1364 .clone()
1365 .new_session(project.clone(), PathList::default(), cx)
1366 })
1367 .await
1368 .unwrap();
1369 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1370
1371 cx.update(|cx| {
1372 thread.update(cx, |thread, cx| {
1373 thread.set_title("Draft Thread".into(), cx).detach();
1374 });
1375 });
1376 cx.run_until_parked();
1377
1378 let metadata_ids = cx.update(|cx| {
1379 ThreadMetadataStore::global(cx)
1380 .read(cx)
1381 .entry_ids()
1382 .collect::<Vec<_>>()
1383 });
1384 assert_eq!(metadata_ids, vec![session_id]);
1385
1386 drop(thread);
1387 cx.update(|_| {});
1388 cx.run_until_parked();
1389 cx.run_until_parked();
1390
1391 let metadata_ids = cx.update(|cx| {
1392 ThreadMetadataStore::global(cx)
1393 .read(cx)
1394 .entry_ids()
1395 .collect::<Vec<_>>()
1396 });
1397 assert!(
1398 metadata_ids.is_empty(),
1399 "expected empty draft thread metadata to be deleted on release"
1400 );
1401 }
1402
1403 #[gpui::test]
1404 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1405 init_test(cx);
1406
1407 let fs = FakeFs::new(cx.executor());
1408 let project = Project::test(fs, None::<&Path>, cx).await;
1409 let connection = Rc::new(StubAgentConnection::new());
1410
1411 let thread = cx
1412 .update(|cx| {
1413 connection
1414 .clone()
1415 .new_session(project.clone(), PathList::default(), cx)
1416 })
1417 .await
1418 .unwrap();
1419 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1420
1421 cx.update(|cx| {
1422 thread.update(cx, |thread, cx| {
1423 thread.push_user_content_block(None, "Hello".into(), cx);
1424 });
1425 });
1426 cx.run_until_parked();
1427
1428 let metadata_ids = cx.update(|cx| {
1429 ThreadMetadataStore::global(cx)
1430 .read(cx)
1431 .entry_ids()
1432 .collect::<Vec<_>>()
1433 });
1434 assert_eq!(metadata_ids, vec![session_id.clone()]);
1435
1436 drop(thread);
1437 cx.update(|_| {});
1438 cx.run_until_parked();
1439
1440 let metadata_ids = cx.update(|cx| {
1441 ThreadMetadataStore::global(cx)
1442 .read(cx)
1443 .entry_ids()
1444 .collect::<Vec<_>>()
1445 });
1446 assert_eq!(metadata_ids, vec![session_id]);
1447 }
1448
1449 #[gpui::test]
1450 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1451 init_test(cx);
1452
1453 let fs = FakeFs::new(cx.executor());
1454 let project = Project::test(fs, None::<&Path>, cx).await;
1455 let connection = Rc::new(StubAgentConnection::new());
1456
1457 // Create a regular (non-subagent) AcpThread.
1458 let regular_thread = cx
1459 .update(|cx| {
1460 connection
1461 .clone()
1462 .new_session(project.clone(), PathList::default(), cx)
1463 })
1464 .await
1465 .unwrap();
1466
1467 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1468
1469 // Set a title on the regular thread to trigger a save via handle_thread_update.
1470 cx.update(|cx| {
1471 regular_thread.update(cx, |thread, cx| {
1472 thread.set_title("Regular Thread".into(), cx).detach();
1473 });
1474 });
1475 cx.run_until_parked();
1476
1477 // Create a subagent AcpThread
1478 let subagent_session_id = acp::SessionId::new("subagent-session");
1479 let subagent_thread = cx.update(|cx| {
1480 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1481 cx.new(|cx| {
1482 acp_thread::AcpThread::new(
1483 Some(regular_session_id.clone()),
1484 Some("Subagent Thread".into()),
1485 None,
1486 connection.clone(),
1487 project.clone(),
1488 action_log,
1489 subagent_session_id.clone(),
1490 watch::Receiver::constant(acp::PromptCapabilities::new()),
1491 cx,
1492 )
1493 })
1494 });
1495
1496 // Set a title on the subagent thread to trigger handle_thread_update.
1497 cx.update(|cx| {
1498 subagent_thread.update(cx, |thread, cx| {
1499 thread
1500 .set_title("Subagent Thread Title".into(), cx)
1501 .detach();
1502 });
1503 });
1504 cx.run_until_parked();
1505
1506 // List all metadata from the store cache.
1507 let list = cx.update(|cx| {
1508 let store = ThreadMetadataStore::global(cx);
1509 store.read(cx).entries().cloned().collect::<Vec<_>>()
1510 });
1511
1512 // The subagent thread should NOT appear in the sidebar metadata.
1513 // Only the regular thread should be listed.
1514 assert_eq!(
1515 list.len(),
1516 1,
1517 "Expected only the regular thread in sidebar metadata, \
1518 but found {} entries (subagent threads are leaking into the sidebar)",
1519 list.len(),
1520 );
1521 assert_eq!(list[0].session_id, regular_session_id);
1522 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1523 }
1524
1525 #[test]
1526 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1527 let now = Utc::now();
1528
1529 let operations = vec![
1530 DbOperation::Upsert(make_metadata(
1531 "session-1",
1532 "First Thread",
1533 now,
1534 PathList::default(),
1535 )),
1536 DbOperation::Delete(acp::SessionId::new("session-1")),
1537 ];
1538
1539 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1540
1541 assert_eq!(deduped.len(), 1);
1542 assert_eq!(
1543 deduped[0],
1544 DbOperation::Delete(acp::SessionId::new("session-1"))
1545 );
1546 }
1547
1548 #[test]
1549 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1550 let now = Utc::now();
1551 let later = now + chrono::Duration::seconds(1);
1552
1553 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1554 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1555
1556 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1557 DbOperation::Upsert(old_metadata),
1558 DbOperation::Upsert(new_metadata.clone()),
1559 ]);
1560
1561 assert_eq!(deduped.len(), 1);
1562 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1563 }
1564
1565 #[test]
1566 fn test_dedup_db_operations_preserves_distinct_sessions() {
1567 let now = Utc::now();
1568
1569 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1570 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1571 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1572 DbOperation::Upsert(metadata1.clone()),
1573 DbOperation::Upsert(metadata2.clone()),
1574 ]);
1575
1576 assert_eq!(deduped.len(), 2);
1577 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1578 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1579 }
1580
1581 #[gpui::test]
1582 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1583 init_test(cx);
1584
1585 let paths = PathList::new(&[Path::new("/project-a")]);
1586 let now = Utc::now();
1587 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1588
1589 cx.update(|cx| {
1590 let store = ThreadMetadataStore::global(cx);
1591 store.update(cx, |store, cx| {
1592 store.save(metadata, cx);
1593 });
1594 });
1595
1596 cx.run_until_parked();
1597
1598 cx.update(|cx| {
1599 let store = ThreadMetadataStore::global(cx);
1600 let store = store.read(cx);
1601
1602 let path_entries = store
1603 .entries_for_path(&paths)
1604 .map(|e| e.session_id.0.to_string())
1605 .collect::<Vec<_>>();
1606 assert_eq!(path_entries, vec!["session-1"]);
1607
1608 let archived = store
1609 .archived_entries()
1610 .map(|e| e.session_id.0.to_string())
1611 .collect::<Vec<_>>();
1612 assert!(archived.is_empty());
1613 });
1614
1615 cx.update(|cx| {
1616 let store = ThreadMetadataStore::global(cx);
1617 store.update(cx, |store, cx| {
1618 store.archive(&acp::SessionId::new("session-1"), cx);
1619 });
1620 });
1621
1622 cx.run_until_parked();
1623
1624 cx.update(|cx| {
1625 let store = ThreadMetadataStore::global(cx);
1626 let store = store.read(cx);
1627
1628 let path_entries = store
1629 .entries_for_path(&paths)
1630 .map(|e| e.session_id.0.to_string())
1631 .collect::<Vec<_>>();
1632 assert!(path_entries.is_empty());
1633
1634 let archived = store.archived_entries().collect::<Vec<_>>();
1635 assert_eq!(archived.len(), 1);
1636 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1637 assert!(archived[0].archived);
1638 });
1639
1640 cx.update(|cx| {
1641 let store = ThreadMetadataStore::global(cx);
1642 store.update(cx, |store, cx| {
1643 store.unarchive(&acp::SessionId::new("session-1"), cx);
1644 });
1645 });
1646
1647 cx.run_until_parked();
1648
1649 cx.update(|cx| {
1650 let store = ThreadMetadataStore::global(cx);
1651 let store = store.read(cx);
1652
1653 let path_entries = store
1654 .entries_for_path(&paths)
1655 .map(|e| e.session_id.0.to_string())
1656 .collect::<Vec<_>>();
1657 assert_eq!(path_entries, vec!["session-1"]);
1658
1659 let archived = store
1660 .archived_entries()
1661 .map(|e| e.session_id.0.to_string())
1662 .collect::<Vec<_>>();
1663 assert!(archived.is_empty());
1664 });
1665 }
1666
1667 #[gpui::test]
1668 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1669 init_test(cx);
1670
1671 let paths = PathList::new(&[Path::new("/project-a")]);
1672 let now = Utc::now();
1673
1674 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1675 let metadata2 = make_metadata(
1676 "session-2",
1677 "Archived Thread",
1678 now - chrono::Duration::seconds(1),
1679 paths.clone(),
1680 );
1681
1682 cx.update(|cx| {
1683 let store = ThreadMetadataStore::global(cx);
1684 store.update(cx, |store, cx| {
1685 store.save(metadata1, cx);
1686 store.save(metadata2, cx);
1687 });
1688 });
1689
1690 cx.run_until_parked();
1691
1692 cx.update(|cx| {
1693 let store = ThreadMetadataStore::global(cx);
1694 store.update(cx, |store, cx| {
1695 store.archive(&acp::SessionId::new("session-2"), cx);
1696 });
1697 });
1698
1699 cx.run_until_parked();
1700
1701 cx.update(|cx| {
1702 let store = ThreadMetadataStore::global(cx);
1703 let store = store.read(cx);
1704
1705 let path_entries = store
1706 .entries_for_path(&paths)
1707 .map(|e| e.session_id.0.to_string())
1708 .collect::<Vec<_>>();
1709 assert_eq!(path_entries, vec!["session-1"]);
1710
1711 let all_entries = store
1712 .entries()
1713 .map(|e| e.session_id.0.to_string())
1714 .collect::<Vec<_>>();
1715 assert_eq!(all_entries.len(), 2);
1716 assert!(all_entries.contains(&"session-1".to_string()));
1717 assert!(all_entries.contains(&"session-2".to_string()));
1718
1719 let archived = store
1720 .archived_entries()
1721 .map(|e| e.session_id.0.to_string())
1722 .collect::<Vec<_>>();
1723 assert_eq!(archived, vec!["session-2"]);
1724 });
1725 }
1726
1727 #[gpui::test]
1728 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1729 init_test(cx);
1730
1731 let paths = PathList::new(&[Path::new("/project-a")]);
1732 let now = Utc::now();
1733
1734 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1735 let m2 = make_metadata(
1736 "session-2",
1737 "Thread Two",
1738 now - chrono::Duration::seconds(1),
1739 paths.clone(),
1740 );
1741 let m3 = make_metadata(
1742 "session-3",
1743 "Thread Three",
1744 now - chrono::Duration::seconds(2),
1745 paths,
1746 );
1747
1748 cx.update(|cx| {
1749 let store = ThreadMetadataStore::global(cx);
1750 store.update(cx, |store, cx| {
1751 store.save_all(vec![m1, m2, m3], cx);
1752 });
1753 });
1754
1755 cx.run_until_parked();
1756
1757 cx.update(|cx| {
1758 let store = ThreadMetadataStore::global(cx);
1759 let store = store.read(cx);
1760
1761 let all_entries = store
1762 .entries()
1763 .map(|e| e.session_id.0.to_string())
1764 .collect::<Vec<_>>();
1765 assert_eq!(all_entries.len(), 3);
1766 assert!(all_entries.contains(&"session-1".to_string()));
1767 assert!(all_entries.contains(&"session-2".to_string()));
1768 assert!(all_entries.contains(&"session-3".to_string()));
1769
1770 let entry_ids = store.entry_ids().collect::<Vec<_>>();
1771 assert_eq!(entry_ids.len(), 3);
1772 });
1773 }
1774
1775 #[gpui::test]
1776 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1777 init_test(cx);
1778
1779 let paths = PathList::new(&[Path::new("/project-a")]);
1780 let now = Utc::now();
1781 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1782
1783 cx.update(|cx| {
1784 let store = ThreadMetadataStore::global(cx);
1785 store.update(cx, |store, cx| {
1786 store.save(metadata, cx);
1787 });
1788 });
1789
1790 cx.run_until_parked();
1791
1792 cx.update(|cx| {
1793 let store = ThreadMetadataStore::global(cx);
1794 store.update(cx, |store, cx| {
1795 store.archive(&acp::SessionId::new("session-1"), cx);
1796 });
1797 });
1798
1799 cx.run_until_parked();
1800
1801 cx.update(|cx| {
1802 let store = ThreadMetadataStore::global(cx);
1803 store.update(cx, |store, cx| {
1804 let _ = store.reload(cx);
1805 });
1806 });
1807
1808 cx.run_until_parked();
1809
1810 cx.update(|cx| {
1811 let store = ThreadMetadataStore::global(cx);
1812 let store = store.read(cx);
1813
1814 let thread = store
1815 .entries()
1816 .find(|e| e.session_id.0.as_ref() == "session-1")
1817 .expect("thread should exist after reload");
1818 assert!(thread.archived);
1819
1820 let path_entries = store
1821 .entries_for_path(&paths)
1822 .map(|e| e.session_id.0.to_string())
1823 .collect::<Vec<_>>();
1824 assert!(path_entries.is_empty());
1825
1826 let archived = store
1827 .archived_entries()
1828 .map(|e| e.session_id.0.to_string())
1829 .collect::<Vec<_>>();
1830 assert_eq!(archived, vec!["session-1"]);
1831 });
1832 }
1833
1834 #[gpui::test]
1835 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1836 init_test(cx);
1837
1838 cx.run_until_parked();
1839
1840 cx.update(|cx| {
1841 let store = ThreadMetadataStore::global(cx);
1842 store.update(cx, |store, cx| {
1843 store.archive(&acp::SessionId::new("nonexistent"), cx);
1844 });
1845 });
1846
1847 cx.run_until_parked();
1848
1849 cx.update(|cx| {
1850 let store = ThreadMetadataStore::global(cx);
1851 let store = store.read(cx);
1852
1853 assert!(store.is_empty());
1854 assert_eq!(store.entries().count(), 0);
1855 assert_eq!(store.archived_entries().count(), 0);
1856 });
1857 }
1858
1859 #[gpui::test]
1860 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1861 init_test(cx);
1862
1863 let paths = PathList::new(&[Path::new("/project-a")]);
1864 let now = Utc::now();
1865 let metadata = make_metadata("session-1", "Thread 1", now, paths);
1866 let session_id = metadata.session_id.clone();
1867
1868 cx.update(|cx| {
1869 let store = ThreadMetadataStore::global(cx);
1870 store.update(cx, |store, cx| {
1871 store.save(metadata.clone(), cx);
1872 store.archive(&session_id, cx);
1873 });
1874 });
1875
1876 cx.run_until_parked();
1877
1878 cx.update(|cx| {
1879 let store = ThreadMetadataStore::global(cx);
1880 let store = store.read(cx);
1881
1882 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1883 pretty_assertions::assert_eq!(
1884 entries,
1885 vec![ThreadMetadata {
1886 archived: true,
1887 ..metadata
1888 }]
1889 );
1890 });
1891 }
1892}