1use std::{path::Path, sync::Arc};
2
3use acp_thread::AcpThreadEvent;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::Context as _;
7use chrono::{DateTime, Utc};
8use collections::{HashMap, HashSet};
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 ThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We skip migrating threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 let store = ThreadMetadataStore::global(cx);
46 let db = store.read(cx).db.clone();
47
48 cx.spawn(async move |cx| {
49 let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
50
51 let is_first_migration = existing_entries.is_empty();
52
53 let mut to_migrate = store.read_with(cx, |_store, cx| {
54 ThreadStore::global(cx)
55 .read(cx)
56 .entries()
57 .filter_map(|entry| {
58 if existing_entries.contains(&entry.id.0) {
59 return None;
60 }
61
62 Some(ThreadMetadata {
63 session_id: entry.id,
64 agent_id: ZED_AGENT_ID.clone(),
65 title: entry.title,
66 updated_at: entry.updated_at,
67 created_at: entry.created_at,
68 folder_paths: entry.folder_paths,
69 archived: true,
70 })
71 })
72 .collect::<Vec<_>>()
73 });
74
75 if to_migrate.is_empty() {
76 return anyhow::Ok(());
77 }
78
79 // On the first migration (no entries in DB yet), keep the 5 most
80 // recent threads per project unarchived.
81 if is_first_migration {
82 let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
83 for entry in &mut to_migrate {
84 if entry.folder_paths.is_empty() {
85 continue;
86 }
87 per_project
88 .entry(entry.folder_paths.clone())
89 .or_default()
90 .push(entry);
91 }
92 for entries in per_project.values_mut() {
93 entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
94 for entry in entries.iter_mut().take(5) {
95 entry.archived = false;
96 }
97 }
98 }
99
100 log::info!("Migrating {} thread store entries", to_migrate.len());
101
102 // Manually save each entry to the database and call reload, otherwise
103 // we'll end up triggering lots of reloads after each save
104 for entry in to_migrate {
105 db.save(entry).await?;
106 }
107
108 log::info!("Finished migrating thread store entries");
109
110 let _ = store.update(cx, |store, cx| store.reload(cx));
111 anyhow::Ok(())
112 })
113 .detach_and_log_err(cx);
114}
115
116struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
117impl Global for GlobalThreadMetadataStore {}
118
119/// Lightweight metadata for any thread (native or ACP), enough to populate
120/// the sidebar list and route to the correct load path when clicked.
121#[derive(Debug, Clone, PartialEq)]
122pub struct ThreadMetadata {
123 pub session_id: acp::SessionId,
124 pub agent_id: AgentId,
125 pub title: SharedString,
126 pub updated_at: DateTime<Utc>,
127 pub created_at: Option<DateTime<Utc>>,
128 pub folder_paths: PathList,
129 pub archived: bool,
130}
131
132impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
133 fn from(meta: &ThreadMetadata) -> Self {
134 Self {
135 session_id: meta.session_id.clone(),
136 work_dirs: Some(meta.folder_paths.clone()),
137 title: Some(meta.title.clone()),
138 updated_at: Some(meta.updated_at),
139 created_at: meta.created_at,
140 meta: None,
141 }
142 }
143}
144
145/// The store holds all metadata needed to show threads in the sidebar/the archive.
146///
147/// Automatically listens to AcpThread events and updates metadata if it has changed.
148pub struct ThreadMetadataStore {
149 db: ThreadMetadataDb,
150 threads: HashMap<acp::SessionId, ThreadMetadata>,
151 threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
152 reload_task: Option<Shared<Task<()>>>,
153 session_subscriptions: HashMap<acp::SessionId, Subscription>,
154 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
155 _db_operations_task: Task<()>,
156}
157
158#[derive(Debug, PartialEq)]
159enum DbOperation {
160 Upsert(ThreadMetadata),
161 Delete(acp::SessionId),
162}
163
164impl DbOperation {
165 fn id(&self) -> &acp::SessionId {
166 match self {
167 DbOperation::Upsert(thread) => &thread.session_id,
168 DbOperation::Delete(session_id) => session_id,
169 }
170 }
171}
172
173impl ThreadMetadataStore {
174 #[cfg(not(any(test, feature = "test-support")))]
175 pub fn init_global(cx: &mut App) {
176 if cx.has_global::<Self>() {
177 return;
178 }
179
180 let db = ThreadMetadataDb::global(cx);
181 let thread_store = cx.new(|cx| Self::new(db, cx));
182 cx.set_global(GlobalThreadMetadataStore(thread_store));
183 }
184
185 #[cfg(any(test, feature = "test-support"))]
186 pub fn init_global(cx: &mut App) {
187 let thread = std::thread::current();
188 let test_name = thread.name().unwrap_or("unknown_test");
189 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
190 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
191 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
192 cx.set_global(GlobalThreadMetadataStore(thread_store));
193 }
194
195 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
196 cx.try_global::<GlobalThreadMetadataStore>()
197 .map(|store| store.0.clone())
198 }
199
200 pub fn global(cx: &App) -> Entity<Self> {
201 cx.global::<GlobalThreadMetadataStore>().0.clone()
202 }
203
204 pub fn is_empty(&self) -> bool {
205 self.threads.is_empty()
206 }
207
208 /// Returns all thread IDs.
209 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
210 self.threads.keys().cloned()
211 }
212
213 /// Returns the metadata for a specific thread, if it exists.
214 pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
215 self.threads.get(session_id)
216 }
217
218 /// Returns all threads.
219 pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
220 self.threads.values()
221 }
222
223 /// Returns all archived threads.
224 pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
225 self.entries().filter(|t| t.archived)
226 }
227
228 /// Returns all threads for the given path list, excluding archived threads.
229 pub fn entries_for_path(
230 &self,
231 path_list: &PathList,
232 ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
233 self.threads_by_paths
234 .get(path_list)
235 .into_iter()
236 .flatten()
237 .filter_map(|s| self.threads.get(s))
238 .filter(|s| !s.archived)
239 }
240
241 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
242 let db = self.db.clone();
243 self.reload_task.take();
244
245 let list_task = cx
246 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
247
248 let reload_task = cx
249 .spawn(async move |this, cx| {
250 let Some(rows) = list_task.await.log_err() else {
251 return;
252 };
253
254 this.update(cx, |this, cx| {
255 this.threads.clear();
256 this.threads_by_paths.clear();
257
258 for row in rows {
259 this.threads_by_paths
260 .entry(row.folder_paths.clone())
261 .or_default()
262 .insert(row.session_id.clone());
263 this.threads.insert(row.session_id.clone(), row);
264 }
265
266 cx.notify();
267 })
268 .ok();
269 })
270 .shared();
271 self.reload_task = Some(reload_task.clone());
272 reload_task
273 }
274
275 pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
276 if !cx.has_flag::<AgentV2FeatureFlag>() {
277 return;
278 }
279
280 for metadata in metadata {
281 self.save_internal(metadata);
282 }
283 cx.notify();
284 }
285
286 #[cfg(any(test, feature = "test-support"))]
287 pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
288 self.save(metadata, cx)
289 }
290
291 fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
292 if !cx.has_flag::<AgentV2FeatureFlag>() {
293 return;
294 }
295
296 self.save_internal(metadata);
297 cx.notify();
298 }
299
300 fn save_internal(&mut self, metadata: ThreadMetadata) {
301 // If the folder paths have changed, we need to clear the old entry
302 if let Some(thread) = self.threads.get(&metadata.session_id)
303 && thread.folder_paths != metadata.folder_paths
304 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
305 {
306 session_ids.remove(&metadata.session_id);
307 }
308
309 self.threads
310 .insert(metadata.session_id.clone(), metadata.clone());
311
312 self.threads_by_paths
313 .entry(metadata.folder_paths.clone())
314 .or_default()
315 .insert(metadata.session_id.clone());
316
317 self.pending_thread_ops_tx
318 .try_send(DbOperation::Upsert(metadata))
319 .log_err();
320 }
321
322 pub fn update_working_directories(
323 &mut self,
324 session_id: &acp::SessionId,
325 work_dirs: PathList,
326 cx: &mut Context<Self>,
327 ) {
328 if !cx.has_flag::<AgentV2FeatureFlag>() {
329 return;
330 }
331
332 if let Some(thread) = self.threads.get(session_id) {
333 self.save_internal(ThreadMetadata {
334 folder_paths: work_dirs,
335 ..thread.clone()
336 });
337 cx.notify();
338 }
339 }
340
341 pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
342 self.update_archived(session_id, true, cx);
343 }
344
345 pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
346 self.update_archived(session_id, false, cx);
347 }
348
349 fn update_archived(
350 &mut self,
351 session_id: &acp::SessionId,
352 archived: bool,
353 cx: &mut Context<Self>,
354 ) {
355 if !cx.has_flag::<AgentV2FeatureFlag>() {
356 return;
357 }
358
359 if let Some(thread) = self.threads.get(session_id) {
360 self.save_internal(ThreadMetadata {
361 archived,
362 ..thread.clone()
363 });
364 cx.notify();
365 }
366 }
367
368 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
369 if !cx.has_flag::<AgentV2FeatureFlag>() {
370 return;
371 }
372
373 if let Some(thread) = self.threads.get(&session_id)
374 && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
375 {
376 session_ids.remove(&session_id);
377 }
378 self.threads.remove(&session_id);
379 self.pending_thread_ops_tx
380 .try_send(DbOperation::Delete(session_id))
381 .log_err();
382 cx.notify();
383 }
384
385 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
386 let weak_store = cx.weak_entity();
387
388 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
389 // Don't track subagent threads in the sidebar.
390 if thread.parent_session_id().is_some() {
391 return;
392 }
393
394 let thread_entity = cx.entity();
395
396 cx.on_release({
397 let weak_store = weak_store.clone();
398 move |thread, cx| {
399 weak_store
400 .update(cx, |store, cx| {
401 let session_id = thread.session_id().clone();
402 store.session_subscriptions.remove(&session_id);
403 if thread.entries().is_empty() {
404 // Empty threads can be unloaded without ever being
405 // durably persisted by the underlying agent.
406 store.delete(session_id, cx);
407 }
408 })
409 .ok();
410 }
411 })
412 .detach();
413
414 weak_store
415 .update(cx, |this, cx| {
416 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
417 this.session_subscriptions
418 .insert(thread.session_id().clone(), subscription);
419 })
420 .ok();
421 })
422 .detach();
423
424 let (tx, rx) = smol::channel::unbounded();
425 let _db_operations_task = cx.background_spawn({
426 let db = db.clone();
427 async move {
428 while let Ok(first_update) = rx.recv().await {
429 let mut updates = vec![first_update];
430 while let Ok(update) = rx.try_recv() {
431 updates.push(update);
432 }
433 let updates = Self::dedup_db_operations(updates);
434 for operation in updates {
435 match operation {
436 DbOperation::Upsert(metadata) => {
437 db.save(metadata).await.log_err();
438 }
439 DbOperation::Delete(session_id) => {
440 db.delete(session_id).await.log_err();
441 }
442 }
443 }
444 }
445 }
446 });
447
448 let mut this = Self {
449 db,
450 threads: HashMap::default(),
451 threads_by_paths: HashMap::default(),
452 reload_task: None,
453 session_subscriptions: HashMap::default(),
454 pending_thread_ops_tx: tx,
455 _db_operations_task,
456 };
457 let _ = this.reload(cx);
458 this
459 }
460
461 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
462 let mut ops = HashMap::default();
463 for operation in operations.into_iter().rev() {
464 if ops.contains_key(operation.id()) {
465 continue;
466 }
467 ops.insert(operation.id().clone(), operation);
468 }
469 ops.into_values().collect()
470 }
471
472 fn handle_thread_event(
473 &mut self,
474 thread: Entity<acp_thread::AcpThread>,
475 event: &AcpThreadEvent,
476 cx: &mut Context<Self>,
477 ) {
478 // Don't track subagent threads in the sidebar.
479 if thread.read(cx).parent_session_id().is_some() {
480 return;
481 }
482
483 match event {
484 AcpThreadEvent::NewEntry
485 | AcpThreadEvent::TitleUpdated
486 | AcpThreadEvent::EntryUpdated(_)
487 | AcpThreadEvent::EntriesRemoved(_)
488 | AcpThreadEvent::ToolAuthorizationRequested(_)
489 | AcpThreadEvent::ToolAuthorizationReceived(_)
490 | AcpThreadEvent::Retry(_)
491 | AcpThreadEvent::Stopped(_)
492 | AcpThreadEvent::Error
493 | AcpThreadEvent::LoadError(_)
494 | AcpThreadEvent::Refusal
495 | AcpThreadEvent::WorkingDirectoriesUpdated => {
496 let thread_ref = thread.read(cx);
497 let existing_thread = self.threads.get(thread_ref.session_id());
498 let session_id = thread_ref.session_id().clone();
499 let title = thread_ref
500 .title()
501 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
502
503 let updated_at = Utc::now();
504
505 let created_at = existing_thread
506 .and_then(|t| t.created_at)
507 .unwrap_or_else(|| updated_at);
508
509 let agent_id = thread_ref.connection().agent_id();
510
511 let folder_paths = {
512 let project = thread_ref.project().read(cx);
513 let paths: Vec<Arc<Path>> = project
514 .visible_worktrees(cx)
515 .map(|worktree| worktree.read(cx).abs_path())
516 .collect();
517 PathList::new(&paths)
518 };
519
520 // Threads without a folder path (e.g. started in an empty
521 // window) are archived by default so they don't get lost,
522 // because they won't show up in the sidebar. Users can reload
523 // them from the archive.
524 let archived = existing_thread
525 .map(|t| t.archived)
526 .unwrap_or(folder_paths.is_empty());
527
528 let metadata = ThreadMetadata {
529 session_id,
530 agent_id,
531 title,
532 created_at: Some(created_at),
533 updated_at,
534 folder_paths,
535 archived,
536 };
537
538 self.save(metadata, cx);
539 }
540 AcpThreadEvent::TokenUsageUpdated
541 | AcpThreadEvent::SubagentSpawned(_)
542 | AcpThreadEvent::PromptCapabilitiesUpdated
543 | AcpThreadEvent::AvailableCommandsUpdated(_)
544 | AcpThreadEvent::ModeUpdated(_)
545 | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
546 }
547 }
548}
549
550impl Global for ThreadMetadataStore {}
551
552struct ThreadMetadataDb(ThreadSafeConnection);
553
554impl Domain for ThreadMetadataDb {
555 const NAME: &str = stringify!(ThreadMetadataDb);
556
557 const MIGRATIONS: &[&str] = &[
558 sql!(
559 CREATE TABLE IF NOT EXISTS sidebar_threads(
560 session_id TEXT PRIMARY KEY,
561 agent_id TEXT,
562 title TEXT NOT NULL,
563 updated_at TEXT NOT NULL,
564 created_at TEXT,
565 folder_paths TEXT,
566 folder_paths_order TEXT
567 ) STRICT;
568 ),
569 sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
570 ];
571}
572
573db::static_connection!(ThreadMetadataDb, []);
574
575impl ThreadMetadataDb {
576 pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
577 self.select::<Arc<str>>(
578 "SELECT session_id FROM sidebar_threads \
579 ORDER BY updated_at DESC",
580 )?()
581 }
582
583 /// List all sidebar thread metadata, ordered by updated_at descending.
584 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
585 self.select::<ThreadMetadata>(
586 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
587 FROM sidebar_threads \
588 ORDER BY updated_at DESC"
589 )?()
590 }
591
592 /// Upsert metadata for a thread.
593 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
594 let id = row.session_id.0.clone();
595 let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
596 None
597 } else {
598 Some(row.agent_id.to_string())
599 };
600 let title = row.title.to_string();
601 let updated_at = row.updated_at.to_rfc3339();
602 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
603 let serialized = row.folder_paths.serialize();
604 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
605 (None, None)
606 } else {
607 (Some(serialized.paths), Some(serialized.order))
608 };
609 let archived = row.archived;
610
611 self.write(move |conn| {
612 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
613 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
614 ON CONFLICT(session_id) DO UPDATE SET \
615 agent_id = excluded.agent_id, \
616 title = excluded.title, \
617 updated_at = excluded.updated_at, \
618 created_at = excluded.created_at, \
619 folder_paths = excluded.folder_paths, \
620 folder_paths_order = excluded.folder_paths_order, \
621 archived = excluded.archived";
622 let mut stmt = Statement::prepare(conn, sql)?;
623 let mut i = stmt.bind(&id, 1)?;
624 i = stmt.bind(&agent_id, i)?;
625 i = stmt.bind(&title, i)?;
626 i = stmt.bind(&updated_at, i)?;
627 i = stmt.bind(&created_at, i)?;
628 i = stmt.bind(&folder_paths, i)?;
629 i = stmt.bind(&folder_paths_order, i)?;
630 stmt.bind(&archived, i)?;
631 stmt.exec()
632 })
633 .await
634 }
635
636 /// Delete metadata for a single thread.
637 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
638 let id = session_id.0.clone();
639 self.write(move |conn| {
640 let mut stmt =
641 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
642 stmt.bind(&id, 1)?;
643 stmt.exec()
644 })
645 .await
646 }
647}
648
649impl Column for ThreadMetadata {
650 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
651 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
652 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
653 let (title, next): (String, i32) = Column::column(statement, next)?;
654 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
655 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
656 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
657 let (folder_paths_order_str, next): (Option<String>, i32) =
658 Column::column(statement, next)?;
659 let (archived, next): (bool, i32) = Column::column(statement, next)?;
660
661 let agent_id = agent_id
662 .map(|id| AgentId::new(id))
663 .unwrap_or(ZED_AGENT_ID.clone());
664
665 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
666 let created_at = created_at_str
667 .as_deref()
668 .map(DateTime::parse_from_rfc3339)
669 .transpose()?
670 .map(|dt| dt.with_timezone(&Utc));
671
672 let folder_paths = folder_paths_str
673 .map(|paths| {
674 PathList::deserialize(&util::path_list::SerializedPathList {
675 paths,
676 order: folder_paths_order_str.unwrap_or_default(),
677 })
678 })
679 .unwrap_or_default();
680
681 Ok((
682 ThreadMetadata {
683 session_id: acp::SessionId::new(id),
684 agent_id,
685 title: title.into(),
686 updated_at,
687 created_at,
688 folder_paths,
689 archived,
690 },
691 next,
692 ))
693 }
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699 use acp_thread::{AgentConnection, StubAgentConnection};
700 use action_log::ActionLog;
701 use agent::DbThread;
702 use agent_client_protocol as acp;
703 use feature_flags::FeatureFlagAppExt;
704 use gpui::TestAppContext;
705 use project::FakeFs;
706 use project::Project;
707 use std::path::Path;
708 use std::rc::Rc;
709
710 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
711 DbThread {
712 title: title.to_string().into(),
713 messages: Vec::new(),
714 updated_at,
715 detailed_summary: None,
716 initial_project_snapshot: None,
717 cumulative_token_usage: Default::default(),
718 request_token_usage: Default::default(),
719 model: None,
720 profile: None,
721 imported: false,
722 subagent_context: None,
723 speed: None,
724 thinking_enabled: false,
725 thinking_effort: None,
726 draft_prompt: None,
727 ui_scroll_position: None,
728 }
729 }
730
731 fn make_metadata(
732 session_id: &str,
733 title: &str,
734 updated_at: DateTime<Utc>,
735 folder_paths: PathList,
736 ) -> ThreadMetadata {
737 ThreadMetadata {
738 archived: false,
739 session_id: acp::SessionId::new(session_id),
740 agent_id: agent::ZED_AGENT_ID.clone(),
741 title: title.to_string().into(),
742 updated_at,
743 created_at: Some(updated_at),
744 folder_paths,
745 }
746 }
747
748 fn init_test(cx: &mut TestAppContext) {
749 cx.update(|cx| {
750 let settings_store = settings::SettingsStore::test(cx);
751 cx.set_global(settings_store);
752 cx.update_flags(true, vec!["agent-v2".to_string()]);
753 ThreadMetadataStore::init_global(cx);
754 ThreadStore::init_global(cx);
755 });
756 cx.run_until_parked();
757 }
758
759 #[gpui::test]
760 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
761 let first_paths = PathList::new(&[Path::new("/project-a")]);
762 let second_paths = PathList::new(&[Path::new("/project-b")]);
763 let now = Utc::now();
764 let older = now - chrono::Duration::seconds(1);
765
766 let thread = std::thread::current();
767 let test_name = thread.name().unwrap_or("unknown_test");
768 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
769 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
770 &db_name,
771 )));
772
773 db.save(make_metadata(
774 "session-1",
775 "First Thread",
776 now,
777 first_paths.clone(),
778 ))
779 .await
780 .unwrap();
781 db.save(make_metadata(
782 "session-2",
783 "Second Thread",
784 older,
785 second_paths.clone(),
786 ))
787 .await
788 .unwrap();
789
790 cx.update(|cx| {
791 let settings_store = settings::SettingsStore::test(cx);
792 cx.set_global(settings_store);
793 cx.update_flags(true, vec!["agent-v2".to_string()]);
794 ThreadMetadataStore::init_global(cx);
795 });
796
797 cx.run_until_parked();
798
799 cx.update(|cx| {
800 let store = ThreadMetadataStore::global(cx);
801 let store = store.read(cx);
802
803 let entry_ids = store
804 .entry_ids()
805 .map(|session_id| session_id.0.to_string())
806 .collect::<Vec<_>>();
807 assert_eq!(entry_ids.len(), 2);
808 assert!(entry_ids.contains(&"session-1".to_string()));
809 assert!(entry_ids.contains(&"session-2".to_string()));
810
811 let first_path_entries = store
812 .entries_for_path(&first_paths)
813 .map(|entry| entry.session_id.0.to_string())
814 .collect::<Vec<_>>();
815 assert_eq!(first_path_entries, vec!["session-1"]);
816
817 let second_path_entries = store
818 .entries_for_path(&second_paths)
819 .map(|entry| entry.session_id.0.to_string())
820 .collect::<Vec<_>>();
821 assert_eq!(second_path_entries, vec!["session-2"]);
822 });
823 }
824
825 #[gpui::test]
826 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
827 init_test(cx);
828
829 let first_paths = PathList::new(&[Path::new("/project-a")]);
830 let second_paths = PathList::new(&[Path::new("/project-b")]);
831 let initial_time = Utc::now();
832 let updated_time = initial_time + chrono::Duration::seconds(1);
833
834 let initial_metadata = make_metadata(
835 "session-1",
836 "First Thread",
837 initial_time,
838 first_paths.clone(),
839 );
840
841 let second_metadata = make_metadata(
842 "session-2",
843 "Second Thread",
844 initial_time,
845 second_paths.clone(),
846 );
847
848 cx.update(|cx| {
849 let store = ThreadMetadataStore::global(cx);
850 store.update(cx, |store, cx| {
851 store.save(initial_metadata, cx);
852 store.save(second_metadata, cx);
853 });
854 });
855
856 cx.run_until_parked();
857
858 cx.update(|cx| {
859 let store = ThreadMetadataStore::global(cx);
860 let store = store.read(cx);
861
862 let first_path_entries = store
863 .entries_for_path(&first_paths)
864 .map(|entry| entry.session_id.0.to_string())
865 .collect::<Vec<_>>();
866 assert_eq!(first_path_entries, vec!["session-1"]);
867
868 let second_path_entries = store
869 .entries_for_path(&second_paths)
870 .map(|entry| entry.session_id.0.to_string())
871 .collect::<Vec<_>>();
872 assert_eq!(second_path_entries, vec!["session-2"]);
873 });
874
875 let moved_metadata = make_metadata(
876 "session-1",
877 "First Thread",
878 updated_time,
879 second_paths.clone(),
880 );
881
882 cx.update(|cx| {
883 let store = ThreadMetadataStore::global(cx);
884 store.update(cx, |store, cx| {
885 store.save(moved_metadata, cx);
886 });
887 });
888
889 cx.run_until_parked();
890
891 cx.update(|cx| {
892 let store = ThreadMetadataStore::global(cx);
893 let store = store.read(cx);
894
895 let entry_ids = store
896 .entry_ids()
897 .map(|session_id| session_id.0.to_string())
898 .collect::<Vec<_>>();
899 assert_eq!(entry_ids.len(), 2);
900 assert!(entry_ids.contains(&"session-1".to_string()));
901 assert!(entry_ids.contains(&"session-2".to_string()));
902
903 let first_path_entries = store
904 .entries_for_path(&first_paths)
905 .map(|entry| entry.session_id.0.to_string())
906 .collect::<Vec<_>>();
907 assert!(first_path_entries.is_empty());
908
909 let second_path_entries = store
910 .entries_for_path(&second_paths)
911 .map(|entry| entry.session_id.0.to_string())
912 .collect::<Vec<_>>();
913 assert_eq!(second_path_entries.len(), 2);
914 assert!(second_path_entries.contains(&"session-1".to_string()));
915 assert!(second_path_entries.contains(&"session-2".to_string()));
916 });
917
918 cx.update(|cx| {
919 let store = ThreadMetadataStore::global(cx);
920 store.update(cx, |store, cx| {
921 store.delete(acp::SessionId::new("session-2"), cx);
922 });
923 });
924
925 cx.run_until_parked();
926
927 cx.update(|cx| {
928 let store = ThreadMetadataStore::global(cx);
929 let store = store.read(cx);
930
931 let entry_ids = store
932 .entry_ids()
933 .map(|session_id| session_id.0.to_string())
934 .collect::<Vec<_>>();
935 assert_eq!(entry_ids, vec!["session-1"]);
936
937 let second_path_entries = store
938 .entries_for_path(&second_paths)
939 .map(|entry| entry.session_id.0.to_string())
940 .collect::<Vec<_>>();
941 assert_eq!(second_path_entries, vec!["session-1"]);
942 });
943 }
944
945 #[gpui::test]
946 async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
947 init_test(cx);
948
949 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
950 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
951 let now = Utc::now();
952
953 let existing_metadata = ThreadMetadata {
954 session_id: acp::SessionId::new("a-session-0"),
955 agent_id: agent::ZED_AGENT_ID.clone(),
956 title: "Existing Metadata".into(),
957 updated_at: now - chrono::Duration::seconds(10),
958 created_at: Some(now - chrono::Duration::seconds(10)),
959 folder_paths: project_a_paths.clone(),
960 archived: false,
961 };
962
963 cx.update(|cx| {
964 let store = ThreadMetadataStore::global(cx);
965 store.update(cx, |store, cx| {
966 store.save(existing_metadata, cx);
967 });
968 });
969 cx.run_until_parked();
970
971 let threads_to_save = vec![
972 (
973 "a-session-0",
974 "Thread A0 From Native Store",
975 project_a_paths.clone(),
976 now,
977 ),
978 (
979 "a-session-1",
980 "Thread A1",
981 project_a_paths.clone(),
982 now + chrono::Duration::seconds(1),
983 ),
984 (
985 "b-session-0",
986 "Thread B0",
987 project_b_paths.clone(),
988 now + chrono::Duration::seconds(2),
989 ),
990 (
991 "projectless",
992 "Projectless",
993 PathList::default(),
994 now + chrono::Duration::seconds(3),
995 ),
996 ];
997
998 for (session_id, title, paths, updated_at) in &threads_to_save {
999 let save_task = cx.update(|cx| {
1000 let thread_store = ThreadStore::global(cx);
1001 let session_id = session_id.to_string();
1002 let title = title.to_string();
1003 let paths = paths.clone();
1004 thread_store.update(cx, |store, cx| {
1005 store.save_thread(
1006 acp::SessionId::new(session_id),
1007 make_db_thread(&title, *updated_at),
1008 paths,
1009 cx,
1010 )
1011 })
1012 });
1013 save_task.await.unwrap();
1014 cx.run_until_parked();
1015 }
1016
1017 cx.update(|cx| migrate_thread_metadata(cx));
1018 cx.run_until_parked();
1019
1020 let list = cx.update(|cx| {
1021 let store = ThreadMetadataStore::global(cx);
1022 store.read(cx).entries().cloned().collect::<Vec<_>>()
1023 });
1024
1025 assert_eq!(list.len(), 4);
1026 assert!(
1027 list.iter()
1028 .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1029 );
1030
1031 let existing_metadata = list
1032 .iter()
1033 .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1034 .unwrap();
1035 assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1036 assert!(!existing_metadata.archived);
1037
1038 let migrated_session_ids = list
1039 .iter()
1040 .map(|metadata| metadata.session_id.0.as_ref())
1041 .collect::<Vec<_>>();
1042 assert!(migrated_session_ids.contains(&"a-session-1"));
1043 assert!(migrated_session_ids.contains(&"b-session-0"));
1044 assert!(migrated_session_ids.contains(&"projectless"));
1045
1046 let migrated_entries = list
1047 .iter()
1048 .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1049 .collect::<Vec<_>>();
1050 assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1051 }
1052
1053 #[gpui::test]
1054 async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1055 cx: &mut TestAppContext,
1056 ) {
1057 init_test(cx);
1058
1059 let project_paths = PathList::new(&[Path::new("/project-a")]);
1060 let existing_updated_at = Utc::now();
1061
1062 let existing_metadata = ThreadMetadata {
1063 session_id: acp::SessionId::new("existing-session"),
1064 agent_id: agent::ZED_AGENT_ID.clone(),
1065 title: "Existing Metadata".into(),
1066 updated_at: existing_updated_at,
1067 created_at: Some(existing_updated_at),
1068 folder_paths: project_paths.clone(),
1069 archived: false,
1070 };
1071
1072 cx.update(|cx| {
1073 let store = ThreadMetadataStore::global(cx);
1074 store.update(cx, |store, cx| {
1075 store.save(existing_metadata, cx);
1076 });
1077 });
1078 cx.run_until_parked();
1079
1080 let save_task = cx.update(|cx| {
1081 let thread_store = ThreadStore::global(cx);
1082 thread_store.update(cx, |store, cx| {
1083 store.save_thread(
1084 acp::SessionId::new("existing-session"),
1085 make_db_thread(
1086 "Updated Native Thread Title",
1087 existing_updated_at + chrono::Duration::seconds(1),
1088 ),
1089 project_paths.clone(),
1090 cx,
1091 )
1092 })
1093 });
1094 save_task.await.unwrap();
1095 cx.run_until_parked();
1096
1097 cx.update(|cx| migrate_thread_metadata(cx));
1098 cx.run_until_parked();
1099
1100 let list = cx.update(|cx| {
1101 let store = ThreadMetadataStore::global(cx);
1102 store.read(cx).entries().cloned().collect::<Vec<_>>()
1103 });
1104
1105 assert_eq!(list.len(), 1);
1106 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1107 }
1108
1109 #[gpui::test]
1110 async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1111 cx: &mut TestAppContext,
1112 ) {
1113 init_test(cx);
1114
1115 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1116 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1117 let now = Utc::now();
1118
1119 // Create 7 threads for project A and 3 for project B
1120 let mut threads_to_save = Vec::new();
1121 for i in 0..7 {
1122 threads_to_save.push((
1123 format!("a-session-{i}"),
1124 format!("Thread A{i}"),
1125 project_a_paths.clone(),
1126 now + chrono::Duration::seconds(i as i64),
1127 ));
1128 }
1129 for i in 0..3 {
1130 threads_to_save.push((
1131 format!("b-session-{i}"),
1132 format!("Thread B{i}"),
1133 project_b_paths.clone(),
1134 now + chrono::Duration::seconds(i as i64),
1135 ));
1136 }
1137
1138 for (session_id, title, paths, updated_at) in &threads_to_save {
1139 let save_task = cx.update(|cx| {
1140 let thread_store = ThreadStore::global(cx);
1141 let session_id = session_id.to_string();
1142 let title = title.to_string();
1143 let paths = paths.clone();
1144 thread_store.update(cx, |store, cx| {
1145 store.save_thread(
1146 acp::SessionId::new(session_id),
1147 make_db_thread(&title, *updated_at),
1148 paths,
1149 cx,
1150 )
1151 })
1152 });
1153 save_task.await.unwrap();
1154 cx.run_until_parked();
1155 }
1156
1157 cx.update(|cx| migrate_thread_metadata(cx));
1158 cx.run_until_parked();
1159
1160 let list = cx.update(|cx| {
1161 let store = ThreadMetadataStore::global(cx);
1162 store.read(cx).entries().cloned().collect::<Vec<_>>()
1163 });
1164
1165 assert_eq!(list.len(), 10);
1166
1167 // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1168 let mut project_a_entries: Vec<_> = list
1169 .iter()
1170 .filter(|m| m.folder_paths == project_a_paths)
1171 .collect();
1172 assert_eq!(project_a_entries.len(), 7);
1173 project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1174
1175 for entry in &project_a_entries[..5] {
1176 assert!(
1177 !entry.archived,
1178 "Expected {} to be unarchived (top 5 most recent)",
1179 entry.session_id.0
1180 );
1181 }
1182 for entry in &project_a_entries[5..] {
1183 assert!(
1184 entry.archived,
1185 "Expected {} to be archived (older than top 5)",
1186 entry.session_id.0
1187 );
1188 }
1189
1190 // Project B: all 3 should be unarchived (under the limit)
1191 let project_b_entries: Vec<_> = list
1192 .iter()
1193 .filter(|m| m.folder_paths == project_b_paths)
1194 .collect();
1195 assert_eq!(project_b_entries.len(), 3);
1196 assert!(project_b_entries.iter().all(|m| !m.archived));
1197 }
1198
1199 #[gpui::test]
1200 async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1201 init_test(cx);
1202
1203 let fs = FakeFs::new(cx.executor());
1204 let project = Project::test(fs, None::<&Path>, cx).await;
1205 let connection = Rc::new(StubAgentConnection::new());
1206
1207 let thread = cx
1208 .update(|cx| {
1209 connection
1210 .clone()
1211 .new_session(project.clone(), PathList::default(), cx)
1212 })
1213 .await
1214 .unwrap();
1215 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1216
1217 cx.update(|cx| {
1218 thread.update(cx, |thread, cx| {
1219 thread.set_title("Draft Thread".into(), cx).detach();
1220 });
1221 });
1222 cx.run_until_parked();
1223
1224 let metadata_ids = cx.update(|cx| {
1225 ThreadMetadataStore::global(cx)
1226 .read(cx)
1227 .entry_ids()
1228 .collect::<Vec<_>>()
1229 });
1230 assert_eq!(metadata_ids, vec![session_id]);
1231
1232 drop(thread);
1233 cx.update(|_| {});
1234 cx.run_until_parked();
1235 cx.run_until_parked();
1236
1237 let metadata_ids = cx.update(|cx| {
1238 ThreadMetadataStore::global(cx)
1239 .read(cx)
1240 .entry_ids()
1241 .collect::<Vec<_>>()
1242 });
1243 assert!(
1244 metadata_ids.is_empty(),
1245 "expected empty draft thread metadata to be deleted on release"
1246 );
1247 }
1248
1249 #[gpui::test]
1250 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1251 init_test(cx);
1252
1253 let fs = FakeFs::new(cx.executor());
1254 let project = Project::test(fs, None::<&Path>, cx).await;
1255 let connection = Rc::new(StubAgentConnection::new());
1256
1257 let thread = cx
1258 .update(|cx| {
1259 connection
1260 .clone()
1261 .new_session(project.clone(), PathList::default(), cx)
1262 })
1263 .await
1264 .unwrap();
1265 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1266
1267 cx.update(|cx| {
1268 thread.update(cx, |thread, cx| {
1269 thread.push_user_content_block(None, "Hello".into(), cx);
1270 });
1271 });
1272 cx.run_until_parked();
1273
1274 let metadata_ids = cx.update(|cx| {
1275 ThreadMetadataStore::global(cx)
1276 .read(cx)
1277 .entry_ids()
1278 .collect::<Vec<_>>()
1279 });
1280 assert_eq!(metadata_ids, vec![session_id.clone()]);
1281
1282 drop(thread);
1283 cx.update(|_| {});
1284 cx.run_until_parked();
1285
1286 let metadata_ids = cx.update(|cx| {
1287 ThreadMetadataStore::global(cx)
1288 .read(cx)
1289 .entry_ids()
1290 .collect::<Vec<_>>()
1291 });
1292 assert_eq!(metadata_ids, vec![session_id]);
1293 }
1294
1295 #[gpui::test]
1296 async fn test_threads_without_project_association_are_archived_by_default(
1297 cx: &mut TestAppContext,
1298 ) {
1299 init_test(cx);
1300
1301 let fs = FakeFs::new(cx.executor());
1302 let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1303 let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1304 let connection = Rc::new(StubAgentConnection::new());
1305
1306 let thread_without_worktree = cx
1307 .update(|cx| {
1308 connection.clone().new_session(
1309 project_without_worktree.clone(),
1310 PathList::default(),
1311 cx,
1312 )
1313 })
1314 .await
1315 .unwrap();
1316 let session_without_worktree =
1317 cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1318
1319 cx.update(|cx| {
1320 thread_without_worktree.update(cx, |thread, cx| {
1321 thread.set_title("No Project Thread".into(), cx).detach();
1322 });
1323 });
1324 cx.run_until_parked();
1325
1326 let thread_with_worktree = cx
1327 .update(|cx| {
1328 connection.clone().new_session(
1329 project_with_worktree.clone(),
1330 PathList::default(),
1331 cx,
1332 )
1333 })
1334 .await
1335 .unwrap();
1336 let session_with_worktree =
1337 cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1338
1339 cx.update(|cx| {
1340 thread_with_worktree.update(cx, |thread, cx| {
1341 thread.set_title("Project Thread".into(), cx).detach();
1342 });
1343 });
1344 cx.run_until_parked();
1345
1346 cx.update(|cx| {
1347 let store = ThreadMetadataStore::global(cx);
1348 let store = store.read(cx);
1349
1350 let without_worktree = store
1351 .entry(&session_without_worktree)
1352 .expect("missing metadata for thread without project association");
1353 assert!(without_worktree.folder_paths.is_empty());
1354 assert!(
1355 without_worktree.archived,
1356 "expected thread without project association to be archived"
1357 );
1358
1359 let with_worktree = store
1360 .entry(&session_with_worktree)
1361 .expect("missing metadata for thread with project association");
1362 assert_eq!(
1363 with_worktree.folder_paths,
1364 PathList::new(&[Path::new("/project-a")])
1365 );
1366 assert!(
1367 !with_worktree.archived,
1368 "expected thread with project association to remain unarchived"
1369 );
1370 });
1371 }
1372
1373 #[gpui::test]
1374 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1375 init_test(cx);
1376
1377 let fs = FakeFs::new(cx.executor());
1378 let project = Project::test(fs, None::<&Path>, cx).await;
1379 let connection = Rc::new(StubAgentConnection::new());
1380
1381 // Create a regular (non-subagent) AcpThread.
1382 let regular_thread = cx
1383 .update(|cx| {
1384 connection
1385 .clone()
1386 .new_session(project.clone(), PathList::default(), cx)
1387 })
1388 .await
1389 .unwrap();
1390
1391 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1392
1393 // Set a title on the regular thread to trigger a save via handle_thread_update.
1394 cx.update(|cx| {
1395 regular_thread.update(cx, |thread, cx| {
1396 thread.set_title("Regular Thread".into(), cx).detach();
1397 });
1398 });
1399 cx.run_until_parked();
1400
1401 // Create a subagent AcpThread
1402 let subagent_session_id = acp::SessionId::new("subagent-session");
1403 let subagent_thread = cx.update(|cx| {
1404 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1405 cx.new(|cx| {
1406 acp_thread::AcpThread::new(
1407 Some(regular_session_id.clone()),
1408 Some("Subagent Thread".into()),
1409 None,
1410 connection.clone(),
1411 project.clone(),
1412 action_log,
1413 subagent_session_id.clone(),
1414 watch::Receiver::constant(acp::PromptCapabilities::new()),
1415 cx,
1416 )
1417 })
1418 });
1419
1420 // Set a title on the subagent thread to trigger handle_thread_update.
1421 cx.update(|cx| {
1422 subagent_thread.update(cx, |thread, cx| {
1423 thread
1424 .set_title("Subagent Thread Title".into(), cx)
1425 .detach();
1426 });
1427 });
1428 cx.run_until_parked();
1429
1430 // List all metadata from the store cache.
1431 let list = cx.update(|cx| {
1432 let store = ThreadMetadataStore::global(cx);
1433 store.read(cx).entries().cloned().collect::<Vec<_>>()
1434 });
1435
1436 // The subagent thread should NOT appear in the sidebar metadata.
1437 // Only the regular thread should be listed.
1438 assert_eq!(
1439 list.len(),
1440 1,
1441 "Expected only the regular thread in sidebar metadata, \
1442 but found {} entries (subagent threads are leaking into the sidebar)",
1443 list.len(),
1444 );
1445 assert_eq!(list[0].session_id, regular_session_id);
1446 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1447 }
1448
1449 #[test]
1450 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1451 let now = Utc::now();
1452
1453 let operations = vec![
1454 DbOperation::Upsert(make_metadata(
1455 "session-1",
1456 "First Thread",
1457 now,
1458 PathList::default(),
1459 )),
1460 DbOperation::Delete(acp::SessionId::new("session-1")),
1461 ];
1462
1463 let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1464
1465 assert_eq!(deduped.len(), 1);
1466 assert_eq!(
1467 deduped[0],
1468 DbOperation::Delete(acp::SessionId::new("session-1"))
1469 );
1470 }
1471
1472 #[test]
1473 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1474 let now = Utc::now();
1475 let later = now + chrono::Duration::seconds(1);
1476
1477 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1478 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1479
1480 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1481 DbOperation::Upsert(old_metadata),
1482 DbOperation::Upsert(new_metadata.clone()),
1483 ]);
1484
1485 assert_eq!(deduped.len(), 1);
1486 assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1487 }
1488
1489 #[test]
1490 fn test_dedup_db_operations_preserves_distinct_sessions() {
1491 let now = Utc::now();
1492
1493 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1494 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1495 let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1496 DbOperation::Upsert(metadata1.clone()),
1497 DbOperation::Upsert(metadata2.clone()),
1498 ]);
1499
1500 assert_eq!(deduped.len(), 2);
1501 assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1502 assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1503 }
1504
1505 #[gpui::test]
1506 async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1507 init_test(cx);
1508
1509 let paths = PathList::new(&[Path::new("/project-a")]);
1510 let now = Utc::now();
1511 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1512
1513 cx.update(|cx| {
1514 let store = ThreadMetadataStore::global(cx);
1515 store.update(cx, |store, cx| {
1516 store.save(metadata, cx);
1517 });
1518 });
1519
1520 cx.run_until_parked();
1521
1522 cx.update(|cx| {
1523 let store = ThreadMetadataStore::global(cx);
1524 let store = store.read(cx);
1525
1526 let path_entries = store
1527 .entries_for_path(&paths)
1528 .map(|e| e.session_id.0.to_string())
1529 .collect::<Vec<_>>();
1530 assert_eq!(path_entries, vec!["session-1"]);
1531
1532 let archived = store
1533 .archived_entries()
1534 .map(|e| e.session_id.0.to_string())
1535 .collect::<Vec<_>>();
1536 assert!(archived.is_empty());
1537 });
1538
1539 cx.update(|cx| {
1540 let store = ThreadMetadataStore::global(cx);
1541 store.update(cx, |store, cx| {
1542 store.archive(&acp::SessionId::new("session-1"), cx);
1543 });
1544 });
1545
1546 cx.run_until_parked();
1547
1548 cx.update(|cx| {
1549 let store = ThreadMetadataStore::global(cx);
1550 let store = store.read(cx);
1551
1552 let path_entries = store
1553 .entries_for_path(&paths)
1554 .map(|e| e.session_id.0.to_string())
1555 .collect::<Vec<_>>();
1556 assert!(path_entries.is_empty());
1557
1558 let archived = store.archived_entries().collect::<Vec<_>>();
1559 assert_eq!(archived.len(), 1);
1560 assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1561 assert!(archived[0].archived);
1562 });
1563
1564 cx.update(|cx| {
1565 let store = ThreadMetadataStore::global(cx);
1566 store.update(cx, |store, cx| {
1567 store.unarchive(&acp::SessionId::new("session-1"), cx);
1568 });
1569 });
1570
1571 cx.run_until_parked();
1572
1573 cx.update(|cx| {
1574 let store = ThreadMetadataStore::global(cx);
1575 let store = store.read(cx);
1576
1577 let path_entries = store
1578 .entries_for_path(&paths)
1579 .map(|e| e.session_id.0.to_string())
1580 .collect::<Vec<_>>();
1581 assert_eq!(path_entries, vec!["session-1"]);
1582
1583 let archived = store
1584 .archived_entries()
1585 .map(|e| e.session_id.0.to_string())
1586 .collect::<Vec<_>>();
1587 assert!(archived.is_empty());
1588 });
1589 }
1590
1591 #[gpui::test]
1592 async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1593 init_test(cx);
1594
1595 let paths = PathList::new(&[Path::new("/project-a")]);
1596 let now = Utc::now();
1597
1598 let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1599 let metadata2 = make_metadata(
1600 "session-2",
1601 "Archived Thread",
1602 now - chrono::Duration::seconds(1),
1603 paths.clone(),
1604 );
1605
1606 cx.update(|cx| {
1607 let store = ThreadMetadataStore::global(cx);
1608 store.update(cx, |store, cx| {
1609 store.save(metadata1, cx);
1610 store.save(metadata2, cx);
1611 });
1612 });
1613
1614 cx.run_until_parked();
1615
1616 cx.update(|cx| {
1617 let store = ThreadMetadataStore::global(cx);
1618 store.update(cx, |store, cx| {
1619 store.archive(&acp::SessionId::new("session-2"), cx);
1620 });
1621 });
1622
1623 cx.run_until_parked();
1624
1625 cx.update(|cx| {
1626 let store = ThreadMetadataStore::global(cx);
1627 let store = store.read(cx);
1628
1629 let path_entries = store
1630 .entries_for_path(&paths)
1631 .map(|e| e.session_id.0.to_string())
1632 .collect::<Vec<_>>();
1633 assert_eq!(path_entries, vec!["session-1"]);
1634
1635 let all_entries = store
1636 .entries()
1637 .map(|e| e.session_id.0.to_string())
1638 .collect::<Vec<_>>();
1639 assert_eq!(all_entries.len(), 2);
1640 assert!(all_entries.contains(&"session-1".to_string()));
1641 assert!(all_entries.contains(&"session-2".to_string()));
1642
1643 let archived = store
1644 .archived_entries()
1645 .map(|e| e.session_id.0.to_string())
1646 .collect::<Vec<_>>();
1647 assert_eq!(archived, vec!["session-2"]);
1648 });
1649 }
1650
1651 #[gpui::test]
1652 async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1653 init_test(cx);
1654
1655 let paths = PathList::new(&[Path::new("/project-a")]);
1656 let now = Utc::now();
1657
1658 let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1659 let m2 = make_metadata(
1660 "session-2",
1661 "Thread Two",
1662 now - chrono::Duration::seconds(1),
1663 paths.clone(),
1664 );
1665 let m3 = make_metadata(
1666 "session-3",
1667 "Thread Three",
1668 now - chrono::Duration::seconds(2),
1669 paths,
1670 );
1671
1672 cx.update(|cx| {
1673 let store = ThreadMetadataStore::global(cx);
1674 store.update(cx, |store, cx| {
1675 store.save_all(vec![m1, m2, m3], cx);
1676 });
1677 });
1678
1679 cx.run_until_parked();
1680
1681 cx.update(|cx| {
1682 let store = ThreadMetadataStore::global(cx);
1683 let store = store.read(cx);
1684
1685 let all_entries = store
1686 .entries()
1687 .map(|e| e.session_id.0.to_string())
1688 .collect::<Vec<_>>();
1689 assert_eq!(all_entries.len(), 3);
1690 assert!(all_entries.contains(&"session-1".to_string()));
1691 assert!(all_entries.contains(&"session-2".to_string()));
1692 assert!(all_entries.contains(&"session-3".to_string()));
1693
1694 let entry_ids = store.entry_ids().collect::<Vec<_>>();
1695 assert_eq!(entry_ids.len(), 3);
1696 });
1697 }
1698
1699 #[gpui::test]
1700 async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1701 init_test(cx);
1702
1703 let paths = PathList::new(&[Path::new("/project-a")]);
1704 let now = Utc::now();
1705 let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1706
1707 cx.update(|cx| {
1708 let store = ThreadMetadataStore::global(cx);
1709 store.update(cx, |store, cx| {
1710 store.save(metadata, cx);
1711 });
1712 });
1713
1714 cx.run_until_parked();
1715
1716 cx.update(|cx| {
1717 let store = ThreadMetadataStore::global(cx);
1718 store.update(cx, |store, cx| {
1719 store.archive(&acp::SessionId::new("session-1"), cx);
1720 });
1721 });
1722
1723 cx.run_until_parked();
1724
1725 cx.update(|cx| {
1726 let store = ThreadMetadataStore::global(cx);
1727 store.update(cx, |store, cx| {
1728 let _ = store.reload(cx);
1729 });
1730 });
1731
1732 cx.run_until_parked();
1733
1734 cx.update(|cx| {
1735 let store = ThreadMetadataStore::global(cx);
1736 let store = store.read(cx);
1737
1738 let thread = store
1739 .entries()
1740 .find(|e| e.session_id.0.as_ref() == "session-1")
1741 .expect("thread should exist after reload");
1742 assert!(thread.archived);
1743
1744 let path_entries = store
1745 .entries_for_path(&paths)
1746 .map(|e| e.session_id.0.to_string())
1747 .collect::<Vec<_>>();
1748 assert!(path_entries.is_empty());
1749
1750 let archived = store
1751 .archived_entries()
1752 .map(|e| e.session_id.0.to_string())
1753 .collect::<Vec<_>>();
1754 assert_eq!(archived, vec!["session-1"]);
1755 });
1756 }
1757
1758 #[gpui::test]
1759 async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1760 init_test(cx);
1761
1762 cx.run_until_parked();
1763
1764 cx.update(|cx| {
1765 let store = ThreadMetadataStore::global(cx);
1766 store.update(cx, |store, cx| {
1767 store.archive(&acp::SessionId::new("nonexistent"), cx);
1768 });
1769 });
1770
1771 cx.run_until_parked();
1772
1773 cx.update(|cx| {
1774 let store = ThreadMetadataStore::global(cx);
1775 let store = store.read(cx);
1776
1777 assert!(store.is_empty());
1778 assert_eq!(store.entries().count(), 0);
1779 assert_eq!(store.archived_entries().count(), 0);
1780 });
1781 }
1782
1783 #[gpui::test]
1784 async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1785 init_test(cx);
1786
1787 let paths = PathList::new(&[Path::new("/project-a")]);
1788 let now = Utc::now();
1789 let metadata = make_metadata("session-1", "Thread 1", now, paths);
1790 let session_id = metadata.session_id.clone();
1791
1792 cx.update(|cx| {
1793 let store = ThreadMetadataStore::global(cx);
1794 store.update(cx, |store, cx| {
1795 store.save(metadata.clone(), cx);
1796 store.archive(&session_id, cx);
1797 });
1798 });
1799
1800 cx.run_until_parked();
1801
1802 cx.update(|cx| {
1803 let store = ThreadMetadataStore::global(cx);
1804 let store = store.read(cx);
1805
1806 let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1807 pretty_assertions::assert_eq!(
1808 entries,
1809 vec![ThreadMetadata {
1810 archived: true,
1811 ..metadata
1812 }]
1813 );
1814 });
1815 }
1816}