1use std::{path::Path, sync::Arc};
2
3use acp_thread::AgentSessionInfo;
4use agent::{ThreadStore, ZED_AGENT_ID};
5use agent_client_protocol as acp;
6use anyhow::Result;
7use chrono::{DateTime, Utc};
8use collections::HashMap;
9use db::{
10 sqlez::{
11 bindable::Column, domain::Domain, statement::Statement,
12 thread_safe_connection::ThreadSafeConnection,
13 },
14 sqlez_macros::sql,
15};
16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
17use gpui::{AppContext as _, Entity, Global, Subscription, Task};
18use project::AgentId;
19use ui::{App, Context, SharedString};
20use workspace::PathList;
21
22pub fn init(cx: &mut App) {
23 SidebarThreadMetadataStore::init_global(cx);
24
25 if cx.has_flag::<AgentV2FeatureFlag>() {
26 migrate_thread_metadata(cx);
27 }
28 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
29 if has_flag {
30 migrate_thread_metadata(cx);
31 }
32 })
33 .detach();
34}
35
36/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
37///
38/// TODO: Remove this after N weeks of shipping the sidebar
39fn migrate_thread_metadata(cx: &mut App) {
40 SidebarThreadMetadataStore::global(cx).update(cx, |store, cx| {
41 let list = store.list(cx);
42 cx.spawn(async move |this, cx| {
43 let Ok(list) = list.await else {
44 return;
45 };
46 if list.is_empty() {
47 this.update(cx, |this, cx| {
48 let metadata = ThreadStore::global(cx)
49 .read(cx)
50 .entries()
51 .map(|entry| ThreadMetadata {
52 session_id: entry.id,
53 agent_id: None,
54 title: entry.title,
55 updated_at: entry.updated_at,
56 created_at: entry.created_at,
57 folder_paths: entry.folder_paths,
58 })
59 .collect::<Vec<_>>();
60 for entry in metadata {
61 this.save(entry, cx).detach_and_log_err(cx);
62 }
63 })
64 .ok();
65 }
66 })
67 .detach();
68 });
69}
70
71struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
72impl Global for GlobalThreadMetadataStore {}
73
74/// Lightweight metadata for any thread (native or ACP), enough to populate
75/// the sidebar list and route to the correct load path when clicked.
76#[derive(Debug, Clone)]
77pub struct ThreadMetadata {
78 pub session_id: acp::SessionId,
79 /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
80 pub agent_id: Option<AgentId>,
81 pub title: SharedString,
82 pub updated_at: DateTime<Utc>,
83 pub created_at: Option<DateTime<Utc>>,
84 pub folder_paths: PathList,
85}
86
87impl ThreadMetadata {
88 pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
89 let session_id = session.session_id.clone();
90 let title = session.title.clone().unwrap_or_default();
91 let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
92 let created_at = session.created_at.unwrap_or(updated_at);
93 let folder_paths = session.work_dirs.clone().unwrap_or_default();
94 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
95 None
96 } else {
97 Some(agent_id)
98 };
99 Self {
100 session_id,
101 agent_id,
102 title,
103 updated_at,
104 created_at: Some(created_at),
105 folder_paths,
106 }
107 }
108
109 pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
110 let thread_ref = thread.read(cx);
111 let session_id = thread_ref.session_id().clone();
112 let title = thread_ref.title();
113 let updated_at = Utc::now();
114
115 let agent_id = thread_ref.connection().agent_id();
116
117 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
118 None
119 } else {
120 Some(agent_id)
121 };
122
123 let folder_paths = {
124 let project = thread_ref.project().read(cx);
125 let paths: Vec<Arc<Path>> = project
126 .visible_worktrees(cx)
127 .map(|worktree| worktree.read(cx).abs_path())
128 .collect();
129 PathList::new(&paths)
130 };
131
132 Self {
133 session_id,
134 agent_id,
135 title,
136 created_at: Some(updated_at), // handled by db `ON CONFLICT`
137 updated_at,
138 folder_paths,
139 }
140 }
141}
142
143/// The store holds all metadata needed to show threads in the sidebar.
144/// Effectively, all threads stored in here are "non-archived".
145///
146/// Automatically listens to AcpThread events and updates metadata if it has changed.
147pub struct SidebarThreadMetadataStore {
148 db: ThreadMetadataDb,
149 session_subscriptions: HashMap<acp::SessionId, Subscription>,
150}
151
152impl SidebarThreadMetadataStore {
153 #[cfg(not(any(test, feature = "test-support")))]
154 pub fn init_global(cx: &mut App) {
155 if cx.has_global::<Self>() {
156 return;
157 }
158
159 let db = ThreadMetadataDb::global(cx);
160 let thread_store = cx.new(|cx| Self::new(db, cx));
161 cx.set_global(GlobalThreadMetadataStore(thread_store));
162 }
163
164 #[cfg(any(test, feature = "test-support"))]
165 pub fn init_global(cx: &mut App) {
166 let thread = std::thread::current();
167 let test_name = thread.name().unwrap_or("unknown_test");
168 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
169 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
170 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
171 cx.set_global(GlobalThreadMetadataStore(thread_store));
172 }
173
174 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
175 cx.try_global::<GlobalThreadMetadataStore>()
176 .map(|store| store.0.clone())
177 }
178
179 pub fn global(cx: &App) -> Entity<Self> {
180 cx.global::<GlobalThreadMetadataStore>().0.clone()
181 }
182
183 pub fn list_ids(&self, cx: &App) -> Task<Result<Vec<acp::SessionId>>> {
184 let db = self.db.clone();
185 cx.background_spawn(async move {
186 let s = db.list_ids()?;
187 Ok(s)
188 })
189 }
190
191 pub fn list_sidebar_ids(&self, cx: &App) -> Task<Result<Vec<acp::SessionId>>> {
192 let db = self.db.clone();
193 cx.background_spawn(async move {
194 let s = db.list_sidebar_ids()?;
195 Ok(s)
196 })
197 }
198
199 pub fn list(&self, cx: &App) -> Task<Result<Vec<ThreadMetadata>>> {
200 let db = self.db.clone();
201 cx.background_spawn(async move {
202 let s = db.list()?;
203 Ok(s)
204 })
205 }
206
207 pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
208 if !cx.has_flag::<AgentV2FeatureFlag>() {
209 return Task::ready(Ok(()));
210 }
211
212 let db = self.db.clone();
213 cx.spawn(async move |this, cx| {
214 db.save(metadata).await?;
215 this.update(cx, |_this, cx| cx.notify())
216 })
217 }
218
219 pub fn delete(
220 &mut self,
221 session_id: acp::SessionId,
222 cx: &mut Context<Self>,
223 ) -> Task<Result<()>> {
224 if !cx.has_flag::<AgentV2FeatureFlag>() {
225 return Task::ready(Ok(()));
226 }
227
228 let db = self.db.clone();
229 cx.spawn(async move |this, cx| {
230 db.delete(session_id).await?;
231 this.update(cx, |_this, cx| cx.notify())
232 })
233 }
234
235 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
236 let weak_store = cx.weak_entity();
237
238 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
239 // Don't track subagent threads in the sidebar.
240 if thread.parent_session_id().is_some() {
241 return;
242 }
243
244 let thread_entity = cx.entity();
245
246 cx.on_release({
247 let weak_store = weak_store.clone();
248 move |thread, cx| {
249 weak_store
250 .update(cx, |store, _cx| {
251 store.session_subscriptions.remove(thread.session_id());
252 })
253 .ok();
254 }
255 })
256 .detach();
257
258 weak_store
259 .update(cx, |this, cx| {
260 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
261 this.session_subscriptions
262 .insert(thread.session_id().clone(), subscription);
263 })
264 .ok();
265 })
266 .detach();
267
268 Self {
269 db,
270 session_subscriptions: HashMap::default(),
271 }
272 }
273
274 fn handle_thread_update(
275 &mut self,
276 thread: Entity<acp_thread::AcpThread>,
277 event: &acp_thread::AcpThreadEvent,
278 cx: &mut Context<Self>,
279 ) {
280 // Don't track subagent threads in the sidebar.
281 if thread.read(cx).parent_session_id().is_some() {
282 return;
283 }
284
285 match event {
286 acp_thread::AcpThreadEvent::NewEntry
287 | acp_thread::AcpThreadEvent::EntryUpdated(_)
288 | acp_thread::AcpThreadEvent::TitleUpdated => {
289 let metadata = ThreadMetadata::from_thread(&thread, cx);
290 self.save(metadata, cx).detach_and_log_err(cx);
291 }
292 _ => {}
293 }
294 }
295}
296
297impl Global for SidebarThreadMetadataStore {}
298
299struct ThreadMetadataDb(ThreadSafeConnection);
300
301impl Domain for ThreadMetadataDb {
302 const NAME: &str = stringify!(ThreadMetadataDb);
303
304 const MIGRATIONS: &[&str] = &[sql!(
305 CREATE TABLE IF NOT EXISTS sidebar_threads(
306 session_id TEXT PRIMARY KEY,
307 agent_id TEXT,
308 title TEXT NOT NULL,
309 updated_at TEXT NOT NULL,
310 created_at TEXT,
311 folder_paths TEXT,
312 folder_paths_order TEXT
313 ) STRICT;
314 )];
315}
316
317db::static_connection!(ThreadMetadataDb, []);
318
319impl ThreadMetadataDb {
320 /// List all sidebar thread session IDs.
321 pub fn list_ids(&self) -> anyhow::Result<Vec<acp::SessionId>> {
322 self.select::<Arc<str>>("SELECT session_id FROM sidebar_threads")?()
323 .map(|ids| ids.into_iter().map(|id| acp::SessionId::new(id)).collect())
324 }
325
326 /// List session IDs of threads that belong to a real project workspace
327 /// (i.e. have non-empty folder_paths). These are the threads shown in
328 /// the sidebar, as opposed to threads created in empty workspaces.
329 pub fn list_sidebar_ids(&self) -> anyhow::Result<Vec<acp::SessionId>> {
330 self.select::<Arc<str>>(
331 "SELECT session_id FROM sidebar_threads WHERE folder_paths IS NOT NULL AND folder_paths != ''",
332 )?()
333 .map(|ids| ids.into_iter().map(|id| acp::SessionId::new(id)).collect())
334 }
335
336 /// List all sidebar thread metadata, ordered by updated_at descending.
337 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
338 self.select::<ThreadMetadata>(
339 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
340 FROM sidebar_threads \
341 ORDER BY updated_at DESC"
342 )?()
343 }
344
345 /// Upsert metadata for a thread.
346 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
347 let id = row.session_id.0.clone();
348 let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
349 let title = row.title.to_string();
350 let updated_at = row.updated_at.to_rfc3339();
351 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
352 let serialized = row.folder_paths.serialize();
353 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
354 (None, None)
355 } else {
356 (Some(serialized.paths), Some(serialized.order))
357 };
358
359 self.write(move |conn| {
360 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
361 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
362 ON CONFLICT(session_id) DO UPDATE SET \
363 agent_id = excluded.agent_id, \
364 title = excluded.title, \
365 updated_at = excluded.updated_at, \
366 folder_paths = excluded.folder_paths, \
367 folder_paths_order = excluded.folder_paths_order";
368 let mut stmt = Statement::prepare(conn, sql)?;
369 let mut i = stmt.bind(&id, 1)?;
370 i = stmt.bind(&agent_id, i)?;
371 i = stmt.bind(&title, i)?;
372 i = stmt.bind(&updated_at, i)?;
373 i = stmt.bind(&created_at, i)?;
374 i = stmt.bind(&folder_paths, i)?;
375 stmt.bind(&folder_paths_order, i)?;
376 stmt.exec()
377 })
378 .await
379 }
380
381 /// Delete metadata for a single thread.
382 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
383 let id = session_id.0.clone();
384 self.write(move |conn| {
385 let mut stmt =
386 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
387 stmt.bind(&id, 1)?;
388 stmt.exec()
389 })
390 .await
391 }
392}
393
394impl Column for ThreadMetadata {
395 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
396 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
397 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
398 let (title, next): (String, i32) = Column::column(statement, next)?;
399 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
400 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
401 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
402 let (folder_paths_order_str, next): (Option<String>, i32) =
403 Column::column(statement, next)?;
404
405 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
406 let created_at = created_at_str
407 .as_deref()
408 .map(DateTime::parse_from_rfc3339)
409 .transpose()?
410 .map(|dt| dt.with_timezone(&Utc));
411
412 let folder_paths = folder_paths_str
413 .map(|paths| {
414 PathList::deserialize(&util::path_list::SerializedPathList {
415 paths,
416 order: folder_paths_order_str.unwrap_or_default(),
417 })
418 })
419 .unwrap_or_default();
420
421 Ok((
422 ThreadMetadata {
423 session_id: acp::SessionId::new(id),
424 agent_id: agent_id.map(|id| AgentId::new(id)),
425 title: title.into(),
426 updated_at,
427 created_at,
428 folder_paths,
429 },
430 next,
431 ))
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use acp_thread::{AgentConnection, StubAgentConnection};
439 use action_log::ActionLog;
440 use agent::DbThread;
441 use agent_client_protocol as acp;
442 use feature_flags::FeatureFlagAppExt;
443 use gpui::TestAppContext;
444 use project::FakeFs;
445 use project::Project;
446 use std::path::Path;
447 use std::rc::Rc;
448 use util::path_list::PathList;
449
450 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
451 DbThread {
452 title: title.to_string().into(),
453 messages: Vec::new(),
454 updated_at,
455 detailed_summary: None,
456 initial_project_snapshot: None,
457 cumulative_token_usage: Default::default(),
458 request_token_usage: Default::default(),
459 model: None,
460 profile: None,
461 imported: false,
462 subagent_context: None,
463 speed: None,
464 thinking_enabled: false,
465 thinking_effort: None,
466 draft_prompt: None,
467 ui_scroll_position: None,
468 }
469 }
470
471 #[gpui::test]
472 async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
473 cx.update(|cx| {
474 ThreadStore::init_global(cx);
475 SidebarThreadMetadataStore::init_global(cx);
476 });
477
478 // Verify the list is empty before migration
479 let metadata_list = cx.update(|cx| {
480 let store = SidebarThreadMetadataStore::global(cx);
481 store.read(cx).list(cx)
482 });
483
484 let list = metadata_list.await.unwrap();
485 assert_eq!(list.len(), 0);
486
487 let now = Utc::now();
488
489 // Populate the native ThreadStore via save_thread
490 let save1 = cx.update(|cx| {
491 let thread_store = ThreadStore::global(cx);
492 thread_store.update(cx, |store, cx| {
493 store.save_thread(
494 acp::SessionId::new("session-1"),
495 make_db_thread("Thread 1", now),
496 PathList::default(),
497 cx,
498 )
499 })
500 });
501 save1.await.unwrap();
502 cx.run_until_parked();
503
504 let save2 = cx.update(|cx| {
505 let thread_store = ThreadStore::global(cx);
506 thread_store.update(cx, |store, cx| {
507 store.save_thread(
508 acp::SessionId::new("session-2"),
509 make_db_thread("Thread 2", now),
510 PathList::default(),
511 cx,
512 )
513 })
514 });
515 save2.await.unwrap();
516 cx.run_until_parked();
517
518 // Run migration
519 cx.update(|cx| {
520 migrate_thread_metadata(cx);
521 });
522
523 cx.run_until_parked();
524
525 // Verify the metadata was migrated
526 let metadata_list = cx.update(|cx| {
527 let store = SidebarThreadMetadataStore::global(cx);
528 store.read(cx).list(cx)
529 });
530
531 let list = metadata_list.await.unwrap();
532 assert_eq!(list.len(), 2);
533
534 let metadata1 = list
535 .iter()
536 .find(|m| m.session_id.0.as_ref() == "session-1")
537 .expect("session-1 should be in migrated metadata");
538 assert_eq!(metadata1.title.as_ref(), "Thread 1");
539 assert!(metadata1.agent_id.is_none());
540
541 let metadata2 = list
542 .iter()
543 .find(|m| m.session_id.0.as_ref() == "session-2")
544 .expect("session-2 should be in migrated metadata");
545 assert_eq!(metadata2.title.as_ref(), "Thread 2");
546 assert!(metadata2.agent_id.is_none());
547 }
548
549 #[gpui::test]
550 async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
551 cx.update(|cx| {
552 ThreadStore::init_global(cx);
553 SidebarThreadMetadataStore::init_global(cx);
554 });
555
556 // Pre-populate the metadata store with existing data
557 let existing_metadata = ThreadMetadata {
558 session_id: acp::SessionId::new("existing-session"),
559 agent_id: None,
560 title: "Existing Thread".into(),
561 updated_at: Utc::now(),
562 created_at: Some(Utc::now()),
563 folder_paths: PathList::default(),
564 };
565
566 cx.update(|cx| {
567 let store = SidebarThreadMetadataStore::global(cx);
568 store.update(cx, |store, cx| {
569 store.save(existing_metadata, cx).detach();
570 });
571 });
572
573 cx.run_until_parked();
574
575 // Add an entry to native thread store that should NOT be migrated
576 let save_task = cx.update(|cx| {
577 let thread_store = ThreadStore::global(cx);
578 thread_store.update(cx, |store, cx| {
579 store.save_thread(
580 acp::SessionId::new("native-session"),
581 make_db_thread("Native Thread", Utc::now()),
582 PathList::default(),
583 cx,
584 )
585 })
586 });
587 save_task.await.unwrap();
588 cx.run_until_parked();
589
590 // Run migration - should skip because metadata store is not empty
591 cx.update(|cx| {
592 migrate_thread_metadata(cx);
593 });
594
595 cx.run_until_parked();
596
597 // Verify only the existing metadata is present (migration was skipped)
598 let metadata_list = cx.update(|cx| {
599 let store = SidebarThreadMetadataStore::global(cx);
600 store.read(cx).list(cx)
601 });
602
603 let list = metadata_list.await.unwrap();
604 assert_eq!(list.len(), 1);
605 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
606 }
607
608 #[gpui::test]
609 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
610 cx.update(|cx| {
611 let settings_store = settings::SettingsStore::test(cx);
612 cx.set_global(settings_store);
613 cx.update_flags(true, vec!["agent-v2".to_string()]);
614 ThreadStore::init_global(cx);
615 SidebarThreadMetadataStore::init_global(cx);
616 });
617
618 let fs = FakeFs::new(cx.executor());
619 let project = Project::test(fs, None::<&Path>, cx).await;
620 let connection = Rc::new(StubAgentConnection::new());
621
622 // Create a regular (non-subagent) AcpThread.
623 let regular_thread = cx
624 .update(|cx| {
625 connection
626 .clone()
627 .new_session(project.clone(), PathList::default(), cx)
628 })
629 .await
630 .unwrap();
631
632 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
633
634 // Set a title on the regular thread to trigger a save via handle_thread_update.
635 cx.update(|cx| {
636 regular_thread.update(cx, |thread, cx| {
637 thread.set_title("Regular Thread".into(), cx).detach();
638 });
639 });
640 cx.run_until_parked();
641
642 // Create a subagent AcpThread
643 let subagent_session_id = acp::SessionId::new("subagent-session");
644 let subagent_thread = cx.update(|cx| {
645 let action_log = cx.new(|_| ActionLog::new(project.clone()));
646 cx.new(|cx| {
647 acp_thread::AcpThread::new(
648 Some(regular_session_id.clone()),
649 "Subagent Thread",
650 None,
651 connection.clone(),
652 project.clone(),
653 action_log,
654 subagent_session_id.clone(),
655 watch::Receiver::constant(acp::PromptCapabilities::new()),
656 cx,
657 )
658 })
659 });
660
661 // Set a title on the subagent thread to trigger handle_thread_update.
662 cx.update(|cx| {
663 subagent_thread.update(cx, |thread, cx| {
664 thread
665 .set_title("Subagent Thread Title".into(), cx)
666 .detach();
667 });
668 });
669 cx.run_until_parked();
670
671 // List all metadata from the store.
672 let metadata_list = cx.update(|cx| {
673 let store = SidebarThreadMetadataStore::global(cx);
674 store.read(cx).list(cx)
675 });
676
677 let list = metadata_list.await.unwrap();
678
679 // The subagent thread should NOT appear in the sidebar metadata.
680 // Only the regular thread should be listed.
681 assert_eq!(
682 list.len(),
683 1,
684 "Expected only the regular thread in sidebar metadata, \
685 but found {} entries (subagent threads are leaking into the sidebar)",
686 list.len(),
687 );
688 assert_eq!(list[0].session_id, regular_session_id);
689 assert_eq!(list[0].title.as_ref(), "Regular Thread");
690 }
691}