1use std::{path::Path, sync::Arc};
2
3use acp_thread::AgentSessionInfo;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::Context as _;
7use chrono::{DateTime, Utc};
8use collections::HashMap;
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use futures::{FutureExt as _, future::Shared};
18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
19use project::AgentId;
20use ui::{App, Context, SharedString};
21use util::ResultExt as _;
22use workspace::PathList;
23
24use crate::DEFAULT_THREAD_TITLE;
25
26pub fn init(cx: &mut App) {
27 SidebarThreadMetadataStore::init_global(cx);
28
29 if cx.has_flag::<AgentV2FeatureFlag>() {
30 migrate_thread_metadata(cx);
31 }
32 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
33 if has_flag {
34 migrate_thread_metadata(cx);
35 }
36 })
37 .detach();
38}
39
40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
41/// We migrate the last 10 threads per project and skip threads that do not have a project.
42///
43/// TODO: Remove this after N weeks of shipping the sidebar
44fn migrate_thread_metadata(cx: &mut App) {
45 const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
46
47 let store = SidebarThreadMetadataStore::global(cx);
48 let db = store.read(cx).db.clone();
49
50 cx.spawn(async move |cx| {
51 if !db.is_empty()? {
52 return Ok::<(), anyhow::Error>(());
53 }
54
55 let metadata = store.read_with(cx, |_store, app| {
56 let mut migrated_threads_per_project = HashMap::default();
57
58 ThreadStore::global(app)
59 .read(app)
60 .entries()
61 .filter_map(|entry| {
62 if entry.folder_paths.is_empty() {
63 return None;
64 }
65
66 let migrated_thread_count = migrated_threads_per_project
67 .entry(entry.folder_paths.clone())
68 .or_insert(0);
69 if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
70 return None;
71 }
72 *migrated_thread_count += 1;
73
74 Some(ThreadMetadata {
75 session_id: entry.id,
76 agent_id: None,
77 title: entry.title,
78 updated_at: entry.updated_at,
79 created_at: entry.created_at,
80 folder_paths: entry.folder_paths,
81 })
82 })
83 .collect::<Vec<_>>()
84 });
85
86 log::info!("Migrating {} thread store entries", metadata.len());
87
88 // Manually save each entry to the database and call reload, otherwise
89 // we'll end up triggering lots of reloads after each save
90 for entry in metadata {
91 db.save(entry).await?;
92 }
93
94 log::info!("Finished migrating thread store entries");
95
96 let _ = store.update(cx, |store, cx| store.reload(cx));
97 Ok(())
98 })
99 .detach_and_log_err(cx);
100}
101
102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
103impl Global for GlobalThreadMetadataStore {}
104
105/// Lightweight metadata for any thread (native or ACP), enough to populate
106/// the sidebar list and route to the correct load path when clicked.
107#[derive(Debug, Clone, PartialEq)]
108pub struct ThreadMetadata {
109 pub session_id: acp::SessionId,
110 /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
111 pub agent_id: Option<AgentId>,
112 pub title: SharedString,
113 pub updated_at: DateTime<Utc>,
114 pub created_at: Option<DateTime<Utc>>,
115 pub folder_paths: PathList,
116}
117
118impl ThreadMetadata {
119 pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
120 let session_id = session.session_id.clone();
121 let title = session.title.clone().unwrap_or_default();
122 let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
123 let created_at = session.created_at.unwrap_or(updated_at);
124 let folder_paths = session.work_dirs.clone().unwrap_or_default();
125 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
126 None
127 } else {
128 Some(agent_id)
129 };
130 Self {
131 session_id,
132 agent_id,
133 title,
134 updated_at,
135 created_at: Some(created_at),
136 folder_paths,
137 }
138 }
139
140 pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
141 let thread_ref = thread.read(cx);
142 let session_id = thread_ref.session_id().clone();
143 let title = thread_ref
144 .title()
145 .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
146 let updated_at = Utc::now();
147
148 let agent_id = thread_ref.connection().agent_id();
149
150 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
151 None
152 } else {
153 Some(agent_id)
154 };
155
156 let folder_paths = {
157 let project = thread_ref.project().read(cx);
158 let paths: Vec<Arc<Path>> = project
159 .visible_worktrees(cx)
160 .map(|worktree| worktree.read(cx).abs_path())
161 .collect();
162 PathList::new(&paths)
163 };
164
165 Self {
166 session_id,
167 agent_id,
168 title,
169 created_at: Some(updated_at), // handled by db `ON CONFLICT`
170 updated_at,
171 folder_paths,
172 }
173 }
174}
175
176/// The store holds all metadata needed to show threads in the sidebar.
177/// Effectively, all threads stored in here are "non-archived".
178///
179/// Automatically listens to AcpThread events and updates metadata if it has changed.
180pub struct SidebarThreadMetadataStore {
181 db: ThreadMetadataDb,
182 threads: Vec<ThreadMetadata>,
183 threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
184 reload_task: Option<Shared<Task<()>>>,
185 session_subscriptions: HashMap<acp::SessionId, Subscription>,
186 pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
187 _db_operations_task: Task<()>,
188}
189
190#[derive(Debug, PartialEq)]
191enum DbOperation {
192 Insert(ThreadMetadata),
193 Delete(acp::SessionId),
194}
195
196impl DbOperation {
197 fn id(&self) -> &acp::SessionId {
198 match self {
199 DbOperation::Insert(thread) => &thread.session_id,
200 DbOperation::Delete(session_id) => session_id,
201 }
202 }
203}
204
205impl SidebarThreadMetadataStore {
206 #[cfg(not(any(test, feature = "test-support")))]
207 pub fn init_global(cx: &mut App) {
208 if cx.has_global::<Self>() {
209 return;
210 }
211
212 let db = ThreadMetadataDb::global(cx);
213 let thread_store = cx.new(|cx| Self::new(db, cx));
214 cx.set_global(GlobalThreadMetadataStore(thread_store));
215 }
216
217 #[cfg(any(test, feature = "test-support"))]
218 pub fn init_global(cx: &mut App) {
219 let thread = std::thread::current();
220 let test_name = thread.name().unwrap_or("unknown_test");
221 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
222 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
223 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
224 cx.set_global(GlobalThreadMetadataStore(thread_store));
225 }
226
227 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
228 cx.try_global::<GlobalThreadMetadataStore>()
229 .map(|store| store.0.clone())
230 }
231
232 pub fn global(cx: &App) -> Entity<Self> {
233 cx.global::<GlobalThreadMetadataStore>().0.clone()
234 }
235
236 pub fn is_empty(&self) -> bool {
237 self.threads.is_empty()
238 }
239
240 pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
241 self.threads.iter().cloned()
242 }
243
244 pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
245 self.threads.iter().map(|thread| thread.session_id.clone())
246 }
247
248 pub fn entries_for_path(
249 &self,
250 path_list: &PathList,
251 ) -> impl Iterator<Item = ThreadMetadata> + '_ {
252 self.threads_by_paths
253 .get(path_list)
254 .into_iter()
255 .flatten()
256 .cloned()
257 }
258
259 fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
260 let db = self.db.clone();
261 self.reload_task.take();
262
263 let list_task = cx
264 .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
265
266 let reload_task = cx
267 .spawn(async move |this, cx| {
268 let Some(rows) = list_task.await.log_err() else {
269 return;
270 };
271
272 this.update(cx, |this, cx| {
273 this.threads.clear();
274 this.threads_by_paths.clear();
275
276 for row in rows {
277 this.threads_by_paths
278 .entry(row.folder_paths.clone())
279 .or_default()
280 .push(row.clone());
281 this.threads.push(row);
282 }
283
284 cx.notify();
285 })
286 .ok();
287 })
288 .shared();
289 self.reload_task = Some(reload_task.clone());
290 reload_task
291 }
292
293 pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
294 if !cx.has_flag::<AgentV2FeatureFlag>() {
295 return;
296 }
297
298 self.pending_thread_ops_tx
299 .try_send(DbOperation::Insert(metadata))
300 .log_err();
301 }
302
303 pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
304 if !cx.has_flag::<AgentV2FeatureFlag>() {
305 return;
306 }
307
308 self.pending_thread_ops_tx
309 .try_send(DbOperation::Delete(session_id))
310 .log_err();
311 }
312
313 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
314 let weak_store = cx.weak_entity();
315
316 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
317 // Don't track subagent threads in the sidebar.
318 if thread.parent_session_id().is_some() {
319 return;
320 }
321
322 let thread_entity = cx.entity();
323
324 cx.on_release({
325 let weak_store = weak_store.clone();
326 move |thread, cx| {
327 weak_store
328 .update(cx, |store, cx| {
329 let session_id = thread.session_id().clone();
330 store.session_subscriptions.remove(&session_id);
331 if thread.entries().is_empty() {
332 // Empty threads can be unloaded without ever being
333 // durably persisted by the underlying agent.
334 store.delete(session_id, cx);
335 }
336 })
337 .ok();
338 }
339 })
340 .detach();
341
342 weak_store
343 .update(cx, |this, cx| {
344 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
345 this.session_subscriptions
346 .insert(thread.session_id().clone(), subscription);
347 })
348 .ok();
349 })
350 .detach();
351
352 let (tx, rx) = smol::channel::unbounded();
353 let _db_operations_task = cx.spawn({
354 let db = db.clone();
355 async move |this, cx| {
356 while let Ok(first_update) = rx.recv().await {
357 let mut updates = vec![first_update];
358 while let Ok(update) = rx.try_recv() {
359 updates.push(update);
360 }
361 let updates = Self::dedup_db_operations(updates);
362 for operation in updates {
363 match operation {
364 DbOperation::Insert(metadata) => {
365 db.save(metadata).await.log_err();
366 }
367 DbOperation::Delete(session_id) => {
368 db.delete(session_id).await.log_err();
369 }
370 }
371 }
372
373 this.update(cx, |this, cx| this.reload(cx)).ok();
374 }
375 }
376 });
377
378 let mut this = Self {
379 db,
380 threads: Vec::new(),
381 threads_by_paths: HashMap::default(),
382 reload_task: None,
383 session_subscriptions: HashMap::default(),
384 pending_thread_ops_tx: tx,
385 _db_operations_task,
386 };
387 let _ = this.reload(cx);
388 this
389 }
390
391 fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
392 let mut ops = HashMap::default();
393 for operation in operations.into_iter().rev() {
394 if ops.contains_key(operation.id()) {
395 continue;
396 }
397 ops.insert(operation.id().clone(), operation);
398 }
399 ops.into_values().collect()
400 }
401
402 fn handle_thread_update(
403 &mut self,
404 thread: Entity<acp_thread::AcpThread>,
405 event: &acp_thread::AcpThreadEvent,
406 cx: &mut Context<Self>,
407 ) {
408 // Don't track subagent threads in the sidebar.
409 if thread.read(cx).parent_session_id().is_some() {
410 return;
411 }
412
413 match event {
414 acp_thread::AcpThreadEvent::NewEntry
415 | acp_thread::AcpThreadEvent::TitleUpdated
416 | acp_thread::AcpThreadEvent::EntryUpdated(_)
417 | acp_thread::AcpThreadEvent::EntriesRemoved(_)
418 | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
419 | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
420 | acp_thread::AcpThreadEvent::Retry(_)
421 | acp_thread::AcpThreadEvent::Stopped(_)
422 | acp_thread::AcpThreadEvent::Error
423 | acp_thread::AcpThreadEvent::LoadError(_)
424 | acp_thread::AcpThreadEvent::Refusal => {
425 let metadata = ThreadMetadata::from_thread(&thread, cx);
426 self.save(metadata, cx);
427 }
428 _ => {}
429 }
430 }
431}
432
433impl Global for SidebarThreadMetadataStore {}
434
435struct ThreadMetadataDb(ThreadSafeConnection);
436
437impl Domain for ThreadMetadataDb {
438 const NAME: &str = stringify!(ThreadMetadataDb);
439
440 const MIGRATIONS: &[&str] = &[sql!(
441 CREATE TABLE IF NOT EXISTS sidebar_threads(
442 session_id TEXT PRIMARY KEY,
443 agent_id TEXT,
444 title TEXT NOT NULL,
445 updated_at TEXT NOT NULL,
446 created_at TEXT,
447 folder_paths TEXT,
448 folder_paths_order TEXT
449 ) STRICT;
450 )];
451}
452
453db::static_connection!(ThreadMetadataDb, []);
454
455impl ThreadMetadataDb {
456 pub fn is_empty(&self) -> anyhow::Result<bool> {
457 self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
458 .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
459 }
460
461 /// List all sidebar thread metadata, ordered by updated_at descending.
462 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
463 self.select::<ThreadMetadata>(
464 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
465 FROM sidebar_threads \
466 ORDER BY updated_at DESC"
467 )?()
468 }
469
470 /// Upsert metadata for a thread.
471 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
472 let id = row.session_id.0.clone();
473 let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
474 let title = row.title.to_string();
475 let updated_at = row.updated_at.to_rfc3339();
476 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
477 let serialized = row.folder_paths.serialize();
478 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
479 (None, None)
480 } else {
481 (Some(serialized.paths), Some(serialized.order))
482 };
483
484 self.write(move |conn| {
485 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
486 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
487 ON CONFLICT(session_id) DO UPDATE SET \
488 agent_id = excluded.agent_id, \
489 title = excluded.title, \
490 updated_at = excluded.updated_at, \
491 folder_paths = excluded.folder_paths, \
492 folder_paths_order = excluded.folder_paths_order";
493 let mut stmt = Statement::prepare(conn, sql)?;
494 let mut i = stmt.bind(&id, 1)?;
495 i = stmt.bind(&agent_id, i)?;
496 i = stmt.bind(&title, i)?;
497 i = stmt.bind(&updated_at, i)?;
498 i = stmt.bind(&created_at, i)?;
499 i = stmt.bind(&folder_paths, i)?;
500 stmt.bind(&folder_paths_order, i)?;
501 stmt.exec()
502 })
503 .await
504 }
505
506 /// Delete metadata for a single thread.
507 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
508 let id = session_id.0.clone();
509 self.write(move |conn| {
510 let mut stmt =
511 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
512 stmt.bind(&id, 1)?;
513 stmt.exec()
514 })
515 .await
516 }
517}
518
519impl Column for ThreadMetadata {
520 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
521 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
522 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
523 let (title, next): (String, i32) = Column::column(statement, next)?;
524 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
525 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
526 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
527 let (folder_paths_order_str, next): (Option<String>, i32) =
528 Column::column(statement, next)?;
529
530 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
531 let created_at = created_at_str
532 .as_deref()
533 .map(DateTime::parse_from_rfc3339)
534 .transpose()?
535 .map(|dt| dt.with_timezone(&Utc));
536
537 let folder_paths = folder_paths_str
538 .map(|paths| {
539 PathList::deserialize(&util::path_list::SerializedPathList {
540 paths,
541 order: folder_paths_order_str.unwrap_or_default(),
542 })
543 })
544 .unwrap_or_default();
545
546 Ok((
547 ThreadMetadata {
548 session_id: acp::SessionId::new(id),
549 agent_id: agent_id.map(|id| AgentId::new(id)),
550 title: title.into(),
551 updated_at,
552 created_at,
553 folder_paths,
554 },
555 next,
556 ))
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use super::*;
563 use acp_thread::{AgentConnection, StubAgentConnection};
564 use action_log::ActionLog;
565 use agent::DbThread;
566 use agent_client_protocol as acp;
567 use feature_flags::FeatureFlagAppExt;
568 use gpui::TestAppContext;
569 use project::FakeFs;
570 use project::Project;
571 use std::path::Path;
572 use std::rc::Rc;
573
574 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
575 DbThread {
576 title: title.to_string().into(),
577 messages: Vec::new(),
578 updated_at,
579 detailed_summary: None,
580 initial_project_snapshot: None,
581 cumulative_token_usage: Default::default(),
582 request_token_usage: Default::default(),
583 model: None,
584 profile: None,
585 imported: false,
586 subagent_context: None,
587 speed: None,
588 thinking_enabled: false,
589 thinking_effort: None,
590 draft_prompt: None,
591 ui_scroll_position: None,
592 }
593 }
594
595 fn make_metadata(
596 session_id: &str,
597 title: &str,
598 updated_at: DateTime<Utc>,
599 folder_paths: PathList,
600 ) -> ThreadMetadata {
601 ThreadMetadata {
602 session_id: acp::SessionId::new(session_id),
603 agent_id: None,
604 title: title.to_string().into(),
605 updated_at,
606 created_at: Some(updated_at),
607 folder_paths,
608 }
609 }
610
611 #[gpui::test]
612 async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
613 let first_paths = PathList::new(&[Path::new("/project-a")]);
614 let second_paths = PathList::new(&[Path::new("/project-b")]);
615 let now = Utc::now();
616 let older = now - chrono::Duration::seconds(1);
617
618 let thread = std::thread::current();
619 let test_name = thread.name().unwrap_or("unknown_test");
620 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
621 let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
622 &db_name,
623 )));
624
625 db.save(make_metadata(
626 "session-1",
627 "First Thread",
628 now,
629 first_paths.clone(),
630 ))
631 .await
632 .unwrap();
633 db.save(make_metadata(
634 "session-2",
635 "Second Thread",
636 older,
637 second_paths.clone(),
638 ))
639 .await
640 .unwrap();
641
642 cx.update(|cx| {
643 let settings_store = settings::SettingsStore::test(cx);
644 cx.set_global(settings_store);
645 cx.update_flags(true, vec!["agent-v2".to_string()]);
646 SidebarThreadMetadataStore::init_global(cx);
647 });
648
649 cx.run_until_parked();
650
651 cx.update(|cx| {
652 let store = SidebarThreadMetadataStore::global(cx);
653 let store = store.read(cx);
654
655 let entry_ids = store
656 .entry_ids()
657 .map(|session_id| session_id.0.to_string())
658 .collect::<Vec<_>>();
659 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
660
661 let first_path_entries = store
662 .entries_for_path(&first_paths)
663 .map(|entry| entry.session_id.0.to_string())
664 .collect::<Vec<_>>();
665 assert_eq!(first_path_entries, vec!["session-1"]);
666
667 let second_path_entries = store
668 .entries_for_path(&second_paths)
669 .map(|entry| entry.session_id.0.to_string())
670 .collect::<Vec<_>>();
671 assert_eq!(second_path_entries, vec!["session-2"]);
672 });
673 }
674
675 #[gpui::test]
676 async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
677 cx.update(|cx| {
678 let settings_store = settings::SettingsStore::test(cx);
679 cx.set_global(settings_store);
680 cx.update_flags(true, vec!["agent-v2".to_string()]);
681 SidebarThreadMetadataStore::init_global(cx);
682 });
683
684 let first_paths = PathList::new(&[Path::new("/project-a")]);
685 let second_paths = PathList::new(&[Path::new("/project-b")]);
686 let initial_time = Utc::now();
687 let updated_time = initial_time + chrono::Duration::seconds(1);
688
689 let initial_metadata = make_metadata(
690 "session-1",
691 "First Thread",
692 initial_time,
693 first_paths.clone(),
694 );
695
696 let second_metadata = make_metadata(
697 "session-2",
698 "Second Thread",
699 initial_time,
700 second_paths.clone(),
701 );
702
703 cx.update(|cx| {
704 let store = SidebarThreadMetadataStore::global(cx);
705 store.update(cx, |store, cx| {
706 store.save(initial_metadata, cx);
707 store.save(second_metadata, cx);
708 });
709 });
710
711 cx.run_until_parked();
712
713 cx.update(|cx| {
714 let store = SidebarThreadMetadataStore::global(cx);
715 let store = store.read(cx);
716
717 let first_path_entries = store
718 .entries_for_path(&first_paths)
719 .map(|entry| entry.session_id.0.to_string())
720 .collect::<Vec<_>>();
721 assert_eq!(first_path_entries, vec!["session-1"]);
722
723 let second_path_entries = store
724 .entries_for_path(&second_paths)
725 .map(|entry| entry.session_id.0.to_string())
726 .collect::<Vec<_>>();
727 assert_eq!(second_path_entries, vec!["session-2"]);
728 });
729
730 let moved_metadata = make_metadata(
731 "session-1",
732 "First Thread",
733 updated_time,
734 second_paths.clone(),
735 );
736
737 cx.update(|cx| {
738 let store = SidebarThreadMetadataStore::global(cx);
739 store.update(cx, |store, cx| {
740 store.save(moved_metadata, cx);
741 });
742 });
743
744 cx.run_until_parked();
745
746 cx.update(|cx| {
747 let store = SidebarThreadMetadataStore::global(cx);
748 let store = store.read(cx);
749
750 let entry_ids = store
751 .entry_ids()
752 .map(|session_id| session_id.0.to_string())
753 .collect::<Vec<_>>();
754 assert_eq!(entry_ids, vec!["session-1", "session-2"]);
755
756 let first_path_entries = store
757 .entries_for_path(&first_paths)
758 .map(|entry| entry.session_id.0.to_string())
759 .collect::<Vec<_>>();
760 assert!(first_path_entries.is_empty());
761
762 let second_path_entries = store
763 .entries_for_path(&second_paths)
764 .map(|entry| entry.session_id.0.to_string())
765 .collect::<Vec<_>>();
766 assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
767 });
768
769 cx.update(|cx| {
770 let store = SidebarThreadMetadataStore::global(cx);
771 store.update(cx, |store, cx| {
772 store.delete(acp::SessionId::new("session-2"), cx);
773 });
774 });
775
776 cx.run_until_parked();
777
778 cx.update(|cx| {
779 let store = SidebarThreadMetadataStore::global(cx);
780 let store = store.read(cx);
781
782 let entry_ids = store
783 .entry_ids()
784 .map(|session_id| session_id.0.to_string())
785 .collect::<Vec<_>>();
786 assert_eq!(entry_ids, vec!["session-1"]);
787
788 let second_path_entries = store
789 .entries_for_path(&second_paths)
790 .map(|entry| entry.session_id.0.to_string())
791 .collect::<Vec<_>>();
792 assert_eq!(second_path_entries, vec!["session-1"]);
793 });
794 }
795
796 #[gpui::test]
797 async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
798 cx.update(|cx| {
799 ThreadStore::init_global(cx);
800 SidebarThreadMetadataStore::init_global(cx);
801 });
802
803 // Verify the cache is empty before migration
804 let list = cx.update(|cx| {
805 let store = SidebarThreadMetadataStore::global(cx);
806 store.read(cx).entries().collect::<Vec<_>>()
807 });
808 assert_eq!(list.len(), 0);
809
810 let project_a_paths = PathList::new(&[Path::new("/project-a")]);
811 let project_b_paths = PathList::new(&[Path::new("/project-b")]);
812 let now = Utc::now();
813
814 for index in 0..12 {
815 let updated_at = now + chrono::Duration::seconds(index as i64);
816 let session_id = format!("project-a-session-{index}");
817 let title = format!("Project A Thread {index}");
818
819 let save_task = cx.update(|cx| {
820 let thread_store = ThreadStore::global(cx);
821 let session_id = session_id.clone();
822 let title = title.clone();
823 let project_a_paths = project_a_paths.clone();
824 thread_store.update(cx, |store, cx| {
825 store.save_thread(
826 acp::SessionId::new(session_id),
827 make_db_thread(&title, updated_at),
828 project_a_paths,
829 cx,
830 )
831 })
832 });
833 save_task.await.unwrap();
834 cx.run_until_parked();
835 }
836
837 for index in 0..3 {
838 let updated_at = now + chrono::Duration::seconds(100 + index as i64);
839 let session_id = format!("project-b-session-{index}");
840 let title = format!("Project B Thread {index}");
841
842 let save_task = cx.update(|cx| {
843 let thread_store = ThreadStore::global(cx);
844 let session_id = session_id.clone();
845 let title = title.clone();
846 let project_b_paths = project_b_paths.clone();
847 thread_store.update(cx, |store, cx| {
848 store.save_thread(
849 acp::SessionId::new(session_id),
850 make_db_thread(&title, updated_at),
851 project_b_paths,
852 cx,
853 )
854 })
855 });
856 save_task.await.unwrap();
857 cx.run_until_parked();
858 }
859
860 let save_projectless = cx.update(|cx| {
861 let thread_store = ThreadStore::global(cx);
862 thread_store.update(cx, |store, cx| {
863 store.save_thread(
864 acp::SessionId::new("projectless-session"),
865 make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
866 PathList::default(),
867 cx,
868 )
869 })
870 });
871 save_projectless.await.unwrap();
872 cx.run_until_parked();
873
874 // Run migration
875 cx.update(|cx| {
876 migrate_thread_metadata(cx);
877 });
878
879 cx.run_until_parked();
880
881 // Verify the metadata was migrated, limited to 10 per project, and
882 // projectless threads were skipped.
883 let list = cx.update(|cx| {
884 let store = SidebarThreadMetadataStore::global(cx);
885 store.read(cx).entries().collect::<Vec<_>>()
886 });
887 assert_eq!(list.len(), 13);
888
889 assert!(
890 list.iter()
891 .all(|metadata| !metadata.folder_paths.is_empty())
892 );
893 assert!(
894 list.iter()
895 .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
896 );
897
898 let project_a_entries = list
899 .iter()
900 .filter(|metadata| metadata.folder_paths == project_a_paths)
901 .collect::<Vec<_>>();
902 assert_eq!(project_a_entries.len(), 10);
903 assert_eq!(
904 project_a_entries
905 .iter()
906 .map(|metadata| metadata.session_id.0.as_ref())
907 .collect::<Vec<_>>(),
908 vec![
909 "project-a-session-11",
910 "project-a-session-10",
911 "project-a-session-9",
912 "project-a-session-8",
913 "project-a-session-7",
914 "project-a-session-6",
915 "project-a-session-5",
916 "project-a-session-4",
917 "project-a-session-3",
918 "project-a-session-2",
919 ]
920 );
921 assert!(
922 project_a_entries
923 .iter()
924 .all(|metadata| metadata.agent_id.is_none())
925 );
926
927 let project_b_entries = list
928 .iter()
929 .filter(|metadata| metadata.folder_paths == project_b_paths)
930 .collect::<Vec<_>>();
931 assert_eq!(project_b_entries.len(), 3);
932 assert_eq!(
933 project_b_entries
934 .iter()
935 .map(|metadata| metadata.session_id.0.as_ref())
936 .collect::<Vec<_>>(),
937 vec![
938 "project-b-session-2",
939 "project-b-session-1",
940 "project-b-session-0",
941 ]
942 );
943 assert!(
944 project_b_entries
945 .iter()
946 .all(|metadata| metadata.agent_id.is_none())
947 );
948 }
949
950 #[gpui::test]
951 async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
952 cx.update(|cx| {
953 ThreadStore::init_global(cx);
954 SidebarThreadMetadataStore::init_global(cx);
955 });
956
957 // Pre-populate the metadata store with existing data
958 let existing_metadata = ThreadMetadata {
959 session_id: acp::SessionId::new("existing-session"),
960 agent_id: None,
961 title: "Existing Thread".into(),
962 updated_at: Utc::now(),
963 created_at: Some(Utc::now()),
964 folder_paths: PathList::default(),
965 };
966
967 cx.update(|cx| {
968 let store = SidebarThreadMetadataStore::global(cx);
969 store.update(cx, |store, cx| {
970 store.save(existing_metadata, cx);
971 });
972 });
973
974 cx.run_until_parked();
975
976 // Add an entry to native thread store that should NOT be migrated
977 let save_task = cx.update(|cx| {
978 let thread_store = ThreadStore::global(cx);
979 thread_store.update(cx, |store, cx| {
980 store.save_thread(
981 acp::SessionId::new("native-session"),
982 make_db_thread("Native Thread", Utc::now()),
983 PathList::default(),
984 cx,
985 )
986 })
987 });
988 save_task.await.unwrap();
989 cx.run_until_parked();
990
991 // Run migration - should skip because metadata store is not empty
992 cx.update(|cx| {
993 migrate_thread_metadata(cx);
994 });
995
996 cx.run_until_parked();
997
998 // Verify only the existing metadata is present (migration was skipped)
999 let list = cx.update(|cx| {
1000 let store = SidebarThreadMetadataStore::global(cx);
1001 store.read(cx).entries().collect::<Vec<_>>()
1002 });
1003 assert_eq!(list.len(), 1);
1004 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1005 }
1006
1007 #[gpui::test]
1008 async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1009 cx.update(|cx| {
1010 let settings_store = settings::SettingsStore::test(cx);
1011 cx.set_global(settings_store);
1012 cx.update_flags(true, vec!["agent-v2".to_string()]);
1013 ThreadStore::init_global(cx);
1014 SidebarThreadMetadataStore::init_global(cx);
1015 });
1016
1017 let fs = FakeFs::new(cx.executor());
1018 let project = Project::test(fs, None::<&Path>, cx).await;
1019 let connection = Rc::new(StubAgentConnection::new());
1020
1021 let thread = cx
1022 .update(|cx| {
1023 connection
1024 .clone()
1025 .new_session(project.clone(), PathList::default(), cx)
1026 })
1027 .await
1028 .unwrap();
1029 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1030
1031 cx.update(|cx| {
1032 thread.update(cx, |thread, cx| {
1033 thread.set_title("Draft Thread".into(), cx).detach();
1034 });
1035 });
1036 cx.run_until_parked();
1037
1038 let metadata_ids = cx.update(|cx| {
1039 SidebarThreadMetadataStore::global(cx)
1040 .read(cx)
1041 .entry_ids()
1042 .collect::<Vec<_>>()
1043 });
1044 assert_eq!(metadata_ids, vec![session_id]);
1045
1046 drop(thread);
1047 cx.update(|_| {});
1048 cx.run_until_parked();
1049 cx.run_until_parked();
1050
1051 let metadata_ids = cx.update(|cx| {
1052 SidebarThreadMetadataStore::global(cx)
1053 .read(cx)
1054 .entry_ids()
1055 .collect::<Vec<_>>()
1056 });
1057 assert!(
1058 metadata_ids.is_empty(),
1059 "expected empty draft thread metadata to be deleted on release"
1060 );
1061 }
1062
1063 #[gpui::test]
1064 async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1065 cx.update(|cx| {
1066 let settings_store = settings::SettingsStore::test(cx);
1067 cx.set_global(settings_store);
1068 cx.update_flags(true, vec!["agent-v2".to_string()]);
1069 ThreadStore::init_global(cx);
1070 SidebarThreadMetadataStore::init_global(cx);
1071 });
1072
1073 let fs = FakeFs::new(cx.executor());
1074 let project = Project::test(fs, None::<&Path>, cx).await;
1075 let connection = Rc::new(StubAgentConnection::new());
1076
1077 let thread = cx
1078 .update(|cx| {
1079 connection
1080 .clone()
1081 .new_session(project.clone(), PathList::default(), cx)
1082 })
1083 .await
1084 .unwrap();
1085 let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1086
1087 cx.update(|cx| {
1088 thread.update(cx, |thread, cx| {
1089 thread.push_user_content_block(None, "Hello".into(), cx);
1090 });
1091 });
1092 cx.run_until_parked();
1093
1094 let metadata_ids = cx.update(|cx| {
1095 SidebarThreadMetadataStore::global(cx)
1096 .read(cx)
1097 .entry_ids()
1098 .collect::<Vec<_>>()
1099 });
1100 assert_eq!(metadata_ids, vec![session_id.clone()]);
1101
1102 drop(thread);
1103 cx.update(|_| {});
1104 cx.run_until_parked();
1105
1106 let metadata_ids = cx.update(|cx| {
1107 SidebarThreadMetadataStore::global(cx)
1108 .read(cx)
1109 .entry_ids()
1110 .collect::<Vec<_>>()
1111 });
1112 assert_eq!(metadata_ids, vec![session_id]);
1113 }
1114
1115 #[gpui::test]
1116 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1117 cx.update(|cx| {
1118 let settings_store = settings::SettingsStore::test(cx);
1119 cx.set_global(settings_store);
1120 cx.update_flags(true, vec!["agent-v2".to_string()]);
1121 ThreadStore::init_global(cx);
1122 SidebarThreadMetadataStore::init_global(cx);
1123 });
1124
1125 let fs = FakeFs::new(cx.executor());
1126 let project = Project::test(fs, None::<&Path>, cx).await;
1127 let connection = Rc::new(StubAgentConnection::new());
1128
1129 // Create a regular (non-subagent) AcpThread.
1130 let regular_thread = cx
1131 .update(|cx| {
1132 connection
1133 .clone()
1134 .new_session(project.clone(), PathList::default(), cx)
1135 })
1136 .await
1137 .unwrap();
1138
1139 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1140
1141 // Set a title on the regular thread to trigger a save via handle_thread_update.
1142 cx.update(|cx| {
1143 regular_thread.update(cx, |thread, cx| {
1144 thread.set_title("Regular Thread".into(), cx).detach();
1145 });
1146 });
1147 cx.run_until_parked();
1148
1149 // Create a subagent AcpThread
1150 let subagent_session_id = acp::SessionId::new("subagent-session");
1151 let subagent_thread = cx.update(|cx| {
1152 let action_log = cx.new(|_| ActionLog::new(project.clone()));
1153 cx.new(|cx| {
1154 acp_thread::AcpThread::new(
1155 Some(regular_session_id.clone()),
1156 Some("Subagent Thread".into()),
1157 None,
1158 connection.clone(),
1159 project.clone(),
1160 action_log,
1161 subagent_session_id.clone(),
1162 watch::Receiver::constant(acp::PromptCapabilities::new()),
1163 cx,
1164 )
1165 })
1166 });
1167
1168 // Set a title on the subagent thread to trigger handle_thread_update.
1169 cx.update(|cx| {
1170 subagent_thread.update(cx, |thread, cx| {
1171 thread
1172 .set_title("Subagent Thread Title".into(), cx)
1173 .detach();
1174 });
1175 });
1176 cx.run_until_parked();
1177
1178 // List all metadata from the store cache.
1179 let list = cx.update(|cx| {
1180 let store = SidebarThreadMetadataStore::global(cx);
1181 store.read(cx).entries().collect::<Vec<_>>()
1182 });
1183
1184 // The subagent thread should NOT appear in the sidebar metadata.
1185 // Only the regular thread should be listed.
1186 assert_eq!(
1187 list.len(),
1188 1,
1189 "Expected only the regular thread in sidebar metadata, \
1190 but found {} entries (subagent threads are leaking into the sidebar)",
1191 list.len(),
1192 );
1193 assert_eq!(list[0].session_id, regular_session_id);
1194 assert_eq!(list[0].title.as_ref(), "Regular Thread");
1195 }
1196
1197 #[test]
1198 fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1199 let now = Utc::now();
1200
1201 let operations = vec![
1202 DbOperation::Insert(make_metadata(
1203 "session-1",
1204 "First Thread",
1205 now,
1206 PathList::default(),
1207 )),
1208 DbOperation::Delete(acp::SessionId::new("session-1")),
1209 ];
1210
1211 let deduped = SidebarThreadMetadataStore::dedup_db_operations(operations);
1212
1213 assert_eq!(deduped.len(), 1);
1214 assert_eq!(
1215 deduped[0],
1216 DbOperation::Delete(acp::SessionId::new("session-1"))
1217 );
1218 }
1219
1220 #[test]
1221 fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1222 let now = Utc::now();
1223 let later = now + chrono::Duration::seconds(1);
1224
1225 let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1226 let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1227
1228 let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1229 DbOperation::Insert(old_metadata),
1230 DbOperation::Insert(new_metadata.clone()),
1231 ]);
1232
1233 assert_eq!(deduped.len(), 1);
1234 assert_eq!(deduped[0], DbOperation::Insert(new_metadata));
1235 }
1236
1237 #[test]
1238 fn test_dedup_db_operations_preserves_distinct_sessions() {
1239 let now = Utc::now();
1240
1241 let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1242 let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1243 let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1244 DbOperation::Insert(metadata1.clone()),
1245 DbOperation::Insert(metadata2.clone()),
1246 ]);
1247
1248 assert_eq!(deduped.len(), 2);
1249 assert!(deduped.contains(&DbOperation::Insert(metadata1)));
1250 assert!(deduped.contains(&DbOperation::Insert(metadata2)));
1251 }
1252}