1use std::{
2 path::{Path, PathBuf},
3 sync::Arc,
4};
5
6use acp_thread::AcpThreadEvent;
7use agent::{ThreadStore, ZED_AGENT_ID};
8use agent_client_protocol as acp;
9use anyhow::Context as _;
10use chrono::{DateTime, Utc};
11use collections::{HashMap, HashSet};
12use db::{
13 kvp::KeyValueStore,
14 sqlez::{
15 bindable::Column, domain::Domain, statement::Statement,
16 thread_safe_connection::ThreadSafeConnection,
17 },
18 sqlez_macros::sql,
19};
20use fs::Fs;
21use futures::{FutureExt, future::Shared};
22use gpui::{AppContext as _, Entity, Global, Subscription, Task};
23use project::AgentId;
24use remote::RemoteConnectionOptions;
25use ui::{App, Context, SharedString};
26use util::ResultExt as _;
27use workspace::{PathList, SerializedWorkspaceLocation, WorkspaceDb};
28
29use crate::DEFAULT_THREAD_TITLE;
30
31const THREAD_REMOTE_CONNECTION_MIGRATION_KEY: &str = "thread-metadata-remote-connection-backfill";
32
33pub fn init(cx: &mut App) {
34 ThreadMetadataStore::init_global(cx);
35 let migration_task = migrate_thread_metadata(cx);
36 migrate_thread_remote_connections(cx, migration_task);
37}
38
39/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
40/// We skip migrating threads that do not have a project.
41///
42/// TODO: Remove this after N weeks of shipping the sidebar
43fn migrate_thread_metadata(cx: &mut App) -> Task<anyhow::Result<()>> {
44 let store = ThreadMetadataStore::global(cx);
45 let db = store.read(cx).db.clone();
46
47 cx.spawn(async move |cx| {
48 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
49
50 let is_first_migration = existing_entries.is_empty();
51
52 let mut to_migrate = store.read_with(cx, |_store, cx| {
53 ThreadStore::global(cx)
54 .read(cx)
55 .entries()
56 .filter_map(|entry| {
57 if existing_entries.contains(&entry.id.0) {
58 return None;
59 }
60
61 Some(ThreadMetadata {
62 session_id: entry.id,
63 agent_id: ZED_AGENT_ID.clone(),
64 title: entry.title,
65 updated_at: entry.updated_at,
66 created_at: entry.created_at,
67 folder_paths: entry.folder_paths,
68 main_worktree_paths: PathList::default(),
69 remote_connection: None,
70 archived: true,
71 })
72 })
73 .collect::<Vec<_>>()
74 });
75
76 if to_migrate.is_empty() {
77 return anyhow::Ok(());
78 }
79
80 // On the first migration (no entries in DB yet), keep the 5 most
81 // recent threads per project unarchived.
82 if is_first_migration {
83 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
84 for entry in &mut to_migrate {
85 if entry.folder_paths.is_empty() {
86 continue;
87 }
88 per_project
89 .entry(entry.folder_paths.clone())
90 .or_default()
91 .push(entry);
92 }
93 for entries in per_project.values_mut() {
94 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
95 for entry in entries.iter_mut().take(5) {
96 entry.archived = false;
97 }
98 }
99 }
100
101 log::info!("Migrating {} thread store entries", to_migrate.len());
102
103 // Manually save each entry to the database and call reload, otherwise
104 // we'll end up triggering lots of reloads after each save
105 for entry in to_migrate {
106 db.save(entry).await?;
107 }
108
109 log::info!("Finished migrating thread store entries");
110
111 let _ = store.update(cx, |store, cx| store.reload(cx));
112 anyhow::Ok(())
113 })
114}
115
116fn migrate_thread_remote_connections(cx: &mut App, migration_task: Task<anyhow::Result<()>>) {
117 let store = ThreadMetadataStore::global(cx);
118 let db = store.read(cx).db.clone();
119 let kvp = KeyValueStore::global(cx);
120 let workspace_db = WorkspaceDb::global(cx);
121 let fs = <dyn Fs>::global(cx);
122
123 cx.spawn(async move |cx| -> anyhow::Result<()> {
124 migration_task.await?;
125
126 if kvp
127 .read_kvp(THREAD_REMOTE_CONNECTION_MIGRATION_KEY)?
128 .is_some()
129 {
130 return Ok(());
131 }
132
133 let recent_workspaces = workspace_db.recent_workspaces_on_disk(fs.as_ref()).await?;
134
135 let mut local_path_lists = HashSet::<PathList>::default();
136 let mut remote_path_lists = HashMap::<PathList, RemoteConnectionOptions>::default();
137
138 recent_workspaces
139 .iter()
140 .filter(|(_, location, path_list, _)| {
141 !path_list.is_empty() && matches!(location, &SerializedWorkspaceLocation::Local)
142 })
143 .for_each(|(_, _, path_list, _)| {
144 local_path_lists.insert(path_list.clone());
145 });
146
147 for (_, location, path_list, _) in recent_workspaces {
148 match location {
149 SerializedWorkspaceLocation::Remote(remote_connection)
150 if !local_path_lists.contains(&path_list) =>
151 {
152 remote_path_lists
153 .entry(path_list)
154 .or_insert(remote_connection);
155 }
156 _ => {}
157 }
158 }
159
160 let mut reloaded = false;
161 for metadata in db.list()? {
162 if metadata.remote_connection.is_some() {
163 continue;
164 }
165
166 if let Some(remote_connection) = remote_path_lists
167 .get(&metadata.folder_paths)
168 .or_else(|| remote_path_lists.get(&metadata.main_worktree_paths))
169 {
170 db.save(ThreadMetadata {
171 remote_connection: Some(remote_connection.clone()),
172 ..metadata
173 })
174 .await?;
175 reloaded = true;
176 }
177 }
178
179 let reloaded_task = reloaded
180 .then_some(store.update(cx, |store, cx| store.reload(cx)))
181 .unwrap_or(Task::ready(()).shared());
182
183 kvp.write_kvp(
184 THREAD_REMOTE_CONNECTION_MIGRATION_KEY.to_string(),
185 "1".to_string(),
186 )
187 .await?;
188 reloaded_task.await;
189
190 Ok(())
191 })
192 .detach_and_log_err(cx);
193}
194
195struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
196impl Global for GlobalThreadMetadataStore {}
197
198/// Lightweight metadata for any thread (native or ACP), enough to populate
199/// the sidebar list and route to the correct load path when clicked.
200#[derive(Debug, Clone, PartialEq)]
201pub struct ThreadMetadata {
202 pub session_id: acp::SessionId,
203 pub agent_id: AgentId,
204 pub title: SharedString,
205 pub updated_at: DateTime<Utc>,
206 pub created_at: Option<DateTime<Utc>>,
207 pub folder_paths: PathList,
208 pub main_worktree_paths: PathList,
209 pub remote_connection: Option<RemoteConnectionOptions>,
210 pub archived: bool,
211}
212
213impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
214 fn from(meta: &ThreadMetadata) -> Self {
215 Self {
216 session_id: meta.session_id.clone(),
217 work_dirs: Some(meta.folder_paths.clone()),
218 title: Some(meta.title.clone()),
219 updated_at: Some(meta.updated_at),
220 created_at: meta.created_at,
221 meta: None,
222 }
223 }
224}
225
226/// Record of a git worktree that was archived (deleted from disk) when its
227/// last thread was archived.
228pub struct ArchivedGitWorktree {
229 /// Auto-incrementing primary key.
230 pub id: i64,
231 /// Absolute path to the directory of the worktree before it was deleted.
232 /// Used when restoring, to put the recreated worktree back where it was.
233 /// If the path already exists on disk, the worktree is assumed to be
234 /// already restored and is used as-is.
235 pub worktree_path: PathBuf,
236 /// Absolute path of the main repository ("main worktree") that owned this worktree.
237 /// Used when restoring, to reattach the recreated worktree to the correct main repo.
238 /// If the main repo isn't found on disk, unarchiving fails because we only store
239 /// commit hashes, and without the actual git repo being available, we can't restore
240 /// the files.
241 pub main_repo_path: PathBuf,
242 /// Branch that was checked out in the worktree at archive time. `None` if
243 /// the worktree was in detached HEAD state, which isn't supported in Zed, but
244 /// could happen if the user made a detached one outside of Zed.
245 /// On restore, we try to switch to this branch. If that fails (e.g. it's
246 /// checked out elsewhere), we auto-generate a new one.
247 pub branch_name: Option<String>,
248 /// SHA of the WIP commit that captures files that were staged (but not yet
249 /// committed) at the time of archiving. This commit can be empty if the
250 /// user had no staged files at the time. It sits directly on top of whatever
251 /// the user's last actual commit was.
252 pub staged_commit_hash: String,
253 /// SHA of the WIP commit that captures files that were unstaged (including
254 /// untracked) at the time of archiving. This commit can be empty if the user
255 /// had no unstaged files at the time. It sits on top of `staged_commit_hash`.
256 /// After doing `git reset` past both of these commits, we're back in the state
257 /// we had before archiving, including what was staged, what was unstaged, and
258 /// what was committed.
259 pub unstaged_commit_hash: String,
260 /// SHA of the commit that HEAD pointed at before we created the two WIP
261 /// commits during archival. After resetting past the WIP commits during
262 /// restore, HEAD should land back on this commit. It also serves as a
263 /// pre-restore sanity check (abort if this commit no longer exists in the
264 /// repo) and as a fallback target if the WIP resets fail.
265 pub original_commit_hash: String,
266}
267
268/// The store holds all metadata needed to show threads in the sidebar/the archive.
269///
270/// Automatically listens to AcpThread events and updates metadata if it has changed.
271pub struct ThreadMetadataStore {
272 db: ThreadMetadataDb,
273 threads: HashMap<acp::SessionId, ThreadMetadata>,
274 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
275 threads_by_main_paths: HashMap<PathList, HashSet<acp::SessionId>>,
276 reload_task: Option<Shared<Task<()>>>,
277 session_subscriptions: HashMap<acp::SessionId, Subscription>,
278 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
279 in_flight_archives: HashMap<acp::SessionId, (Task<()>, smol::channel::Sender<()>)>,
280 _db_operations_task: Task<()>,
281}
282
283#[derive(Debug, PartialEq)]
284enum DbOperation {
285 Upsert(ThreadMetadata),
286 Delete(acp::SessionId),
287}
288
289impl DbOperation {
290 fn id(&self) -> &acp::SessionId {
291 match self {
292 DbOperation::Upsert(thread) => &thread.session_id,
293 DbOperation::Delete(session_id) => session_id,
294 }
295 }
296}
297
298impl ThreadMetadataStore {
299 #[cfg(not(any(test, feature = "test-support")))]
300 pub fn init_global(cx: &mut App) {
301 if cx.has_global::<Self>() {
302 return;
303 }
304
305 let db = ThreadMetadataDb::global(cx);
306 let thread_store = cx.new(|cx| Self::new(db, cx));
307 cx.set_global(GlobalThreadMetadataStore(thread_store));
308 }
309
310 #[cfg(any(test, feature = "test-support"))]
311 pub fn init_global(cx: &mut App) {
312 let thread = std::thread::current();
313 let test_name = thread.name().unwrap_or("unknown_test");
314 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
315 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
316 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
317 cx.set_global(GlobalThreadMetadataStore(thread_store));
318 }
319
320 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
321 cx.try_global::<GlobalThreadMetadataStore>()
322 .map(|store| store.0.clone())
323 }
324
325 pub fn global(cx: &App) -> Entity<Self> {
326 cx.global::<GlobalThreadMetadataStore>().0.clone()
327 }
328
329 pub fn is_empty(&self) -> bool {
330 self.threads.is_empty()
331 }
332
333 /// Returns all thread IDs.
334 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
335 self.threads.keys().cloned()
336 }
337
338 /// Returns the metadata for a specific thread, if it exists.
339 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
340 self.threads.get(session_id)
341 }
342
343 /// Returns all threads.
344 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
345 self.threads.values()
346 }
347
348 /// Returns all archived threads.
349 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
350 self.entries().filter(|t| t.archived)
351 }
352
353 /// Returns all threads for the given path list, excluding archived threads.
354 pub fn entries_for_path(
355 &self,
356 path_list: &PathList,
357 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
358 self.threads_by_paths
359 .get(path_list)
360 .into_iter()
361 .flatten()
362 .filter_map(|s| self.threads.get(s))
363 .filter(|s| !s.archived)
364 }
365
366 /// Returns threads whose `main_worktree_paths` matches the given path list,
367 /// excluding archived threads. This finds threads that were opened in a
368 /// linked worktree but are associated with the given main worktree.
369 pub fn entries_for_main_worktree_path(
370 &self,
371 path_list: &PathList,
372 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
373 self.threads_by_main_paths
374 .get(path_list)
375 .into_iter()
376 .flatten()
377 .filter_map(|s| self.threads.get(s))
378 .filter(|s| !s.archived)
379 }
380
381 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
382 let db = self.db.clone();
383 self.reload_task.take();
384
385 let list_task = cx
386 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
387
388 let reload_task = cx
389 .spawn(async move |this, cx| {
390 let Some(rows) = list_task.await.log_err() else {
391 return;
392 };
393
394 this.update(cx, |this, cx| {
395 this.threads.clear();
396 this.threads_by_paths.clear();
397 this.threads_by_main_paths.clear();
398
399 for row in rows {
400 this.threads_by_paths
401 .entry(row.folder_paths.clone())
402 .or_default()
403 .insert(row.session_id.clone());
404 if !row.main_worktree_paths.is_empty() {
405 this.threads_by_main_paths
406 .entry(row.main_worktree_paths.clone())
407 .or_default()
408 .insert(row.session_id.clone());
409 }
410 this.threads.insert(row.session_id.clone(), row);
411 }
412
413 cx.notify();
414 })
415 .ok();
416 })
417 .shared();
418 self.reload_task = Some(reload_task.clone());
419 reload_task
420 }
421
422 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
423 for metadata in metadata {
424 self.save_internal(metadata);
425 }
426 cx.notify();
427 }
428
429 #[cfg(any(test, feature = "test-support"))]
430 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
431 self.save(metadata, cx)
432 }
433
434 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
435 self.save_internal(metadata);
436 cx.notify();
437 }
438
439 fn save_internal(&mut self, metadata: ThreadMetadata) {
440 if let Some(thread) = self.threads.get(&metadata.session_id) {
441 if thread.folder_paths != metadata.folder_paths {
442 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
443 session_ids.remove(&metadata.session_id);
444 }
445 }
446 if thread.main_worktree_paths != metadata.main_worktree_paths
447 && !thread.main_worktree_paths.is_empty()
448 {
449 if let Some(session_ids) = self
450 .threads_by_main_paths
451 .get_mut(&thread.main_worktree_paths)
452 {
453 session_ids.remove(&metadata.session_id);
454 }
455 }
456 }
457
458 self.threads
459 .insert(metadata.session_id.clone(), metadata.clone());
460
461 self.threads_by_paths
462 .entry(metadata.folder_paths.clone())
463 .or_default()
464 .insert(metadata.session_id.clone());
465
466 if !metadata.main_worktree_paths.is_empty() {
467 self.threads_by_main_paths
468 .entry(metadata.main_worktree_paths.clone())
469 .or_default()
470 .insert(metadata.session_id.clone());
471 }
472
473 self.pending_thread_ops_tx
474 .try_send(DbOperation::Upsert(metadata))
475 .log_err();
476 }
477
478 pub fn update_working_directories(
479 &mut self,
480 session_id: &acp::SessionId,
481 work_dirs: PathList,
482 cx: &mut Context<Self>,
483 ) {
484 if let Some(thread) = self.threads.get(session_id) {
485 self.save_internal(ThreadMetadata {
486 folder_paths: work_dirs,
487 ..thread.clone()
488 });
489 cx.notify();
490 }
491 }
492
493 pub fn archive(
494 &mut self,
495 session_id: &acp::SessionId,
496 archive_job: Option<(Task<()>, smol::channel::Sender<()>)>,
497 cx: &mut Context<Self>,
498 ) {
499 self.update_archived(session_id, true, cx);
500
501 if let Some(job) = archive_job {
502 self.in_flight_archives.insert(session_id.clone(), job);
503 }
504 }
505
506 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
507 self.update_archived(session_id, false, cx);
508 // Dropping the Sender triggers cancellation in the background task.
509 self.in_flight_archives.remove(session_id);
510 }
511
512 pub fn cleanup_completed_archive(&mut self, session_id: &acp::SessionId) {
513 self.in_flight_archives.remove(session_id);
514 }
515
516 /// Updates a thread's `folder_paths` after an archived worktree has been
517 /// restored to disk. The restored worktree may land at a different path
518 /// than it had before archival, so each `(old_path, new_path)` pair in
519 /// `path_replacements` is applied to the thread's stored folder paths.
520 pub fn update_restored_worktree_paths(
521 &mut self,
522 session_id: &acp::SessionId,
523 path_replacements: &[(PathBuf, PathBuf)],
524 cx: &mut Context<Self>,
525 ) {
526 if let Some(thread) = self.threads.get(session_id).cloned() {
527 let mut paths: Vec<PathBuf> = thread.folder_paths.paths().to_vec();
528 for (old_path, new_path) in path_replacements {
529 if let Some(pos) = paths.iter().position(|p| p == old_path) {
530 paths[pos] = new_path.clone();
531 }
532 }
533 let new_folder_paths = PathList::new(&paths);
534 self.save_internal(ThreadMetadata {
535 folder_paths: new_folder_paths,
536 ..thread
537 });
538 cx.notify();
539 }
540 }
541
542 pub fn complete_worktree_restore(
543 &mut self,
544 session_id: &acp::SessionId,
545 path_replacements: &[(PathBuf, PathBuf)],
546 cx: &mut Context<Self>,
547 ) {
548 if let Some(thread) = self.threads.get(session_id).cloned() {
549 let mut paths: Vec<PathBuf> = thread.folder_paths.paths().to_vec();
550 for (old_path, new_path) in path_replacements {
551 for path in &mut paths {
552 if path == old_path {
553 *path = new_path.clone();
554 }
555 }
556 }
557 let new_folder_paths = PathList::new(&paths);
558 self.save_internal(ThreadMetadata {
559 folder_paths: new_folder_paths,
560 ..thread
561 });
562 cx.notify();
563 }
564 }
565
566 pub fn create_archived_worktree(
567 &self,
568 worktree_path: String,
569 main_repo_path: String,
570 branch_name: Option<String>,
571 staged_commit_hash: String,
572 unstaged_commit_hash: String,
573 original_commit_hash: String,
574 cx: &App,
575 ) -> Task<anyhow::Result<i64>> {
576 let db = self.db.clone();
577 cx.background_spawn(async move {
578 db.create_archived_worktree(
579 worktree_path,
580 main_repo_path,
581 branch_name,
582 staged_commit_hash,
583 unstaged_commit_hash,
584 original_commit_hash,
585 )
586 .await
587 })
588 }
589
590 pub fn link_thread_to_archived_worktree(
591 &self,
592 session_id: String,
593 archived_worktree_id: i64,
594 cx: &App,
595 ) -> Task<anyhow::Result<()>> {
596 let db = self.db.clone();
597 cx.background_spawn(async move {
598 db.link_thread_to_archived_worktree(session_id, archived_worktree_id)
599 .await
600 })
601 }
602
603 pub fn get_archived_worktrees_for_thread(
604 &self,
605 session_id: String,
606 cx: &App,
607 ) -> Task<anyhow::Result<Vec<ArchivedGitWorktree>>> {
608 let db = self.db.clone();
609 cx.background_spawn(async move { db.get_archived_worktrees_for_thread(session_id).await })
610 }
611
612 pub fn delete_archived_worktree(&self, id: i64, cx: &App) -> Task<anyhow::Result<()>> {
613 let db = self.db.clone();
614 cx.background_spawn(async move { db.delete_archived_worktree(id).await })
615 }
616
617 pub fn unlink_thread_from_all_archived_worktrees(
618 &self,
619 session_id: String,
620 cx: &App,
621 ) -> Task<anyhow::Result<()>> {
622 let db = self.db.clone();
623 cx.background_spawn(async move {
624 db.unlink_thread_from_all_archived_worktrees(session_id)
625 .await
626 })
627 }
628
629 pub fn is_archived_worktree_referenced(
630 &self,
631 archived_worktree_id: i64,
632 cx: &App,
633 ) -> Task<anyhow::Result<bool>> {
634 let db = self.db.clone();
635 cx.background_spawn(async move {
636 db.is_archived_worktree_referenced(archived_worktree_id)
637 .await
638 })
639 }
640
641 fn update_archived(
642 &mut self,
643 session_id: &acp::SessionId,
644 archived: bool,
645 cx: &mut Context<Self>,
646 ) {
647 if let Some(thread) = self.threads.get(session_id) {
648 self.save_internal(ThreadMetadata {
649 archived,
650 ..thread.clone()
651 });
652 cx.notify();
653 }
654 }
655
656 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
657 if let Some(thread) = self.threads.get(&session_id) {
658 if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
659 session_ids.remove(&session_id);
660 }
661 if !thread.main_worktree_paths.is_empty() {
662 if let Some(session_ids) = self
663 .threads_by_main_paths
664 .get_mut(&thread.main_worktree_paths)
665 {
666 session_ids.remove(&session_id);
667 }
668 }
669 }
670 self.threads.remove(&session_id);
671 self.pending_thread_ops_tx
672 .try_send(DbOperation::Delete(session_id))
673 .log_err();
674 cx.notify();
675 }
676
677 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
678 let weak_store = cx.weak_entity();
679
680 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
681 // Don't track subagent threads in the sidebar.
682 if thread.parent_session_id().is_some() {
683 return;
684 }
685
686 let thread_entity = cx.entity();
687
688 cx.on_release({
689 let weak_store = weak_store.clone();
690 move |thread, cx| {
691 weak_store
692 .update(cx, |store, _cx| {
693 let session_id = thread.session_id().clone();
694 store.session_subscriptions.remove(&session_id);
695 })
696 .ok();
697 }
698 })
699 .detach();
700
701 weak_store
702 .update(cx, |this, cx| {
703 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
704 this.session_subscriptions
705 .insert(thread.session_id().clone(), subscription);
706 })
707 .ok();
708 })
709 .detach();
710
711 let (tx, rx) = smol::channel::unbounded();
712 let _db_operations_task = cx.background_spawn({
713 let db = db.clone();
714 async move {
715 while let Ok(first_update) = rx.recv().await {
716 let mut updates = vec![first_update];
717 while let Ok(update) = rx.try_recv() {
718 updates.push(update);
719 }
720 let updates = Self::dedup_db_operations(updates);
721 for operation in updates {
722 match operation {
723 DbOperation::Upsert(metadata) => {
724 db.save(metadata).await.log_err();
725 }
726 DbOperation::Delete(session_id) => {
727 db.delete(session_id).await.log_err();
728 }
729 }
730 }
731 }
732 }
733 });
734
735 let mut this = Self {
736 db,
737 threads: HashMap::default(),
738 threads_by_paths: HashMap::default(),
739 threads_by_main_paths: HashMap::default(),
740 reload_task: None,
741 session_subscriptions: HashMap::default(),
742 pending_thread_ops_tx: tx,
743 in_flight_archives: HashMap::default(),
744 _db_operations_task,
745 };
746 let _ = this.reload(cx);
747 this
748 }
749
750 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
751 let mut ops = HashMap::default();
752 for operation in operations.into_iter().rev() {
753 if ops.contains_key(operation.id()) {
754 continue;
755 }
756 ops.insert(operation.id().clone(), operation);
757 }
758 ops.into_values().collect()
759 }
760
761 fn handle_thread_event(
762 &mut self,
763 thread: Entity<acp_thread::AcpThread>,
764 event: &AcpThreadEvent,
765 cx: &mut Context<Self>,
766 ) {
767 // Don't track subagent threads in the sidebar.
768 if thread.read(cx).parent_session_id().is_some() {
769 return;
770 }
771
772 match event {
773 AcpThreadEvent::NewEntry
774 | AcpThreadEvent::TitleUpdated
775 | AcpThreadEvent::EntryUpdated(_)
776 | AcpThreadEvent::EntriesRemoved(_)
777 | AcpThreadEvent::ToolAuthorizationRequested(_)
778 | AcpThreadEvent::ToolAuthorizationReceived(_)
779 | AcpThreadEvent::Retry(_)
780 | AcpThreadEvent::Stopped(_)
781 | AcpThreadEvent::Error
782 | AcpThreadEvent::LoadError(_)
783 | AcpThreadEvent::Refusal
784 | AcpThreadEvent::WorkingDirectoriesUpdated => {
785 let thread_ref = thread.read(cx);
786 if thread_ref.entries().is_empty() {
787 return;
788 }
789
790 let existing_thread = self.threads.get(thread_ref.session_id());
791 let session_id = thread_ref.session_id().clone();
792 let title = thread_ref
793 .title()
794 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
795
796 let updated_at = Utc::now();
797
798 let created_at = existing_thread
799 .and_then(|t| t.created_at)
800 .unwrap_or_else(|| updated_at);
801
802 let agent_id = thread_ref.connection().agent_id();
803
804 let project = thread_ref.project().read(cx);
805 let folder_paths = {
806 let paths: Vec<Arc<Path>> = project
807 .visible_worktrees(cx)
808 .map(|worktree| worktree.read(cx).abs_path())
809 .collect();
810 PathList::new(&paths)
811 };
812
813 let project_group_key = project.project_group_key(cx);
814 let main_worktree_paths = project_group_key.path_list().clone();
815 let remote_connection = project_group_key.host();
816
817 // Threads without a folder path (e.g. started in an empty
818 // window) are archived by default so they don't get lost,
819 // because they won't show up in the sidebar. Users can reload
820 // them from the archive.
821 let archived = existing_thread
822 .map(|t| t.archived)
823 .unwrap_or(folder_paths.is_empty());
824
825 let metadata = ThreadMetadata {
826 session_id,
827 agent_id,
828 title,
829 created_at: Some(created_at),
830 updated_at,
831 folder_paths,
832 main_worktree_paths,
833 remote_connection,
834 archived,
835 };
836
837 self.save(metadata, cx);
838 }
839 AcpThreadEvent::TokenUsageUpdated
840 | AcpThreadEvent::SubagentSpawned(_)
841 | AcpThreadEvent::PromptCapabilitiesUpdated
842 | AcpThreadEvent::AvailableCommandsUpdated(_)
843 | AcpThreadEvent::ModeUpdated(_)
844 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
845 }
846 }
847}
848
849impl Global for ThreadMetadataStore {}
850
851struct ThreadMetadataDb(ThreadSafeConnection);
852
853impl Domain for ThreadMetadataDb {
854 const NAME: &str = stringify!(ThreadMetadataDb);
855
856 const MIGRATIONS: &[&str] = &[
857 sql!(
858 CREATE TABLE IF NOT EXISTS sidebar_threads(
859 session_id TEXT PRIMARY KEY,
860 agent_id TEXT,
861 title TEXT NOT NULL,
862 updated_at TEXT NOT NULL,
863 created_at TEXT,
864 folder_paths TEXT,
865 folder_paths_order TEXT
866 ) STRICT;
867 ),
868 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
869 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths TEXT),
870 sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths_order TEXT),
871 sql!(
872 CREATE TABLE IF NOT EXISTS archived_git_worktrees(
873 id INTEGER PRIMARY KEY,
874 worktree_path TEXT NOT NULL,
875 main_repo_path TEXT NOT NULL,
876 branch_name TEXT,
877 staged_commit_hash TEXT,
878 unstaged_commit_hash TEXT,
879 original_commit_hash TEXT
880 ) STRICT;
881
882 CREATE TABLE IF NOT EXISTS thread_archived_worktrees(
883 session_id TEXT NOT NULL,
884 archived_worktree_id INTEGER NOT NULL REFERENCES archived_git_worktrees(id),
885 PRIMARY KEY (session_id, archived_worktree_id)
886 ) STRICT;
887 ),
888 sql!(ALTER TABLE sidebar_threads ADD COLUMN remote_connection TEXT),
889 ];
890}
891
892db::static_connection!(ThreadMetadataDb, []);
893
894impl ThreadMetadataDb {
895 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
896 self.select::<Arc<str>>(
897 "SELECT session_id FROM sidebar_threads \
898 ORDER BY updated_at DESC",
899 )?()
900 }
901
902 /// List all sidebar thread metadata, ordered by updated_at descending.
903 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
904 self.select::<ThreadMetadata>(
905 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order, remote_connection \
906 FROM sidebar_threads \
907 ORDER BY updated_at DESC"
908 )?()
909 }
910
911 /// Upsert metadata for a thread.
912 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
913 let id = row.session_id.0.clone();
914 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
915 None
916 } else {
917 Some(row.agent_id.to_string())
918 };
919 let title = row.title.to_string();
920 let updated_at = row.updated_at.to_rfc3339();
921 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
922 let serialized = row.folder_paths.serialize();
923 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
924 (None, None)
925 } else {
926 (Some(serialized.paths), Some(serialized.order))
927 };
928 let main_serialized = row.main_worktree_paths.serialize();
929 let (main_worktree_paths, main_worktree_paths_order) = if row.main_worktree_paths.is_empty()
930 {
931 (None, None)
932 } else {
933 (Some(main_serialized.paths), Some(main_serialized.order))
934 };
935 let remote_connection = row
936 .remote_connection
937 .as_ref()
938 .map(serde_json::to_string)
939 .transpose()
940 .context("serialize thread metadata remote connection")?;
941 let archived = row.archived;
942
943 self.write(move |conn| {
944 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order, remote_connection) \
945 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
946 ON CONFLICT(session_id) DO UPDATE SET \
947 agent_id = excluded.agent_id, \
948 title = excluded.title, \
949 updated_at = excluded.updated_at, \
950 created_at = excluded.created_at, \
951 folder_paths = excluded.folder_paths, \
952 folder_paths_order = excluded.folder_paths_order, \
953 archived = excluded.archived, \
954 main_worktree_paths = excluded.main_worktree_paths, \
955 main_worktree_paths_order = excluded.main_worktree_paths_order, \
956 remote_connection = excluded.remote_connection";
957 let mut stmt = Statement::prepare(conn, sql)?;
958 let mut i = stmt.bind(&id, 1)?;
959 i = stmt.bind(&agent_id, i)?;
960 i = stmt.bind(&title, i)?;
961 i = stmt.bind(&updated_at, i)?;
962 i = stmt.bind(&created_at, i)?;
963 i = stmt.bind(&folder_paths, i)?;
964 i = stmt.bind(&folder_paths_order, i)?;
965 i = stmt.bind(&archived, i)?;
966 i = stmt.bind(&main_worktree_paths, i)?;
967 i = stmt.bind(&main_worktree_paths_order, i)?;
968 stmt.bind(&remote_connection, i)?;
969 stmt.exec()
970 })
971 .await
972 }
973
974 /// Delete metadata for a single thread.
975 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
976 let id = session_id.0.clone();
977 self.write(move |conn| {
978 let mut stmt =
979 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
980 stmt.bind(&id, 1)?;
981 stmt.exec()
982 })
983 .await
984 }
985
986 pub async fn create_archived_worktree(
987 &self,
988 worktree_path: String,
989 main_repo_path: String,
990 branch_name: Option<String>,
991 staged_commit_hash: String,
992 unstaged_commit_hash: String,
993 original_commit_hash: String,
994 ) -> anyhow::Result<i64> {
995 self.write(move |conn| {
996 let mut stmt = Statement::prepare(
997 conn,
998 "INSERT INTO archived_git_worktrees(worktree_path, main_repo_path, branch_name, staged_commit_hash, unstaged_commit_hash, original_commit_hash) \
999 VALUES (?1, ?2, ?3, ?4, ?5, ?6) \
1000 RETURNING id",
1001 )?;
1002 let mut i = stmt.bind(&worktree_path, 1)?;
1003 i = stmt.bind(&main_repo_path, i)?;
1004 i = stmt.bind(&branch_name, i)?;
1005 i = stmt.bind(&staged_commit_hash, i)?;
1006 i = stmt.bind(&unstaged_commit_hash, i)?;
1007 stmt.bind(&original_commit_hash, i)?;
1008 stmt.maybe_row::<i64>()?.context("expected RETURNING id")
1009 })
1010 .await
1011 }
1012
1013 pub async fn link_thread_to_archived_worktree(
1014 &self,
1015 session_id: String,
1016 archived_worktree_id: i64,
1017 ) -> anyhow::Result<()> {
1018 self.write(move |conn| {
1019 let mut stmt = Statement::prepare(
1020 conn,
1021 "INSERT INTO thread_archived_worktrees(session_id, archived_worktree_id) \
1022 VALUES (?1, ?2)",
1023 )?;
1024 let i = stmt.bind(&session_id, 1)?;
1025 stmt.bind(&archived_worktree_id, i)?;
1026 stmt.exec()
1027 })
1028 .await
1029 }
1030
1031 pub async fn get_archived_worktrees_for_thread(
1032 &self,
1033 session_id: String,
1034 ) -> anyhow::Result<Vec<ArchivedGitWorktree>> {
1035 self.select_bound::<String, ArchivedGitWorktree>(
1036 "SELECT a.id, a.worktree_path, a.main_repo_path, a.branch_name, a.staged_commit_hash, a.unstaged_commit_hash, a.original_commit_hash \
1037 FROM archived_git_worktrees a \
1038 JOIN thread_archived_worktrees t ON a.id = t.archived_worktree_id \
1039 WHERE t.session_id = ?1",
1040 )?(session_id)
1041 }
1042
1043 pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> {
1044 self.write(move |conn| {
1045 let mut stmt = Statement::prepare(
1046 conn,
1047 "DELETE FROM thread_archived_worktrees WHERE archived_worktree_id = ?",
1048 )?;
1049 stmt.bind(&id, 1)?;
1050 stmt.exec()?;
1051
1052 let mut stmt =
1053 Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
1054 stmt.bind(&id, 1)?;
1055 stmt.exec()
1056 })
1057 .await
1058 }
1059
1060 pub async fn unlink_thread_from_all_archived_worktrees(
1061 &self,
1062 session_id: String,
1063 ) -> anyhow::Result<()> {
1064 self.write(move |conn| {
1065 let mut stmt = Statement::prepare(
1066 conn,
1067 "DELETE FROM thread_archived_worktrees WHERE session_id = ?",
1068 )?;
1069 stmt.bind(&session_id, 1)?;
1070 stmt.exec()
1071 })
1072 .await
1073 }
1074
1075 pub async fn is_archived_worktree_referenced(
1076 &self,
1077 archived_worktree_id: i64,
1078 ) -> anyhow::Result<bool> {
1079 self.select_row_bound::<i64, i64>(
1080 "SELECT COUNT(*) FROM thread_archived_worktrees WHERE archived_worktree_id = ?1",
1081 )?(archived_worktree_id)
1082 .map(|count| count.unwrap_or(0) > 0)
1083 }
1084}
1085
1086impl Column for ThreadMetadata {
1087 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
1088 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
1089 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
1090 let (title, next): (String, i32) = Column::column(statement, next)?;
1091 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
1092 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
1093 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
1094 let (folder_paths_order_str, next): (Option<String>, i32) =
1095 Column::column(statement, next)?;
1096 let (archived, next): (bool, i32) = Column::column(statement, next)?;
1097 let (main_worktree_paths_str, next): (Option<String>, i32) =
1098 Column::column(statement, next)?;
1099 let (main_worktree_paths_order_str, next): (Option<String>, i32) =
1100 Column::column(statement, next)?;
1101 let (remote_connection_json, next): (Option<String>, i32) =
1102 Column::column(statement, next)?;
1103
1104 let agent_id = agent_id
1105 .map(|id| AgentId::new(id))
1106 .unwrap_or(ZED_AGENT_ID.clone());
1107
1108 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
1109 let created_at = created_at_str
1110 .as_deref()
1111 .map(DateTime::parse_from_rfc3339)
1112 .transpose()?
1113 .map(|dt| dt.with_timezone(&Utc));
1114
1115 let folder_paths = folder_paths_str
1116 .map(|paths| {
1117 PathList::deserialize(&util::path_list::SerializedPathList {
1118 paths,
1119 order: folder_paths_order_str.unwrap_or_default(),
1120 })
1121 })
1122 .unwrap_or_default();
1123
1124 let main_worktree_paths = main_worktree_paths_str
1125 .map(|paths| {
1126 PathList::deserialize(&util::path_list::SerializedPathList {
1127 paths,
1128 order: main_worktree_paths_order_str.unwrap_or_default(),
1129 })
1130 })
1131 .unwrap_or_default();
1132
1133 let remote_connection = remote_connection_json
1134 .as_deref()
1135 .map(serde_json::from_str::<RemoteConnectionOptions>)
1136 .transpose()
1137 .context("deserialize thread metadata remote connection")?;
1138
1139 Ok((
1140 ThreadMetadata {
1141 session_id: acp::SessionId::new(id),
1142 agent_id,
1143 title: title.into(),
1144 updated_at,
1145 created_at,
1146 folder_paths,
1147 main_worktree_paths,
1148 remote_connection,
1149 archived,
1150 },
1151 next,
1152 ))
1153 }
1154}
1155
1156impl Column for ArchivedGitWorktree {
1157 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
1158 let (id, next): (i64, i32) = Column::column(statement, start_index)?;
1159 let (worktree_path_str, next): (String, i32) = Column::column(statement, next)?;
1160 let (main_repo_path_str, next): (String, i32) = Column::column(statement, next)?;
1161 let (branch_name, next): (Option<String>, i32) = Column::column(statement, next)?;
1162 let (staged_commit_hash, next): (String, i32) = Column::column(statement, next)?;
1163 let (unstaged_commit_hash, next): (String, i32) = Column::column(statement, next)?;
1164 let (original_commit_hash, next): (String, i32) = Column::column(statement, next)?;
1165
1166 Ok((
1167 ArchivedGitWorktree {
1168 id,
1169 worktree_path: PathBuf::from(worktree_path_str),
1170 main_repo_path: PathBuf::from(main_repo_path_str),
1171 branch_name,
1172 staged_commit_hash,
1173 unstaged_commit_hash,
1174 original_commit_hash,
1175 },
1176 next,
1177 ))
1178 }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 use super::*;
1184 use acp_thread::{AgentConnection, StubAgentConnection};
1185 use action_log::ActionLog;
1186 use agent::DbThread;
1187 use agent_client_protocol as acp;
1188
1189 use gpui::TestAppContext;
1190 use project::FakeFs;
1191 use project::Project;
1192 use remote::WslConnectionOptions;
1193 use std::path::Path;
1194 use std::rc::Rc;
1195
1196 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
1197 DbThread {
1198 title: title.to_string().into(),
1199 messages: Vec::new(),
1200 updated_at,
1201 detailed_summary: None,
1202 initial_project_snapshot: None,
1203 cumulative_token_usage: Default::default(),
1204 request_token_usage: Default::default(),
1205 model: None,
1206 profile: None,
1207 imported: false,
1208 subagent_context: None,
1209 speed: None,
1210 thinking_enabled: false,
1211 thinking_effort: None,
1212 draft_prompt: None,
1213 ui_scroll_position: None,
1214 }
1215 }
1216
1217 fn make_metadata(
1218 session_id: &str,
1219 title: &str,
1220 updated_at: DateTime<Utc>,
1221 folder_paths: PathList,
1222 ) -> ThreadMetadata {
1223 ThreadMetadata {
1224 archived: false,
1225 session_id: acp::SessionId::new(session_id),
1226 agent_id: agent::ZED_AGENT_ID.clone(),
1227 title: title.to_string().into(),
1228 updated_at,
1229 created_at: Some(updated_at),
1230 folder_paths,
1231 main_worktree_paths: PathList::default(),
1232 remote_connection: None,
1233 }
1234 }
1235
1236 fn init_test(cx: &mut TestAppContext) {
1237 let fs = FakeFs::new(cx.executor());
1238 cx.update(|cx| {
1239 let settings_store = settings::SettingsStore::test(cx);
1240 cx.set_global(settings_store);
1241 <dyn Fs>::set_global(fs, cx);
1242 ThreadMetadataStore::init_global(cx);
1243 ThreadStore::init_global(cx);
1244 });
1245 cx.run_until_parked();
1246 }
1247
1248 fn clear_thread_metadata_remote_connection_backfill(cx: &mut TestAppContext) {
1249 let kvp = cx.update(|cx| KeyValueStore::global(cx));
1250 smol::block_on(kvp.delete_kvp("thread-metadata-remote-connection-backfill".to_string()))
1251 .unwrap();
1252 }
1253
1254 fn run_thread_metadata_migrations(cx: &mut TestAppContext) {
1255 clear_thread_metadata_remote_connection_backfill(cx);
1256 cx.update(|cx| {
1257 let migration_task = migrate_thread_metadata(cx);
1258 migrate_thread_remote_connections(cx, migration_task);
1259 });
1260 cx.run_until_parked();
1261 }
1262
1263 #[gpui::test]
1264 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
1265 let first_paths = PathList::new(&[Path::new("/project-a")]);
1266 let second_paths = PathList::new(&[Path::new("/project-b")]);
1267 let now = Utc::now();
1268 let older = now - chrono::Duration::seconds(1);
1269
1270 let thread = std::thread::current();
1271 let test_name = thread.name().unwrap_or("unknown_test");
1272 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
1273 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
1274 &db_name,
1275 )));
1276
1277 db.save(make_metadata(
1278 "session-1",
1279 "First Thread",
1280 now,
1281 first_paths.clone(),
1282 ))
1283 .await
1284 .unwrap();
1285 db.save(make_metadata(
1286 "session-2",
1287 "Second Thread",
1288 older,
1289 second_paths.clone(),
1290 ))
1291 .await
1292 .unwrap();
1293
1294 cx.update(|cx| {
1295 let settings_store = settings::SettingsStore::test(cx);
1296 cx.set_global(settings_store);
1297 ThreadMetadataStore::init_global(cx);
1298 });
1299
1300 cx.run_until_parked();
1301
1302 cx.update(|cx| {
1303 let store = ThreadMetadataStore::global(cx);
1304 let store = store.read(cx);
1305
1306 let entry_ids = store
1307 .entry_ids()
1308 .map(|session_id| session_id.0.to_string())
1309 .collect::<Vec<_>>();
1310 assert_eq!(entry_ids.len(), 2);
1311 assert!(entry_ids.contains(&"session-1".to_string()));
1312 assert!(entry_ids.contains(&"session-2".to_string()));
1313
1314 let first_path_entries = store
1315 .entries_for_path(&first_paths)
1316 .map(|entry| entry.session_id.0.to_string())
1317 .collect::<Vec<_>>();
1318 assert_eq!(first_path_entries, vec!["session-1"]);
1319
1320 let second_path_entries = store
1321 .entries_for_path(&second_paths)
1322 .map(|entry| entry.session_id.0.to_string())
1323 .collect::<Vec<_>>();
1324 assert_eq!(second_path_entries, vec!["session-2"]);
1325 });
1326 }
1327
1328 #[gpui::test]
1329 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
1330 init_test(cx);
1331
1332 let first_paths = PathList::new(&[Path::new("/project-a")]);
1333 let second_paths = PathList::new(&[Path::new("/project-b")]);
1334 let initial_time = Utc::now();
1335 let updated_time = initial_time + chrono::Duration::seconds(1);
1336
1337 let initial_metadata = make_metadata(
1338 "session-1",
1339 "First Thread",
1340 initial_time,
1341 first_paths.clone(),
1342 );
1343
1344 let second_metadata = make_metadata(
1345 "session-2",
1346 "Second Thread",
1347 initial_time,
1348 second_paths.clone(),
1349 );
1350
1351 cx.update(|cx| {
1352 let store = ThreadMetadataStore::global(cx);
1353 store.update(cx, |store, cx| {
1354 store.save(initial_metadata, cx);
1355 store.save(second_metadata, cx);
1356 });
1357 });
1358
1359 cx.run_until_parked();
1360
1361 cx.update(|cx| {
1362 let store = ThreadMetadataStore::global(cx);
1363 let store = store.read(cx);
1364
1365 let first_path_entries = store
1366 .entries_for_path(&first_paths)
1367 .map(|entry| entry.session_id.0.to_string())
1368 .collect::<Vec<_>>();
1369 assert_eq!(first_path_entries, vec!["session-1"]);
1370
1371 let second_path_entries = store
1372 .entries_for_path(&second_paths)
1373 .map(|entry| entry.session_id.0.to_string())
1374 .collect::<Vec<_>>();
1375 assert_eq!(second_path_entries, vec!["session-2"]);
1376 });
1377
1378 let moved_metadata = make_metadata(
1379 "session-1",
1380 "First Thread",
1381 updated_time,
1382 second_paths.clone(),
1383 );
1384
1385 cx.update(|cx| {
1386 let store = ThreadMetadataStore::global(cx);
1387 store.update(cx, |store, cx| {
1388 store.save(moved_metadata, cx);
1389 });
1390 });
1391
1392 cx.run_until_parked();
1393
1394 cx.update(|cx| {
1395 let store = ThreadMetadataStore::global(cx);
1396 let store = store.read(cx);
1397
1398 let entry_ids = store
1399 .entry_ids()
1400 .map(|session_id| session_id.0.to_string())
1401 .collect::<Vec<_>>();
1402 assert_eq!(entry_ids.len(), 2);
1403 assert!(entry_ids.contains(&"session-1".to_string()));
1404 assert!(entry_ids.contains(&"session-2".to_string()));
1405
1406 let first_path_entries = store
1407 .entries_for_path(&first_paths)
1408 .map(|entry| entry.session_id.0.to_string())
1409 .collect::<Vec<_>>();
1410 assert!(first_path_entries.is_empty());
1411
1412 let second_path_entries = store
1413 .entries_for_path(&second_paths)
1414 .map(|entry| entry.session_id.0.to_string())
1415 .collect::<Vec<_>>();
1416 assert_eq!(second_path_entries.len(), 2);
1417 assert!(second_path_entries.contains(&"session-1".to_string()));
1418 assert!(second_path_entries.contains(&"session-2".to_string()));
1419 });
1420
1421 cx.update(|cx| {
1422 let store = ThreadMetadataStore::global(cx);
1423 store.update(cx, |store, cx| {
1424 store.delete(acp::SessionId::new("session-2"), cx);
1425 });
1426 });
1427
1428 cx.run_until_parked();
1429
1430 cx.update(|cx| {
1431 let store = ThreadMetadataStore::global(cx);
1432 let store = store.read(cx);
1433
1434 let entry_ids = store
1435 .entry_ids()
1436 .map(|session_id| session_id.0.to_string())
1437 .collect::<Vec<_>>();
1438 assert_eq!(entry_ids, vec!["session-1"]);
1439
1440 let second_path_entries = store
1441 .entries_for_path(&second_paths)
1442 .map(|entry| entry.session_id.0.to_string())
1443 .collect::<Vec<_>>();
1444 assert_eq!(second_path_entries, vec!["session-1"]);
1445 });
1446 }
1447
1448 #[gpui::test]
1449 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1450 init_test(cx);
1451
1452 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1453 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1454 let now = Utc::now();
1455
1456 let existing_metadata = ThreadMetadata {
1457 session_id: acp::SessionId::new("a-session-0"),
1458 agent_id: agent::ZED_AGENT_ID.clone(),
1459 title: "Existing Metadata".into(),
1460 updated_at: now - chrono::Duration::seconds(10),
1461 created_at: Some(now - chrono::Duration::seconds(10)),
1462 folder_paths: project_a_paths.clone(),
1463 main_worktree_paths: PathList::default(),
1464 remote_connection: None,
1465 archived: false,
1466 };
1467
1468 cx.update(|cx| {
1469 let store = ThreadMetadataStore::global(cx);
1470 store.update(cx, |store, cx| {
1471 store.save(existing_metadata, cx);
1472 });
1473 });
1474 cx.run_until_parked();
1475
1476 let threads_to_save = vec![
1477 (
1478 "a-session-0",
1479 "Thread A0 From Native Store",
1480 project_a_paths.clone(),
1481 now,
1482 ),
1483 (
1484 "a-session-1",
1485 "Thread A1",
1486 project_a_paths.clone(),
1487 now + chrono::Duration::seconds(1),
1488 ),
1489 (
1490 "b-session-0",
1491 "Thread B0",
1492 project_b_paths.clone(),
1493 now + chrono::Duration::seconds(2),
1494 ),
1495 (
1496 "projectless",
1497 "Projectless",
1498 PathList::default(),
1499 now + chrono::Duration::seconds(3),
1500 ),
1501 ];
1502
1503 for (session_id, title, paths, updated_at) in &threads_to_save {
1504 let save_task = cx.update(|cx| {
1505 let thread_store = ThreadStore::global(cx);
1506 let session_id = session_id.to_string();
1507 let title = title.to_string();
1508 let paths = paths.clone();
1509 thread_store.update(cx, |store, cx| {
1510 store.save_thread(
1511 acp::SessionId::new(session_id),
1512 make_db_thread(&title, *updated_at),
1513 paths,
1514 cx,
1515 )
1516 })
1517 });
1518 save_task.await.unwrap();
1519 cx.run_until_parked();
1520 }
1521
1522 run_thread_metadata_migrations(cx);
1523
1524 let list = cx.update(|cx| {
1525 let store = ThreadMetadataStore::global(cx);
1526 store.read(cx).entries().cloned().collect::<Vec<_>>()
1527 });
1528
1529 assert_eq!(list.len(), 4);
1530 assert!(
1531 list.iter()
1532 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1533 );
1534
1535 let existing_metadata = list
1536 .iter()
1537 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1538 .unwrap();
1539 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1540 assert!(!existing_metadata.archived);
1541
1542 let migrated_session_ids = list
1543 .iter()
1544 .map(|metadata| metadata.session_id.0.as_ref())
1545 .collect::<Vec<_>>();
1546 assert!(migrated_session_ids.contains(&"a-session-1"));
1547 assert!(migrated_session_ids.contains(&"b-session-0"));
1548 assert!(migrated_session_ids.contains(&"projectless"));
1549
1550 let migrated_entries = list
1551 .iter()
1552 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1553 .collect::<Vec<_>>();
1554 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1555 }
1556
1557 #[gpui::test]
1558 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1559 cx: &mut TestAppContext,
1560 ) {
1561 init_test(cx);
1562
1563 let project_paths = PathList::new(&[Path::new("/project-a")]);
1564 let existing_updated_at = Utc::now();
1565
1566 let existing_metadata = ThreadMetadata {
1567 session_id: acp::SessionId::new("existing-session"),
1568 agent_id: agent::ZED_AGENT_ID.clone(),
1569 title: "Existing Metadata".into(),
1570 updated_at: existing_updated_at,
1571 created_at: Some(existing_updated_at),
1572 folder_paths: project_paths.clone(),
1573 main_worktree_paths: PathList::default(),
1574 remote_connection: None,
1575 archived: false,
1576 };
1577
1578 cx.update(|cx| {
1579 let store = ThreadMetadataStore::global(cx);
1580 store.update(cx, |store, cx| {
1581 store.save(existing_metadata, cx);
1582 });
1583 });
1584 cx.run_until_parked();
1585
1586 let save_task = cx.update(|cx| {
1587 let thread_store = ThreadStore::global(cx);
1588 thread_store.update(cx, |store, cx| {
1589 store.save_thread(
1590 acp::SessionId::new("existing-session"),
1591 make_db_thread(
1592 "Updated Native Thread Title",
1593 existing_updated_at + chrono::Duration::seconds(1),
1594 ),
1595 project_paths.clone(),
1596 cx,
1597 )
1598 })
1599 });
1600 save_task.await.unwrap();
1601 cx.run_until_parked();
1602
1603 run_thread_metadata_migrations(cx);
1604
1605 let list = cx.update(|cx| {
1606 let store = ThreadMetadataStore::global(cx);
1607 store.read(cx).entries().cloned().collect::<Vec<_>>()
1608 });
1609
1610 assert_eq!(list.len(), 1);
1611 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1612 }
1613
1614 #[gpui::test]
1615 async fn test_migrate_thread_remote_connections_backfills_from_workspace_db(
1616 cx: &mut TestAppContext,
1617 ) {
1618 init_test(cx);
1619
1620 let folder_paths = PathList::new(&[Path::new("/remote-project")]);
1621 let updated_at = Utc::now();
1622 let metadata = make_metadata(
1623 "remote-session",
1624 "Remote Thread",
1625 updated_at,
1626 folder_paths.clone(),
1627 );
1628
1629 cx.update(|cx| {
1630 let store = ThreadMetadataStore::global(cx);
1631 store.update(cx, |store, cx| {
1632 store.save(metadata, cx);
1633 });
1634 });
1635 cx.run_until_parked();
1636
1637 let workspace_db = cx.update(|cx| WorkspaceDb::global(cx));
1638 let workspace_id = workspace_db.next_id().await.unwrap();
1639 let serialized_paths = folder_paths.serialize();
1640 let remote_connection_id = 1_i64;
1641 workspace_db
1642 .write(move |conn| {
1643 let mut stmt = Statement::prepare(
1644 conn,
1645 "INSERT INTO remote_connections(id, kind, user, distro) VALUES (?1, ?2, ?3, ?4)",
1646 )?;
1647 let mut next_index = stmt.bind(&remote_connection_id, 1)?;
1648 next_index = stmt.bind(&"wsl", next_index)?;
1649 next_index = stmt.bind(&Some("anth".to_string()), next_index)?;
1650 stmt.bind(&Some("Ubuntu".to_string()), next_index)?;
1651 stmt.exec()?;
1652
1653 let mut stmt = Statement::prepare(
1654 conn,
1655 "UPDATE workspaces SET paths = ?2, paths_order = ?3, remote_connection_id = ?4, timestamp = CURRENT_TIMESTAMP WHERE workspace_id = ?1",
1656 )?;
1657 let mut next_index = stmt.bind(&workspace_id, 1)?;
1658 next_index = stmt.bind(&serialized_paths.paths, next_index)?;
1659 next_index = stmt.bind(&serialized_paths.order, next_index)?;
1660 stmt.bind(&Some(remote_connection_id as i32), next_index)?;
1661 stmt.exec()
1662 })
1663 .await
1664 .unwrap();
1665
1666 clear_thread_metadata_remote_connection_backfill(cx);
1667 cx.update(|cx| {
1668 migrate_thread_remote_connections(cx, Task::ready(Ok(())));
1669 });
1670 cx.run_until_parked();
1671
1672 let metadata = cx.update(|cx| {
1673 let store = ThreadMetadataStore::global(cx);
1674 store
1675 .read(cx)
1676 .entry(&acp::SessionId::new("remote-session"))
1677 .cloned()
1678 .expect("expected migrated metadata row")
1679 });
1680
1681 assert_eq!(
1682 metadata.remote_connection,
1683 Some(RemoteConnectionOptions::Wsl(WslConnectionOptions {
1684 distro_name: "Ubuntu".to_string(),
1685 user: Some("anth".to_string()),
1686 }))
1687 );
1688 }
1689
1690 #[gpui::test]
1691 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1692 cx: &mut TestAppContext,
1693 ) {
1694 init_test(cx);
1695
1696 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1697 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1698 let now = Utc::now();
1699
1700 // Create 7 threads for project A and 3 for project B
1701 let mut threads_to_save = Vec::new();
1702 for i in 0..7 {
1703 threads_to_save.push((
1704 format!("a-session-{i}"),
1705 format!("Thread A{i}"),
1706 project_a_paths.clone(),
1707 now + chrono::Duration::seconds(i as i64),
1708 ));
1709 }
1710 for i in 0..3 {
1711 threads_to_save.push((
1712 format!("b-session-{i}"),
1713 format!("Thread B{i}"),
1714 project_b_paths.clone(),
1715 now + chrono::Duration::seconds(i as i64),
1716 ));
1717 }
1718
1719 for (session_id, title, paths, updated_at) in &threads_to_save {
1720 let save_task = cx.update(|cx| {
1721 let thread_store = ThreadStore::global(cx);
1722 let session_id = session_id.to_string();
1723 let title = title.to_string();
1724 let paths = paths.clone();
1725 thread_store.update(cx, |store, cx| {
1726 store.save_thread(
1727 acp::SessionId::new(session_id),
1728 make_db_thread(&title, *updated_at),
1729 paths,
1730 cx,
1731 )
1732 })
1733 });
1734 save_task.await.unwrap();
1735 cx.run_until_parked();
1736 }
1737
1738 run_thread_metadata_migrations(cx);
1739
1740 let list = cx.update(|cx| {
1741 let store = ThreadMetadataStore::global(cx);
1742 store.read(cx).entries().cloned().collect::<Vec<_>>()
1743 });
1744
1745 assert_eq!(list.len(), 10);
1746
1747 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1748 let mut project_a_entries: Vec<_> = list
1749 .iter()
1750 .filter(|m| m.folder_paths == project_a_paths)
1751 .collect();
1752 assert_eq!(project_a_entries.len(), 7);
1753 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1754
1755 for entry in &project_a_entries[..5] {
1756 assert!(
1757 !entry.archived,
1758 "Expected {} to be unarchived (top 5 most recent)",
1759 entry.session_id.0
1760 );
1761 }
1762 for entry in &project_a_entries[5..] {
1763 assert!(
1764 entry.archived,
1765 "Expected {} to be archived (older than top 5)",
1766 entry.session_id.0
1767 );
1768 }
1769
1770 // Project B: all 3 should be unarchived (under the limit)
1771 let project_b_entries: Vec<_> = list
1772 .iter()
1773 .filter(|m| m.folder_paths == project_b_paths)
1774 .collect();
1775 assert_eq!(project_b_entries.len(), 3);
1776 assert!(project_b_entries.iter().all(|m| !m.archived));
1777 }
1778
1779 #[gpui::test]
1780 async fn test_empty_thread_events_do_not_create_metadata(cx: &mut TestAppContext) {
1781 init_test(cx);
1782
1783 let fs = FakeFs::new(cx.executor());
1784 let project = Project::test(fs, None::<&Path>, cx).await;
1785 let connection = Rc::new(StubAgentConnection::new());
1786
1787 let thread = cx
1788 .update(|cx| {
1789 connection
1790 .clone()
1791 .new_session(project.clone(), PathList::default(), cx)
1792 })
1793 .await
1794 .unwrap();
1795 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1796
1797 cx.update(|cx| {
1798 thread.update(cx, |thread, cx| {
1799 thread.set_title("Draft Thread".into(), cx).detach();
1800 });
1801 });
1802 cx.run_until_parked();
1803
1804 let metadata_ids = cx.update(|cx| {
1805 ThreadMetadataStore::global(cx)
1806 .read(cx)
1807 .entry_ids()
1808 .collect::<Vec<_>>()
1809 });
1810 assert!(
1811 metadata_ids.is_empty(),
1812 "expected empty draft thread title updates to be ignored"
1813 );
1814
1815 cx.update(|cx| {
1816 thread.update(cx, |thread, cx| {
1817 thread.push_user_content_block(None, "Hello".into(), cx);
1818 });
1819 });
1820 cx.run_until_parked();
1821
1822 let metadata_ids = cx.update(|cx| {
1823 ThreadMetadataStore::global(cx)
1824 .read(cx)
1825 .entry_ids()
1826 .collect::<Vec<_>>()
1827 });
1828 assert_eq!(metadata_ids, vec![session_id]);
1829 }
1830
1831 #[gpui::test]
1832 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1833 init_test(cx);
1834
1835 let fs = FakeFs::new(cx.executor());
1836 let project = Project::test(fs, None::<&Path>, cx).await;
1837 let connection = Rc::new(StubAgentConnection::new());
1838
1839 let thread = cx
1840 .update(|cx| {
1841 connection
1842 .clone()
1843 .new_session(project.clone(), PathList::default(), cx)
1844 })
1845 .await
1846 .unwrap();
1847 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1848
1849 cx.update(|cx| {
1850 thread.update(cx, |thread, cx| {
1851 thread.push_user_content_block(None, "Hello".into(), cx);
1852 });
1853 });
1854 cx.run_until_parked();
1855
1856 let metadata_ids = cx.update(|cx| {
1857 ThreadMetadataStore::global(cx)
1858 .read(cx)
1859 .entry_ids()
1860 .collect::<Vec<_>>()
1861 });
1862 assert_eq!(metadata_ids, vec![session_id.clone()]);
1863
1864 drop(thread);
1865 cx.update(|_| {});
1866 cx.run_until_parked();
1867
1868 let metadata_ids = cx.update(|cx| {
1869 ThreadMetadataStore::global(cx)
1870 .read(cx)
1871 .entry_ids()
1872 .collect::<Vec<_>>()
1873 });
1874 assert_eq!(metadata_ids, vec![session_id]);
1875 }
1876
1877 #[gpui::test]
1878 async fn test_threads_without_project_association_are_archived_by_default(
1879 cx: &mut TestAppContext,
1880 ) {
1881 init_test(cx);
1882
1883 let fs = FakeFs::new(cx.executor());
1884 let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1885 let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1886 let connection = Rc::new(StubAgentConnection::new());
1887
1888 let thread_without_worktree = cx
1889 .update(|cx| {
1890 connection.clone().new_session(
1891 project_without_worktree.clone(),
1892 PathList::default(),
1893 cx,
1894 )
1895 })
1896 .await
1897 .unwrap();
1898 let session_without_worktree =
1899 cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1900
1901 cx.update(|cx| {
1902 thread_without_worktree.update(cx, |thread, cx| {
1903 thread.push_user_content_block(None, "content".into(), cx);
1904 thread.set_title("No Project Thread".into(), cx).detach();
1905 });
1906 });
1907 cx.run_until_parked();
1908
1909 let thread_with_worktree = cx
1910 .update(|cx| {
1911 connection.clone().new_session(
1912 project_with_worktree.clone(),
1913 PathList::default(),
1914 cx,
1915 )
1916 })
1917 .await
1918 .unwrap();
1919 let session_with_worktree =
1920 cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1921
1922 cx.update(|cx| {
1923 thread_with_worktree.update(cx, |thread, cx| {
1924 thread.push_user_content_block(None, "content".into(), cx);
1925 thread.set_title("Project Thread".into(), cx).detach();
1926 });
1927 });
1928 cx.run_until_parked();
1929
1930 cx.update(|cx| {
1931 let store = ThreadMetadataStore::global(cx);
1932 let store = store.read(cx);
1933
1934 let without_worktree = store
1935 .entry(&session_without_worktree)
1936 .expect("missing metadata for thread without project association");
1937 assert!(without_worktree.folder_paths.is_empty());
1938 assert!(
1939 without_worktree.archived,
1940 "expected thread without project association to be archived"
1941 );
1942
1943 let with_worktree = store
1944 .entry(&session_with_worktree)
1945 .expect("missing metadata for thread with project association");
1946 assert_eq!(
1947 with_worktree.folder_paths,
1948 PathList::new(&[Path::new("/project-a")])
1949 );
1950 assert!(
1951 !with_worktree.archived,
1952 "expected thread with project association to remain unarchived"
1953 );
1954 });
1955 }
1956
1957 #[gpui::test]
1958 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1959 init_test(cx);
1960
1961 let fs = FakeFs::new(cx.executor());
1962 let project = Project::test(fs, None::<&Path>, cx).await;
1963 let connection = Rc::new(StubAgentConnection::new());
1964
1965 // Create a regular (non-subagent) AcpThread.
1966 let regular_thread = cx
1967 .update(|cx| {
1968 connection
1969 .clone()
1970 .new_session(project.clone(), PathList::default(), cx)
1971 })
1972 .await
1973 .unwrap();
1974
1975 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1976
1977 // Set a title on the regular thread to trigger a save via handle_thread_update.
1978 cx.update(|cx| {
1979 regular_thread.update(cx, |thread, cx| {
1980 thread.push_user_content_block(None, "content".into(), cx);
1981 thread.set_title("Regular Thread".into(), cx).detach();
1982 });
1983 });
1984 cx.run_until_parked();
1985
1986 // Create a subagent AcpThread
1987 let subagent_session_id = acp::SessionId::new("subagent-session");
1988 let subagent_thread = cx.update(|cx| {
1989 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1990 cx.new(|cx| {
1991 acp_thread::AcpThread::new(
1992 Some(regular_session_id.clone()),
1993 Some("Subagent Thread".into()),
1994 None,
1995 connection.clone(),
1996 project.clone(),
1997 action_log,
1998 subagent_session_id.clone(),
1999 watch::Receiver::constant(acp::PromptCapabilities::new()),
2000 cx,
2001 )
2002 })
2003 });
2004
2005 // Set a title on the subagent thread to trigger handle_thread_update.
2006 cx.update(|cx| {
2007 subagent_thread.update(cx, |thread, cx| {
2008 thread
2009 .set_title("Subagent Thread Title".into(), cx)
2010 .detach();
2011 });
2012 });
2013 cx.run_until_parked();
2014
2015 // List all metadata from the store cache.
2016 let list = cx.update(|cx| {
2017 let store = ThreadMetadataStore::global(cx);
2018 store.read(cx).entries().cloned().collect::<Vec<_>>()
2019 });
2020
2021 // The subagent thread should NOT appear in the sidebar metadata.
2022 // Only the regular thread should be listed.
2023 assert_eq!(
2024 list.len(),
2025 1,
2026 "Expected only the regular thread in sidebar metadata, \
2027 but found {} entries (subagent threads are leaking into the sidebar)",
2028 list.len(),
2029 );
2030 assert_eq!(list[0].session_id, regular_session_id);
2031 assert_eq!(list[0].title.as_ref(), "Regular Thread");
2032 }
2033
2034 #[test]
2035 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
2036 let now = Utc::now();
2037
2038 let operations = vec![
2039 DbOperation::Upsert(make_metadata(
2040 "session-1",
2041 "First Thread",
2042 now,
2043 PathList::default(),
2044 )),
2045 DbOperation::Delete(acp::SessionId::new("session-1")),
2046 ];
2047
2048 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
2049
2050 assert_eq!(deduped.len(), 1);
2051 assert_eq!(
2052 deduped[0],
2053 DbOperation::Delete(acp::SessionId::new("session-1"))
2054 );
2055 }
2056
2057 #[test]
2058 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
2059 let now = Utc::now();
2060 let later = now + chrono::Duration::seconds(1);
2061
2062 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
2063 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
2064
2065 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
2066 DbOperation::Upsert(old_metadata),
2067 DbOperation::Upsert(new_metadata.clone()),
2068 ]);
2069
2070 assert_eq!(deduped.len(), 1);
2071 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
2072 }
2073
2074 #[test]
2075 fn test_dedup_db_operations_preserves_distinct_sessions() {
2076 let now = Utc::now();
2077
2078 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
2079 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
2080 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
2081 DbOperation::Upsert(metadata1.clone()),
2082 DbOperation::Upsert(metadata2.clone()),
2083 ]);
2084
2085 assert_eq!(deduped.len(), 2);
2086 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
2087 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
2088 }
2089
2090 #[gpui::test]
2091 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
2092 init_test(cx);
2093
2094 let paths = PathList::new(&[Path::new("/project-a")]);
2095 let now = Utc::now();
2096 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
2097
2098 cx.update(|cx| {
2099 let store = ThreadMetadataStore::global(cx);
2100 store.update(cx, |store, cx| {
2101 store.save(metadata, cx);
2102 });
2103 });
2104
2105 cx.run_until_parked();
2106
2107 cx.update(|cx| {
2108 let store = ThreadMetadataStore::global(cx);
2109 let store = store.read(cx);
2110
2111 let path_entries = store
2112 .entries_for_path(&paths)
2113 .map(|e| e.session_id.0.to_string())
2114 .collect::<Vec<_>>();
2115 assert_eq!(path_entries, vec!["session-1"]);
2116
2117 let archived = store
2118 .archived_entries()
2119 .map(|e| e.session_id.0.to_string())
2120 .collect::<Vec<_>>();
2121 assert!(archived.is_empty());
2122 });
2123
2124 cx.update(|cx| {
2125 let store = ThreadMetadataStore::global(cx);
2126 store.update(cx, |store, cx| {
2127 store.archive(&acp::SessionId::new("session-1"), None, cx);
2128 });
2129 });
2130
2131 // Thread 1 should now be archived
2132 cx.run_until_parked();
2133
2134 cx.update(|cx| {
2135 let store = ThreadMetadataStore::global(cx);
2136 let store = store.read(cx);
2137
2138 let path_entries = store
2139 .entries_for_path(&paths)
2140 .map(|e| e.session_id.0.to_string())
2141 .collect::<Vec<_>>();
2142 assert!(path_entries.is_empty());
2143
2144 let archived = store.archived_entries().collect::<Vec<_>>();
2145 assert_eq!(archived.len(), 1);
2146 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
2147 assert!(archived[0].archived);
2148 });
2149
2150 cx.update(|cx| {
2151 let store = ThreadMetadataStore::global(cx);
2152 store.update(cx, |store, cx| {
2153 store.unarchive(&acp::SessionId::new("session-1"), cx);
2154 });
2155 });
2156
2157 cx.run_until_parked();
2158
2159 cx.update(|cx| {
2160 let store = ThreadMetadataStore::global(cx);
2161 let store = store.read(cx);
2162
2163 let path_entries = store
2164 .entries_for_path(&paths)
2165 .map(|e| e.session_id.0.to_string())
2166 .collect::<Vec<_>>();
2167 assert_eq!(path_entries, vec!["session-1"]);
2168
2169 let archived = store
2170 .archived_entries()
2171 .map(|e| e.session_id.0.to_string())
2172 .collect::<Vec<_>>();
2173 assert!(archived.is_empty());
2174 });
2175 }
2176
2177 #[gpui::test]
2178 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
2179 init_test(cx);
2180
2181 let paths = PathList::new(&[Path::new("/project-a")]);
2182 let now = Utc::now();
2183
2184 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
2185 let metadata2 = make_metadata(
2186 "session-2",
2187 "Archived Thread",
2188 now - chrono::Duration::seconds(1),
2189 paths.clone(),
2190 );
2191
2192 cx.update(|cx| {
2193 let store = ThreadMetadataStore::global(cx);
2194 store.update(cx, |store, cx| {
2195 store.save(metadata1, cx);
2196 store.save(metadata2, cx);
2197 });
2198 });
2199
2200 cx.run_until_parked();
2201
2202 cx.update(|cx| {
2203 let store = ThreadMetadataStore::global(cx);
2204 store.update(cx, |store, cx| {
2205 store.archive(&acp::SessionId::new("session-2"), None, cx);
2206 });
2207 });
2208
2209 cx.run_until_parked();
2210
2211 cx.update(|cx| {
2212 let store = ThreadMetadataStore::global(cx);
2213 let store = store.read(cx);
2214
2215 let path_entries = store
2216 .entries_for_path(&paths)
2217 .map(|e| e.session_id.0.to_string())
2218 .collect::<Vec<_>>();
2219 assert_eq!(path_entries, vec!["session-1"]);
2220
2221 let all_entries = store
2222 .entries()
2223 .map(|e| e.session_id.0.to_string())
2224 .collect::<Vec<_>>();
2225 assert_eq!(all_entries.len(), 2);
2226 assert!(all_entries.contains(&"session-1".to_string()));
2227 assert!(all_entries.contains(&"session-2".to_string()));
2228
2229 let archived = store
2230 .archived_entries()
2231 .map(|e| e.session_id.0.to_string())
2232 .collect::<Vec<_>>();
2233 assert_eq!(archived, vec!["session-2"]);
2234 });
2235 }
2236
2237 #[gpui::test]
2238 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
2239 init_test(cx);
2240
2241 let paths = PathList::new(&[Path::new("/project-a")]);
2242 let now = Utc::now();
2243
2244 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
2245 let m2 = make_metadata(
2246 "session-2",
2247 "Thread Two",
2248 now - chrono::Duration::seconds(1),
2249 paths.clone(),
2250 );
2251 let m3 = make_metadata(
2252 "session-3",
2253 "Thread Three",
2254 now - chrono::Duration::seconds(2),
2255 paths,
2256 );
2257
2258 cx.update(|cx| {
2259 let store = ThreadMetadataStore::global(cx);
2260 store.update(cx, |store, cx| {
2261 store.save_all(vec![m1, m2, m3], cx);
2262 });
2263 });
2264
2265 cx.run_until_parked();
2266
2267 cx.update(|cx| {
2268 let store = ThreadMetadataStore::global(cx);
2269 let store = store.read(cx);
2270
2271 let all_entries = store
2272 .entries()
2273 .map(|e| e.session_id.0.to_string())
2274 .collect::<Vec<_>>();
2275 assert_eq!(all_entries.len(), 3);
2276 assert!(all_entries.contains(&"session-1".to_string()));
2277 assert!(all_entries.contains(&"session-2".to_string()));
2278 assert!(all_entries.contains(&"session-3".to_string()));
2279
2280 let entry_ids = store.entry_ids().collect::<Vec<_>>();
2281 assert_eq!(entry_ids.len(), 3);
2282 });
2283 }
2284
2285 #[gpui::test]
2286 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
2287 init_test(cx);
2288
2289 let paths = PathList::new(&[Path::new("/project-a")]);
2290 let now = Utc::now();
2291 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
2292
2293 cx.update(|cx| {
2294 let store = ThreadMetadataStore::global(cx);
2295 store.update(cx, |store, cx| {
2296 store.save(metadata, cx);
2297 });
2298 });
2299
2300 cx.run_until_parked();
2301
2302 cx.update(|cx| {
2303 let store = ThreadMetadataStore::global(cx);
2304 store.update(cx, |store, cx| {
2305 store.archive(&acp::SessionId::new("session-1"), None, cx);
2306 });
2307 });
2308
2309 cx.run_until_parked();
2310
2311 cx.update(|cx| {
2312 let store = ThreadMetadataStore::global(cx);
2313 store.update(cx, |store, cx| {
2314 let _ = store.reload(cx);
2315 });
2316 });
2317
2318 cx.run_until_parked();
2319
2320 cx.update(|cx| {
2321 let store = ThreadMetadataStore::global(cx);
2322 let store = store.read(cx);
2323
2324 let thread = store
2325 .entries()
2326 .find(|e| e.session_id.0.as_ref() == "session-1")
2327 .expect("thread should exist after reload");
2328 assert!(thread.archived);
2329
2330 let path_entries = store
2331 .entries_for_path(&paths)
2332 .map(|e| e.session_id.0.to_string())
2333 .collect::<Vec<_>>();
2334 assert!(path_entries.is_empty());
2335
2336 let archived = store
2337 .archived_entries()
2338 .map(|e| e.session_id.0.to_string())
2339 .collect::<Vec<_>>();
2340 assert_eq!(archived, vec!["session-1"]);
2341 });
2342 }
2343
2344 #[gpui::test]
2345 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
2346 init_test(cx);
2347
2348 cx.run_until_parked();
2349
2350 cx.update(|cx| {
2351 let store = ThreadMetadataStore::global(cx);
2352 store.update(cx, |store, cx| {
2353 store.archive(&acp::SessionId::new("nonexistent"), None, cx);
2354 });
2355 });
2356
2357 cx.run_until_parked();
2358
2359 cx.update(|cx| {
2360 let store = ThreadMetadataStore::global(cx);
2361 let store = store.read(cx);
2362
2363 assert!(store.is_empty());
2364 assert_eq!(store.entries().count(), 0);
2365 assert_eq!(store.archived_entries().count(), 0);
2366 });
2367 }
2368
2369 #[gpui::test]
2370 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
2371 init_test(cx);
2372
2373 let paths = PathList::new(&[Path::new("/project-a")]);
2374 let now = Utc::now();
2375 let metadata = make_metadata("session-1", "Thread 1", now, paths);
2376 let session_id = metadata.session_id.clone();
2377
2378 cx.update(|cx| {
2379 let store = ThreadMetadataStore::global(cx);
2380 store.update(cx, |store, cx| {
2381 store.save(metadata.clone(), cx);
2382 store.archive(&session_id, None, cx);
2383 });
2384 });
2385
2386 cx.run_until_parked();
2387
2388 cx.update(|cx| {
2389 let store = ThreadMetadataStore::global(cx);
2390 let store = store.read(cx);
2391
2392 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
2393 pretty_assertions::assert_eq!(
2394 entries,
2395 vec![ThreadMetadata {
2396 archived: true,
2397 ..metadata
2398 }]
2399 );
2400 });
2401 }
2402
2403 #[gpui::test]
2404 async fn test_create_and_retrieve_archived_worktree(cx: &mut TestAppContext) {
2405 init_test(cx);
2406 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2407
2408 let id = store
2409 .read_with(cx, |store, cx| {
2410 store.create_archived_worktree(
2411 "/tmp/worktree".to_string(),
2412 "/home/user/repo".to_string(),
2413 Some("feature-branch".to_string()),
2414 "staged_aaa".to_string(),
2415 "unstaged_bbb".to_string(),
2416 "original_000".to_string(),
2417 cx,
2418 )
2419 })
2420 .await
2421 .unwrap();
2422
2423 store
2424 .read_with(cx, |store, cx| {
2425 store.link_thread_to_archived_worktree("session-1".to_string(), id, cx)
2426 })
2427 .await
2428 .unwrap();
2429
2430 let worktrees = store
2431 .read_with(cx, |store, cx| {
2432 store.get_archived_worktrees_for_thread("session-1".to_string(), cx)
2433 })
2434 .await
2435 .unwrap();
2436
2437 assert_eq!(worktrees.len(), 1);
2438 let wt = &worktrees[0];
2439 assert_eq!(wt.id, id);
2440 assert_eq!(wt.worktree_path, PathBuf::from("/tmp/worktree"));
2441 assert_eq!(wt.main_repo_path, PathBuf::from("/home/user/repo"));
2442 assert_eq!(wt.branch_name.as_deref(), Some("feature-branch"));
2443 assert_eq!(wt.staged_commit_hash, "staged_aaa");
2444 assert_eq!(wt.unstaged_commit_hash, "unstaged_bbb");
2445 assert_eq!(wt.original_commit_hash, "original_000");
2446 }
2447
2448 #[gpui::test]
2449 async fn test_delete_archived_worktree(cx: &mut TestAppContext) {
2450 init_test(cx);
2451 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2452
2453 let id = store
2454 .read_with(cx, |store, cx| {
2455 store.create_archived_worktree(
2456 "/tmp/worktree".to_string(),
2457 "/home/user/repo".to_string(),
2458 Some("main".to_string()),
2459 "deadbeef".to_string(),
2460 "deadbeef".to_string(),
2461 "original_000".to_string(),
2462 cx,
2463 )
2464 })
2465 .await
2466 .unwrap();
2467
2468 store
2469 .read_with(cx, |store, cx| {
2470 store.link_thread_to_archived_worktree("session-1".to_string(), id, cx)
2471 })
2472 .await
2473 .unwrap();
2474
2475 store
2476 .read_with(cx, |store, cx| store.delete_archived_worktree(id, cx))
2477 .await
2478 .unwrap();
2479
2480 let worktrees = store
2481 .read_with(cx, |store, cx| {
2482 store.get_archived_worktrees_for_thread("session-1".to_string(), cx)
2483 })
2484 .await
2485 .unwrap();
2486 assert!(worktrees.is_empty());
2487 }
2488
2489 #[gpui::test]
2490 async fn test_link_multiple_threads_to_archived_worktree(cx: &mut TestAppContext) {
2491 init_test(cx);
2492 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2493
2494 let id = store
2495 .read_with(cx, |store, cx| {
2496 store.create_archived_worktree(
2497 "/tmp/worktree".to_string(),
2498 "/home/user/repo".to_string(),
2499 None,
2500 "abc123".to_string(),
2501 "abc123".to_string(),
2502 "original_000".to_string(),
2503 cx,
2504 )
2505 })
2506 .await
2507 .unwrap();
2508
2509 store
2510 .read_with(cx, |store, cx| {
2511 store.link_thread_to_archived_worktree("session-1".to_string(), id, cx)
2512 })
2513 .await
2514 .unwrap();
2515
2516 store
2517 .read_with(cx, |store, cx| {
2518 store.link_thread_to_archived_worktree("session-2".to_string(), id, cx)
2519 })
2520 .await
2521 .unwrap();
2522
2523 let wt1 = store
2524 .read_with(cx, |store, cx| {
2525 store.get_archived_worktrees_for_thread("session-1".to_string(), cx)
2526 })
2527 .await
2528 .unwrap();
2529
2530 let wt2 = store
2531 .read_with(cx, |store, cx| {
2532 store.get_archived_worktrees_for_thread("session-2".to_string(), cx)
2533 })
2534 .await
2535 .unwrap();
2536
2537 assert_eq!(wt1.len(), 1);
2538 assert_eq!(wt2.len(), 1);
2539 assert_eq!(wt1[0].id, wt2[0].id);
2540 }
2541
2542 #[gpui::test]
2543 async fn test_complete_worktree_restore_multiple_paths(cx: &mut TestAppContext) {
2544 init_test(cx);
2545 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2546
2547 let original_paths = PathList::new(&[
2548 Path::new("/projects/worktree-a"),
2549 Path::new("/projects/worktree-b"),
2550 Path::new("/other/unrelated"),
2551 ]);
2552 let meta = make_metadata("session-multi", "Multi Thread", Utc::now(), original_paths);
2553
2554 store.update(cx, |store, cx| {
2555 store.save_manually(meta, cx);
2556 });
2557
2558 let replacements = vec![
2559 (
2560 PathBuf::from("/projects/worktree-a"),
2561 PathBuf::from("/restored/worktree-a"),
2562 ),
2563 (
2564 PathBuf::from("/projects/worktree-b"),
2565 PathBuf::from("/restored/worktree-b"),
2566 ),
2567 ];
2568
2569 store.update(cx, |store, cx| {
2570 store.complete_worktree_restore(
2571 &acp::SessionId::new("session-multi"),
2572 &replacements,
2573 cx,
2574 );
2575 });
2576
2577 let entry = store.read_with(cx, |store, _cx| {
2578 store.entry(&acp::SessionId::new("session-multi")).cloned()
2579 });
2580 let entry = entry.unwrap();
2581 let paths = entry.folder_paths.paths();
2582 assert_eq!(paths.len(), 3);
2583 assert!(paths.contains(&PathBuf::from("/restored/worktree-a")));
2584 assert!(paths.contains(&PathBuf::from("/restored/worktree-b")));
2585 assert!(paths.contains(&PathBuf::from("/other/unrelated")));
2586 }
2587
2588 #[gpui::test]
2589 async fn test_complete_worktree_restore_preserves_unmatched_paths(cx: &mut TestAppContext) {
2590 init_test(cx);
2591 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2592
2593 let original_paths =
2594 PathList::new(&[Path::new("/projects/worktree-a"), Path::new("/other/path")]);
2595 let meta = make_metadata("session-partial", "Partial", Utc::now(), original_paths);
2596
2597 store.update(cx, |store, cx| {
2598 store.save_manually(meta, cx);
2599 });
2600
2601 let replacements = vec![
2602 (
2603 PathBuf::from("/projects/worktree-a"),
2604 PathBuf::from("/new/worktree-a"),
2605 ),
2606 (
2607 PathBuf::from("/nonexistent/path"),
2608 PathBuf::from("/should/not/appear"),
2609 ),
2610 ];
2611
2612 store.update(cx, |store, cx| {
2613 store.complete_worktree_restore(
2614 &acp::SessionId::new("session-partial"),
2615 &replacements,
2616 cx,
2617 );
2618 });
2619
2620 let entry = store.read_with(cx, |store, _cx| {
2621 store
2622 .entry(&acp::SessionId::new("session-partial"))
2623 .cloned()
2624 });
2625 let entry = entry.unwrap();
2626 let paths = entry.folder_paths.paths();
2627 assert_eq!(paths.len(), 2);
2628 assert!(paths.contains(&PathBuf::from("/new/worktree-a")));
2629 assert!(paths.contains(&PathBuf::from("/other/path")));
2630 assert!(!paths.contains(&PathBuf::from("/should/not/appear")));
2631 }
2632
2633 #[gpui::test]
2634 async fn test_update_restored_worktree_paths_multiple(cx: &mut TestAppContext) {
2635 init_test(cx);
2636 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2637
2638 let original_paths = PathList::new(&[
2639 Path::new("/projects/worktree-a"),
2640 Path::new("/projects/worktree-b"),
2641 Path::new("/other/unrelated"),
2642 ]);
2643 let meta = make_metadata("session-multi", "Multi Thread", Utc::now(), original_paths);
2644
2645 store.update(cx, |store, cx| {
2646 store.save_manually(meta, cx);
2647 });
2648
2649 let replacements = vec![
2650 (
2651 PathBuf::from("/projects/worktree-a"),
2652 PathBuf::from("/restored/worktree-a"),
2653 ),
2654 (
2655 PathBuf::from("/projects/worktree-b"),
2656 PathBuf::from("/restored/worktree-b"),
2657 ),
2658 ];
2659
2660 store.update(cx, |store, cx| {
2661 store.update_restored_worktree_paths(
2662 &acp::SessionId::new("session-multi"),
2663 &replacements,
2664 cx,
2665 );
2666 });
2667
2668 let entry = store.read_with(cx, |store, _cx| {
2669 store.entry(&acp::SessionId::new("session-multi")).cloned()
2670 });
2671 let entry = entry.unwrap();
2672 let paths = entry.folder_paths.paths();
2673 assert_eq!(paths.len(), 3);
2674 assert!(paths.contains(&PathBuf::from("/restored/worktree-a")));
2675 assert!(paths.contains(&PathBuf::from("/restored/worktree-b")));
2676 assert!(paths.contains(&PathBuf::from("/other/unrelated")));
2677 }
2678
2679 #[gpui::test]
2680 async fn test_update_restored_worktree_paths_preserves_unmatched(cx: &mut TestAppContext) {
2681 init_test(cx);
2682 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2683
2684 let original_paths =
2685 PathList::new(&[Path::new("/projects/worktree-a"), Path::new("/other/path")]);
2686 let meta = make_metadata("session-partial", "Partial", Utc::now(), original_paths);
2687
2688 store.update(cx, |store, cx| {
2689 store.save_manually(meta, cx);
2690 });
2691
2692 let replacements = vec![
2693 (
2694 PathBuf::from("/projects/worktree-a"),
2695 PathBuf::from("/new/worktree-a"),
2696 ),
2697 (
2698 PathBuf::from("/nonexistent/path"),
2699 PathBuf::from("/should/not/appear"),
2700 ),
2701 ];
2702
2703 store.update(cx, |store, cx| {
2704 store.update_restored_worktree_paths(
2705 &acp::SessionId::new("session-partial"),
2706 &replacements,
2707 cx,
2708 );
2709 });
2710
2711 let entry = store.read_with(cx, |store, _cx| {
2712 store
2713 .entry(&acp::SessionId::new("session-partial"))
2714 .cloned()
2715 });
2716 let entry = entry.unwrap();
2717 let paths = entry.folder_paths.paths();
2718 assert_eq!(paths.len(), 2);
2719 assert!(paths.contains(&PathBuf::from("/new/worktree-a")));
2720 assert!(paths.contains(&PathBuf::from("/other/path")));
2721 assert!(!paths.contains(&PathBuf::from("/should/not/appear")));
2722 }
2723
2724 #[gpui::test]
2725 async fn test_multiple_archived_worktrees_per_thread(cx: &mut TestAppContext) {
2726 init_test(cx);
2727 let store = cx.update(|cx| ThreadMetadataStore::global(cx));
2728
2729 let id1 = store
2730 .read_with(cx, |store, cx| {
2731 store.create_archived_worktree(
2732 "/projects/worktree-a".to_string(),
2733 "/home/user/repo".to_string(),
2734 Some("branch-a".to_string()),
2735 "staged_a".to_string(),
2736 "unstaged_a".to_string(),
2737 "original_000".to_string(),
2738 cx,
2739 )
2740 })
2741 .await
2742 .unwrap();
2743
2744 let id2 = store
2745 .read_with(cx, |store, cx| {
2746 store.create_archived_worktree(
2747 "/projects/worktree-b".to_string(),
2748 "/home/user/repo".to_string(),
2749 Some("branch-b".to_string()),
2750 "staged_b".to_string(),
2751 "unstaged_b".to_string(),
2752 "original_000".to_string(),
2753 cx,
2754 )
2755 })
2756 .await
2757 .unwrap();
2758
2759 store
2760 .read_with(cx, |store, cx| {
2761 store.link_thread_to_archived_worktree("session-1".to_string(), id1, cx)
2762 })
2763 .await
2764 .unwrap();
2765
2766 store
2767 .read_with(cx, |store, cx| {
2768 store.link_thread_to_archived_worktree("session-1".to_string(), id2, cx)
2769 })
2770 .await
2771 .unwrap();
2772
2773 let worktrees = store
2774 .read_with(cx, |store, cx| {
2775 store.get_archived_worktrees_for_thread("session-1".to_string(), cx)
2776 })
2777 .await
2778 .unwrap();
2779
2780 assert_eq!(worktrees.len(), 2);
2781
2782 let paths: Vec<&Path> = worktrees
2783 .iter()
2784 .map(|w| w.worktree_path.as_path())
2785 .collect();
2786 assert!(paths.contains(&Path::new("/projects/worktree-a")));
2787 assert!(paths.contains(&Path::new("/projects/worktree-b")));
2788 }
2789}