thread_metadata_store.rs

  1use std::{path::Path, sync::Arc};
  2
  3use acp_thread::AgentSessionInfo;
  4use agent::{ThreadStore, ZED_AGENT_ID};
  5use agent_client_protocol as acp;
  6use anyhow::Result;
  7use chrono::{DateTime, Utc};
  8use collections::HashMap;
  9use db::{
 10    sqlez::{
 11        bindable::Column, domain::Domain, statement::Statement,
 12        thread_safe_connection::ThreadSafeConnection,
 13    },
 14    sqlez_macros::sql,
 15};
 16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
 17use gpui::{AppContext as _, Entity, Global, Subscription, Task};
 18use project::AgentId;
 19use ui::{App, Context, SharedString};
 20use workspace::PathList;
 21
 22pub fn init(cx: &mut App) {
 23    SidebarThreadMetadataStore::init_global(cx);
 24
 25    if cx.has_flag::<AgentV2FeatureFlag>() {
 26        migrate_thread_metadata(cx);
 27    }
 28    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
 29        if has_flag {
 30            migrate_thread_metadata(cx);
 31        }
 32    })
 33    .detach();
 34}
 35
 36/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
 37///
 38/// TODO: Remove this after N weeks of shipping the sidebar
 39fn migrate_thread_metadata(cx: &mut App) {
 40    SidebarThreadMetadataStore::global(cx).update(cx, |store, cx| {
 41        let list = store.list(cx);
 42        cx.spawn(async move |this, cx| {
 43            let Ok(list) = list.await else {
 44                return;
 45            };
 46            if list.is_empty() {
 47                this.update(cx, |this, cx| {
 48                    let metadata = ThreadStore::global(cx)
 49                        .read(cx)
 50                        .entries()
 51                        .map(|entry| ThreadMetadata {
 52                            session_id: entry.id,
 53                            agent_id: None,
 54                            title: entry.title,
 55                            updated_at: entry.updated_at,
 56                            created_at: entry.created_at,
 57                            folder_paths: entry.folder_paths,
 58                        })
 59                        .collect::<Vec<_>>();
 60                    for entry in metadata {
 61                        this.save(entry, cx).detach_and_log_err(cx);
 62                    }
 63                })
 64                .ok();
 65            }
 66        })
 67        .detach();
 68    });
 69}
 70
 71struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
 72impl Global for GlobalThreadMetadataStore {}
 73
 74/// Lightweight metadata for any thread (native or ACP), enough to populate
 75/// the sidebar list and route to the correct load path when clicked.
 76#[derive(Debug, Clone)]
 77pub struct ThreadMetadata {
 78    pub session_id: acp::SessionId,
 79    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 80    pub agent_id: Option<AgentId>,
 81    pub title: SharedString,
 82    pub updated_at: DateTime<Utc>,
 83    pub created_at: Option<DateTime<Utc>>,
 84    pub folder_paths: PathList,
 85}
 86
 87impl ThreadMetadata {
 88    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 89        let session_id = session.session_id.clone();
 90        let title = session.title.clone().unwrap_or_default();
 91        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 92        let created_at = session.created_at.unwrap_or(updated_at);
 93        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 94        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 95            None
 96        } else {
 97            Some(agent_id)
 98        };
 99        Self {
100            session_id,
101            agent_id,
102            title,
103            updated_at,
104            created_at: Some(created_at),
105            folder_paths,
106        }
107    }
108
109    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
110        let thread_ref = thread.read(cx);
111        let session_id = thread_ref.session_id().clone();
112        let title = thread_ref.title();
113        let updated_at = Utc::now();
114
115        let agent_id = thread_ref.connection().agent_id();
116
117        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
118            None
119        } else {
120            Some(agent_id)
121        };
122
123        let folder_paths = {
124            let project = thread_ref.project().read(cx);
125            let paths: Vec<Arc<Path>> = project
126                .visible_worktrees(cx)
127                .map(|worktree| worktree.read(cx).abs_path())
128                .collect();
129            PathList::new(&paths)
130        };
131
132        Self {
133            session_id,
134            agent_id,
135            title,
136            created_at: Some(updated_at), // handled by db `ON CONFLICT`
137            updated_at,
138            folder_paths,
139        }
140    }
141}
142
143/// The store holds all metadata needed to show threads in the sidebar.
144/// Effectively, all threads stored in here are "non-archived".
145///
146/// Automatically listens to AcpThread events and updates metadata if it has changed.
147pub struct SidebarThreadMetadataStore {
148    db: ThreadMetadataDb,
149    session_subscriptions: HashMap<acp::SessionId, Subscription>,
150}
151
152impl SidebarThreadMetadataStore {
153    #[cfg(not(any(test, feature = "test-support")))]
154    pub fn init_global(cx: &mut App) {
155        if cx.has_global::<Self>() {
156            return;
157        }
158
159        let db = ThreadMetadataDb::global(cx);
160        let thread_store = cx.new(|cx| Self::new(db, cx));
161        cx.set_global(GlobalThreadMetadataStore(thread_store));
162    }
163
164    #[cfg(any(test, feature = "test-support"))]
165    pub fn init_global(cx: &mut App) {
166        let thread = std::thread::current();
167        let test_name = thread.name().unwrap_or("unknown_test");
168        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
169        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
170        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
171        cx.set_global(GlobalThreadMetadataStore(thread_store));
172    }
173
174    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
175        cx.try_global::<GlobalThreadMetadataStore>()
176            .map(|store| store.0.clone())
177    }
178
179    pub fn global(cx: &App) -> Entity<Self> {
180        cx.global::<GlobalThreadMetadataStore>().0.clone()
181    }
182
183    pub fn list_ids(&self, cx: &App) -> Task<Result<Vec<acp::SessionId>>> {
184        let db = self.db.clone();
185        cx.background_spawn(async move {
186            let s = db.list_ids()?;
187            Ok(s)
188        })
189    }
190
191    pub fn list_sidebar_ids(&self, cx: &App) -> Task<Result<Vec<acp::SessionId>>> {
192        let db = self.db.clone();
193        cx.background_spawn(async move {
194            let s = db.list_sidebar_ids()?;
195            Ok(s)
196        })
197    }
198
199    pub fn list(&self, cx: &App) -> Task<Result<Vec<ThreadMetadata>>> {
200        let db = self.db.clone();
201        cx.background_spawn(async move {
202            let s = db.list()?;
203            Ok(s)
204        })
205    }
206
207    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
208        if !cx.has_flag::<AgentV2FeatureFlag>() {
209            return Task::ready(Ok(()));
210        }
211
212        let db = self.db.clone();
213        cx.spawn(async move |this, cx| {
214            db.save(metadata).await?;
215            this.update(cx, |_this, cx| cx.notify())
216        })
217    }
218
219    pub fn delete(
220        &mut self,
221        session_id: acp::SessionId,
222        cx: &mut Context<Self>,
223    ) -> Task<Result<()>> {
224        if !cx.has_flag::<AgentV2FeatureFlag>() {
225            return Task::ready(Ok(()));
226        }
227
228        let db = self.db.clone();
229        cx.spawn(async move |this, cx| {
230            db.delete(session_id).await?;
231            this.update(cx, |_this, cx| cx.notify())
232        })
233    }
234
235    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
236        let weak_store = cx.weak_entity();
237
238        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
239            // Don't track subagent threads in the sidebar.
240            if thread.parent_session_id().is_some() {
241                return;
242            }
243
244            let thread_entity = cx.entity();
245
246            cx.on_release({
247                let weak_store = weak_store.clone();
248                move |thread, cx| {
249                    weak_store
250                        .update(cx, |store, _cx| {
251                            store.session_subscriptions.remove(thread.session_id());
252                        })
253                        .ok();
254                }
255            })
256            .detach();
257
258            weak_store
259                .update(cx, |this, cx| {
260                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
261                    this.session_subscriptions
262                        .insert(thread.session_id().clone(), subscription);
263                })
264                .ok();
265        })
266        .detach();
267
268        Self {
269            db,
270            session_subscriptions: HashMap::default(),
271        }
272    }
273
274    fn handle_thread_update(
275        &mut self,
276        thread: Entity<acp_thread::AcpThread>,
277        event: &acp_thread::AcpThreadEvent,
278        cx: &mut Context<Self>,
279    ) {
280        // Don't track subagent threads in the sidebar.
281        if thread.read(cx).parent_session_id().is_some() {
282            return;
283        }
284
285        match event {
286            acp_thread::AcpThreadEvent::NewEntry
287            | acp_thread::AcpThreadEvent::EntryUpdated(_)
288            | acp_thread::AcpThreadEvent::TitleUpdated => {
289                let metadata = ThreadMetadata::from_thread(&thread, cx);
290                self.save(metadata, cx).detach_and_log_err(cx);
291            }
292            _ => {}
293        }
294    }
295}
296
297impl Global for SidebarThreadMetadataStore {}
298
299struct ThreadMetadataDb(ThreadSafeConnection);
300
301impl Domain for ThreadMetadataDb {
302    const NAME: &str = stringify!(ThreadMetadataDb);
303
304    const MIGRATIONS: &[&str] = &[sql!(
305        CREATE TABLE IF NOT EXISTS sidebar_threads(
306            session_id TEXT PRIMARY KEY,
307            agent_id TEXT,
308            title TEXT NOT NULL,
309            updated_at TEXT NOT NULL,
310            created_at TEXT,
311            folder_paths TEXT,
312            folder_paths_order TEXT
313        ) STRICT;
314    )];
315}
316
317db::static_connection!(ThreadMetadataDb, []);
318
319impl ThreadMetadataDb {
320    /// List all sidebar thread session IDs.
321    pub fn list_ids(&self) -> anyhow::Result<Vec<acp::SessionId>> {
322        self.select::<Arc<str>>("SELECT session_id FROM sidebar_threads")?()
323            .map(|ids| ids.into_iter().map(|id| acp::SessionId::new(id)).collect())
324    }
325
326    /// List session IDs of threads that belong to a real project workspace
327    /// (i.e. have non-empty folder_paths). These are the threads shown in
328    /// the sidebar, as opposed to threads created in empty workspaces.
329    pub fn list_sidebar_ids(&self) -> anyhow::Result<Vec<acp::SessionId>> {
330        self.select::<Arc<str>>(
331            "SELECT session_id FROM sidebar_threads WHERE folder_paths IS NOT NULL AND folder_paths != ''",
332        )?()
333        .map(|ids| ids.into_iter().map(|id| acp::SessionId::new(id)).collect())
334    }
335
336    /// List all sidebar thread metadata, ordered by updated_at descending.
337    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
338        self.select::<ThreadMetadata>(
339            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
340             FROM sidebar_threads \
341             ORDER BY updated_at DESC"
342        )?()
343    }
344
345    /// Upsert metadata for a thread.
346    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
347        let id = row.session_id.0.clone();
348        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
349        let title = row.title.to_string();
350        let updated_at = row.updated_at.to_rfc3339();
351        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
352        let serialized = row.folder_paths.serialize();
353        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
354            (None, None)
355        } else {
356            (Some(serialized.paths), Some(serialized.order))
357        };
358
359        self.write(move |conn| {
360            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
361                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
362                       ON CONFLICT(session_id) DO UPDATE SET \
363                           agent_id = excluded.agent_id, \
364                           title = excluded.title, \
365                           updated_at = excluded.updated_at, \
366                           folder_paths = excluded.folder_paths, \
367                           folder_paths_order = excluded.folder_paths_order";
368            let mut stmt = Statement::prepare(conn, sql)?;
369            let mut i = stmt.bind(&id, 1)?;
370            i = stmt.bind(&agent_id, i)?;
371            i = stmt.bind(&title, i)?;
372            i = stmt.bind(&updated_at, i)?;
373            i = stmt.bind(&created_at, i)?;
374            i = stmt.bind(&folder_paths, i)?;
375            stmt.bind(&folder_paths_order, i)?;
376            stmt.exec()
377        })
378        .await
379    }
380
381    /// Delete metadata for a single thread.
382    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
383        let id = session_id.0.clone();
384        self.write(move |conn| {
385            let mut stmt =
386                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
387            stmt.bind(&id, 1)?;
388            stmt.exec()
389        })
390        .await
391    }
392}
393
394impl Column for ThreadMetadata {
395    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
396        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
397        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
398        let (title, next): (String, i32) = Column::column(statement, next)?;
399        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
400        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
401        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
402        let (folder_paths_order_str, next): (Option<String>, i32) =
403            Column::column(statement, next)?;
404
405        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
406        let created_at = created_at_str
407            .as_deref()
408            .map(DateTime::parse_from_rfc3339)
409            .transpose()?
410            .map(|dt| dt.with_timezone(&Utc));
411
412        let folder_paths = folder_paths_str
413            .map(|paths| {
414                PathList::deserialize(&util::path_list::SerializedPathList {
415                    paths,
416                    order: folder_paths_order_str.unwrap_or_default(),
417                })
418            })
419            .unwrap_or_default();
420
421        Ok((
422            ThreadMetadata {
423                session_id: acp::SessionId::new(id),
424                agent_id: agent_id.map(|id| AgentId::new(id)),
425                title: title.into(),
426                updated_at,
427                created_at,
428                folder_paths,
429            },
430            next,
431        ))
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use acp_thread::{AgentConnection, StubAgentConnection};
439    use action_log::ActionLog;
440    use agent::DbThread;
441    use agent_client_protocol as acp;
442    use feature_flags::FeatureFlagAppExt;
443    use gpui::TestAppContext;
444    use project::FakeFs;
445    use project::Project;
446    use std::path::Path;
447    use std::rc::Rc;
448    use util::path_list::PathList;
449
450    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
451        DbThread {
452            title: title.to_string().into(),
453            messages: Vec::new(),
454            updated_at,
455            detailed_summary: None,
456            initial_project_snapshot: None,
457            cumulative_token_usage: Default::default(),
458            request_token_usage: Default::default(),
459            model: None,
460            profile: None,
461            imported: false,
462            subagent_context: None,
463            speed: None,
464            thinking_enabled: false,
465            thinking_effort: None,
466            draft_prompt: None,
467            ui_scroll_position: None,
468        }
469    }
470
471    #[gpui::test]
472    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
473        cx.update(|cx| {
474            ThreadStore::init_global(cx);
475            SidebarThreadMetadataStore::init_global(cx);
476        });
477
478        // Verify the list is empty before migration
479        let metadata_list = cx.update(|cx| {
480            let store = SidebarThreadMetadataStore::global(cx);
481            store.read(cx).list(cx)
482        });
483
484        let list = metadata_list.await.unwrap();
485        assert_eq!(list.len(), 0);
486
487        let now = Utc::now();
488
489        // Populate the native ThreadStore via save_thread
490        let save1 = cx.update(|cx| {
491            let thread_store = ThreadStore::global(cx);
492            thread_store.update(cx, |store, cx| {
493                store.save_thread(
494                    acp::SessionId::new("session-1"),
495                    make_db_thread("Thread 1", now),
496                    PathList::default(),
497                    cx,
498                )
499            })
500        });
501        save1.await.unwrap();
502        cx.run_until_parked();
503
504        let save2 = cx.update(|cx| {
505            let thread_store = ThreadStore::global(cx);
506            thread_store.update(cx, |store, cx| {
507                store.save_thread(
508                    acp::SessionId::new("session-2"),
509                    make_db_thread("Thread 2", now),
510                    PathList::default(),
511                    cx,
512                )
513            })
514        });
515        save2.await.unwrap();
516        cx.run_until_parked();
517
518        // Run migration
519        cx.update(|cx| {
520            migrate_thread_metadata(cx);
521        });
522
523        cx.run_until_parked();
524
525        // Verify the metadata was migrated
526        let metadata_list = cx.update(|cx| {
527            let store = SidebarThreadMetadataStore::global(cx);
528            store.read(cx).list(cx)
529        });
530
531        let list = metadata_list.await.unwrap();
532        assert_eq!(list.len(), 2);
533
534        let metadata1 = list
535            .iter()
536            .find(|m| m.session_id.0.as_ref() == "session-1")
537            .expect("session-1 should be in migrated metadata");
538        assert_eq!(metadata1.title.as_ref(), "Thread 1");
539        assert!(metadata1.agent_id.is_none());
540
541        let metadata2 = list
542            .iter()
543            .find(|m| m.session_id.0.as_ref() == "session-2")
544            .expect("session-2 should be in migrated metadata");
545        assert_eq!(metadata2.title.as_ref(), "Thread 2");
546        assert!(metadata2.agent_id.is_none());
547    }
548
549    #[gpui::test]
550    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
551        cx.update(|cx| {
552            ThreadStore::init_global(cx);
553            SidebarThreadMetadataStore::init_global(cx);
554        });
555
556        // Pre-populate the metadata store with existing data
557        let existing_metadata = ThreadMetadata {
558            session_id: acp::SessionId::new("existing-session"),
559            agent_id: None,
560            title: "Existing Thread".into(),
561            updated_at: Utc::now(),
562            created_at: Some(Utc::now()),
563            folder_paths: PathList::default(),
564        };
565
566        cx.update(|cx| {
567            let store = SidebarThreadMetadataStore::global(cx);
568            store.update(cx, |store, cx| {
569                store.save(existing_metadata, cx).detach();
570            });
571        });
572
573        cx.run_until_parked();
574
575        // Add an entry to native thread store that should NOT be migrated
576        let save_task = cx.update(|cx| {
577            let thread_store = ThreadStore::global(cx);
578            thread_store.update(cx, |store, cx| {
579                store.save_thread(
580                    acp::SessionId::new("native-session"),
581                    make_db_thread("Native Thread", Utc::now()),
582                    PathList::default(),
583                    cx,
584                )
585            })
586        });
587        save_task.await.unwrap();
588        cx.run_until_parked();
589
590        // Run migration - should skip because metadata store is not empty
591        cx.update(|cx| {
592            migrate_thread_metadata(cx);
593        });
594
595        cx.run_until_parked();
596
597        // Verify only the existing metadata is present (migration was skipped)
598        let metadata_list = cx.update(|cx| {
599            let store = SidebarThreadMetadataStore::global(cx);
600            store.read(cx).list(cx)
601        });
602
603        let list = metadata_list.await.unwrap();
604        assert_eq!(list.len(), 1);
605        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
606    }
607
608    #[gpui::test]
609    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
610        cx.update(|cx| {
611            let settings_store = settings::SettingsStore::test(cx);
612            cx.set_global(settings_store);
613            cx.update_flags(true, vec!["agent-v2".to_string()]);
614            ThreadStore::init_global(cx);
615            SidebarThreadMetadataStore::init_global(cx);
616        });
617
618        let fs = FakeFs::new(cx.executor());
619        let project = Project::test(fs, None::<&Path>, cx).await;
620        let connection = Rc::new(StubAgentConnection::new());
621
622        // Create a regular (non-subagent) AcpThread.
623        let regular_thread = cx
624            .update(|cx| {
625                connection
626                    .clone()
627                    .new_session(project.clone(), PathList::default(), cx)
628            })
629            .await
630            .unwrap();
631
632        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
633
634        // Set a title on the regular thread to trigger a save via handle_thread_update.
635        cx.update(|cx| {
636            regular_thread.update(cx, |thread, cx| {
637                thread.set_title("Regular Thread".into(), cx).detach();
638            });
639        });
640        cx.run_until_parked();
641
642        // Create a subagent AcpThread
643        let subagent_session_id = acp::SessionId::new("subagent-session");
644        let subagent_thread = cx.update(|cx| {
645            let action_log = cx.new(|_| ActionLog::new(project.clone()));
646            cx.new(|cx| {
647                acp_thread::AcpThread::new(
648                    Some(regular_session_id.clone()),
649                    "Subagent Thread",
650                    None,
651                    connection.clone(),
652                    project.clone(),
653                    action_log,
654                    subagent_session_id.clone(),
655                    watch::Receiver::constant(acp::PromptCapabilities::new()),
656                    cx,
657                )
658            })
659        });
660
661        // Set a title on the subagent thread to trigger handle_thread_update.
662        cx.update(|cx| {
663            subagent_thread.update(cx, |thread, cx| {
664                thread
665                    .set_title("Subagent Thread Title".into(), cx)
666                    .detach();
667            });
668        });
669        cx.run_until_parked();
670
671        // List all metadata from the store.
672        let metadata_list = cx.update(|cx| {
673            let store = SidebarThreadMetadataStore::global(cx);
674            store.read(cx).list(cx)
675        });
676
677        let list = metadata_list.await.unwrap();
678
679        // The subagent thread should NOT appear in the sidebar metadata.
680        // Only the regular thread should be listed.
681        assert_eq!(
682            list.len(),
683            1,
684            "Expected only the regular thread in sidebar metadata, \
685             but found {} entries (subagent threads are leaking into the sidebar)",
686            list.len(),
687        );
688        assert_eq!(list[0].session_id, regular_session_id);
689        assert_eq!(list[0].title.as_ref(), "Regular Thread");
690    }
691}