1use std::{
2 path::{Path, PathBuf},
3 sync::Arc,
4};
5
6use acp_thread::AcpThreadEvent;
7use agent::{ThreadStore, ZED_AGENT_ID};
8use agent_client_protocol as acp;
9use anyhow::Context as _;
10use chrono::{DateTime, Utc};
11use collections::{HashMap, HashSet};
12use db::{
13 sqlez::{
14 bindable::Column, domain::Domain, statement::Statement,
15 thread_safe_connection::ThreadSafeConnection,
16 },
17 sqlez_macros::sql,
18};
19use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
20use futures::{FutureExt as _, future::Shared};
21use gpui::{AppContext as _, Entity, Global, Subscription, Task};
22use project::AgentId;
23use ui::{App, Context, SharedString};
24use util::ResultExt as _;
25use workspace::PathList;
26
27use crate::DEFAULT_THREAD_TITLE;
28
29pub fn init(cx: &mut App) {
30 ThreadMetadataStore::init_global(cx);
31
32 if cx.has_flag::<AgentV2FeatureFlag>() {
33 migrate_thread_metadata(cx);
34 }
35 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
36 if has_flag {
37 migrate_thread_metadata(cx);
38 }
39 })
40 .detach();
41}
42
43/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
44/// We skip migrating threads that do not have a project.
45///
46/// TODO: Remove this after N weeks of shipping the sidebar
47fn migrate_thread_metadata(cx: &mut App) {
48 let store = ThreadMetadataStore::global(cx);
49 let db = store.read(cx).db.clone();
50
51 cx.spawn(async move |cx| {
52 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
53
54 let is_first_migration = existing_entries.is_empty();
55
56 let mut to_migrate = store.read_with(cx, |_store, cx| {
57 ThreadStore::global(cx)
58 .read(cx)
59 .entries()
60 .filter_map(|entry| {
61 if existing_entries.contains(&entry.id.0) || entry.folder_paths.is_empty() {
62 return None;
63 }
64
65 Some(ThreadMetadata {
66 session_id: entry.id,
67 agent_id: ZED_AGENT_ID.clone(),
68 title: entry.title,
69 updated_at: entry.updated_at,
70 created_at: entry.created_at,
71 folder_paths: entry.folder_paths,
72 archived: true,
73 })
74 })
75 .collect::<Vec<_>>()
76 });
77
78 if to_migrate.is_empty() {
79 return anyhow::Ok(());
80 }
81
82 // On the first migration (no entries in DB yet), keep the 5 most
83 // recent threads per project unarchived.
84 if is_first_migration {
85 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
86 for entry in &mut to_migrate {
87 per_project
88 .entry(entry.folder_paths.clone())
89 .or_default()
90 .push(entry);
91 }
92 for entries in per_project.values_mut() {
93 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
94 for entry in entries.iter_mut().take(5) {
95 entry.archived = false;
96 }
97 }
98 }
99
100 log::info!("Migrating {} thread store entries", to_migrate.len());
101
102 // Manually save each entry to the database and call reload, otherwise
103 // we'll end up triggering lots of reloads after each save
104 for entry in to_migrate {
105 db.save(entry).await?;
106 }
107
108 log::info!("Finished migrating thread store entries");
109
110 let _ = store.update(cx, |store, cx| store.reload(cx));
111 anyhow::Ok(())
112 })
113 .detach_and_log_err(cx);
114}
115
116struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
117impl Global for GlobalThreadMetadataStore {}
118
119/// Lightweight metadata for any thread (native or ACP), enough to populate
120/// the sidebar list and route to the correct load path when clicked.
121#[derive(Debug, Clone, PartialEq)]
122pub struct ThreadMetadata {
123 pub session_id: acp::SessionId,
124 pub agent_id: AgentId,
125 pub title: SharedString,
126 pub updated_at: DateTime<Utc>,
127 pub created_at: Option<DateTime<Utc>>,
128 pub folder_paths: PathList,
129 pub archived: bool,
130}
131
132impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
133 fn from(meta: &ThreadMetadata) -> Self {
134 Self {
135 session_id: meta.session_id.clone(),
136 work_dirs: Some(meta.folder_paths.clone()),
137 title: Some(meta.title.clone()),
138 updated_at: Some(meta.updated_at),
139 created_at: meta.created_at,
140 meta: None,
141 }
142 }
143}
144
145/// Record of a git worktree that was archived (deleted from disk) when its last thread was archived.
146/// Lives in this module because it shares the same SQLite database as thread metadata.
147pub struct ArchivedGitWorktree {
148 pub id: i64,
149 pub worktree_path: PathBuf,
150 pub main_repo_path: PathBuf,
151 pub branch_name: Option<String>,
152 pub commit_hash: String,
153 pub restored: bool,
154}
155
156/// The store holds all metadata needed to show threads in the sidebar/the archive.
157///
158/// Automatically listens to AcpThread events and updates metadata if it has changed.
159pub struct ThreadMetadataStore {
160 db: ThreadMetadataDb,
161 threads: HashMap<acp::SessionId, ThreadMetadata>,
162 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
163 reload_task: Option<Shared<Task<()>>>,
164 session_subscriptions: HashMap<acp::SessionId, Subscription>,
165 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
166 _db_operations_task: Task<()>,
167}
168
169#[derive(Debug, PartialEq)]
170enum DbOperation {
171 Upsert(ThreadMetadata),
172 Delete(acp::SessionId),
173}
174
175impl DbOperation {
176 fn id(&self) -> &acp::SessionId {
177 match self {
178 DbOperation::Upsert(thread) => &thread.session_id,
179 DbOperation::Delete(session_id) => session_id,
180 }
181 }
182}
183
184impl ThreadMetadataStore {
185 #[cfg(not(any(test, feature = "test-support")))]
186 pub fn init_global(cx: &mut App) {
187 if cx.has_global::<Self>() {
188 return;
189 }
190
191 let db = ThreadMetadataDb::global(cx);
192 let thread_store = cx.new(|cx| Self::new(db, cx));
193 cx.set_global(GlobalThreadMetadataStore(thread_store));
194 }
195
196 #[cfg(any(test, feature = "test-support"))]
197 pub fn init_global(cx: &mut App) {
198 let thread = std::thread::current();
199 let test_name = thread.name().unwrap_or("unknown_test");
200 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
201 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
202 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
203 cx.set_global(GlobalThreadMetadataStore(thread_store));
204 }
205
206 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
207 cx.try_global::<GlobalThreadMetadataStore>()
208 .map(|store| store.0.clone())
209 }
210
211 pub fn global(cx: &App) -> Entity<Self> {
212 cx.global::<GlobalThreadMetadataStore>().0.clone()
213 }
214
215 pub fn is_empty(&self) -> bool {
216 self.threads.is_empty()
217 }
218
219 /// Returns all thread IDs.
220 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
221 self.threads.keys().cloned()
222 }
223
224 /// Returns the metadata for a specific thread, if it exists.
225 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
226 self.threads.get(session_id)
227 }
228
229 /// Returns all threads.
230 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
231 self.threads.values()
232 }
233
234 /// Returns all archived threads.
235 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
236 self.entries().filter(|t| t.archived)
237 }
238
239 /// Returns all threads for the given path list, excluding archived threads.
240 pub fn entries_for_path(
241 &self,
242 path_list: &PathList,
243 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
244 self.threads_by_paths
245 .get(path_list)
246 .into_iter()
247 .flatten()
248 .filter_map(|s| self.threads.get(s))
249 .filter(|s| !s.archived)
250 }
251
252 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
253 let db = self.db.clone();
254 self.reload_task.take();
255
256 let list_task = cx
257 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
258
259 let reload_task = cx
260 .spawn(async move |this, cx| {
261 let Some(rows) = list_task.await.log_err() else {
262 return;
263 };
264
265 this.update(cx, |this, cx| {
266 this.threads.clear();
267 this.threads_by_paths.clear();
268
269 for row in rows {
270 this.threads_by_paths
271 .entry(row.folder_paths.clone())
272 .or_default()
273 .insert(row.session_id.clone());
274 this.threads.insert(row.session_id.clone(), row);
275 }
276
277 cx.notify();
278 })
279 .ok();
280 })
281 .shared();
282 self.reload_task = Some(reload_task.clone());
283 reload_task
284 }
285
286 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
287 if !cx.has_flag::<AgentV2FeatureFlag>() {
288 return;
289 }
290
291 for metadata in metadata {
292 self.save_internal(metadata);
293 }
294 cx.notify();
295 }
296
297 #[cfg(any(test, feature = "test-support"))]
298 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
299 self.save(metadata, cx)
300 }
301
302 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
303 if !cx.has_flag::<AgentV2FeatureFlag>() {
304 return;
305 }
306
307 self.save_internal(metadata);
308 cx.notify();
309 }
310
311 fn save_internal(&mut self, metadata: ThreadMetadata) {
312 // If the folder paths have changed, we need to clear the old entry
313 if let Some(thread) = self.threads.get(&metadata.session_id)
314 && thread.folder_paths != metadata.folder_paths
315 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
316 {
317 session_ids.remove(&metadata.session_id);
318 }
319
320 self.threads
321 .insert(metadata.session_id.clone(), metadata.clone());
322
323 self.threads_by_paths
324 .entry(metadata.folder_paths.clone())
325 .or_default()
326 .insert(metadata.session_id.clone());
327
328 self.pending_thread_ops_tx
329 .try_send(DbOperation::Upsert(metadata))
330 .log_err();
331 }
332
333 pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
334 self.update_archived(session_id, true, cx);
335 }
336
337 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
338 self.update_archived(session_id, false, cx);
339 }
340
341 fn update_archived(
342 &mut self,
343 session_id: &acp::SessionId,
344 archived: bool,
345 cx: &mut Context<Self>,
346 ) {
347 if !cx.has_flag::<AgentV2FeatureFlag>() {
348 return;
349 }
350
351 if let Some(thread) = self.threads.get(session_id) {
352 self.save_internal(ThreadMetadata {
353 archived,
354 ..thread.clone()
355 });
356 cx.notify();
357 }
358 }
359
360 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
361 if !cx.has_flag::<AgentV2FeatureFlag>() {
362 return;
363 }
364
365 if let Some(thread) = self.threads.get(&session_id)
366 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
367 {
368 session_ids.remove(&session_id);
369 }
370 self.threads.remove(&session_id);
371 self.pending_thread_ops_tx
372 .try_send(DbOperation::Delete(session_id))
373 .log_err();
374 cx.notify();
375 }
376
377 pub fn create_archived_worktree(
378 &self,
379 worktree_path: String,
380 main_repo_path: String,
381 branch_name: Option<String>,
382 commit_hash: String,
383 cx: &mut Context<Self>,
384 ) -> Task<anyhow::Result<i64>> {
385 let db = self.db.clone();
386 cx.background_spawn(async move {
387 db.create_archived_worktree(
388 &worktree_path,
389 &main_repo_path,
390 branch_name.as_deref(),
391 &commit_hash,
392 )
393 .await
394 })
395 }
396
397 pub fn get_archived_worktree_by_path(
398 &self,
399 worktree_path: String,
400 cx: &mut Context<Self>,
401 ) -> Task<anyhow::Result<Option<ArchivedGitWorktree>>> {
402 let db = self.db.clone();
403 cx.background_spawn(async move { db.get_archived_worktree_by_path(&worktree_path).await })
404 }
405
406 pub fn delete_archived_worktree(
407 &self,
408 id: i64,
409 cx: &mut Context<Self>,
410 ) -> Task<anyhow::Result<()>> {
411 let db = self.db.clone();
412 cx.background_spawn(async move { db.delete_archived_worktree(id).await })
413 }
414
415 pub fn update_archived_worktree_restored(
416 &self,
417 id: i64,
418 worktree_path: String,
419 branch_name: Option<String>,
420 cx: &mut Context<Self>,
421 ) -> Task<anyhow::Result<()>> {
422 let db = self.db.clone();
423 cx.background_spawn(async move {
424 db.update_archived_worktree_restored(id, &worktree_path, branch_name.as_deref())
425 .await
426 })
427 }
428
429 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
430 let weak_store = cx.weak_entity();
431
432 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
433 // Don't track subagent threads in the sidebar.
434 if thread.parent_session_id().is_some() {
435 return;
436 }
437
438 let thread_entity = cx.entity();
439
440 cx.on_release({
441 let weak_store = weak_store.clone();
442 move |thread, cx| {
443 weak_store
444 .update(cx, |store, cx| {
445 let session_id = thread.session_id().clone();
446 store.session_subscriptions.remove(&session_id);
447 if thread.entries().is_empty() {
448 // Empty threads can be unloaded without ever being
449 // durably persisted by the underlying agent.
450 store.delete(session_id, cx);
451 }
452 })
453 .ok();
454 }
455 })
456 .detach();
457
458 weak_store
459 .update(cx, |this, cx| {
460 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
461 this.session_subscriptions
462 .insert(thread.session_id().clone(), subscription);
463 })
464 .ok();
465 })
466 .detach();
467
468 let (tx, rx) = smol::channel::unbounded();
469 let _db_operations_task = cx.background_spawn({
470 let db = db.clone();
471 async move {
472 while let Ok(first_update) = rx.recv().await {
473 let mut updates = vec![first_update];
474 while let Ok(update) = rx.try_recv() {
475 updates.push(update);
476 }
477 let updates = Self::dedup_db_operations(updates);
478 for operation in updates {
479 match operation {
480 DbOperation::Upsert(metadata) => {
481 db.save(metadata).await.log_err();
482 }
483 DbOperation::Delete(session_id) => {
484 db.delete(session_id).await.log_err();
485 }
486 }
487 }
488 }
489 }
490 });
491
492 let mut this = Self {
493 db,
494 threads: HashMap::default(),
495 threads_by_paths: HashMap::default(),
496 reload_task: None,
497 session_subscriptions: HashMap::default(),
498 pending_thread_ops_tx: tx,
499 _db_operations_task,
500 };
501 let _ = this.reload(cx);
502 this
503 }
504
505 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
506 let mut ops = HashMap::default();
507 for operation in operations.into_iter().rev() {
508 if ops.contains_key(operation.id()) {
509 continue;
510 }
511 ops.insert(operation.id().clone(), operation);
512 }
513 ops.into_values().collect()
514 }
515
516 fn handle_thread_event(
517 &mut self,
518 thread: Entity<acp_thread::AcpThread>,
519 event: &AcpThreadEvent,
520 cx: &mut Context<Self>,
521 ) {
522 // Don't track subagent threads in the sidebar.
523 if thread.read(cx).parent_session_id().is_some() {
524 return;
525 }
526
527 match event {
528 AcpThreadEvent::NewEntry
529 | AcpThreadEvent::TitleUpdated
530 | AcpThreadEvent::EntryUpdated(_)
531 | AcpThreadEvent::EntriesRemoved(_)
532 | AcpThreadEvent::ToolAuthorizationRequested(_)
533 | AcpThreadEvent::ToolAuthorizationReceived(_)
534 | AcpThreadEvent::Retry(_)
535 | AcpThreadEvent::Stopped(_)
536 | AcpThreadEvent::Error
537 | AcpThreadEvent::LoadError(_)
538 | AcpThreadEvent::Refusal
539 | AcpThreadEvent::WorkingDirectoriesUpdated => {
540 let thread_ref = thread.read(cx);
541 let existing_thread = self.threads.get(thread_ref.session_id());
542 let session_id = thread_ref.session_id().clone();
543 let title = thread_ref
544 .title()
545 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
546
547 let updated_at = Utc::now();
548
549 let created_at = existing_thread
550 .and_then(|t| t.created_at)
551 .unwrap_or_else(|| updated_at);
552
553 let agent_id = thread_ref.connection().agent_id();
554
555 let folder_paths = {
556 let project = thread_ref.project().read(cx);
557 let paths: Vec<Arc<Path>> = project
558 .visible_worktrees(cx)
559 .map(|worktree| worktree.read(cx).abs_path())
560 .collect();
561 PathList::new(&paths)
562 };
563
564 let archived = existing_thread.map(|t| t.archived).unwrap_or(false);
565
566 let metadata = ThreadMetadata {
567 session_id,
568 agent_id,
569 title,
570 created_at: Some(created_at),
571 updated_at,
572 folder_paths,
573 archived,
574 };
575
576 self.save(metadata, cx);
577 }
578 AcpThreadEvent::TokenUsageUpdated
579 | AcpThreadEvent::SubagentSpawned(_)
580 | AcpThreadEvent::PromptCapabilitiesUpdated
581 | AcpThreadEvent::AvailableCommandsUpdated(_)
582 | AcpThreadEvent::ModeUpdated(_)
583 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
584 }
585 }
586}
587
588impl Global for ThreadMetadataStore {}
589
590struct ThreadMetadataDb(ThreadSafeConnection);
591
592impl Domain for ThreadMetadataDb {
593 const NAME: &str = stringify!(ThreadMetadataDb);
594
595 const MIGRATIONS: &[&str] = &[
596 sql!(
597 CREATE TABLE IF NOT EXISTS sidebar_threads(
598 session_id TEXT PRIMARY KEY,
599 agent_id TEXT,
600 title TEXT NOT NULL,
601 updated_at TEXT NOT NULL,
602 created_at TEXT,
603 folder_paths TEXT,
604 folder_paths_order TEXT
605 ) STRICT;
606 ),
607 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
608 sql!(
609 CREATE TABLE IF NOT EXISTS archived_git_worktrees(
610 id INTEGER PRIMARY KEY,
611 worktree_path TEXT NOT NULL,
612 main_repo_path TEXT NOT NULL,
613 branch_name TEXT,
614 commit_hash TEXT NOT NULL,
615 restored INTEGER NOT NULL DEFAULT 0
616 ) STRICT;
617
618 CREATE UNIQUE INDEX IF NOT EXISTS idx_archived_worktrees_path
619 ON archived_git_worktrees(worktree_path);
620 ),
621 ];
622}
623
624db::static_connection!(ThreadMetadataDb, []);
625
626impl ThreadMetadataDb {
627 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
628 self.select::<Arc<str>>(
629 "SELECT session_id FROM sidebar_threads \
630 ORDER BY updated_at DESC",
631 )?()
632 }
633
634 /// List all sidebar thread metadata, ordered by updated_at descending.
635 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
636 self.select::<ThreadMetadata>(
637 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
638 FROM sidebar_threads \
639 ORDER BY updated_at DESC"
640 )?()
641 }
642
643 /// Upsert metadata for a thread.
644 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
645 let id = row.session_id.0.clone();
646 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
647 None
648 } else {
649 Some(row.agent_id.to_string())
650 };
651 let title = row.title.to_string();
652 let updated_at = row.updated_at.to_rfc3339();
653 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
654 let serialized = row.folder_paths.serialize();
655 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
656 (None, None)
657 } else {
658 (Some(serialized.paths), Some(serialized.order))
659 };
660 let archived = row.archived;
661
662 self.write(move |conn| {
663 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
664 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
665 ON CONFLICT(session_id) DO UPDATE SET \
666 agent_id = excluded.agent_id, \
667 title = excluded.title, \
668 updated_at = excluded.updated_at, \
669 created_at = excluded.created_at, \
670 folder_paths = excluded.folder_paths, \
671 folder_paths_order = excluded.folder_paths_order, \
672 archived = excluded.archived";
673 let mut stmt = Statement::prepare(conn, sql)?;
674 let mut i = stmt.bind(&id, 1)?;
675 i = stmt.bind(&agent_id, i)?;
676 i = stmt.bind(&title, i)?;
677 i = stmt.bind(&updated_at, i)?;
678 i = stmt.bind(&created_at, i)?;
679 i = stmt.bind(&folder_paths, i)?;
680 i = stmt.bind(&folder_paths_order, i)?;
681 stmt.bind(&archived, i)?;
682 stmt.exec()
683 })
684 .await
685 }
686
687 /// Delete metadata for a single thread.
688 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
689 let id = session_id.0.clone();
690 self.write(move |conn| {
691 let mut stmt =
692 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
693 stmt.bind(&id, 1)?;
694 stmt.exec()
695 })
696 .await
697 }
698
699 pub async fn create_archived_worktree(
700 &self,
701 worktree_path: &str,
702 main_repo_path: &str,
703 branch_name: Option<&str>,
704 commit_hash: &str,
705 ) -> anyhow::Result<i64> {
706 let worktree_path = worktree_path.to_string();
707 let main_repo_path = main_repo_path.to_string();
708 let branch_name = branch_name.map(|s| s.to_string());
709 let commit_hash = commit_hash.to_string();
710 self.write(move |conn| {
711 let mut stmt = Statement::prepare(
712 conn,
713 "INSERT OR REPLACE INTO archived_git_worktrees(\
714 worktree_path, main_repo_path, branch_name, commit_hash\
715 ) VALUES (?, ?, ?, ?)",
716 )?;
717 let mut i = stmt.bind(&worktree_path, 1)?;
718 i = stmt.bind(&main_repo_path, i)?;
719 i = stmt.bind(&branch_name, i)?;
720 stmt.bind(&commit_hash, i)?;
721 stmt.exec()?;
722
723 let mut id_stmt = Statement::prepare(conn, "SELECT last_insert_rowid()")?;
724 let id = id_stmt
725 .maybe_row::<i64>()?
726 .ok_or_else(|| anyhow::anyhow!("No row ID returned after INSERT"))?;
727
728 Ok(id)
729 })
730 .await
731 }
732
733 pub async fn get_archived_worktree_by_path(
734 &self,
735 worktree_path: &str,
736 ) -> anyhow::Result<Option<ArchivedGitWorktree>> {
737 let worktree_path = worktree_path.to_string();
738 self.select_row_bound::<String, ArchivedGitWorktree>(
739 "SELECT id, worktree_path, main_repo_path, branch_name, commit_hash, restored \
740 FROM archived_git_worktrees WHERE worktree_path = ?",
741 )?(worktree_path)
742 }
743
744 pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> {
745 self.write(move |conn| {
746 let mut stmt =
747 Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
748 stmt.bind(&id, 1)?;
749 stmt.exec()
750 })
751 .await
752 }
753
754 pub async fn update_archived_worktree_restored(
755 &self,
756 id: i64,
757 worktree_path: &str,
758 branch_name: Option<&str>,
759 ) -> anyhow::Result<()> {
760 let worktree_path = worktree_path.to_string();
761 let branch_name = branch_name.map(|s| s.to_string());
762 self.write(move |conn| {
763 let mut stmt = Statement::prepare(
764 conn,
765 "UPDATE archived_git_worktrees \
766 SET restored = 1, worktree_path = ?, branch_name = ? \
767 WHERE id = ?",
768 )?;
769 let mut i = stmt.bind(&worktree_path, 1)?;
770 i = stmt.bind(&branch_name, i)?;
771 stmt.bind(&id, i)?;
772 stmt.exec()
773 })
774 .await
775 }
776}
777
778impl Column for ThreadMetadata {
779 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
780 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
781 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
782 let (title, next): (String, i32) = Column::column(statement, next)?;
783 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
784 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
785 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
786 let (folder_paths_order_str, next): (Option<String>, i32) =
787 Column::column(statement, next)?;
788 let (archived, next): (bool, i32) = Column::column(statement, next)?;
789
790 let agent_id = agent_id
791 .map(|id| AgentId::new(id))
792 .unwrap_or(ZED_AGENT_ID.clone());
793
794 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
795 let created_at = created_at_str
796 .as_deref()
797 .map(DateTime::parse_from_rfc3339)
798 .transpose()?
799 .map(|dt| dt.with_timezone(&Utc));
800
801 let folder_paths = folder_paths_str
802 .map(|paths| {
803 PathList::deserialize(&util::path_list::SerializedPathList {
804 paths,
805 order: folder_paths_order_str.unwrap_or_default(),
806 })
807 })
808 .unwrap_or_default();
809
810 Ok((
811 ThreadMetadata {
812 session_id: acp::SessionId::new(id),
813 agent_id,
814 title: title.into(),
815 updated_at,
816 created_at,
817 folder_paths,
818 archived,
819 },
820 next,
821 ))
822 }
823}
824
825impl Column for ArchivedGitWorktree {
826 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
827 let (id, next): (i64, i32) = Column::column(statement, start_index)?;
828 let (worktree_path_str, next): (String, i32) = Column::column(statement, next)?;
829 let (main_repo_path_str, next): (String, i32) = Column::column(statement, next)?;
830 let (branch_name, next): (Option<String>, i32) = Column::column(statement, next)?;
831 let (commit_hash, next): (String, i32) = Column::column(statement, next)?;
832 let (restored_int, next): (i64, i32) = Column::column(statement, next)?;
833 Ok((
834 ArchivedGitWorktree {
835 id,
836 worktree_path: PathBuf::from(worktree_path_str),
837 main_repo_path: PathBuf::from(main_repo_path_str),
838 branch_name,
839 commit_hash,
840 restored: restored_int != 0,
841 },
842 next,
843 ))
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 use super::*;
850 use acp_thread::{AgentConnection, StubAgentConnection};
851 use action_log::ActionLog;
852 use agent::DbThread;
853 use agent_client_protocol as acp;
854 use feature_flags::FeatureFlagAppExt;
855 use gpui::TestAppContext;
856 use project::FakeFs;
857 use project::Project;
858 use std::path::Path;
859 use std::rc::Rc;
860
861 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
862 DbThread {
863 title: title.to_string().into(),
864 messages: Vec::new(),
865 updated_at,
866 detailed_summary: None,
867 initial_project_snapshot: None,
868 cumulative_token_usage: Default::default(),
869 request_token_usage: Default::default(),
870 model: None,
871 profile: None,
872 imported: false,
873 subagent_context: None,
874 speed: None,
875 thinking_enabled: false,
876 thinking_effort: None,
877 draft_prompt: None,
878 ui_scroll_position: None,
879 }
880 }
881
882 fn make_metadata(
883 session_id: &str,
884 title: &str,
885 updated_at: DateTime<Utc>,
886 folder_paths: PathList,
887 ) -> ThreadMetadata {
888 ThreadMetadata {
889 archived: false,
890 session_id: acp::SessionId::new(session_id),
891 agent_id: agent::ZED_AGENT_ID.clone(),
892 title: title.to_string().into(),
893 updated_at,
894 created_at: Some(updated_at),
895 folder_paths,
896 }
897 }
898
899 fn init_test(cx: &mut TestAppContext) {
900 cx.update(|cx| {
901 let settings_store = settings::SettingsStore::test(cx);
902 cx.set_global(settings_store);
903 cx.update_flags(true, vec!["agent-v2".to_string()]);
904 ThreadMetadataStore::init_global(cx);
905 ThreadStore::init_global(cx);
906 });
907 cx.run_until_parked();
908 }
909
910 #[gpui::test]
911 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
912 let first_paths = PathList::new(&[Path::new("/project-a")]);
913 let second_paths = PathList::new(&[Path::new("/project-b")]);
914 let now = Utc::now();
915 let older = now - chrono::Duration::seconds(1);
916
917 let thread = std::thread::current();
918 let test_name = thread.name().unwrap_or("unknown_test");
919 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
920 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
921 &db_name,
922 )));
923
924 db.save(make_metadata(
925 "session-1",
926 "First Thread",
927 now,
928 first_paths.clone(),
929 ))
930 .await
931 .unwrap();
932 db.save(make_metadata(
933 "session-2",
934 "Second Thread",
935 older,
936 second_paths.clone(),
937 ))
938 .await
939 .unwrap();
940
941 cx.update(|cx| {
942 let settings_store = settings::SettingsStore::test(cx);
943 cx.set_global(settings_store);
944 cx.update_flags(true, vec!["agent-v2".to_string()]);
945 ThreadMetadataStore::init_global(cx);
946 });
947
948 cx.run_until_parked();
949
950 cx.update(|cx| {
951 let store = ThreadMetadataStore::global(cx);
952 let store = store.read(cx);
953
954 let entry_ids = store
955 .entry_ids()
956 .map(|session_id| session_id.0.to_string())
957 .collect::<Vec<_>>();
958 assert_eq!(entry_ids.len(), 2);
959 assert!(entry_ids.contains(&"session-1".to_string()));
960 assert!(entry_ids.contains(&"session-2".to_string()));
961
962 let first_path_entries = store
963 .entries_for_path(&first_paths)
964 .map(|entry| entry.session_id.0.to_string())
965 .collect::<Vec<_>>();
966 assert_eq!(first_path_entries, vec!["session-1"]);
967
968 let second_path_entries = store
969 .entries_for_path(&second_paths)
970 .map(|entry| entry.session_id.0.to_string())
971 .collect::<Vec<_>>();
972 assert_eq!(second_path_entries, vec!["session-2"]);
973 });
974 }
975
976 #[gpui::test]
977 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
978 init_test(cx);
979
980 let first_paths = PathList::new(&[Path::new("/project-a")]);
981 let second_paths = PathList::new(&[Path::new("/project-b")]);
982 let initial_time = Utc::now();
983 let updated_time = initial_time + chrono::Duration::seconds(1);
984
985 let initial_metadata = make_metadata(
986 "session-1",
987 "First Thread",
988 initial_time,
989 first_paths.clone(),
990 );
991
992 let second_metadata = make_metadata(
993 "session-2",
994 "Second Thread",
995 initial_time,
996 second_paths.clone(),
997 );
998
999 cx.update(|cx| {
1000 let store = ThreadMetadataStore::global(cx);
1001 store.update(cx, |store, cx| {
1002 store.save(initial_metadata, cx);
1003 store.save(second_metadata, cx);
1004 });
1005 });
1006
1007 cx.run_until_parked();
1008
1009 cx.update(|cx| {
1010 let store = ThreadMetadataStore::global(cx);
1011 let store = store.read(cx);
1012
1013 let first_path_entries = store
1014 .entries_for_path(&first_paths)
1015 .map(|entry| entry.session_id.0.to_string())
1016 .collect::<Vec<_>>();
1017 assert_eq!(first_path_entries, vec!["session-1"]);
1018
1019 let second_path_entries = store
1020 .entries_for_path(&second_paths)
1021 .map(|entry| entry.session_id.0.to_string())
1022 .collect::<Vec<_>>();
1023 assert_eq!(second_path_entries, vec!["session-2"]);
1024 });
1025
1026 let moved_metadata = make_metadata(
1027 "session-1",
1028 "First Thread",
1029 updated_time,
1030 second_paths.clone(),
1031 );
1032
1033 cx.update(|cx| {
1034 let store = ThreadMetadataStore::global(cx);
1035 store.update(cx, |store, cx| {
1036 store.save(moved_metadata, cx);
1037 });
1038 });
1039
1040 cx.run_until_parked();
1041
1042 cx.update(|cx| {
1043 let store = ThreadMetadataStore::global(cx);
1044 let store = store.read(cx);
1045
1046 let entry_ids = store
1047 .entry_ids()
1048 .map(|session_id| session_id.0.to_string())
1049 .collect::<Vec<_>>();
1050 assert_eq!(entry_ids.len(), 2);
1051 assert!(entry_ids.contains(&"session-1".to_string()));
1052 assert!(entry_ids.contains(&"session-2".to_string()));
1053
1054 let first_path_entries = store
1055 .entries_for_path(&first_paths)
1056 .map(|entry| entry.session_id.0.to_string())
1057 .collect::<Vec<_>>();
1058 assert!(first_path_entries.is_empty());
1059
1060 let second_path_entries = store
1061 .entries_for_path(&second_paths)
1062 .map(|entry| entry.session_id.0.to_string())
1063 .collect::<Vec<_>>();
1064 assert_eq!(second_path_entries.len(), 2);
1065 assert!(second_path_entries.contains(&"session-1".to_string()));
1066 assert!(second_path_entries.contains(&"session-2".to_string()));
1067 });
1068
1069 cx.update(|cx| {
1070 let store = ThreadMetadataStore::global(cx);
1071 store.update(cx, |store, cx| {
1072 store.delete(acp::SessionId::new("session-2"), cx);
1073 });
1074 });
1075
1076 cx.run_until_parked();
1077
1078 cx.update(|cx| {
1079 let store = ThreadMetadataStore::global(cx);
1080 let store = store.read(cx);
1081
1082 let entry_ids = store
1083 .entry_ids()
1084 .map(|session_id| session_id.0.to_string())
1085 .collect::<Vec<_>>();
1086 assert_eq!(entry_ids, vec!["session-1"]);
1087
1088 let second_path_entries = store
1089 .entries_for_path(&second_paths)
1090 .map(|entry| entry.session_id.0.to_string())
1091 .collect::<Vec<_>>();
1092 assert_eq!(second_path_entries, vec!["session-1"]);
1093 });
1094 }
1095
1096 #[gpui::test]
1097 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1098 init_test(cx);
1099
1100 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1101 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1102 let now = Utc::now();
1103
1104 let existing_metadata = ThreadMetadata {
1105 session_id: acp::SessionId::new("a-session-0"),
1106 agent_id: agent::ZED_AGENT_ID.clone(),
1107 title: "Existing Metadata".into(),
1108 updated_at: now - chrono::Duration::seconds(10),
1109 created_at: Some(now - chrono::Duration::seconds(10)),
1110 folder_paths: project_a_paths.clone(),
1111 archived: false,
1112 };
1113
1114 cx.update(|cx| {
1115 let store = ThreadMetadataStore::global(cx);
1116 store.update(cx, |store, cx| {
1117 store.save(existing_metadata, cx);
1118 });
1119 });
1120 cx.run_until_parked();
1121
1122 let threads_to_save = vec![
1123 (
1124 "a-session-0",
1125 "Thread A0 From Native Store",
1126 project_a_paths.clone(),
1127 now,
1128 ),
1129 (
1130 "a-session-1",
1131 "Thread A1",
1132 project_a_paths.clone(),
1133 now + chrono::Duration::seconds(1),
1134 ),
1135 (
1136 "b-session-0",
1137 "Thread B0",
1138 project_b_paths.clone(),
1139 now + chrono::Duration::seconds(2),
1140 ),
1141 (
1142 "projectless",
1143 "Projectless",
1144 PathList::default(),
1145 now + chrono::Duration::seconds(3),
1146 ),
1147 ];
1148
1149 for (session_id, title, paths, updated_at) in &threads_to_save {
1150 let save_task = cx.update(|cx| {
1151 let thread_store = ThreadStore::global(cx);
1152 let session_id = session_id.to_string();
1153 let title = title.to_string();
1154 let paths = paths.clone();
1155 thread_store.update(cx, |store, cx| {
1156 store.save_thread(
1157 acp::SessionId::new(session_id),
1158 make_db_thread(&title, *updated_at),
1159 paths,
1160 cx,
1161 )
1162 })
1163 });
1164 save_task.await.unwrap();
1165 cx.run_until_parked();
1166 }
1167
1168 cx.update(|cx| migrate_thread_metadata(cx));
1169 cx.run_until_parked();
1170
1171 let list = cx.update(|cx| {
1172 let store = ThreadMetadataStore::global(cx);
1173 store.read(cx).entries().cloned().collect::<Vec<_>>()
1174 });
1175
1176 assert_eq!(list.len(), 3);
1177 assert!(
1178 list.iter()
1179 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1180 );
1181
1182 let existing_metadata = list
1183 .iter()
1184 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1185 .unwrap();
1186 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1187 assert!(!existing_metadata.archived);
1188
1189 let migrated_session_ids = list
1190 .iter()
1191 .map(|metadata| metadata.session_id.0.as_ref())
1192 .collect::<Vec<_>>();
1193 assert!(migrated_session_ids.contains(&"a-session-1"));
1194 assert!(migrated_session_ids.contains(&"b-session-0"));
1195 assert!(!migrated_session_ids.contains(&"projectless"));
1196
1197 let migrated_entries = list
1198 .iter()
1199 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1200 .collect::<Vec<_>>();
1201 assert!(
1202 migrated_entries
1203 .iter()
1204 .all(|metadata| !metadata.folder_paths.is_empty())
1205 );
1206 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1207 }
1208
1209 #[gpui::test]
1210 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1211 cx: &mut TestAppContext,
1212 ) {
1213 init_test(cx);
1214
1215 let project_paths = PathList::new(&[Path::new("/project-a")]);
1216 let existing_updated_at = Utc::now();
1217
1218 let existing_metadata = ThreadMetadata {
1219 session_id: acp::SessionId::new("existing-session"),
1220 agent_id: agent::ZED_AGENT_ID.clone(),
1221 title: "Existing Metadata".into(),
1222 updated_at: existing_updated_at,
1223 created_at: Some(existing_updated_at),
1224 folder_paths: project_paths.clone(),
1225 archived: false,
1226 };
1227
1228 cx.update(|cx| {
1229 let store = ThreadMetadataStore::global(cx);
1230 store.update(cx, |store, cx| {
1231 store.save(existing_metadata, cx);
1232 });
1233 });
1234 cx.run_until_parked();
1235
1236 let save_task = cx.update(|cx| {
1237 let thread_store = ThreadStore::global(cx);
1238 thread_store.update(cx, |store, cx| {
1239 store.save_thread(
1240 acp::SessionId::new("existing-session"),
1241 make_db_thread(
1242 "Updated Native Thread Title",
1243 existing_updated_at + chrono::Duration::seconds(1),
1244 ),
1245 project_paths.clone(),
1246 cx,
1247 )
1248 })
1249 });
1250 save_task.await.unwrap();
1251 cx.run_until_parked();
1252
1253 cx.update(|cx| migrate_thread_metadata(cx));
1254 cx.run_until_parked();
1255
1256 let list = cx.update(|cx| {
1257 let store = ThreadMetadataStore::global(cx);
1258 store.read(cx).entries().cloned().collect::<Vec<_>>()
1259 });
1260
1261 assert_eq!(list.len(), 1);
1262 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1263 }
1264
1265 #[gpui::test]
1266 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1267 cx: &mut TestAppContext,
1268 ) {
1269 init_test(cx);
1270
1271 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1272 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1273 let now = Utc::now();
1274
1275 // Create 7 threads for project A and 3 for project B
1276 let mut threads_to_save = Vec::new();
1277 for i in 0..7 {
1278 threads_to_save.push((
1279 format!("a-session-{i}"),
1280 format!("Thread A{i}"),
1281 project_a_paths.clone(),
1282 now + chrono::Duration::seconds(i as i64),
1283 ));
1284 }
1285 for i in 0..3 {
1286 threads_to_save.push((
1287 format!("b-session-{i}"),
1288 format!("Thread B{i}"),
1289 project_b_paths.clone(),
1290 now + chrono::Duration::seconds(i as i64),
1291 ));
1292 }
1293
1294 for (session_id, title, paths, updated_at) in &threads_to_save {
1295 let save_task = cx.update(|cx| {
1296 let thread_store = ThreadStore::global(cx);
1297 let session_id = session_id.to_string();
1298 let title = title.to_string();
1299 let paths = paths.clone();
1300 thread_store.update(cx, |store, cx| {
1301 store.save_thread(
1302 acp::SessionId::new(session_id),
1303 make_db_thread(&title, *updated_at),
1304 paths,
1305 cx,
1306 )
1307 })
1308 });
1309 save_task.await.unwrap();
1310 cx.run_until_parked();
1311 }
1312
1313 cx.update(|cx| migrate_thread_metadata(cx));
1314 cx.run_until_parked();
1315
1316 let list = cx.update(|cx| {
1317 let store = ThreadMetadataStore::global(cx);
1318 store.read(cx).entries().cloned().collect::<Vec<_>>()
1319 });
1320
1321 assert_eq!(list.len(), 10);
1322
1323 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1324 let mut project_a_entries: Vec<_> = list
1325 .iter()
1326 .filter(|m| m.folder_paths == project_a_paths)
1327 .collect();
1328 assert_eq!(project_a_entries.len(), 7);
1329 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1330
1331 for entry in &project_a_entries[..5] {
1332 assert!(
1333 !entry.archived,
1334 "Expected {} to be unarchived (top 5 most recent)",
1335 entry.session_id.0
1336 );
1337 }
1338 for entry in &project_a_entries[5..] {
1339 assert!(
1340 entry.archived,
1341 "Expected {} to be archived (older than top 5)",
1342 entry.session_id.0
1343 );
1344 }
1345
1346 // Project B: all 3 should be unarchived (under the limit)
1347 let project_b_entries: Vec<_> = list
1348 .iter()
1349 .filter(|m| m.folder_paths == project_b_paths)
1350 .collect();
1351 assert_eq!(project_b_entries.len(), 3);
1352 assert!(project_b_entries.iter().all(|m| !m.archived));
1353 }
1354
1355 #[gpui::test]
1356 async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1357 init_test(cx);
1358
1359 let fs = FakeFs::new(cx.executor());
1360 let project = Project::test(fs, None::<&Path>, cx).await;
1361 let connection = Rc::new(StubAgentConnection::new());
1362
1363 let thread = cx
1364 .update(|cx| {
1365 connection
1366 .clone()
1367 .new_session(project.clone(), PathList::default(), cx)
1368 })
1369 .await
1370 .unwrap();
1371 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1372
1373 cx.update(|cx| {
1374 thread.update(cx, |thread, cx| {
1375 thread.set_title("Draft Thread".into(), cx).detach();
1376 });
1377 });
1378 cx.run_until_parked();
1379
1380 let metadata_ids = cx.update(|cx| {
1381 ThreadMetadataStore::global(cx)
1382 .read(cx)
1383 .entry_ids()
1384 .collect::<Vec<_>>()
1385 });
1386 assert_eq!(metadata_ids, vec![session_id]);
1387
1388 drop(thread);
1389 cx.update(|_| {});
1390 cx.run_until_parked();
1391 cx.run_until_parked();
1392
1393 let metadata_ids = cx.update(|cx| {
1394 ThreadMetadataStore::global(cx)
1395 .read(cx)
1396 .entry_ids()
1397 .collect::<Vec<_>>()
1398 });
1399 assert!(
1400 metadata_ids.is_empty(),
1401 "expected empty draft thread metadata to be deleted on release"
1402 );
1403 }
1404
1405 #[gpui::test]
1406 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1407 init_test(cx);
1408
1409 let fs = FakeFs::new(cx.executor());
1410 let project = Project::test(fs, None::<&Path>, cx).await;
1411 let connection = Rc::new(StubAgentConnection::new());
1412
1413 let thread = cx
1414 .update(|cx| {
1415 connection
1416 .clone()
1417 .new_session(project.clone(), PathList::default(), cx)
1418 })
1419 .await
1420 .unwrap();
1421 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1422
1423 cx.update(|cx| {
1424 thread.update(cx, |thread, cx| {
1425 thread.push_user_content_block(None, "Hello".into(), cx);
1426 });
1427 });
1428 cx.run_until_parked();
1429
1430 let metadata_ids = cx.update(|cx| {
1431 ThreadMetadataStore::global(cx)
1432 .read(cx)
1433 .entry_ids()
1434 .collect::<Vec<_>>()
1435 });
1436 assert_eq!(metadata_ids, vec![session_id.clone()]);
1437
1438 drop(thread);
1439 cx.update(|_| {});
1440 cx.run_until_parked();
1441
1442 let metadata_ids = cx.update(|cx| {
1443 ThreadMetadataStore::global(cx)
1444 .read(cx)
1445 .entry_ids()
1446 .collect::<Vec<_>>()
1447 });
1448 assert_eq!(metadata_ids, vec![session_id]);
1449 }
1450
1451 #[gpui::test]
1452 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1453 init_test(cx);
1454
1455 let fs = FakeFs::new(cx.executor());
1456 let project = Project::test(fs, None::<&Path>, cx).await;
1457 let connection = Rc::new(StubAgentConnection::new());
1458
1459 // Create a regular (non-subagent) AcpThread.
1460 let regular_thread = cx
1461 .update(|cx| {
1462 connection
1463 .clone()
1464 .new_session(project.clone(), PathList::default(), cx)
1465 })
1466 .await
1467 .unwrap();
1468
1469 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1470
1471 // Set a title on the regular thread to trigger a save via handle_thread_update.
1472 cx.update(|cx| {
1473 regular_thread.update(cx, |thread, cx| {
1474 thread.set_title("Regular Thread".into(), cx).detach();
1475 });
1476 });
1477 cx.run_until_parked();
1478
1479 // Create a subagent AcpThread
1480 let subagent_session_id = acp::SessionId::new("subagent-session");
1481 let subagent_thread = cx.update(|cx| {
1482 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1483 cx.new(|cx| {
1484 acp_thread::AcpThread::new(
1485 Some(regular_session_id.clone()),
1486 Some("Subagent Thread".into()),
1487 None,
1488 connection.clone(),
1489 project.clone(),
1490 action_log,
1491 subagent_session_id.clone(),
1492 watch::Receiver::constant(acp::PromptCapabilities::new()),
1493 cx,
1494 )
1495 })
1496 });
1497
1498 // Set a title on the subagent thread to trigger handle_thread_update.
1499 cx.update(|cx| {
1500 subagent_thread.update(cx, |thread, cx| {
1501 thread
1502 .set_title("Subagent Thread Title".into(), cx)
1503 .detach();
1504 });
1505 });
1506 cx.run_until_parked();
1507
1508 // List all metadata from the store cache.
1509 let list = cx.update(|cx| {
1510 let store = ThreadMetadataStore::global(cx);
1511 store.read(cx).entries().cloned().collect::<Vec<_>>()
1512 });
1513
1514 // The subagent thread should NOT appear in the sidebar metadata.
1515 // Only the regular thread should be listed.
1516 assert_eq!(
1517 list.len(),
1518 1,
1519 "Expected only the regular thread in sidebar metadata, \
1520 but found {} entries (subagent threads are leaking into the sidebar)",
1521 list.len(),
1522 );
1523 assert_eq!(list[0].session_id, regular_session_id);
1524 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1525 }
1526
1527 #[test]
1528 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1529 let now = Utc::now();
1530
1531 let operations = vec![
1532 DbOperation::Upsert(make_metadata(
1533 "session-1",
1534 "First Thread",
1535 now,
1536 PathList::default(),
1537 )),
1538 DbOperation::Delete(acp::SessionId::new("session-1")),
1539 ];
1540
1541 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1542
1543 assert_eq!(deduped.len(), 1);
1544 assert_eq!(
1545 deduped[0],
1546 DbOperation::Delete(acp::SessionId::new("session-1"))
1547 );
1548 }
1549
1550 #[test]
1551 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1552 let now = Utc::now();
1553 let later = now + chrono::Duration::seconds(1);
1554
1555 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1556 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1557
1558 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1559 DbOperation::Upsert(old_metadata),
1560 DbOperation::Upsert(new_metadata.clone()),
1561 ]);
1562
1563 assert_eq!(deduped.len(), 1);
1564 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1565 }
1566
1567 #[test]
1568 fn test_dedup_db_operations_preserves_distinct_sessions() {
1569 let now = Utc::now();
1570
1571 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1572 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1573 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1574 DbOperation::Upsert(metadata1.clone()),
1575 DbOperation::Upsert(metadata2.clone()),
1576 ]);
1577
1578 assert_eq!(deduped.len(), 2);
1579 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1580 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1581 }
1582
1583 #[gpui::test]
1584 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1585 init_test(cx);
1586
1587 let paths = PathList::new(&[Path::new("/project-a")]);
1588 let now = Utc::now();
1589 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1590
1591 cx.update(|cx| {
1592 let store = ThreadMetadataStore::global(cx);
1593 store.update(cx, |store, cx| {
1594 store.save(metadata, cx);
1595 });
1596 });
1597
1598 cx.run_until_parked();
1599
1600 cx.update(|cx| {
1601 let store = ThreadMetadataStore::global(cx);
1602 let store = store.read(cx);
1603
1604 let path_entries = store
1605 .entries_for_path(&paths)
1606 .map(|e| e.session_id.0.to_string())
1607 .collect::<Vec<_>>();
1608 assert_eq!(path_entries, vec!["session-1"]);
1609
1610 let archived = store
1611 .archived_entries()
1612 .map(|e| e.session_id.0.to_string())
1613 .collect::<Vec<_>>();
1614 assert!(archived.is_empty());
1615 });
1616
1617 cx.update(|cx| {
1618 let store = ThreadMetadataStore::global(cx);
1619 store.update(cx, |store, cx| {
1620 store.archive(&acp::SessionId::new("session-1"), cx);
1621 });
1622 });
1623
1624 cx.run_until_parked();
1625
1626 cx.update(|cx| {
1627 let store = ThreadMetadataStore::global(cx);
1628 let store = store.read(cx);
1629
1630 let path_entries = store
1631 .entries_for_path(&paths)
1632 .map(|e| e.session_id.0.to_string())
1633 .collect::<Vec<_>>();
1634 assert!(path_entries.is_empty());
1635
1636 let archived = store.archived_entries().collect::<Vec<_>>();
1637 assert_eq!(archived.len(), 1);
1638 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1639 assert!(archived[0].archived);
1640 });
1641
1642 cx.update(|cx| {
1643 let store = ThreadMetadataStore::global(cx);
1644 store.update(cx, |store, cx| {
1645 store.unarchive(&acp::SessionId::new("session-1"), cx);
1646 });
1647 });
1648
1649 cx.run_until_parked();
1650
1651 cx.update(|cx| {
1652 let store = ThreadMetadataStore::global(cx);
1653 let store = store.read(cx);
1654
1655 let path_entries = store
1656 .entries_for_path(&paths)
1657 .map(|e| e.session_id.0.to_string())
1658 .collect::<Vec<_>>();
1659 assert_eq!(path_entries, vec!["session-1"]);
1660
1661 let archived = store
1662 .archived_entries()
1663 .map(|e| e.session_id.0.to_string())
1664 .collect::<Vec<_>>();
1665 assert!(archived.is_empty());
1666 });
1667 }
1668
1669 #[gpui::test]
1670 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1671 init_test(cx);
1672
1673 let paths = PathList::new(&[Path::new("/project-a")]);
1674 let now = Utc::now();
1675
1676 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1677 let metadata2 = make_metadata(
1678 "session-2",
1679 "Archived Thread",
1680 now - chrono::Duration::seconds(1),
1681 paths.clone(),
1682 );
1683
1684 cx.update(|cx| {
1685 let store = ThreadMetadataStore::global(cx);
1686 store.update(cx, |store, cx| {
1687 store.save(metadata1, cx);
1688 store.save(metadata2, cx);
1689 });
1690 });
1691
1692 cx.run_until_parked();
1693
1694 cx.update(|cx| {
1695 let store = ThreadMetadataStore::global(cx);
1696 store.update(cx, |store, cx| {
1697 store.archive(&acp::SessionId::new("session-2"), cx);
1698 });
1699 });
1700
1701 cx.run_until_parked();
1702
1703 cx.update(|cx| {
1704 let store = ThreadMetadataStore::global(cx);
1705 let store = store.read(cx);
1706
1707 let path_entries = store
1708 .entries_for_path(&paths)
1709 .map(|e| e.session_id.0.to_string())
1710 .collect::<Vec<_>>();
1711 assert_eq!(path_entries, vec!["session-1"]);
1712
1713 let all_entries = store
1714 .entries()
1715 .map(|e| e.session_id.0.to_string())
1716 .collect::<Vec<_>>();
1717 assert_eq!(all_entries.len(), 2);
1718 assert!(all_entries.contains(&"session-1".to_string()));
1719 assert!(all_entries.contains(&"session-2".to_string()));
1720
1721 let archived = store
1722 .archived_entries()
1723 .map(|e| e.session_id.0.to_string())
1724 .collect::<Vec<_>>();
1725 assert_eq!(archived, vec!["session-2"]);
1726 });
1727 }
1728
1729 #[gpui::test]
1730 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1731 init_test(cx);
1732
1733 let paths = PathList::new(&[Path::new("/project-a")]);
1734 let now = Utc::now();
1735
1736 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1737 let m2 = make_metadata(
1738 "session-2",
1739 "Thread Two",
1740 now - chrono::Duration::seconds(1),
1741 paths.clone(),
1742 );
1743 let m3 = make_metadata(
1744 "session-3",
1745 "Thread Three",
1746 now - chrono::Duration::seconds(2),
1747 paths,
1748 );
1749
1750 cx.update(|cx| {
1751 let store = ThreadMetadataStore::global(cx);
1752 store.update(cx, |store, cx| {
1753 store.save_all(vec![m1, m2, m3], cx);
1754 });
1755 });
1756
1757 cx.run_until_parked();
1758
1759 cx.update(|cx| {
1760 let store = ThreadMetadataStore::global(cx);
1761 let store = store.read(cx);
1762
1763 let all_entries = store
1764 .entries()
1765 .map(|e| e.session_id.0.to_string())
1766 .collect::<Vec<_>>();
1767 assert_eq!(all_entries.len(), 3);
1768 assert!(all_entries.contains(&"session-1".to_string()));
1769 assert!(all_entries.contains(&"session-2".to_string()));
1770 assert!(all_entries.contains(&"session-3".to_string()));
1771
1772 let entry_ids = store.entry_ids().collect::<Vec<_>>();
1773 assert_eq!(entry_ids.len(), 3);
1774 });
1775 }
1776
1777 #[gpui::test]
1778 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1779 init_test(cx);
1780
1781 let paths = PathList::new(&[Path::new("/project-a")]);
1782 let now = Utc::now();
1783 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1784
1785 cx.update(|cx| {
1786 let store = ThreadMetadataStore::global(cx);
1787 store.update(cx, |store, cx| {
1788 store.save(metadata, cx);
1789 });
1790 });
1791
1792 cx.run_until_parked();
1793
1794 cx.update(|cx| {
1795 let store = ThreadMetadataStore::global(cx);
1796 store.update(cx, |store, cx| {
1797 store.archive(&acp::SessionId::new("session-1"), cx);
1798 });
1799 });
1800
1801 cx.run_until_parked();
1802
1803 cx.update(|cx| {
1804 let store = ThreadMetadataStore::global(cx);
1805 store.update(cx, |store, cx| {
1806 let _ = store.reload(cx);
1807 });
1808 });
1809
1810 cx.run_until_parked();
1811
1812 cx.update(|cx| {
1813 let store = ThreadMetadataStore::global(cx);
1814 let store = store.read(cx);
1815
1816 let thread = store
1817 .entries()
1818 .find(|e| e.session_id.0.as_ref() == "session-1")
1819 .expect("thread should exist after reload");
1820 assert!(thread.archived);
1821
1822 let path_entries = store
1823 .entries_for_path(&paths)
1824 .map(|e| e.session_id.0.to_string())
1825 .collect::<Vec<_>>();
1826 assert!(path_entries.is_empty());
1827
1828 let archived = store
1829 .archived_entries()
1830 .map(|e| e.session_id.0.to_string())
1831 .collect::<Vec<_>>();
1832 assert_eq!(archived, vec!["session-1"]);
1833 });
1834 }
1835
1836 #[gpui::test]
1837 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1838 init_test(cx);
1839
1840 cx.run_until_parked();
1841
1842 cx.update(|cx| {
1843 let store = ThreadMetadataStore::global(cx);
1844 store.update(cx, |store, cx| {
1845 store.archive(&acp::SessionId::new("nonexistent"), cx);
1846 });
1847 });
1848
1849 cx.run_until_parked();
1850
1851 cx.update(|cx| {
1852 let store = ThreadMetadataStore::global(cx);
1853 let store = store.read(cx);
1854
1855 assert!(store.is_empty());
1856 assert_eq!(store.entries().count(), 0);
1857 assert_eq!(store.archived_entries().count(), 0);
1858 });
1859 }
1860
1861 #[gpui::test]
1862 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1863 init_test(cx);
1864
1865 let paths = PathList::new(&[Path::new("/project-a")]);
1866 let now = Utc::now();
1867 let metadata = make_metadata("session-1", "Thread 1", now, paths);
1868 let session_id = metadata.session_id.clone();
1869
1870 cx.update(|cx| {
1871 let store = ThreadMetadataStore::global(cx);
1872 store.update(cx, |store, cx| {
1873 store.save(metadata.clone(), cx);
1874 store.archive(&session_id, cx);
1875 });
1876 });
1877
1878 cx.run_until_parked();
1879
1880 cx.update(|cx| {
1881 let store = ThreadMetadataStore::global(cx);
1882 let store = store.read(cx);
1883
1884 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1885 pretty_assertions::assert_eq!(
1886 entries,
1887 vec![ThreadMetadata {
1888 archived: true,
1889 ..metadata
1890 }]
1891 );
1892 });
1893 }
1894}