1use std::{
2 path::{Path, PathBuf},
3 sync::Arc,
4};
5
6use acp_thread::AcpThreadEvent;
7use agent::{ThreadStore, ZED_AGENT_ID};
8use agent_client_protocol as acp;
9use anyhow::Context as _;
10use chrono::{DateTime, Utc};
11use collections::{HashMap, HashSet};
12use db::{
13 sqlez::{
14 bindable::Column, domain::Domain, statement::Statement,
15 thread_safe_connection::ThreadSafeConnection,
16 },
17 sqlez_macros::sql,
18};
19use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
20use futures::{FutureExt as _, future::Shared};
21use gpui::{AppContext as _, Entity, Global, Subscription, Task};
22use project::AgentId;
23use ui::{App, Context, SharedString};
24use util::ResultExt as _;
25use workspace::PathList;
26
27use crate::DEFAULT_THREAD_TITLE;
28
29pub fn init(cx: &mut App) {
30 ThreadMetadataStore::init_global(cx);
31
32 if cx.has_flag::<AgentV2FeatureFlag>() {
33 migrate_thread_metadata(cx);
34 }
35 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
36 if has_flag {
37 migrate_thread_metadata(cx);
38 }
39 })
40 .detach();
41}
42
43/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
44/// We skip migrating threads that do not have a project.
45///
46/// TODO: Remove this after N weeks of shipping the sidebar
47fn migrate_thread_metadata(cx: &mut App) {
48 let store = ThreadMetadataStore::global(cx);
49 let db = store.read(cx).db.clone();
50
51 cx.spawn(async move |cx| {
52 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
53
54 let is_first_migration = existing_entries.is_empty();
55
56 let mut to_migrate = store.read_with(cx, |_store, cx| {
57 ThreadStore::global(cx)
58 .read(cx)
59 .entries()
60 .filter_map(|entry| {
61 if existing_entries.contains(&entry.id.0) {
62 return None;
63 }
64
65 Some(ThreadMetadata {
66 session_id: entry.id,
67 agent_id: ZED_AGENT_ID.clone(),
68 title: entry.title,
69 updated_at: entry.updated_at,
70 created_at: entry.created_at,
71 folder_paths: entry.folder_paths,
72 archived: true,
73 })
74 })
75 .collect::<Vec<_>>()
76 });
77
78 if to_migrate.is_empty() {
79 return anyhow::Ok(());
80 }
81
82 // On the first migration (no entries in DB yet), keep the 5 most
83 // recent threads per project unarchived.
84 if is_first_migration {
85 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
86 for entry in &mut to_migrate {
87 if entry.folder_paths.is_empty() {
88 continue;
89 }
90 per_project
91 .entry(entry.folder_paths.clone())
92 .or_default()
93 .push(entry);
94 }
95 for entries in per_project.values_mut() {
96 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
97 for entry in entries.iter_mut().take(5) {
98 entry.archived = false;
99 }
100 }
101 }
102
103 log::info!("Migrating {} thread store entries", to_migrate.len());
104
105 // Manually save each entry to the database and call reload, otherwise
106 // we'll end up triggering lots of reloads after each save
107 for entry in to_migrate {
108 db.save(entry).await?;
109 }
110
111 log::info!("Finished migrating thread store entries");
112
113 let _ = store.update(cx, |store, cx| store.reload(cx));
114 anyhow::Ok(())
115 })
116 .detach_and_log_err(cx);
117}
118
119struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
120impl Global for GlobalThreadMetadataStore {}
121
122/// Lightweight metadata for any thread (native or ACP), enough to populate
123/// the sidebar list and route to the correct load path when clicked.
124#[derive(Debug, Clone, PartialEq)]
125pub struct ThreadMetadata {
126 pub session_id: acp::SessionId,
127 pub agent_id: AgentId,
128 pub title: SharedString,
129 pub updated_at: DateTime<Utc>,
130 pub created_at: Option<DateTime<Utc>>,
131 pub folder_paths: PathList,
132 pub archived: bool,
133}
134
135impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
136 fn from(meta: &ThreadMetadata) -> Self {
137 Self {
138 session_id: meta.session_id.clone(),
139 work_dirs: Some(meta.folder_paths.clone()),
140 title: Some(meta.title.clone()),
141 updated_at: Some(meta.updated_at),
142 created_at: meta.created_at,
143 meta: None,
144 }
145 }
146}
147
148/// Record of a git worktree that was archived (deleted from disk) when its last thread was archived.
149/// Lives in this module because it shares the same SQLite database as thread metadata.
150pub struct ArchivedGitWorktree {
151 pub id: i64,
152 pub worktree_path: PathBuf,
153 pub main_repo_path: PathBuf,
154 pub branch_name: Option<String>,
155 pub commit_hash: String,
156 pub restored: bool,
157}
158
159/// The store holds all metadata needed to show threads in the sidebar/the archive.
160///
161/// Automatically listens to AcpThread events and updates metadata if it has changed.
162pub struct ThreadMetadataStore {
163 db: ThreadMetadataDb,
164 threads: HashMap<acp::SessionId, ThreadMetadata>,
165 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
166 reload_task: Option<Shared<Task<()>>>,
167 session_subscriptions: HashMap<acp::SessionId, Subscription>,
168 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
169 _db_operations_task: Task<()>,
170}
171
172#[derive(Debug, PartialEq)]
173enum DbOperation {
174 Upsert(ThreadMetadata),
175 Delete(acp::SessionId),
176}
177
178impl DbOperation {
179 fn id(&self) -> &acp::SessionId {
180 match self {
181 DbOperation::Upsert(thread) => &thread.session_id,
182 DbOperation::Delete(session_id) => session_id,
183 }
184 }
185}
186
187impl ThreadMetadataStore {
188 #[cfg(not(any(test, feature = "test-support")))]
189 pub fn init_global(cx: &mut App) {
190 if cx.has_global::<Self>() {
191 return;
192 }
193
194 let db = ThreadMetadataDb::global(cx);
195 let thread_store = cx.new(|cx| Self::new(db, cx));
196 cx.set_global(GlobalThreadMetadataStore(thread_store));
197 }
198
199 #[cfg(any(test, feature = "test-support"))]
200 pub fn init_global(cx: &mut App) {
201 let thread = std::thread::current();
202 let test_name = thread.name().unwrap_or("unknown_test");
203 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
204 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
205 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
206 cx.set_global(GlobalThreadMetadataStore(thread_store));
207 }
208
209 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
210 cx.try_global::<GlobalThreadMetadataStore>()
211 .map(|store| store.0.clone())
212 }
213
214 pub fn global(cx: &App) -> Entity<Self> {
215 cx.global::<GlobalThreadMetadataStore>().0.clone()
216 }
217
218 pub fn is_empty(&self) -> bool {
219 self.threads.is_empty()
220 }
221
222 /// Returns all thread IDs.
223 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
224 self.threads.keys().cloned()
225 }
226
227 /// Returns the metadata for a specific thread, if it exists.
228 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
229 self.threads.get(session_id)
230 }
231
232 /// Returns all threads.
233 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
234 self.threads.values()
235 }
236
237 /// Returns all archived threads.
238 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
239 self.entries().filter(|t| t.archived)
240 }
241
242 /// Returns all threads for the given path list, excluding archived threads.
243 pub fn entries_for_path(
244 &self,
245 path_list: &PathList,
246 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
247 self.threads_by_paths
248 .get(path_list)
249 .into_iter()
250 .flatten()
251 .filter_map(|s| self.threads.get(s))
252 .filter(|s| !s.archived)
253 }
254
255 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
256 let db = self.db.clone();
257 self.reload_task.take();
258
259 let list_task = cx
260 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
261
262 let reload_task = cx
263 .spawn(async move |this, cx| {
264 let Some(rows) = list_task.await.log_err() else {
265 return;
266 };
267
268 this.update(cx, |this, cx| {
269 this.threads.clear();
270 this.threads_by_paths.clear();
271
272 for row in rows {
273 this.threads_by_paths
274 .entry(row.folder_paths.clone())
275 .or_default()
276 .insert(row.session_id.clone());
277 this.threads.insert(row.session_id.clone(), row);
278 }
279
280 cx.notify();
281 })
282 .ok();
283 })
284 .shared();
285 self.reload_task = Some(reload_task.clone());
286 reload_task
287 }
288
289 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
290 if !cx.has_flag::<AgentV2FeatureFlag>() {
291 return;
292 }
293
294 for metadata in metadata {
295 self.save_internal(metadata);
296 }
297 cx.notify();
298 }
299
300 #[cfg(any(test, feature = "test-support"))]
301 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
302 self.save(metadata, cx)
303 }
304
305 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
306 if !cx.has_flag::<AgentV2FeatureFlag>() {
307 return;
308 }
309
310 self.save_internal(metadata);
311 cx.notify();
312 }
313
314 fn save_internal(&mut self, metadata: ThreadMetadata) {
315 // If the folder paths have changed, we need to clear the old entry
316 if let Some(thread) = self.threads.get(&metadata.session_id)
317 && thread.folder_paths != metadata.folder_paths
318 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
319 {
320 session_ids.remove(&metadata.session_id);
321 }
322
323 self.threads
324 .insert(metadata.session_id.clone(), metadata.clone());
325
326 self.threads_by_paths
327 .entry(metadata.folder_paths.clone())
328 .or_default()
329 .insert(metadata.session_id.clone());
330
331 self.pending_thread_ops_tx
332 .try_send(DbOperation::Upsert(metadata))
333 .log_err();
334 }
335
336 pub fn update_working_directories(
337 &mut self,
338 session_id: &acp::SessionId,
339 work_dirs: PathList,
340 cx: &mut Context<Self>,
341 ) {
342 if !cx.has_flag::<AgentV2FeatureFlag>() {
343 return;
344 }
345
346 if let Some(thread) = self.threads.get(session_id) {
347 self.save_internal(ThreadMetadata {
348 folder_paths: work_dirs,
349 ..thread.clone()
350 });
351 cx.notify();
352 }
353 }
354
355 pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
356 self.update_archived(session_id, true, cx);
357 }
358
359 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
360 self.update_archived(session_id, false, cx);
361 }
362
363 fn update_archived(
364 &mut self,
365 session_id: &acp::SessionId,
366 archived: bool,
367 cx: &mut Context<Self>,
368 ) {
369 if !cx.has_flag::<AgentV2FeatureFlag>() {
370 return;
371 }
372
373 if let Some(thread) = self.threads.get(session_id) {
374 self.save_internal(ThreadMetadata {
375 archived,
376 ..thread.clone()
377 });
378 cx.notify();
379 }
380 }
381
382 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
383 if !cx.has_flag::<AgentV2FeatureFlag>() {
384 return;
385 }
386
387 if let Some(thread) = self.threads.get(&session_id)
388 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
389 {
390 session_ids.remove(&session_id);
391 }
392 self.threads.remove(&session_id);
393 self.pending_thread_ops_tx
394 .try_send(DbOperation::Delete(session_id))
395 .log_err();
396 cx.notify();
397 }
398
399 pub fn create_archived_worktree(
400 &self,
401 worktree_path: String,
402 main_repo_path: String,
403 branch_name: Option<String>,
404 commit_hash: String,
405 cx: &mut Context<Self>,
406 ) -> Task<anyhow::Result<i64>> {
407 let db = self.db.clone();
408 cx.background_spawn(async move {
409 db.create_archived_worktree(
410 &worktree_path,
411 &main_repo_path,
412 branch_name.as_deref(),
413 &commit_hash,
414 )
415 .await
416 })
417 }
418
419 pub fn get_archived_worktree_by_path(
420 &self,
421 worktree_path: String,
422 cx: &mut Context<Self>,
423 ) -> Task<anyhow::Result<Option<ArchivedGitWorktree>>> {
424 let db = self.db.clone();
425 cx.background_spawn(async move { db.get_archived_worktree_by_path(&worktree_path).await })
426 }
427
428 pub fn delete_archived_worktree(
429 &self,
430 id: i64,
431 cx: &mut Context<Self>,
432 ) -> Task<anyhow::Result<()>> {
433 let db = self.db.clone();
434 cx.background_spawn(async move { db.delete_archived_worktree(id).await })
435 }
436
437 pub fn update_archived_worktree_restored(
438 &self,
439 id: i64,
440 worktree_path: String,
441 branch_name: Option<String>,
442 cx: &mut Context<Self>,
443 ) -> Task<anyhow::Result<()>> {
444 let db = self.db.clone();
445 cx.background_spawn(async move {
446 db.update_archived_worktree_restored(id, &worktree_path, branch_name.as_deref())
447 .await
448 })
449 }
450
451 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
452 let weak_store = cx.weak_entity();
453
454 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
455 // Don't track subagent threads in the sidebar.
456 if thread.parent_session_id().is_some() {
457 return;
458 }
459
460 let thread_entity = cx.entity();
461
462 cx.on_release({
463 let weak_store = weak_store.clone();
464 move |thread, cx| {
465 weak_store
466 .update(cx, |store, cx| {
467 let session_id = thread.session_id().clone();
468 store.session_subscriptions.remove(&session_id);
469 if thread.entries().is_empty() {
470 // Empty threads can be unloaded without ever being
471 // durably persisted by the underlying agent.
472 store.delete(session_id, cx);
473 }
474 })
475 .ok();
476 }
477 })
478 .detach();
479
480 weak_store
481 .update(cx, |this, cx| {
482 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
483 this.session_subscriptions
484 .insert(thread.session_id().clone(), subscription);
485 })
486 .ok();
487 })
488 .detach();
489
490 let (tx, rx) = smol::channel::unbounded();
491 let _db_operations_task = cx.background_spawn({
492 let db = db.clone();
493 async move {
494 while let Ok(first_update) = rx.recv().await {
495 let mut updates = vec![first_update];
496 while let Ok(update) = rx.try_recv() {
497 updates.push(update);
498 }
499 let updates = Self::dedup_db_operations(updates);
500 for operation in updates {
501 match operation {
502 DbOperation::Upsert(metadata) => {
503 db.save(metadata).await.log_err();
504 }
505 DbOperation::Delete(session_id) => {
506 db.delete(session_id).await.log_err();
507 }
508 }
509 }
510 }
511 }
512 });
513
514 let mut this = Self {
515 db,
516 threads: HashMap::default(),
517 threads_by_paths: HashMap::default(),
518 reload_task: None,
519 session_subscriptions: HashMap::default(),
520 pending_thread_ops_tx: tx,
521 _db_operations_task,
522 };
523 let _ = this.reload(cx);
524 this
525 }
526
527 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
528 let mut ops = HashMap::default();
529 for operation in operations.into_iter().rev() {
530 if ops.contains_key(operation.id()) {
531 continue;
532 }
533 ops.insert(operation.id().clone(), operation);
534 }
535 ops.into_values().collect()
536 }
537
538 fn handle_thread_event(
539 &mut self,
540 thread: Entity<acp_thread::AcpThread>,
541 event: &AcpThreadEvent,
542 cx: &mut Context<Self>,
543 ) {
544 // Don't track subagent threads in the sidebar.
545 if thread.read(cx).parent_session_id().is_some() {
546 return;
547 }
548
549 match event {
550 AcpThreadEvent::NewEntry
551 | AcpThreadEvent::TitleUpdated
552 | AcpThreadEvent::EntryUpdated(_)
553 | AcpThreadEvent::EntriesRemoved(_)
554 | AcpThreadEvent::ToolAuthorizationRequested(_)
555 | AcpThreadEvent::ToolAuthorizationReceived(_)
556 | AcpThreadEvent::Retry(_)
557 | AcpThreadEvent::Stopped(_)
558 | AcpThreadEvent::Error
559 | AcpThreadEvent::LoadError(_)
560 | AcpThreadEvent::Refusal
561 | AcpThreadEvent::WorkingDirectoriesUpdated => {
562 let thread_ref = thread.read(cx);
563 let existing_thread = self.threads.get(thread_ref.session_id());
564 let session_id = thread_ref.session_id().clone();
565 let title = thread_ref
566 .title()
567 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
568
569 let updated_at = Utc::now();
570
571 let created_at = existing_thread
572 .and_then(|t| t.created_at)
573 .unwrap_or_else(|| updated_at);
574
575 let agent_id = thread_ref.connection().agent_id();
576
577 let folder_paths = {
578 let project = thread_ref.project().read(cx);
579 let paths: Vec<Arc<Path>> = project
580 .visible_worktrees(cx)
581 .map(|worktree| worktree.read(cx).abs_path())
582 .collect();
583 PathList::new(&paths)
584 };
585
586 // Threads without a folder path (e.g. started in an empty
587 // window) are archived by default so they don't get lost,
588 // because they won't show up in the sidebar. Users can reload
589 // them from the archive.
590 let archived = existing_thread
591 .map(|t| t.archived)
592 .unwrap_or(folder_paths.is_empty());
593
594 let metadata = ThreadMetadata {
595 session_id,
596 agent_id,
597 title,
598 created_at: Some(created_at),
599 updated_at,
600 folder_paths,
601 archived,
602 };
603
604 self.save(metadata, cx);
605 }
606 AcpThreadEvent::TokenUsageUpdated
607 | AcpThreadEvent::SubagentSpawned(_)
608 | AcpThreadEvent::PromptCapabilitiesUpdated
609 | AcpThreadEvent::AvailableCommandsUpdated(_)
610 | AcpThreadEvent::ModeUpdated(_)
611 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
612 }
613 }
614}
615
616impl Global for ThreadMetadataStore {}
617
618struct ThreadMetadataDb(ThreadSafeConnection);
619
620impl Domain for ThreadMetadataDb {
621 const NAME: &str = stringify!(ThreadMetadataDb);
622
623 const MIGRATIONS: &[&str] = &[
624 sql!(
625 CREATE TABLE IF NOT EXISTS sidebar_threads(
626 session_id TEXT PRIMARY KEY,
627 agent_id TEXT,
628 title TEXT NOT NULL,
629 updated_at TEXT NOT NULL,
630 created_at TEXT,
631 folder_paths TEXT,
632 folder_paths_order TEXT
633 ) STRICT;
634 ),
635 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
636 sql!(
637 CREATE TABLE IF NOT EXISTS archived_git_worktrees(
638 id INTEGER PRIMARY KEY,
639 worktree_path TEXT NOT NULL,
640 main_repo_path TEXT NOT NULL,
641 branch_name TEXT,
642 commit_hash TEXT NOT NULL,
643 restored INTEGER NOT NULL DEFAULT 0
644 ) STRICT;
645
646 CREATE UNIQUE INDEX IF NOT EXISTS idx_archived_worktrees_path
647 ON archived_git_worktrees(worktree_path);
648 ),
649 ];
650}
651
652db::static_connection!(ThreadMetadataDb, []);
653
654impl ThreadMetadataDb {
655 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
656 self.select::<Arc<str>>(
657 "SELECT session_id FROM sidebar_threads \
658 ORDER BY updated_at DESC",
659 )?()
660 }
661
662 /// List all sidebar thread metadata, ordered by updated_at descending.
663 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
664 self.select::<ThreadMetadata>(
665 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
666 FROM sidebar_threads \
667 ORDER BY updated_at DESC"
668 )?()
669 }
670
671 /// Upsert metadata for a thread.
672 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
673 let id = row.session_id.0.clone();
674 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
675 None
676 } else {
677 Some(row.agent_id.to_string())
678 };
679 let title = row.title.to_string();
680 let updated_at = row.updated_at.to_rfc3339();
681 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
682 let serialized = row.folder_paths.serialize();
683 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
684 (None, None)
685 } else {
686 (Some(serialized.paths), Some(serialized.order))
687 };
688 let archived = row.archived;
689
690 self.write(move |conn| {
691 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
692 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
693 ON CONFLICT(session_id) DO UPDATE SET \
694 agent_id = excluded.agent_id, \
695 title = excluded.title, \
696 updated_at = excluded.updated_at, \
697 created_at = excluded.created_at, \
698 folder_paths = excluded.folder_paths, \
699 folder_paths_order = excluded.folder_paths_order, \
700 archived = excluded.archived";
701 let mut stmt = Statement::prepare(conn, sql)?;
702 let mut i = stmt.bind(&id, 1)?;
703 i = stmt.bind(&agent_id, i)?;
704 i = stmt.bind(&title, i)?;
705 i = stmt.bind(&updated_at, i)?;
706 i = stmt.bind(&created_at, i)?;
707 i = stmt.bind(&folder_paths, i)?;
708 i = stmt.bind(&folder_paths_order, i)?;
709 stmt.bind(&archived, i)?;
710 stmt.exec()
711 })
712 .await
713 }
714
715 /// Delete metadata for a single thread.
716 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
717 let id = session_id.0.clone();
718 self.write(move |conn| {
719 let mut stmt =
720 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
721 stmt.bind(&id, 1)?;
722 stmt.exec()
723 })
724 .await
725 }
726
727 pub async fn create_archived_worktree(
728 &self,
729 worktree_path: &str,
730 main_repo_path: &str,
731 branch_name: Option<&str>,
732 commit_hash: &str,
733 ) -> anyhow::Result<i64> {
734 let worktree_path = worktree_path.to_string();
735 let main_repo_path = main_repo_path.to_string();
736 let branch_name = branch_name.map(|s| s.to_string());
737 let commit_hash = commit_hash.to_string();
738 self.write(move |conn| {
739 let mut stmt = Statement::prepare(
740 conn,
741 "INSERT OR REPLACE INTO archived_git_worktrees(\
742 worktree_path, main_repo_path, branch_name, commit_hash\
743 ) VALUES (?, ?, ?, ?)",
744 )?;
745 let mut i = stmt.bind(&worktree_path, 1)?;
746 i = stmt.bind(&main_repo_path, i)?;
747 i = stmt.bind(&branch_name, i)?;
748 stmt.bind(&commit_hash, i)?;
749 stmt.exec()?;
750
751 let mut id_stmt = Statement::prepare(conn, "SELECT last_insert_rowid()")?;
752 let id = id_stmt
753 .maybe_row::<i64>()?
754 .ok_or_else(|| anyhow::anyhow!("No row ID returned after INSERT"))?;
755
756 Ok(id)
757 })
758 .await
759 }
760
761 pub async fn get_archived_worktree_by_path(
762 &self,
763 worktree_path: &str,
764 ) -> anyhow::Result<Option<ArchivedGitWorktree>> {
765 let worktree_path = worktree_path.to_string();
766 self.select_row_bound::<String, ArchivedGitWorktree>(
767 "SELECT id, worktree_path, main_repo_path, branch_name, commit_hash, restored \
768 FROM archived_git_worktrees WHERE worktree_path = ?",
769 )?(worktree_path)
770 }
771
772 pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> {
773 self.write(move |conn| {
774 let mut stmt =
775 Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
776 stmt.bind(&id, 1)?;
777 stmt.exec()
778 })
779 .await
780 }
781
782 pub async fn update_archived_worktree_restored(
783 &self,
784 id: i64,
785 worktree_path: &str,
786 branch_name: Option<&str>,
787 ) -> anyhow::Result<()> {
788 let worktree_path = worktree_path.to_string();
789 let branch_name = branch_name.map(|s| s.to_string());
790 self.write(move |conn| {
791 let mut stmt = Statement::prepare(
792 conn,
793 "UPDATE archived_git_worktrees \
794 SET restored = 1, worktree_path = ?, branch_name = ? \
795 WHERE id = ?",
796 )?;
797 let mut i = stmt.bind(&worktree_path, 1)?;
798 i = stmt.bind(&branch_name, i)?;
799 stmt.bind(&id, i)?;
800 stmt.exec()
801 })
802 .await
803 }
804}
805
806impl Column for ThreadMetadata {
807 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
808 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
809 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
810 let (title, next): (String, i32) = Column::column(statement, next)?;
811 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
812 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
813 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
814 let (folder_paths_order_str, next): (Option<String>, i32) =
815 Column::column(statement, next)?;
816 let (archived, next): (bool, i32) = Column::column(statement, next)?;
817
818 let agent_id = agent_id
819 .map(|id| AgentId::new(id))
820 .unwrap_or(ZED_AGENT_ID.clone());
821
822 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
823 let created_at = created_at_str
824 .as_deref()
825 .map(DateTime::parse_from_rfc3339)
826 .transpose()?
827 .map(|dt| dt.with_timezone(&Utc));
828
829 let folder_paths = folder_paths_str
830 .map(|paths| {
831 PathList::deserialize(&util::path_list::SerializedPathList {
832 paths,
833 order: folder_paths_order_str.unwrap_or_default(),
834 })
835 })
836 .unwrap_or_default();
837
838 Ok((
839 ThreadMetadata {
840 session_id: acp::SessionId::new(id),
841 agent_id,
842 title: title.into(),
843 updated_at,
844 created_at,
845 folder_paths,
846 archived,
847 },
848 next,
849 ))
850 }
851}
852
853impl Column for ArchivedGitWorktree {
854 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
855 let (id, next): (i64, i32) = Column::column(statement, start_index)?;
856 let (worktree_path_str, next): (String, i32) = Column::column(statement, next)?;
857 let (main_repo_path_str, next): (String, i32) = Column::column(statement, next)?;
858 let (branch_name, next): (Option<String>, i32) = Column::column(statement, next)?;
859 let (commit_hash, next): (String, i32) = Column::column(statement, next)?;
860 let (restored_int, next): (i64, i32) = Column::column(statement, next)?;
861 Ok((
862 ArchivedGitWorktree {
863 id,
864 worktree_path: PathBuf::from(worktree_path_str),
865 main_repo_path: PathBuf::from(main_repo_path_str),
866 branch_name,
867 commit_hash,
868 restored: restored_int != 0,
869 },
870 next,
871 ))
872 }
873}
874
875#[cfg(test)]
876mod tests {
877 use super::*;
878 use acp_thread::{AgentConnection, StubAgentConnection};
879 use action_log::ActionLog;
880 use agent::DbThread;
881 use agent_client_protocol as acp;
882 use feature_flags::FeatureFlagAppExt;
883 use gpui::TestAppContext;
884 use project::FakeFs;
885 use project::Project;
886 use std::path::Path;
887 use std::rc::Rc;
888
889 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
890 DbThread {
891 title: title.to_string().into(),
892 messages: Vec::new(),
893 updated_at,
894 detailed_summary: None,
895 initial_project_snapshot: None,
896 cumulative_token_usage: Default::default(),
897 request_token_usage: Default::default(),
898 model: None,
899 profile: None,
900 imported: false,
901 subagent_context: None,
902 speed: None,
903 thinking_enabled: false,
904 thinking_effort: None,
905 draft_prompt: None,
906 ui_scroll_position: None,
907 }
908 }
909
910 fn make_metadata(
911 session_id: &str,
912 title: &str,
913 updated_at: DateTime<Utc>,
914 folder_paths: PathList,
915 ) -> ThreadMetadata {
916 ThreadMetadata {
917 archived: false,
918 session_id: acp::SessionId::new(session_id),
919 agent_id: agent::ZED_AGENT_ID.clone(),
920 title: title.to_string().into(),
921 updated_at,
922 created_at: Some(updated_at),
923 folder_paths,
924 }
925 }
926
927 fn init_test(cx: &mut TestAppContext) {
928 cx.update(|cx| {
929 let settings_store = settings::SettingsStore::test(cx);
930 cx.set_global(settings_store);
931 cx.update_flags(true, vec!["agent-v2".to_string()]);
932 ThreadMetadataStore::init_global(cx);
933 ThreadStore::init_global(cx);
934 });
935 cx.run_until_parked();
936 }
937
938 #[gpui::test]
939 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
940 let first_paths = PathList::new(&[Path::new("/project-a")]);
941 let second_paths = PathList::new(&[Path::new("/project-b")]);
942 let now = Utc::now();
943 let older = now - chrono::Duration::seconds(1);
944
945 let thread = std::thread::current();
946 let test_name = thread.name().unwrap_or("unknown_test");
947 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
948 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
949 &db_name,
950 )));
951
952 db.save(make_metadata(
953 "session-1",
954 "First Thread",
955 now,
956 first_paths.clone(),
957 ))
958 .await
959 .unwrap();
960 db.save(make_metadata(
961 "session-2",
962 "Second Thread",
963 older,
964 second_paths.clone(),
965 ))
966 .await
967 .unwrap();
968
969 cx.update(|cx| {
970 let settings_store = settings::SettingsStore::test(cx);
971 cx.set_global(settings_store);
972 cx.update_flags(true, vec!["agent-v2".to_string()]);
973 ThreadMetadataStore::init_global(cx);
974 });
975
976 cx.run_until_parked();
977
978 cx.update(|cx| {
979 let store = ThreadMetadataStore::global(cx);
980 let store = store.read(cx);
981
982 let entry_ids = store
983 .entry_ids()
984 .map(|session_id| session_id.0.to_string())
985 .collect::<Vec<_>>();
986 assert_eq!(entry_ids.len(), 2);
987 assert!(entry_ids.contains(&"session-1".to_string()));
988 assert!(entry_ids.contains(&"session-2".to_string()));
989
990 let first_path_entries = store
991 .entries_for_path(&first_paths)
992 .map(|entry| entry.session_id.0.to_string())
993 .collect::<Vec<_>>();
994 assert_eq!(first_path_entries, vec!["session-1"]);
995
996 let second_path_entries = store
997 .entries_for_path(&second_paths)
998 .map(|entry| entry.session_id.0.to_string())
999 .collect::<Vec<_>>();
1000 assert_eq!(second_path_entries, vec!["session-2"]);
1001 });
1002 }
1003
1004 #[gpui::test]
1005 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
1006 init_test(cx);
1007
1008 let first_paths = PathList::new(&[Path::new("/project-a")]);
1009 let second_paths = PathList::new(&[Path::new("/project-b")]);
1010 let initial_time = Utc::now();
1011 let updated_time = initial_time + chrono::Duration::seconds(1);
1012
1013 let initial_metadata = make_metadata(
1014 "session-1",
1015 "First Thread",
1016 initial_time,
1017 first_paths.clone(),
1018 );
1019
1020 let second_metadata = make_metadata(
1021 "session-2",
1022 "Second Thread",
1023 initial_time,
1024 second_paths.clone(),
1025 );
1026
1027 cx.update(|cx| {
1028 let store = ThreadMetadataStore::global(cx);
1029 store.update(cx, |store, cx| {
1030 store.save(initial_metadata, cx);
1031 store.save(second_metadata, cx);
1032 });
1033 });
1034
1035 cx.run_until_parked();
1036
1037 cx.update(|cx| {
1038 let store = ThreadMetadataStore::global(cx);
1039 let store = store.read(cx);
1040
1041 let first_path_entries = store
1042 .entries_for_path(&first_paths)
1043 .map(|entry| entry.session_id.0.to_string())
1044 .collect::<Vec<_>>();
1045 assert_eq!(first_path_entries, vec!["session-1"]);
1046
1047 let second_path_entries = store
1048 .entries_for_path(&second_paths)
1049 .map(|entry| entry.session_id.0.to_string())
1050 .collect::<Vec<_>>();
1051 assert_eq!(second_path_entries, vec!["session-2"]);
1052 });
1053
1054 let moved_metadata = make_metadata(
1055 "session-1",
1056 "First Thread",
1057 updated_time,
1058 second_paths.clone(),
1059 );
1060
1061 cx.update(|cx| {
1062 let store = ThreadMetadataStore::global(cx);
1063 store.update(cx, |store, cx| {
1064 store.save(moved_metadata, cx);
1065 });
1066 });
1067
1068 cx.run_until_parked();
1069
1070 cx.update(|cx| {
1071 let store = ThreadMetadataStore::global(cx);
1072 let store = store.read(cx);
1073
1074 let entry_ids = store
1075 .entry_ids()
1076 .map(|session_id| session_id.0.to_string())
1077 .collect::<Vec<_>>();
1078 assert_eq!(entry_ids.len(), 2);
1079 assert!(entry_ids.contains(&"session-1".to_string()));
1080 assert!(entry_ids.contains(&"session-2".to_string()));
1081
1082 let first_path_entries = store
1083 .entries_for_path(&first_paths)
1084 .map(|entry| entry.session_id.0.to_string())
1085 .collect::<Vec<_>>();
1086 assert!(first_path_entries.is_empty());
1087
1088 let second_path_entries = store
1089 .entries_for_path(&second_paths)
1090 .map(|entry| entry.session_id.0.to_string())
1091 .collect::<Vec<_>>();
1092 assert_eq!(second_path_entries.len(), 2);
1093 assert!(second_path_entries.contains(&"session-1".to_string()));
1094 assert!(second_path_entries.contains(&"session-2".to_string()));
1095 });
1096
1097 cx.update(|cx| {
1098 let store = ThreadMetadataStore::global(cx);
1099 store.update(cx, |store, cx| {
1100 store.delete(acp::SessionId::new("session-2"), cx);
1101 });
1102 });
1103
1104 cx.run_until_parked();
1105
1106 cx.update(|cx| {
1107 let store = ThreadMetadataStore::global(cx);
1108 let store = store.read(cx);
1109
1110 let entry_ids = store
1111 .entry_ids()
1112 .map(|session_id| session_id.0.to_string())
1113 .collect::<Vec<_>>();
1114 assert_eq!(entry_ids, vec!["session-1"]);
1115
1116 let second_path_entries = store
1117 .entries_for_path(&second_paths)
1118 .map(|entry| entry.session_id.0.to_string())
1119 .collect::<Vec<_>>();
1120 assert_eq!(second_path_entries, vec!["session-1"]);
1121 });
1122 }
1123
1124 #[gpui::test]
1125 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1126 init_test(cx);
1127
1128 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1129 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1130 let now = Utc::now();
1131
1132 let existing_metadata = ThreadMetadata {
1133 session_id: acp::SessionId::new("a-session-0"),
1134 agent_id: agent::ZED_AGENT_ID.clone(),
1135 title: "Existing Metadata".into(),
1136 updated_at: now - chrono::Duration::seconds(10),
1137 created_at: Some(now - chrono::Duration::seconds(10)),
1138 folder_paths: project_a_paths.clone(),
1139 archived: false,
1140 };
1141
1142 cx.update(|cx| {
1143 let store = ThreadMetadataStore::global(cx);
1144 store.update(cx, |store, cx| {
1145 store.save(existing_metadata, cx);
1146 });
1147 });
1148 cx.run_until_parked();
1149
1150 let threads_to_save = vec![
1151 (
1152 "a-session-0",
1153 "Thread A0 From Native Store",
1154 project_a_paths.clone(),
1155 now,
1156 ),
1157 (
1158 "a-session-1",
1159 "Thread A1",
1160 project_a_paths.clone(),
1161 now + chrono::Duration::seconds(1),
1162 ),
1163 (
1164 "b-session-0",
1165 "Thread B0",
1166 project_b_paths.clone(),
1167 now + chrono::Duration::seconds(2),
1168 ),
1169 (
1170 "projectless",
1171 "Projectless",
1172 PathList::default(),
1173 now + chrono::Duration::seconds(3),
1174 ),
1175 ];
1176
1177 for (session_id, title, paths, updated_at) in &threads_to_save {
1178 let save_task = cx.update(|cx| {
1179 let thread_store = ThreadStore::global(cx);
1180 let session_id = session_id.to_string();
1181 let title = title.to_string();
1182 let paths = paths.clone();
1183 thread_store.update(cx, |store, cx| {
1184 store.save_thread(
1185 acp::SessionId::new(session_id),
1186 make_db_thread(&title, *updated_at),
1187 paths,
1188 cx,
1189 )
1190 })
1191 });
1192 save_task.await.unwrap();
1193 cx.run_until_parked();
1194 }
1195
1196 cx.update(|cx| migrate_thread_metadata(cx));
1197 cx.run_until_parked();
1198
1199 let list = cx.update(|cx| {
1200 let store = ThreadMetadataStore::global(cx);
1201 store.read(cx).entries().cloned().collect::<Vec<_>>()
1202 });
1203
1204 assert_eq!(list.len(), 4);
1205 assert!(
1206 list.iter()
1207 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1208 );
1209
1210 let existing_metadata = list
1211 .iter()
1212 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1213 .unwrap();
1214 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1215 assert!(!existing_metadata.archived);
1216
1217 let migrated_session_ids = list
1218 .iter()
1219 .map(|metadata| metadata.session_id.0.as_ref())
1220 .collect::<Vec<_>>();
1221 assert!(migrated_session_ids.contains(&"a-session-1"));
1222 assert!(migrated_session_ids.contains(&"b-session-0"));
1223 assert!(migrated_session_ids.contains(&"projectless"));
1224
1225 let migrated_entries = list
1226 .iter()
1227 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1228 .collect::<Vec<_>>();
1229 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1230 }
1231
1232 #[gpui::test]
1233 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1234 cx: &mut TestAppContext,
1235 ) {
1236 init_test(cx);
1237
1238 let project_paths = PathList::new(&[Path::new("/project-a")]);
1239 let existing_updated_at = Utc::now();
1240
1241 let existing_metadata = ThreadMetadata {
1242 session_id: acp::SessionId::new("existing-session"),
1243 agent_id: agent::ZED_AGENT_ID.clone(),
1244 title: "Existing Metadata".into(),
1245 updated_at: existing_updated_at,
1246 created_at: Some(existing_updated_at),
1247 folder_paths: project_paths.clone(),
1248 archived: false,
1249 };
1250
1251 cx.update(|cx| {
1252 let store = ThreadMetadataStore::global(cx);
1253 store.update(cx, |store, cx| {
1254 store.save(existing_metadata, cx);
1255 });
1256 });
1257 cx.run_until_parked();
1258
1259 let save_task = cx.update(|cx| {
1260 let thread_store = ThreadStore::global(cx);
1261 thread_store.update(cx, |store, cx| {
1262 store.save_thread(
1263 acp::SessionId::new("existing-session"),
1264 make_db_thread(
1265 "Updated Native Thread Title",
1266 existing_updated_at + chrono::Duration::seconds(1),
1267 ),
1268 project_paths.clone(),
1269 cx,
1270 )
1271 })
1272 });
1273 save_task.await.unwrap();
1274 cx.run_until_parked();
1275
1276 cx.update(|cx| migrate_thread_metadata(cx));
1277 cx.run_until_parked();
1278
1279 let list = cx.update(|cx| {
1280 let store = ThreadMetadataStore::global(cx);
1281 store.read(cx).entries().cloned().collect::<Vec<_>>()
1282 });
1283
1284 assert_eq!(list.len(), 1);
1285 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1286 }
1287
1288 #[gpui::test]
1289 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1290 cx: &mut TestAppContext,
1291 ) {
1292 init_test(cx);
1293
1294 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1295 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1296 let now = Utc::now();
1297
1298 // Create 7 threads for project A and 3 for project B
1299 let mut threads_to_save = Vec::new();
1300 for i in 0..7 {
1301 threads_to_save.push((
1302 format!("a-session-{i}"),
1303 format!("Thread A{i}"),
1304 project_a_paths.clone(),
1305 now + chrono::Duration::seconds(i as i64),
1306 ));
1307 }
1308 for i in 0..3 {
1309 threads_to_save.push((
1310 format!("b-session-{i}"),
1311 format!("Thread B{i}"),
1312 project_b_paths.clone(),
1313 now + chrono::Duration::seconds(i as i64),
1314 ));
1315 }
1316
1317 for (session_id, title, paths, updated_at) in &threads_to_save {
1318 let save_task = cx.update(|cx| {
1319 let thread_store = ThreadStore::global(cx);
1320 let session_id = session_id.to_string();
1321 let title = title.to_string();
1322 let paths = paths.clone();
1323 thread_store.update(cx, |store, cx| {
1324 store.save_thread(
1325 acp::SessionId::new(session_id),
1326 make_db_thread(&title, *updated_at),
1327 paths,
1328 cx,
1329 )
1330 })
1331 });
1332 save_task.await.unwrap();
1333 cx.run_until_parked();
1334 }
1335
1336 cx.update(|cx| migrate_thread_metadata(cx));
1337 cx.run_until_parked();
1338
1339 let list = cx.update(|cx| {
1340 let store = ThreadMetadataStore::global(cx);
1341 store.read(cx).entries().cloned().collect::<Vec<_>>()
1342 });
1343
1344 assert_eq!(list.len(), 10);
1345
1346 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1347 let mut project_a_entries: Vec<_> = list
1348 .iter()
1349 .filter(|m| m.folder_paths == project_a_paths)
1350 .collect();
1351 assert_eq!(project_a_entries.len(), 7);
1352 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1353
1354 for entry in &project_a_entries[..5] {
1355 assert!(
1356 !entry.archived,
1357 "Expected {} to be unarchived (top 5 most recent)",
1358 entry.session_id.0
1359 );
1360 }
1361 for entry in &project_a_entries[5..] {
1362 assert!(
1363 entry.archived,
1364 "Expected {} to be archived (older than top 5)",
1365 entry.session_id.0
1366 );
1367 }
1368
1369 // Project B: all 3 should be unarchived (under the limit)
1370 let project_b_entries: Vec<_> = list
1371 .iter()
1372 .filter(|m| m.folder_paths == project_b_paths)
1373 .collect();
1374 assert_eq!(project_b_entries.len(), 3);
1375 assert!(project_b_entries.iter().all(|m| !m.archived));
1376 }
1377
1378 #[gpui::test]
1379 async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1380 init_test(cx);
1381
1382 let fs = FakeFs::new(cx.executor());
1383 let project = Project::test(fs, None::<&Path>, cx).await;
1384 let connection = Rc::new(StubAgentConnection::new());
1385
1386 let thread = cx
1387 .update(|cx| {
1388 connection
1389 .clone()
1390 .new_session(project.clone(), PathList::default(), cx)
1391 })
1392 .await
1393 .unwrap();
1394 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1395
1396 cx.update(|cx| {
1397 thread.update(cx, |thread, cx| {
1398 thread.set_title("Draft Thread".into(), cx).detach();
1399 });
1400 });
1401 cx.run_until_parked();
1402
1403 let metadata_ids = cx.update(|cx| {
1404 ThreadMetadataStore::global(cx)
1405 .read(cx)
1406 .entry_ids()
1407 .collect::<Vec<_>>()
1408 });
1409 assert_eq!(metadata_ids, vec![session_id]);
1410
1411 drop(thread);
1412 cx.update(|_| {});
1413 cx.run_until_parked();
1414 cx.run_until_parked();
1415
1416 let metadata_ids = cx.update(|cx| {
1417 ThreadMetadataStore::global(cx)
1418 .read(cx)
1419 .entry_ids()
1420 .collect::<Vec<_>>()
1421 });
1422 assert!(
1423 metadata_ids.is_empty(),
1424 "expected empty draft thread metadata to be deleted on release"
1425 );
1426 }
1427
1428 #[gpui::test]
1429 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1430 init_test(cx);
1431
1432 let fs = FakeFs::new(cx.executor());
1433 let project = Project::test(fs, None::<&Path>, cx).await;
1434 let connection = Rc::new(StubAgentConnection::new());
1435
1436 let thread = cx
1437 .update(|cx| {
1438 connection
1439 .clone()
1440 .new_session(project.clone(), PathList::default(), cx)
1441 })
1442 .await
1443 .unwrap();
1444 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1445
1446 cx.update(|cx| {
1447 thread.update(cx, |thread, cx| {
1448 thread.push_user_content_block(None, "Hello".into(), cx);
1449 });
1450 });
1451 cx.run_until_parked();
1452
1453 let metadata_ids = cx.update(|cx| {
1454 ThreadMetadataStore::global(cx)
1455 .read(cx)
1456 .entry_ids()
1457 .collect::<Vec<_>>()
1458 });
1459 assert_eq!(metadata_ids, vec![session_id.clone()]);
1460
1461 drop(thread);
1462 cx.update(|_| {});
1463 cx.run_until_parked();
1464
1465 let metadata_ids = cx.update(|cx| {
1466 ThreadMetadataStore::global(cx)
1467 .read(cx)
1468 .entry_ids()
1469 .collect::<Vec<_>>()
1470 });
1471 assert_eq!(metadata_ids, vec![session_id]);
1472 }
1473
1474 #[gpui::test]
1475 async fn test_threads_without_project_association_are_archived_by_default(
1476 cx: &mut TestAppContext,
1477 ) {
1478 init_test(cx);
1479
1480 let fs = FakeFs::new(cx.executor());
1481 let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1482 let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1483 let connection = Rc::new(StubAgentConnection::new());
1484
1485 let thread_without_worktree = cx
1486 .update(|cx| {
1487 connection.clone().new_session(
1488 project_without_worktree.clone(),
1489 PathList::default(),
1490 cx,
1491 )
1492 })
1493 .await
1494 .unwrap();
1495 let session_without_worktree =
1496 cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1497
1498 cx.update(|cx| {
1499 thread_without_worktree.update(cx, |thread, cx| {
1500 thread.set_title("No Project Thread".into(), cx).detach();
1501 });
1502 });
1503 cx.run_until_parked();
1504
1505 let thread_with_worktree = cx
1506 .update(|cx| {
1507 connection.clone().new_session(
1508 project_with_worktree.clone(),
1509 PathList::default(),
1510 cx,
1511 )
1512 })
1513 .await
1514 .unwrap();
1515 let session_with_worktree =
1516 cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1517
1518 cx.update(|cx| {
1519 thread_with_worktree.update(cx, |thread, cx| {
1520 thread.set_title("Project Thread".into(), cx).detach();
1521 });
1522 });
1523 cx.run_until_parked();
1524
1525 cx.update(|cx| {
1526 let store = ThreadMetadataStore::global(cx);
1527 let store = store.read(cx);
1528
1529 let without_worktree = store
1530 .entry(&session_without_worktree)
1531 .expect("missing metadata for thread without project association");
1532 assert!(without_worktree.folder_paths.is_empty());
1533 assert!(
1534 without_worktree.archived,
1535 "expected thread without project association to be archived"
1536 );
1537
1538 let with_worktree = store
1539 .entry(&session_with_worktree)
1540 .expect("missing metadata for thread with project association");
1541 assert_eq!(
1542 with_worktree.folder_paths,
1543 PathList::new(&[Path::new("/project-a")])
1544 );
1545 assert!(
1546 !with_worktree.archived,
1547 "expected thread with project association to remain unarchived"
1548 );
1549 });
1550 }
1551
1552 #[gpui::test]
1553 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1554 init_test(cx);
1555
1556 let fs = FakeFs::new(cx.executor());
1557 let project = Project::test(fs, None::<&Path>, cx).await;
1558 let connection = Rc::new(StubAgentConnection::new());
1559
1560 // Create a regular (non-subagent) AcpThread.
1561 let regular_thread = cx
1562 .update(|cx| {
1563 connection
1564 .clone()
1565 .new_session(project.clone(), PathList::default(), cx)
1566 })
1567 .await
1568 .unwrap();
1569
1570 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1571
1572 // Set a title on the regular thread to trigger a save via handle_thread_update.
1573 cx.update(|cx| {
1574 regular_thread.update(cx, |thread, cx| {
1575 thread.set_title("Regular Thread".into(), cx).detach();
1576 });
1577 });
1578 cx.run_until_parked();
1579
1580 // Create a subagent AcpThread
1581 let subagent_session_id = acp::SessionId::new("subagent-session");
1582 let subagent_thread = cx.update(|cx| {
1583 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1584 cx.new(|cx| {
1585 acp_thread::AcpThread::new(
1586 Some(regular_session_id.clone()),
1587 Some("Subagent Thread".into()),
1588 None,
1589 connection.clone(),
1590 project.clone(),
1591 action_log,
1592 subagent_session_id.clone(),
1593 watch::Receiver::constant(acp::PromptCapabilities::new()),
1594 cx,
1595 )
1596 })
1597 });
1598
1599 // Set a title on the subagent thread to trigger handle_thread_update.
1600 cx.update(|cx| {
1601 subagent_thread.update(cx, |thread, cx| {
1602 thread
1603 .set_title("Subagent Thread Title".into(), cx)
1604 .detach();
1605 });
1606 });
1607 cx.run_until_parked();
1608
1609 // List all metadata from the store cache.
1610 let list = cx.update(|cx| {
1611 let store = ThreadMetadataStore::global(cx);
1612 store.read(cx).entries().cloned().collect::<Vec<_>>()
1613 });
1614
1615 // The subagent thread should NOT appear in the sidebar metadata.
1616 // Only the regular thread should be listed.
1617 assert_eq!(
1618 list.len(),
1619 1,
1620 "Expected only the regular thread in sidebar metadata, \
1621 but found {} entries (subagent threads are leaking into the sidebar)",
1622 list.len(),
1623 );
1624 assert_eq!(list[0].session_id, regular_session_id);
1625 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1626 }
1627
1628 #[test]
1629 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1630 let now = Utc::now();
1631
1632 let operations = vec![
1633 DbOperation::Upsert(make_metadata(
1634 "session-1",
1635 "First Thread",
1636 now,
1637 PathList::default(),
1638 )),
1639 DbOperation::Delete(acp::SessionId::new("session-1")),
1640 ];
1641
1642 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1643
1644 assert_eq!(deduped.len(), 1);
1645 assert_eq!(
1646 deduped[0],
1647 DbOperation::Delete(acp::SessionId::new("session-1"))
1648 );
1649 }
1650
1651 #[test]
1652 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1653 let now = Utc::now();
1654 let later = now + chrono::Duration::seconds(1);
1655
1656 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1657 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1658
1659 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1660 DbOperation::Upsert(old_metadata),
1661 DbOperation::Upsert(new_metadata.clone()),
1662 ]);
1663
1664 assert_eq!(deduped.len(), 1);
1665 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1666 }
1667
1668 #[test]
1669 fn test_dedup_db_operations_preserves_distinct_sessions() {
1670 let now = Utc::now();
1671
1672 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1673 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1674 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1675 DbOperation::Upsert(metadata1.clone()),
1676 DbOperation::Upsert(metadata2.clone()),
1677 ]);
1678
1679 assert_eq!(deduped.len(), 2);
1680 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1681 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1682 }
1683
1684 #[gpui::test]
1685 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1686 init_test(cx);
1687
1688 let paths = PathList::new(&[Path::new("/project-a")]);
1689 let now = Utc::now();
1690 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1691
1692 cx.update(|cx| {
1693 let store = ThreadMetadataStore::global(cx);
1694 store.update(cx, |store, cx| {
1695 store.save(metadata, cx);
1696 });
1697 });
1698
1699 cx.run_until_parked();
1700
1701 cx.update(|cx| {
1702 let store = ThreadMetadataStore::global(cx);
1703 let store = store.read(cx);
1704
1705 let path_entries = store
1706 .entries_for_path(&paths)
1707 .map(|e| e.session_id.0.to_string())
1708 .collect::<Vec<_>>();
1709 assert_eq!(path_entries, vec!["session-1"]);
1710
1711 let archived = store
1712 .archived_entries()
1713 .map(|e| e.session_id.0.to_string())
1714 .collect::<Vec<_>>();
1715 assert!(archived.is_empty());
1716 });
1717
1718 cx.update(|cx| {
1719 let store = ThreadMetadataStore::global(cx);
1720 store.update(cx, |store, cx| {
1721 store.archive(&acp::SessionId::new("session-1"), cx);
1722 });
1723 });
1724
1725 cx.run_until_parked();
1726
1727 cx.update(|cx| {
1728 let store = ThreadMetadataStore::global(cx);
1729 let store = store.read(cx);
1730
1731 let path_entries = store
1732 .entries_for_path(&paths)
1733 .map(|e| e.session_id.0.to_string())
1734 .collect::<Vec<_>>();
1735 assert!(path_entries.is_empty());
1736
1737 let archived = store.archived_entries().collect::<Vec<_>>();
1738 assert_eq!(archived.len(), 1);
1739 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1740 assert!(archived[0].archived);
1741 });
1742
1743 cx.update(|cx| {
1744 let store = ThreadMetadataStore::global(cx);
1745 store.update(cx, |store, cx| {
1746 store.unarchive(&acp::SessionId::new("session-1"), cx);
1747 });
1748 });
1749
1750 cx.run_until_parked();
1751
1752 cx.update(|cx| {
1753 let store = ThreadMetadataStore::global(cx);
1754 let store = store.read(cx);
1755
1756 let path_entries = store
1757 .entries_for_path(&paths)
1758 .map(|e| e.session_id.0.to_string())
1759 .collect::<Vec<_>>();
1760 assert_eq!(path_entries, vec!["session-1"]);
1761
1762 let archived = store
1763 .archived_entries()
1764 .map(|e| e.session_id.0.to_string())
1765 .collect::<Vec<_>>();
1766 assert!(archived.is_empty());
1767 });
1768 }
1769
1770 #[gpui::test]
1771 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1772 init_test(cx);
1773
1774 let paths = PathList::new(&[Path::new("/project-a")]);
1775 let now = Utc::now();
1776
1777 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1778 let metadata2 = make_metadata(
1779 "session-2",
1780 "Archived Thread",
1781 now - chrono::Duration::seconds(1),
1782 paths.clone(),
1783 );
1784
1785 cx.update(|cx| {
1786 let store = ThreadMetadataStore::global(cx);
1787 store.update(cx, |store, cx| {
1788 store.save(metadata1, cx);
1789 store.save(metadata2, cx);
1790 });
1791 });
1792
1793 cx.run_until_parked();
1794
1795 cx.update(|cx| {
1796 let store = ThreadMetadataStore::global(cx);
1797 store.update(cx, |store, cx| {
1798 store.archive(&acp::SessionId::new("session-2"), cx);
1799 });
1800 });
1801
1802 cx.run_until_parked();
1803
1804 cx.update(|cx| {
1805 let store = ThreadMetadataStore::global(cx);
1806 let store = store.read(cx);
1807
1808 let path_entries = store
1809 .entries_for_path(&paths)
1810 .map(|e| e.session_id.0.to_string())
1811 .collect::<Vec<_>>();
1812 assert_eq!(path_entries, vec!["session-1"]);
1813
1814 let all_entries = store
1815 .entries()
1816 .map(|e| e.session_id.0.to_string())
1817 .collect::<Vec<_>>();
1818 assert_eq!(all_entries.len(), 2);
1819 assert!(all_entries.contains(&"session-1".to_string()));
1820 assert!(all_entries.contains(&"session-2".to_string()));
1821
1822 let archived = store
1823 .archived_entries()
1824 .map(|e| e.session_id.0.to_string())
1825 .collect::<Vec<_>>();
1826 assert_eq!(archived, vec!["session-2"]);
1827 });
1828 }
1829
1830 #[gpui::test]
1831 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1832 init_test(cx);
1833
1834 let paths = PathList::new(&[Path::new("/project-a")]);
1835 let now = Utc::now();
1836
1837 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1838 let m2 = make_metadata(
1839 "session-2",
1840 "Thread Two",
1841 now - chrono::Duration::seconds(1),
1842 paths.clone(),
1843 );
1844 let m3 = make_metadata(
1845 "session-3",
1846 "Thread Three",
1847 now - chrono::Duration::seconds(2),
1848 paths,
1849 );
1850
1851 cx.update(|cx| {
1852 let store = ThreadMetadataStore::global(cx);
1853 store.update(cx, |store, cx| {
1854 store.save_all(vec![m1, m2, m3], cx);
1855 });
1856 });
1857
1858 cx.run_until_parked();
1859
1860 cx.update(|cx| {
1861 let store = ThreadMetadataStore::global(cx);
1862 let store = store.read(cx);
1863
1864 let all_entries = store
1865 .entries()
1866 .map(|e| e.session_id.0.to_string())
1867 .collect::<Vec<_>>();
1868 assert_eq!(all_entries.len(), 3);
1869 assert!(all_entries.contains(&"session-1".to_string()));
1870 assert!(all_entries.contains(&"session-2".to_string()));
1871 assert!(all_entries.contains(&"session-3".to_string()));
1872
1873 let entry_ids = store.entry_ids().collect::<Vec<_>>();
1874 assert_eq!(entry_ids.len(), 3);
1875 });
1876 }
1877
1878 #[gpui::test]
1879 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1880 init_test(cx);
1881
1882 let paths = PathList::new(&[Path::new("/project-a")]);
1883 let now = Utc::now();
1884 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1885
1886 cx.update(|cx| {
1887 let store = ThreadMetadataStore::global(cx);
1888 store.update(cx, |store, cx| {
1889 store.save(metadata, cx);
1890 });
1891 });
1892
1893 cx.run_until_parked();
1894
1895 cx.update(|cx| {
1896 let store = ThreadMetadataStore::global(cx);
1897 store.update(cx, |store, cx| {
1898 store.archive(&acp::SessionId::new("session-1"), cx);
1899 });
1900 });
1901
1902 cx.run_until_parked();
1903
1904 cx.update(|cx| {
1905 let store = ThreadMetadataStore::global(cx);
1906 store.update(cx, |store, cx| {
1907 let _ = store.reload(cx);
1908 });
1909 });
1910
1911 cx.run_until_parked();
1912
1913 cx.update(|cx| {
1914 let store = ThreadMetadataStore::global(cx);
1915 let store = store.read(cx);
1916
1917 let thread = store
1918 .entries()
1919 .find(|e| e.session_id.0.as_ref() == "session-1")
1920 .expect("thread should exist after reload");
1921 assert!(thread.archived);
1922
1923 let path_entries = store
1924 .entries_for_path(&paths)
1925 .map(|e| e.session_id.0.to_string())
1926 .collect::<Vec<_>>();
1927 assert!(path_entries.is_empty());
1928
1929 let archived = store
1930 .archived_entries()
1931 .map(|e| e.session_id.0.to_string())
1932 .collect::<Vec<_>>();
1933 assert_eq!(archived, vec!["session-1"]);
1934 });
1935 }
1936
1937 #[gpui::test]
1938 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1939 init_test(cx);
1940
1941 cx.run_until_parked();
1942
1943 cx.update(|cx| {
1944 let store = ThreadMetadataStore::global(cx);
1945 store.update(cx, |store, cx| {
1946 store.archive(&acp::SessionId::new("nonexistent"), cx);
1947 });
1948 });
1949
1950 cx.run_until_parked();
1951
1952 cx.update(|cx| {
1953 let store = ThreadMetadataStore::global(cx);
1954 let store = store.read(cx);
1955
1956 assert!(store.is_empty());
1957 assert_eq!(store.entries().count(), 0);
1958 assert_eq!(store.archived_entries().count(), 0);
1959 });
1960 }
1961
1962 #[gpui::test]
1963 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1964 init_test(cx);
1965
1966 let paths = PathList::new(&[Path::new("/project-a")]);
1967 let now = Utc::now();
1968 let metadata = make_metadata("session-1", "Thread 1", now, paths);
1969 let session_id = metadata.session_id.clone();
1970
1971 cx.update(|cx| {
1972 let store = ThreadMetadataStore::global(cx);
1973 store.update(cx, |store, cx| {
1974 store.save(metadata.clone(), cx);
1975 store.archive(&session_id, cx);
1976 });
1977 });
1978
1979 cx.run_until_parked();
1980
1981 cx.update(|cx| {
1982 let store = ThreadMetadataStore::global(cx);
1983 let store = store.read(cx);
1984
1985 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1986 pretty_assertions::assert_eq!(
1987 entries,
1988 vec![ThreadMetadata {
1989 archived: true,
1990 ..metadata
1991 }]
1992 );
1993 });
1994 }
1995}