1use std::{path::Path, sync::Arc};
2
3use agent::{ThreadStore, ZED_AGENT_ID};
4use agent_client_protocol as acp;
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use collections::HashMap;
8use db::{
9 sqlez::{
10 bindable::Column, domain::Domain, statement::Statement,
11 thread_safe_connection::ThreadSafeConnection,
12 },
13 sqlez_macros::sql,
14};
15use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
16use gpui::{AppContext as _, Entity, Global, Subscription, Task};
17use project::AgentId;
18use ui::{App, Context, SharedString};
19use workspace::PathList;
20
21pub fn init(cx: &mut App) {
22 ThreadMetadataStore::init_global(cx);
23
24 if cx.has_flag::<AgentV2FeatureFlag>() {
25 migrate_thread_metadata(cx);
26 }
27 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
28 if has_flag {
29 migrate_thread_metadata(cx);
30 }
31 })
32 .detach();
33}
34
35/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
36///
37/// TODO: Remove this after N weeks of shipping the sidebar
38fn migrate_thread_metadata(cx: &mut App) {
39 ThreadMetadataStore::global(cx).update(cx, |store, cx| {
40 let list = store.list(cx);
41 cx.spawn(async move |this, cx| {
42 let Ok(list) = list.await else {
43 return;
44 };
45 if list.is_empty() {
46 this.update(cx, |this, cx| {
47 let metadata = ThreadStore::global(cx)
48 .read(cx)
49 .entries()
50 .map(|entry| ThreadMetadata {
51 session_id: entry.id,
52 agent_id: None,
53 title: entry.title,
54 updated_at: entry.updated_at,
55 created_at: entry.created_at,
56 folder_paths: entry.folder_paths,
57 })
58 .collect::<Vec<_>>();
59 for entry in metadata {
60 this.save(entry, cx).detach_and_log_err(cx);
61 }
62 })
63 .ok();
64 }
65 })
66 .detach();
67 });
68}
69
70struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
71impl Global for GlobalThreadMetadataStore {}
72
73/// Lightweight metadata for any thread (native or ACP), enough to populate
74/// the sidebar list and route to the correct load path when clicked.
75#[derive(Debug, Clone)]
76pub struct ThreadMetadata {
77 pub session_id: acp::SessionId,
78 /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
79 pub agent_id: Option<AgentId>,
80 pub title: SharedString,
81 pub updated_at: DateTime<Utc>,
82 pub created_at: Option<DateTime<Utc>>,
83 pub folder_paths: PathList,
84}
85
86pub struct ThreadMetadataStore {
87 db: ThreadMetadataDb,
88 session_subscriptions: HashMap<acp::SessionId, Subscription>,
89}
90
91impl ThreadMetadataStore {
92 #[cfg(not(any(test, feature = "test-support")))]
93 pub fn init_global(cx: &mut App) {
94 if cx.has_global::<Self>() {
95 return;
96 }
97
98 let db = ThreadMetadataDb::global(cx);
99 let thread_store = cx.new(|cx| Self::new(db, cx));
100 cx.set_global(GlobalThreadMetadataStore(thread_store));
101 }
102
103 #[cfg(any(test, feature = "test-support"))]
104 pub fn init_global(cx: &mut App) {
105 let thread = std::thread::current();
106 let test_name = thread.name().unwrap_or("unknown_test");
107 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
108 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
109 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
110 cx.set_global(GlobalThreadMetadataStore(thread_store));
111 }
112
113 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
114 cx.try_global::<GlobalThreadMetadataStore>()
115 .map(|store| store.0.clone())
116 }
117
118 pub fn global(cx: &App) -> Entity<Self> {
119 cx.global::<GlobalThreadMetadataStore>().0.clone()
120 }
121
122 pub fn list(&self, cx: &App) -> Task<Result<Vec<ThreadMetadata>>> {
123 let db = self.db.clone();
124 cx.background_spawn(async move {
125 let s = db.list()?;
126 Ok(s)
127 })
128 }
129
130 pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
131 if !cx.has_flag::<AgentV2FeatureFlag>() {
132 return Task::ready(Ok(()));
133 }
134
135 let db = self.db.clone();
136 cx.spawn(async move |this, cx| {
137 db.save(metadata).await?;
138 this.update(cx, |_this, cx| cx.notify())
139 })
140 }
141
142 pub fn delete(
143 &mut self,
144 session_id: acp::SessionId,
145 cx: &mut Context<Self>,
146 ) -> Task<Result<()>> {
147 if !cx.has_flag::<AgentV2FeatureFlag>() {
148 return Task::ready(Ok(()));
149 }
150
151 let db = self.db.clone();
152 cx.spawn(async move |this, cx| {
153 db.delete(session_id).await?;
154 this.update(cx, |_this, cx| cx.notify())
155 })
156 }
157
158 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
159 let weak_store = cx.weak_entity();
160
161 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
162 // Don't track subagent threads in the sidebar.
163 if thread.parent_session_id().is_some() {
164 return;
165 }
166
167 let thread_entity = cx.entity();
168
169 cx.on_release({
170 let weak_store = weak_store.clone();
171 move |thread, cx| {
172 weak_store
173 .update(cx, |store, _cx| {
174 store.session_subscriptions.remove(thread.session_id());
175 })
176 .ok();
177 }
178 })
179 .detach();
180
181 weak_store
182 .update(cx, |this, cx| {
183 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
184 this.session_subscriptions
185 .insert(thread.session_id().clone(), subscription);
186 })
187 .ok();
188 })
189 .detach();
190
191 Self {
192 db,
193 session_subscriptions: HashMap::default(),
194 }
195 }
196
197 fn handle_thread_update(
198 &mut self,
199 thread: Entity<acp_thread::AcpThread>,
200 event: &acp_thread::AcpThreadEvent,
201 cx: &mut Context<Self>,
202 ) {
203 // Don't track subagent threads in the sidebar.
204 if thread.read(cx).parent_session_id().is_some() {
205 return;
206 }
207
208 match event {
209 acp_thread::AcpThreadEvent::NewEntry
210 | acp_thread::AcpThreadEvent::EntryUpdated(_)
211 | acp_thread::AcpThreadEvent::TitleUpdated => {
212 let metadata = Self::metadata_for_acp_thread(thread.read(cx), cx);
213 self.save(metadata, cx).detach_and_log_err(cx);
214 }
215 _ => {}
216 }
217 }
218
219 fn metadata_for_acp_thread(thread: &acp_thread::AcpThread, cx: &App) -> ThreadMetadata {
220 let session_id = thread.session_id().clone();
221 let title = thread.title();
222 let updated_at = Utc::now();
223
224 let agent_id = thread.connection().agent_id();
225
226 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
227 None
228 } else {
229 Some(agent_id)
230 };
231
232 let folder_paths = {
233 let project = thread.project().read(cx);
234 let paths: Vec<Arc<Path>> = project
235 .visible_worktrees(cx)
236 .map(|worktree| worktree.read(cx).abs_path())
237 .collect();
238 PathList::new(&paths)
239 };
240
241 ThreadMetadata {
242 session_id,
243 agent_id,
244 title,
245 created_at: Some(updated_at), // handled by db `ON CONFLICT`
246 updated_at,
247 folder_paths,
248 }
249 }
250}
251
252impl Global for ThreadMetadataStore {}
253
254struct ThreadMetadataDb(ThreadSafeConnection);
255
256impl Domain for ThreadMetadataDb {
257 const NAME: &str = stringify!(ThreadMetadataDb);
258
259 const MIGRATIONS: &[&str] = &[sql!(
260 CREATE TABLE IF NOT EXISTS sidebar_threads(
261 session_id TEXT PRIMARY KEY,
262 agent_id TEXT,
263 title TEXT NOT NULL,
264 updated_at TEXT NOT NULL,
265 created_at TEXT,
266 folder_paths TEXT,
267 folder_paths_order TEXT
268 ) STRICT;
269 )];
270}
271
272db::static_connection!(ThreadMetadataDb, []);
273
274impl ThreadMetadataDb {
275 /// List all sidebar thread metadata, ordered by updated_at descending.
276 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
277 self.select::<ThreadMetadata>(
278 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
279 FROM sidebar_threads \
280 ORDER BY updated_at DESC"
281 )?()
282 }
283
284 /// Upsert metadata for a thread.
285 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
286 let id = row.session_id.0.clone();
287 let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
288 let title = row.title.to_string();
289 let updated_at = row.updated_at.to_rfc3339();
290 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
291 let serialized = row.folder_paths.serialize();
292 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
293 (None, None)
294 } else {
295 (Some(serialized.paths), Some(serialized.order))
296 };
297
298 self.write(move |conn| {
299 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
300 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
301 ON CONFLICT(session_id) DO UPDATE SET \
302 agent_id = excluded.agent_id, \
303 title = excluded.title, \
304 updated_at = excluded.updated_at, \
305 folder_paths = excluded.folder_paths, \
306 folder_paths_order = excluded.folder_paths_order";
307 let mut stmt = Statement::prepare(conn, sql)?;
308 let mut i = stmt.bind(&id, 1)?;
309 i = stmt.bind(&agent_id, i)?;
310 i = stmt.bind(&title, i)?;
311 i = stmt.bind(&updated_at, i)?;
312 i = stmt.bind(&created_at, i)?;
313 i = stmt.bind(&folder_paths, i)?;
314 stmt.bind(&folder_paths_order, i)?;
315 stmt.exec()
316 })
317 .await
318 }
319
320 /// Delete metadata for a single thread.
321 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
322 let id = session_id.0.clone();
323 self.write(move |conn| {
324 let mut stmt =
325 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
326 stmt.bind(&id, 1)?;
327 stmt.exec()
328 })
329 .await
330 }
331}
332
333impl Column for ThreadMetadata {
334 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
335 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
336 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
337 let (title, next): (String, i32) = Column::column(statement, next)?;
338 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
339 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
340 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
341 let (folder_paths_order_str, next): (Option<String>, i32) =
342 Column::column(statement, next)?;
343
344 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
345 let created_at = created_at_str
346 .as_deref()
347 .map(DateTime::parse_from_rfc3339)
348 .transpose()?
349 .map(|dt| dt.with_timezone(&Utc));
350
351 let folder_paths = folder_paths_str
352 .map(|paths| {
353 PathList::deserialize(&util::path_list::SerializedPathList {
354 paths,
355 order: folder_paths_order_str.unwrap_or_default(),
356 })
357 })
358 .unwrap_or_default();
359
360 Ok((
361 ThreadMetadata {
362 session_id: acp::SessionId::new(id),
363 agent_id: agent_id.map(|id| AgentId::new(id)),
364 title: title.into(),
365 updated_at,
366 created_at,
367 folder_paths,
368 },
369 next,
370 ))
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use acp_thread::{AgentConnection, StubAgentConnection};
378 use action_log::ActionLog;
379 use agent::DbThread;
380 use agent_client_protocol as acp;
381 use feature_flags::FeatureFlagAppExt;
382 use gpui::TestAppContext;
383 use project::FakeFs;
384 use project::Project;
385 use std::path::Path;
386 use std::rc::Rc;
387 use util::path_list::PathList;
388
389 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
390 DbThread {
391 title: title.to_string().into(),
392 messages: Vec::new(),
393 updated_at,
394 detailed_summary: None,
395 initial_project_snapshot: None,
396 cumulative_token_usage: Default::default(),
397 request_token_usage: Default::default(),
398 model: None,
399 profile: None,
400 imported: false,
401 subagent_context: None,
402 speed: None,
403 thinking_enabled: false,
404 thinking_effort: None,
405 draft_prompt: None,
406 ui_scroll_position: None,
407 }
408 }
409
410 #[gpui::test]
411 async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
412 cx.update(|cx| {
413 ThreadStore::init_global(cx);
414 ThreadMetadataStore::init_global(cx);
415 });
416
417 // Verify the list is empty before migration
418 let metadata_list = cx.update(|cx| {
419 let store = ThreadMetadataStore::global(cx);
420 store.read(cx).list(cx)
421 });
422
423 let list = metadata_list.await.unwrap();
424 assert_eq!(list.len(), 0);
425
426 let now = Utc::now();
427
428 // Populate the native ThreadStore via save_thread
429 let save1 = cx.update(|cx| {
430 let thread_store = ThreadStore::global(cx);
431 thread_store.update(cx, |store, cx| {
432 store.save_thread(
433 acp::SessionId::new("session-1"),
434 make_db_thread("Thread 1", now),
435 PathList::default(),
436 cx,
437 )
438 })
439 });
440 save1.await.unwrap();
441 cx.run_until_parked();
442
443 let save2 = cx.update(|cx| {
444 let thread_store = ThreadStore::global(cx);
445 thread_store.update(cx, |store, cx| {
446 store.save_thread(
447 acp::SessionId::new("session-2"),
448 make_db_thread("Thread 2", now),
449 PathList::default(),
450 cx,
451 )
452 })
453 });
454 save2.await.unwrap();
455 cx.run_until_parked();
456
457 // Run migration
458 cx.update(|cx| {
459 migrate_thread_metadata(cx);
460 });
461
462 cx.run_until_parked();
463
464 // Verify the metadata was migrated
465 let metadata_list = cx.update(|cx| {
466 let store = ThreadMetadataStore::global(cx);
467 store.read(cx).list(cx)
468 });
469
470 let list = metadata_list.await.unwrap();
471 assert_eq!(list.len(), 2);
472
473 let metadata1 = list
474 .iter()
475 .find(|m| m.session_id.0.as_ref() == "session-1")
476 .expect("session-1 should be in migrated metadata");
477 assert_eq!(metadata1.title.as_ref(), "Thread 1");
478 assert!(metadata1.agent_id.is_none());
479
480 let metadata2 = list
481 .iter()
482 .find(|m| m.session_id.0.as_ref() == "session-2")
483 .expect("session-2 should be in migrated metadata");
484 assert_eq!(metadata2.title.as_ref(), "Thread 2");
485 assert!(metadata2.agent_id.is_none());
486 }
487
488 #[gpui::test]
489 async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
490 cx.update(|cx| {
491 ThreadStore::init_global(cx);
492 ThreadMetadataStore::init_global(cx);
493 });
494
495 // Pre-populate the metadata store with existing data
496 let existing_metadata = ThreadMetadata {
497 session_id: acp::SessionId::new("existing-session"),
498 agent_id: None,
499 title: "Existing Thread".into(),
500 updated_at: Utc::now(),
501 created_at: Some(Utc::now()),
502 folder_paths: PathList::default(),
503 };
504
505 cx.update(|cx| {
506 let store = ThreadMetadataStore::global(cx);
507 store.update(cx, |store, cx| {
508 store.save(existing_metadata, cx).detach();
509 });
510 });
511
512 cx.run_until_parked();
513
514 // Add an entry to native thread store that should NOT be migrated
515 let save_task = cx.update(|cx| {
516 let thread_store = ThreadStore::global(cx);
517 thread_store.update(cx, |store, cx| {
518 store.save_thread(
519 acp::SessionId::new("native-session"),
520 make_db_thread("Native Thread", Utc::now()),
521 PathList::default(),
522 cx,
523 )
524 })
525 });
526 save_task.await.unwrap();
527 cx.run_until_parked();
528
529 // Run migration - should skip because metadata store is not empty
530 cx.update(|cx| {
531 migrate_thread_metadata(cx);
532 });
533
534 cx.run_until_parked();
535
536 // Verify only the existing metadata is present (migration was skipped)
537 let metadata_list = cx.update(|cx| {
538 let store = ThreadMetadataStore::global(cx);
539 store.read(cx).list(cx)
540 });
541
542 let list = metadata_list.await.unwrap();
543 assert_eq!(list.len(), 1);
544 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
545 }
546
547 #[gpui::test]
548 async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
549 cx.update(|cx| {
550 let settings_store = settings::SettingsStore::test(cx);
551 cx.set_global(settings_store);
552 cx.update_flags(true, vec!["agent-v2".to_string()]);
553 ThreadStore::init_global(cx);
554 ThreadMetadataStore::init_global(cx);
555 });
556
557 let fs = FakeFs::new(cx.executor());
558 let project = Project::test(fs, None::<&Path>, cx).await;
559 let connection = Rc::new(StubAgentConnection::new());
560
561 // Create a regular (non-subagent) AcpThread.
562 let regular_thread = cx
563 .update(|cx| {
564 connection
565 .clone()
566 .new_session(project.clone(), PathList::default(), cx)
567 })
568 .await
569 .unwrap();
570
571 let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
572
573 // Set a title on the regular thread to trigger a save via handle_thread_update.
574 cx.update(|cx| {
575 regular_thread.update(cx, |thread, cx| {
576 thread.set_title("Regular Thread".into(), cx).detach();
577 });
578 });
579 cx.run_until_parked();
580
581 // Create a subagent AcpThread
582 let subagent_session_id = acp::SessionId::new("subagent-session");
583 let subagent_thread = cx.update(|cx| {
584 let action_log = cx.new(|_| ActionLog::new(project.clone()));
585 cx.new(|cx| {
586 acp_thread::AcpThread::new(
587 Some(regular_session_id.clone()),
588 "Subagent Thread",
589 None,
590 connection.clone(),
591 project.clone(),
592 action_log,
593 subagent_session_id.clone(),
594 watch::Receiver::constant(acp::PromptCapabilities::new()),
595 cx,
596 )
597 })
598 });
599
600 // Set a title on the subagent thread to trigger handle_thread_update.
601 cx.update(|cx| {
602 subagent_thread.update(cx, |thread, cx| {
603 thread
604 .set_title("Subagent Thread Title".into(), cx)
605 .detach();
606 });
607 });
608 cx.run_until_parked();
609
610 // List all metadata from the store.
611 let metadata_list = cx.update(|cx| {
612 let store = ThreadMetadataStore::global(cx);
613 store.read(cx).list(cx)
614 });
615
616 let list = metadata_list.await.unwrap();
617
618 // The subagent thread should NOT appear in the sidebar metadata.
619 // Only the regular thread should be listed.
620 assert_eq!(
621 list.len(),
622 1,
623 "Expected only the regular thread in sidebar metadata, \
624 but found {} entries (subagent threads are leaking into the sidebar)",
625 list.len(),
626 );
627 assert_eq!(list[0].session_id, regular_session_id);
628 assert_eq!(list[0].title.as_ref(), "Regular Thread");
629 }
630}