1use std::{path::Path, sync::Arc};
2
3use agent::{ThreadStore, ZED_AGENT_ID};
4use agent_client_protocol as acp;
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use collections::HashMap;
8use db::{
9 sqlez::{
10 bindable::Column, domain::Domain, statement::Statement,
11 thread_safe_connection::ThreadSafeConnection,
12 },
13 sqlez_macros::sql,
14};
15use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
16use gpui::{AppContext as _, Entity, Global, Subscription, Task};
17use project::AgentId;
18use ui::{App, Context, SharedString};
19use workspace::PathList;
20
21pub fn init(cx: &mut App) {
22 ThreadMetadataStore::init_global(cx);
23
24 if cx.has_flag::<AgentV2FeatureFlag>() {
25 migrate_thread_metadata(cx);
26 }
27 cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
28 if has_flag {
29 migrate_thread_metadata(cx);
30 }
31 })
32 .detach();
33}
34
35/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
36///
37/// TODO: Remove this after N weeks of shipping the sidebar
38fn migrate_thread_metadata(cx: &mut App) {
39 ThreadMetadataStore::global(cx).update(cx, |store, cx| {
40 let list = store.list(cx);
41 cx.spawn(async move |this, cx| {
42 let Ok(list) = list.await else {
43 return;
44 };
45 if list.is_empty() {
46 this.update(cx, |this, cx| {
47 let metadata = ThreadStore::global(cx)
48 .read(cx)
49 .entries()
50 .map(|entry| ThreadMetadata {
51 session_id: entry.id,
52 agent_id: None,
53 title: entry.title,
54 updated_at: entry.updated_at,
55 created_at: entry.created_at,
56 folder_paths: entry.folder_paths,
57 })
58 .collect::<Vec<_>>();
59 for entry in metadata {
60 this.save(entry, cx).detach_and_log_err(cx);
61 }
62 })
63 .ok();
64 }
65 })
66 .detach();
67 });
68}
69
70struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
71impl Global for GlobalThreadMetadataStore {}
72
73/// Lightweight metadata for any thread (native or ACP), enough to populate
74/// the sidebar list and route to the correct load path when clicked.
75#[derive(Debug, Clone)]
76pub struct ThreadMetadata {
77 pub session_id: acp::SessionId,
78 /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
79 pub agent_id: Option<AgentId>,
80 pub title: SharedString,
81 pub updated_at: DateTime<Utc>,
82 pub created_at: Option<DateTime<Utc>>,
83 pub folder_paths: PathList,
84}
85
86pub struct ThreadMetadataStore {
87 db: ThreadMetadataDb,
88 session_subscriptions: HashMap<acp::SessionId, Subscription>,
89}
90
91impl ThreadMetadataStore {
92 #[cfg(not(any(test, feature = "test-support")))]
93 pub fn init_global(cx: &mut App) {
94 if cx.has_global::<Self>() {
95 return;
96 }
97
98 let db = THREAD_METADATA_DB.clone();
99 let thread_store = cx.new(|cx| Self::new(db, cx));
100 cx.set_global(GlobalThreadMetadataStore(thread_store));
101 }
102
103 #[cfg(any(test, feature = "test-support"))]
104 pub fn init_global(cx: &mut App) {
105 let thread = std::thread::current();
106 let test_name = thread.name().unwrap_or("unknown_test");
107 let db_name = format!("THREAD_METADATA_DB_{}", test_name);
108 let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
109 let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
110 cx.set_global(GlobalThreadMetadataStore(thread_store));
111 }
112
113 pub fn try_global(cx: &App) -> Option<Entity<Self>> {
114 cx.try_global::<GlobalThreadMetadataStore>()
115 .map(|store| store.0.clone())
116 }
117
118 pub fn global(cx: &App) -> Entity<Self> {
119 cx.global::<GlobalThreadMetadataStore>().0.clone()
120 }
121
122 pub fn list(&self, cx: &App) -> Task<Result<Vec<ThreadMetadata>>> {
123 let db = self.db.clone();
124 cx.background_spawn(async move {
125 let s = db.list()?;
126 Ok(s)
127 })
128 }
129
130 pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
131 if !cx.has_flag::<AgentV2FeatureFlag>() {
132 return Task::ready(Ok(()));
133 }
134
135 let db = self.db.clone();
136 cx.spawn(async move |this, cx| {
137 db.save(metadata).await?;
138 this.update(cx, |_this, cx| cx.notify())
139 })
140 }
141
142 pub fn delete(
143 &mut self,
144 session_id: acp::SessionId,
145 cx: &mut Context<Self>,
146 ) -> Task<Result<()>> {
147 if !cx.has_flag::<AgentV2FeatureFlag>() {
148 return Task::ready(Ok(()));
149 }
150
151 let db = self.db.clone();
152 cx.spawn(async move |this, cx| {
153 db.delete(session_id).await?;
154 this.update(cx, |_this, cx| cx.notify())
155 })
156 }
157
158 fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
159 let weak_store = cx.weak_entity();
160
161 cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
162 let thread_entity = cx.entity();
163
164 cx.on_release({
165 let weak_store = weak_store.clone();
166 move |thread, cx| {
167 weak_store
168 .update(cx, |store, _cx| {
169 store.session_subscriptions.remove(thread.session_id());
170 })
171 .ok();
172 }
173 })
174 .detach();
175
176 weak_store
177 .update(cx, |this, cx| {
178 let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
179 this.session_subscriptions
180 .insert(thread.session_id().clone(), subscription);
181 })
182 .ok();
183 })
184 .detach();
185
186 Self {
187 db,
188 session_subscriptions: HashMap::default(),
189 }
190 }
191
192 fn handle_thread_update(
193 &mut self,
194 thread: Entity<acp_thread::AcpThread>,
195 event: &acp_thread::AcpThreadEvent,
196 cx: &mut Context<Self>,
197 ) {
198 match event {
199 acp_thread::AcpThreadEvent::NewEntry
200 | acp_thread::AcpThreadEvent::EntryUpdated(_)
201 | acp_thread::AcpThreadEvent::TitleUpdated => {
202 let metadata = Self::metadata_for_acp_thread(thread.read(cx), cx);
203 self.save(metadata, cx).detach_and_log_err(cx);
204 }
205 _ => {}
206 }
207 }
208
209 fn metadata_for_acp_thread(thread: &acp_thread::AcpThread, cx: &App) -> ThreadMetadata {
210 let session_id = thread.session_id().clone();
211 let title = thread.title();
212 let updated_at = Utc::now();
213
214 let agent_id = thread.connection().agent_id();
215
216 let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
217 None
218 } else {
219 Some(agent_id)
220 };
221
222 let folder_paths = {
223 let project = thread.project().read(cx);
224 let paths: Vec<Arc<Path>> = project
225 .visible_worktrees(cx)
226 .map(|worktree| worktree.read(cx).abs_path())
227 .collect();
228 PathList::new(&paths)
229 };
230
231 ThreadMetadata {
232 session_id,
233 agent_id,
234 title,
235 created_at: Some(updated_at), // handled by db `ON CONFLICT`
236 updated_at,
237 folder_paths,
238 }
239 }
240}
241
242impl Global for ThreadMetadataStore {}
243
244#[derive(Clone)]
245struct ThreadMetadataDb(ThreadSafeConnection);
246
247impl Domain for ThreadMetadataDb {
248 const NAME: &str = stringify!(ThreadMetadataDb);
249
250 const MIGRATIONS: &[&str] = &[sql!(
251 CREATE TABLE IF NOT EXISTS sidebar_threads(
252 session_id TEXT PRIMARY KEY,
253 agent_id TEXT,
254 title TEXT NOT NULL,
255 updated_at TEXT NOT NULL,
256 created_at TEXT,
257 folder_paths TEXT,
258 folder_paths_order TEXT
259 ) STRICT;
260 )];
261}
262
263db::static_connection!(THREAD_METADATA_DB, ThreadMetadataDb, []);
264
265impl ThreadMetadataDb {
266 /// List all sidebar thread metadata, ordered by updated_at descending.
267 pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
268 self.select::<ThreadMetadata>(
269 "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
270 FROM sidebar_threads \
271 ORDER BY updated_at DESC"
272 )?()
273 }
274
275 /// Upsert metadata for a thread.
276 pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
277 let id = row.session_id.0.clone();
278 let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
279 let title = row.title.to_string();
280 let updated_at = row.updated_at.to_rfc3339();
281 let created_at = row.created_at.map(|dt| dt.to_rfc3339());
282 let serialized = row.folder_paths.serialize();
283 let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
284 (None, None)
285 } else {
286 (Some(serialized.paths), Some(serialized.order))
287 };
288
289 self.write(move |conn| {
290 let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
291 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
292 ON CONFLICT(session_id) DO UPDATE SET \
293 agent_id = excluded.agent_id, \
294 title = excluded.title, \
295 updated_at = excluded.updated_at, \
296 folder_paths = excluded.folder_paths, \
297 folder_paths_order = excluded.folder_paths_order";
298 let mut stmt = Statement::prepare(conn, sql)?;
299 let mut i = stmt.bind(&id, 1)?;
300 i = stmt.bind(&agent_id, i)?;
301 i = stmt.bind(&title, i)?;
302 i = stmt.bind(&updated_at, i)?;
303 i = stmt.bind(&created_at, i)?;
304 i = stmt.bind(&folder_paths, i)?;
305 stmt.bind(&folder_paths_order, i)?;
306 stmt.exec()
307 })
308 .await
309 }
310
311 /// Delete metadata for a single thread.
312 pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
313 let id = session_id.0.clone();
314 self.write(move |conn| {
315 let mut stmt =
316 Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
317 stmt.bind(&id, 1)?;
318 stmt.exec()
319 })
320 .await
321 }
322}
323
324impl Column for ThreadMetadata {
325 fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
326 let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
327 let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
328 let (title, next): (String, i32) = Column::column(statement, next)?;
329 let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
330 let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
331 let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
332 let (folder_paths_order_str, next): (Option<String>, i32) =
333 Column::column(statement, next)?;
334
335 let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
336 let created_at = created_at_str
337 .as_deref()
338 .map(DateTime::parse_from_rfc3339)
339 .transpose()?
340 .map(|dt| dt.with_timezone(&Utc));
341
342 let folder_paths = folder_paths_str
343 .map(|paths| {
344 PathList::deserialize(&util::path_list::SerializedPathList {
345 paths,
346 order: folder_paths_order_str.unwrap_or_default(),
347 })
348 })
349 .unwrap_or_default();
350
351 Ok((
352 ThreadMetadata {
353 session_id: acp::SessionId::new(id),
354 agent_id: agent_id.map(|id| AgentId::new(id)),
355 title: title.into(),
356 updated_at,
357 created_at,
358 folder_paths,
359 },
360 next,
361 ))
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use agent::DbThread;
369 use gpui::TestAppContext;
370
371 fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
372 DbThread {
373 title: title.to_string().into(),
374 messages: Vec::new(),
375 updated_at,
376 detailed_summary: None,
377 initial_project_snapshot: None,
378 cumulative_token_usage: Default::default(),
379 request_token_usage: Default::default(),
380 model: None,
381 profile: None,
382 imported: false,
383 subagent_context: None,
384 speed: None,
385 thinking_enabled: false,
386 thinking_effort: None,
387 draft_prompt: None,
388 ui_scroll_position: None,
389 }
390 }
391
392 #[gpui::test]
393 async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
394 cx.update(|cx| {
395 ThreadStore::init_global(cx);
396 ThreadMetadataStore::init_global(cx);
397 });
398
399 // Verify the list is empty before migration
400 let metadata_list = cx.update(|cx| {
401 let store = ThreadMetadataStore::global(cx);
402 store.read(cx).list(cx)
403 });
404
405 let list = metadata_list.await.unwrap();
406 assert_eq!(list.len(), 0);
407
408 let now = Utc::now();
409
410 // Populate the native ThreadStore via save_thread
411 let save1 = cx.update(|cx| {
412 let thread_store = ThreadStore::global(cx);
413 thread_store.update(cx, |store, cx| {
414 store.save_thread(
415 acp::SessionId::new("session-1"),
416 make_db_thread("Thread 1", now),
417 PathList::default(),
418 cx,
419 )
420 })
421 });
422 save1.await.unwrap();
423 cx.run_until_parked();
424
425 let save2 = cx.update(|cx| {
426 let thread_store = ThreadStore::global(cx);
427 thread_store.update(cx, |store, cx| {
428 store.save_thread(
429 acp::SessionId::new("session-2"),
430 make_db_thread("Thread 2", now),
431 PathList::default(),
432 cx,
433 )
434 })
435 });
436 save2.await.unwrap();
437 cx.run_until_parked();
438
439 // Run migration
440 cx.update(|cx| {
441 migrate_thread_metadata(cx);
442 });
443
444 cx.run_until_parked();
445
446 // Verify the metadata was migrated
447 let metadata_list = cx.update(|cx| {
448 let store = ThreadMetadataStore::global(cx);
449 store.read(cx).list(cx)
450 });
451
452 let list = metadata_list.await.unwrap();
453 assert_eq!(list.len(), 2);
454
455 let metadata1 = list
456 .iter()
457 .find(|m| m.session_id.0.as_ref() == "session-1")
458 .expect("session-1 should be in migrated metadata");
459 assert_eq!(metadata1.title.as_ref(), "Thread 1");
460 assert!(metadata1.agent_id.is_none());
461
462 let metadata2 = list
463 .iter()
464 .find(|m| m.session_id.0.as_ref() == "session-2")
465 .expect("session-2 should be in migrated metadata");
466 assert_eq!(metadata2.title.as_ref(), "Thread 2");
467 assert!(metadata2.agent_id.is_none());
468 }
469
470 #[gpui::test]
471 async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
472 cx.update(|cx| {
473 ThreadStore::init_global(cx);
474 ThreadMetadataStore::init_global(cx);
475 });
476
477 // Pre-populate the metadata store with existing data
478 let existing_metadata = ThreadMetadata {
479 session_id: acp::SessionId::new("existing-session"),
480 agent_id: None,
481 title: "Existing Thread".into(),
482 updated_at: Utc::now(),
483 created_at: Some(Utc::now()),
484 folder_paths: PathList::default(),
485 };
486
487 cx.update(|cx| {
488 let store = ThreadMetadataStore::global(cx);
489 store.update(cx, |store, cx| {
490 store.save(existing_metadata, cx).detach();
491 });
492 });
493
494 cx.run_until_parked();
495
496 // Add an entry to native thread store that should NOT be migrated
497 let save_task = cx.update(|cx| {
498 let thread_store = ThreadStore::global(cx);
499 thread_store.update(cx, |store, cx| {
500 store.save_thread(
501 acp::SessionId::new("native-session"),
502 make_db_thread("Native Thread", Utc::now()),
503 PathList::default(),
504 cx,
505 )
506 })
507 });
508 save_task.await.unwrap();
509 cx.run_until_parked();
510
511 // Run migration - should skip because metadata store is not empty
512 cx.update(|cx| {
513 migrate_thread_metadata(cx);
514 });
515
516 cx.run_until_parked();
517
518 // Verify only the existing metadata is present (migration was skipped)
519 let metadata_list = cx.update(|cx| {
520 let store = ThreadMetadataStore::global(cx);
521 store.read(cx).list(cx)
522 });
523
524 let list = metadata_list.await.unwrap();
525 assert_eq!(list.len(), 1);
526 assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
527 }
528}