thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AgentSessionInfo;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::{Context as _, Result};
   7use chrono::{DateTime, Utc};
   8use collections::HashMap;
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    SidebarThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We migrate the last 10 threads per project and skip threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
  46
  47    let store = SidebarThreadMetadataStore::global(cx);
  48    let db = store.read(cx).db.clone();
  49
  50    cx.spawn(async move |cx| {
  51        if !db.is_empty()? {
  52            return Ok::<(), anyhow::Error>(());
  53        }
  54
  55        let metadata = store.read_with(cx, |_store, app| {
  56            let mut migrated_threads_per_project = HashMap::default();
  57
  58            ThreadStore::global(app)
  59                .read(app)
  60                .entries()
  61                .filter_map(|entry| {
  62                    if entry.folder_paths.is_empty() {
  63                        return None;
  64                    }
  65
  66                    let migrated_thread_count = migrated_threads_per_project
  67                        .entry(entry.folder_paths.clone())
  68                        .or_insert(0);
  69                    if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
  70                        return None;
  71                    }
  72                    *migrated_thread_count += 1;
  73
  74                    Some(ThreadMetadata {
  75                        session_id: entry.id,
  76                        agent_id: None,
  77                        title: entry.title,
  78                        updated_at: entry.updated_at,
  79                        created_at: entry.created_at,
  80                        folder_paths: entry.folder_paths,
  81                    })
  82                })
  83                .collect::<Vec<_>>()
  84        });
  85
  86        log::info!("Migrating {} thread store entries", metadata.len());
  87
  88        // Manually save each entry to the database and call reload, otherwise
  89        // we'll end up triggering lots of reloads after each save
  90        for entry in metadata {
  91            db.save(entry).await?;
  92        }
  93
  94        log::info!("Finished migrating thread store entries");
  95
  96        let _ = store.update(cx, |store, cx| store.reload(cx));
  97        Ok(())
  98    })
  99    .detach_and_log_err(cx);
 100}
 101
 102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
 103impl Global for GlobalThreadMetadataStore {}
 104
 105/// Lightweight metadata for any thread (native or ACP), enough to populate
 106/// the sidebar list and route to the correct load path when clicked.
 107#[derive(Debug, Clone)]
 108pub struct ThreadMetadata {
 109    pub session_id: acp::SessionId,
 110    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 111    pub agent_id: Option<AgentId>,
 112    pub title: SharedString,
 113    pub updated_at: DateTime<Utc>,
 114    pub created_at: Option<DateTime<Utc>>,
 115    pub folder_paths: PathList,
 116}
 117
 118impl ThreadMetadata {
 119    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 120        let session_id = session.session_id.clone();
 121        let title = session.title.clone().unwrap_or_default();
 122        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 123        let created_at = session.created_at.unwrap_or(updated_at);
 124        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 125        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 126            None
 127        } else {
 128            Some(agent_id)
 129        };
 130        Self {
 131            session_id,
 132            agent_id,
 133            title,
 134            updated_at,
 135            created_at: Some(created_at),
 136            folder_paths,
 137        }
 138    }
 139
 140    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
 141        let thread_ref = thread.read(cx);
 142        let session_id = thread_ref.session_id().clone();
 143        let title = thread_ref
 144            .title()
 145            .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 146        let updated_at = Utc::now();
 147
 148        let agent_id = thread_ref.connection().agent_id();
 149
 150        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 151            None
 152        } else {
 153            Some(agent_id)
 154        };
 155
 156        let folder_paths = {
 157            let project = thread_ref.project().read(cx);
 158            let paths: Vec<Arc<Path>> = project
 159                .visible_worktrees(cx)
 160                .map(|worktree| worktree.read(cx).abs_path())
 161                .collect();
 162            PathList::new(&paths)
 163        };
 164
 165        Self {
 166            session_id,
 167            agent_id,
 168            title,
 169            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 170            updated_at,
 171            folder_paths,
 172        }
 173    }
 174}
 175
 176/// The store holds all metadata needed to show threads in the sidebar.
 177/// Effectively, all threads stored in here are "non-archived".
 178///
 179/// Automatically listens to AcpThread events and updates metadata if it has changed.
 180pub struct SidebarThreadMetadataStore {
 181    db: ThreadMetadataDb,
 182    threads: Vec<ThreadMetadata>,
 183    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 184    reload_task: Option<Shared<Task<()>>>,
 185    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 186}
 187
 188impl SidebarThreadMetadataStore {
 189    #[cfg(not(any(test, feature = "test-support")))]
 190    pub fn init_global(cx: &mut App) {
 191        if cx.has_global::<Self>() {
 192            return;
 193        }
 194
 195        let db = ThreadMetadataDb::global(cx);
 196        let thread_store = cx.new(|cx| Self::new(db, cx));
 197        cx.set_global(GlobalThreadMetadataStore(thread_store));
 198    }
 199
 200    #[cfg(any(test, feature = "test-support"))]
 201    pub fn init_global(cx: &mut App) {
 202        let thread = std::thread::current();
 203        let test_name = thread.name().unwrap_or("unknown_test");
 204        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 205        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 206        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 207        cx.set_global(GlobalThreadMetadataStore(thread_store));
 208    }
 209
 210    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 211        cx.try_global::<GlobalThreadMetadataStore>()
 212            .map(|store| store.0.clone())
 213    }
 214
 215    pub fn global(cx: &App) -> Entity<Self> {
 216        cx.global::<GlobalThreadMetadataStore>().0.clone()
 217    }
 218
 219    pub fn is_empty(&self) -> bool {
 220        self.threads.is_empty()
 221    }
 222
 223    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 224        self.threads.iter().cloned()
 225    }
 226
 227    pub fn all_entries(&self) -> &[ThreadMetadata] {
 228        &self.threads
 229    }
 230
 231    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 232        self.threads.iter().map(|thread| thread.session_id.clone())
 233    }
 234
 235    pub fn entries_for_path(
 236        &self,
 237        path_list: &PathList,
 238    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 239        self.threads_by_paths
 240            .get(path_list)
 241            .into_iter()
 242            .flatten()
 243            .cloned()
 244    }
 245
 246    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 247        let db = self.db.clone();
 248        self.reload_task.take();
 249
 250        let list_task = cx
 251            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 252
 253        let reload_task = cx
 254            .spawn(async move |this, cx| {
 255                let Some(rows) = list_task.await.log_err() else {
 256                    return;
 257                };
 258
 259                this.update(cx, |this, cx| {
 260                    this.threads.clear();
 261                    this.threads_by_paths.clear();
 262
 263                    for row in rows {
 264                        this.threads_by_paths
 265                            .entry(row.folder_paths.clone())
 266                            .or_default()
 267                            .push(row.clone());
 268                        this.threads.push(row);
 269                    }
 270
 271                    cx.notify();
 272                })
 273                .ok();
 274            })
 275            .shared();
 276        self.reload_task = Some(reload_task.clone());
 277        reload_task
 278    }
 279
 280    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
 281        if !cx.has_flag::<AgentV2FeatureFlag>() {
 282            return Task::ready(Ok(()));
 283        }
 284
 285        let db = self.db.clone();
 286        cx.spawn(async move |this, cx| {
 287            db.save(metadata).await?;
 288            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 289            reload_task.await;
 290            Ok(())
 291        })
 292    }
 293
 294    pub fn delete(
 295        &mut self,
 296        session_id: acp::SessionId,
 297        cx: &mut Context<Self>,
 298    ) -> Task<Result<()>> {
 299        if !cx.has_flag::<AgentV2FeatureFlag>() {
 300            return Task::ready(Ok(()));
 301        }
 302
 303        let db = self.db.clone();
 304        cx.spawn(async move |this, cx| {
 305            db.delete(session_id).await?;
 306            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 307            reload_task.await;
 308            Ok(())
 309        })
 310    }
 311
 312    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 313        let weak_store = cx.weak_entity();
 314
 315        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 316            // Don't track subagent threads in the sidebar.
 317            if thread.parent_session_id().is_some() {
 318                return;
 319            }
 320
 321            let thread_entity = cx.entity();
 322
 323            cx.on_release({
 324                let weak_store = weak_store.clone();
 325                move |thread, cx| {
 326                    weak_store
 327                        .update(cx, |store, _cx| {
 328                            store.session_subscriptions.remove(thread.session_id());
 329                        })
 330                        .ok();
 331                }
 332            })
 333            .detach();
 334
 335            weak_store
 336                .update(cx, |this, cx| {
 337                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 338                    this.session_subscriptions
 339                        .insert(thread.session_id().clone(), subscription);
 340                })
 341                .ok();
 342        })
 343        .detach();
 344
 345        let mut this = Self {
 346            db,
 347            threads: Vec::new(),
 348            threads_by_paths: HashMap::default(),
 349            reload_task: None,
 350            session_subscriptions: HashMap::default(),
 351        };
 352        let _ = this.reload(cx);
 353        this
 354    }
 355
 356    fn handle_thread_update(
 357        &mut self,
 358        thread: Entity<acp_thread::AcpThread>,
 359        event: &acp_thread::AcpThreadEvent,
 360        cx: &mut Context<Self>,
 361    ) {
 362        // Don't track subagent threads in the sidebar.
 363        if thread.read(cx).parent_session_id().is_some() {
 364            return;
 365        }
 366
 367        match event {
 368            acp_thread::AcpThreadEvent::NewEntry
 369            | acp_thread::AcpThreadEvent::TitleUpdated
 370            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 371            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 372            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 373            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 374            | acp_thread::AcpThreadEvent::Retry(_)
 375            | acp_thread::AcpThreadEvent::Stopped(_)
 376            | acp_thread::AcpThreadEvent::Error
 377            | acp_thread::AcpThreadEvent::LoadError(_)
 378            | acp_thread::AcpThreadEvent::Refusal => {
 379                let metadata = ThreadMetadata::from_thread(&thread, cx);
 380                self.save(metadata, cx).detach_and_log_err(cx);
 381            }
 382            _ => {}
 383        }
 384    }
 385}
 386
 387impl Global for SidebarThreadMetadataStore {}
 388
 389struct ThreadMetadataDb(ThreadSafeConnection);
 390
 391impl Domain for ThreadMetadataDb {
 392    const NAME: &str = stringify!(ThreadMetadataDb);
 393
 394    const MIGRATIONS: &[&str] = &[sql!(
 395        CREATE TABLE IF NOT EXISTS sidebar_threads(
 396            session_id TEXT PRIMARY KEY,
 397            agent_id TEXT,
 398            title TEXT NOT NULL,
 399            updated_at TEXT NOT NULL,
 400            created_at TEXT,
 401            folder_paths TEXT,
 402            folder_paths_order TEXT
 403        ) STRICT;
 404    )];
 405}
 406
 407db::static_connection!(ThreadMetadataDb, []);
 408
 409impl ThreadMetadataDb {
 410    pub fn is_empty(&self) -> anyhow::Result<bool> {
 411        self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
 412            .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
 413    }
 414
 415    /// List all sidebar thread metadata, ordered by updated_at descending.
 416    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 417        self.select::<ThreadMetadata>(
 418            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
 419             FROM sidebar_threads \
 420             ORDER BY updated_at DESC"
 421        )?()
 422    }
 423
 424    /// Upsert metadata for a thread.
 425    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 426        let id = row.session_id.0.clone();
 427        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
 428        let title = row.title.to_string();
 429        let updated_at = row.updated_at.to_rfc3339();
 430        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 431        let serialized = row.folder_paths.serialize();
 432        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 433            (None, None)
 434        } else {
 435            (Some(serialized.paths), Some(serialized.order))
 436        };
 437
 438        self.write(move |conn| {
 439            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
 440                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
 441                       ON CONFLICT(session_id) DO UPDATE SET \
 442                           agent_id = excluded.agent_id, \
 443                           title = excluded.title, \
 444                           updated_at = excluded.updated_at, \
 445                           folder_paths = excluded.folder_paths, \
 446                           folder_paths_order = excluded.folder_paths_order";
 447            let mut stmt = Statement::prepare(conn, sql)?;
 448            let mut i = stmt.bind(&id, 1)?;
 449            i = stmt.bind(&agent_id, i)?;
 450            i = stmt.bind(&title, i)?;
 451            i = stmt.bind(&updated_at, i)?;
 452            i = stmt.bind(&created_at, i)?;
 453            i = stmt.bind(&folder_paths, i)?;
 454            stmt.bind(&folder_paths_order, i)?;
 455            stmt.exec()
 456        })
 457        .await
 458    }
 459
 460    /// Delete metadata for a single thread.
 461    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 462        let id = session_id.0.clone();
 463        self.write(move |conn| {
 464            let mut stmt =
 465                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 466            stmt.bind(&id, 1)?;
 467            stmt.exec()
 468        })
 469        .await
 470    }
 471}
 472
 473impl Column for ThreadMetadata {
 474    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 475        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 476        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 477        let (title, next): (String, i32) = Column::column(statement, next)?;
 478        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 479        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 480        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 481        let (folder_paths_order_str, next): (Option<String>, i32) =
 482            Column::column(statement, next)?;
 483
 484        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 485        let created_at = created_at_str
 486            .as_deref()
 487            .map(DateTime::parse_from_rfc3339)
 488            .transpose()?
 489            .map(|dt| dt.with_timezone(&Utc));
 490
 491        let folder_paths = folder_paths_str
 492            .map(|paths| {
 493                PathList::deserialize(&util::path_list::SerializedPathList {
 494                    paths,
 495                    order: folder_paths_order_str.unwrap_or_default(),
 496                })
 497            })
 498            .unwrap_or_default();
 499
 500        Ok((
 501            ThreadMetadata {
 502                session_id: acp::SessionId::new(id),
 503                agent_id: agent_id.map(|id| AgentId::new(id)),
 504                title: title.into(),
 505                updated_at,
 506                created_at,
 507                folder_paths,
 508            },
 509            next,
 510        ))
 511    }
 512}
 513
 514#[cfg(test)]
 515mod tests {
 516    use super::*;
 517    use acp_thread::{AgentConnection, StubAgentConnection};
 518    use action_log::ActionLog;
 519    use agent::DbThread;
 520    use agent_client_protocol as acp;
 521    use feature_flags::FeatureFlagAppExt;
 522    use gpui::TestAppContext;
 523    use project::FakeFs;
 524    use project::Project;
 525    use std::path::Path;
 526    use std::rc::Rc;
 527
 528    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 529        DbThread {
 530            title: title.to_string().into(),
 531            messages: Vec::new(),
 532            updated_at,
 533            detailed_summary: None,
 534            initial_project_snapshot: None,
 535            cumulative_token_usage: Default::default(),
 536            request_token_usage: Default::default(),
 537            model: None,
 538            profile: None,
 539            imported: false,
 540            subagent_context: None,
 541            speed: None,
 542            thinking_enabled: false,
 543            thinking_effort: None,
 544            draft_prompt: None,
 545            ui_scroll_position: None,
 546        }
 547    }
 548
 549    fn make_metadata(
 550        session_id: &str,
 551        title: &str,
 552        updated_at: DateTime<Utc>,
 553        folder_paths: PathList,
 554    ) -> ThreadMetadata {
 555        ThreadMetadata {
 556            session_id: acp::SessionId::new(session_id),
 557            agent_id: None,
 558            title: title.to_string().into(),
 559            updated_at,
 560            created_at: Some(updated_at),
 561            folder_paths,
 562        }
 563    }
 564
 565    #[gpui::test]
 566    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 567        let first_paths = PathList::new(&[Path::new("/project-a")]);
 568        let second_paths = PathList::new(&[Path::new("/project-b")]);
 569        let now = Utc::now();
 570        let older = now - chrono::Duration::seconds(1);
 571
 572        let thread = std::thread::current();
 573        let test_name = thread.name().unwrap_or("unknown_test");
 574        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 575        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 576            &db_name,
 577        )));
 578
 579        db.save(make_metadata(
 580            "session-1",
 581            "First Thread",
 582            now,
 583            first_paths.clone(),
 584        ))
 585        .await
 586        .unwrap();
 587        db.save(make_metadata(
 588            "session-2",
 589            "Second Thread",
 590            older,
 591            second_paths.clone(),
 592        ))
 593        .await
 594        .unwrap();
 595
 596        cx.update(|cx| {
 597            let settings_store = settings::SettingsStore::test(cx);
 598            cx.set_global(settings_store);
 599            cx.update_flags(true, vec!["agent-v2".to_string()]);
 600            SidebarThreadMetadataStore::init_global(cx);
 601        });
 602
 603        cx.run_until_parked();
 604
 605        cx.update(|cx| {
 606            let store = SidebarThreadMetadataStore::global(cx);
 607            let store = store.read(cx);
 608
 609            let entry_ids = store
 610                .entry_ids()
 611                .map(|session_id| session_id.0.to_string())
 612                .collect::<Vec<_>>();
 613            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 614
 615            let first_path_entries = store
 616                .entries_for_path(&first_paths)
 617                .map(|entry| entry.session_id.0.to_string())
 618                .collect::<Vec<_>>();
 619            assert_eq!(first_path_entries, vec!["session-1"]);
 620
 621            let second_path_entries = store
 622                .entries_for_path(&second_paths)
 623                .map(|entry| entry.session_id.0.to_string())
 624                .collect::<Vec<_>>();
 625            assert_eq!(second_path_entries, vec!["session-2"]);
 626        });
 627    }
 628
 629    #[gpui::test]
 630    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 631        cx.update(|cx| {
 632            let settings_store = settings::SettingsStore::test(cx);
 633            cx.set_global(settings_store);
 634            cx.update_flags(true, vec!["agent-v2".to_string()]);
 635            SidebarThreadMetadataStore::init_global(cx);
 636        });
 637
 638        let first_paths = PathList::new(&[Path::new("/project-a")]);
 639        let second_paths = PathList::new(&[Path::new("/project-b")]);
 640        let initial_time = Utc::now();
 641        let updated_time = initial_time + chrono::Duration::seconds(1);
 642
 643        let initial_metadata = make_metadata(
 644            "session-1",
 645            "First Thread",
 646            initial_time,
 647            first_paths.clone(),
 648        );
 649
 650        let second_metadata = make_metadata(
 651            "session-2",
 652            "Second Thread",
 653            initial_time,
 654            second_paths.clone(),
 655        );
 656
 657        cx.update(|cx| {
 658            let store = SidebarThreadMetadataStore::global(cx);
 659            store.update(cx, |store, cx| {
 660                store.save(initial_metadata, cx).detach();
 661                store.save(second_metadata, cx).detach();
 662            });
 663        });
 664
 665        cx.run_until_parked();
 666
 667        cx.update(|cx| {
 668            let store = SidebarThreadMetadataStore::global(cx);
 669            let store = store.read(cx);
 670
 671            let first_path_entries = store
 672                .entries_for_path(&first_paths)
 673                .map(|entry| entry.session_id.0.to_string())
 674                .collect::<Vec<_>>();
 675            assert_eq!(first_path_entries, vec!["session-1"]);
 676
 677            let second_path_entries = store
 678                .entries_for_path(&second_paths)
 679                .map(|entry| entry.session_id.0.to_string())
 680                .collect::<Vec<_>>();
 681            assert_eq!(second_path_entries, vec!["session-2"]);
 682        });
 683
 684        let moved_metadata = make_metadata(
 685            "session-1",
 686            "First Thread",
 687            updated_time,
 688            second_paths.clone(),
 689        );
 690
 691        cx.update(|cx| {
 692            let store = SidebarThreadMetadataStore::global(cx);
 693            store.update(cx, |store, cx| {
 694                store.save(moved_metadata, cx).detach();
 695            });
 696        });
 697
 698        cx.run_until_parked();
 699
 700        cx.update(|cx| {
 701            let store = SidebarThreadMetadataStore::global(cx);
 702            let store = store.read(cx);
 703
 704            let entry_ids = store
 705                .entry_ids()
 706                .map(|session_id| session_id.0.to_string())
 707                .collect::<Vec<_>>();
 708            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 709
 710            let first_path_entries = store
 711                .entries_for_path(&first_paths)
 712                .map(|entry| entry.session_id.0.to_string())
 713                .collect::<Vec<_>>();
 714            assert!(first_path_entries.is_empty());
 715
 716            let second_path_entries = store
 717                .entries_for_path(&second_paths)
 718                .map(|entry| entry.session_id.0.to_string())
 719                .collect::<Vec<_>>();
 720            assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
 721        });
 722
 723        cx.update(|cx| {
 724            let store = SidebarThreadMetadataStore::global(cx);
 725            store.update(cx, |store, cx| {
 726                store.delete(acp::SessionId::new("session-2"), cx).detach();
 727            });
 728        });
 729
 730        cx.run_until_parked();
 731
 732        cx.update(|cx| {
 733            let store = SidebarThreadMetadataStore::global(cx);
 734            let store = store.read(cx);
 735
 736            let entry_ids = store
 737                .entry_ids()
 738                .map(|session_id| session_id.0.to_string())
 739                .collect::<Vec<_>>();
 740            assert_eq!(entry_ids, vec!["session-1"]);
 741
 742            let second_path_entries = store
 743                .entries_for_path(&second_paths)
 744                .map(|entry| entry.session_id.0.to_string())
 745                .collect::<Vec<_>>();
 746            assert_eq!(second_path_entries, vec!["session-1"]);
 747        });
 748    }
 749
 750    #[gpui::test]
 751    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
 752        cx.update(|cx| {
 753            ThreadStore::init_global(cx);
 754            SidebarThreadMetadataStore::init_global(cx);
 755        });
 756
 757        // Verify the cache is empty before migration
 758        let list = cx.update(|cx| {
 759            let store = SidebarThreadMetadataStore::global(cx);
 760            store.read(cx).entries().collect::<Vec<_>>()
 761        });
 762        assert_eq!(list.len(), 0);
 763
 764        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 765        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 766        let now = Utc::now();
 767
 768        for index in 0..12 {
 769            let updated_at = now + chrono::Duration::seconds(index as i64);
 770            let session_id = format!("project-a-session-{index}");
 771            let title = format!("Project A Thread {index}");
 772
 773            let save_task = cx.update(|cx| {
 774                let thread_store = ThreadStore::global(cx);
 775                let session_id = session_id.clone();
 776                let title = title.clone();
 777                let project_a_paths = project_a_paths.clone();
 778                thread_store.update(cx, |store, cx| {
 779                    store.save_thread(
 780                        acp::SessionId::new(session_id),
 781                        make_db_thread(&title, updated_at),
 782                        project_a_paths,
 783                        cx,
 784                    )
 785                })
 786            });
 787            save_task.await.unwrap();
 788            cx.run_until_parked();
 789        }
 790
 791        for index in 0..3 {
 792            let updated_at = now + chrono::Duration::seconds(100 + index as i64);
 793            let session_id = format!("project-b-session-{index}");
 794            let title = format!("Project B Thread {index}");
 795
 796            let save_task = cx.update(|cx| {
 797                let thread_store = ThreadStore::global(cx);
 798                let session_id = session_id.clone();
 799                let title = title.clone();
 800                let project_b_paths = project_b_paths.clone();
 801                thread_store.update(cx, |store, cx| {
 802                    store.save_thread(
 803                        acp::SessionId::new(session_id),
 804                        make_db_thread(&title, updated_at),
 805                        project_b_paths,
 806                        cx,
 807                    )
 808                })
 809            });
 810            save_task.await.unwrap();
 811            cx.run_until_parked();
 812        }
 813
 814        let save_projectless = cx.update(|cx| {
 815            let thread_store = ThreadStore::global(cx);
 816            thread_store.update(cx, |store, cx| {
 817                store.save_thread(
 818                    acp::SessionId::new("projectless-session"),
 819                    make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
 820                    PathList::default(),
 821                    cx,
 822                )
 823            })
 824        });
 825        save_projectless.await.unwrap();
 826        cx.run_until_parked();
 827
 828        // Run migration
 829        cx.update(|cx| {
 830            migrate_thread_metadata(cx);
 831        });
 832
 833        cx.run_until_parked();
 834
 835        // Verify the metadata was migrated, limited to 10 per project, and
 836        // projectless threads were skipped.
 837        let list = cx.update(|cx| {
 838            let store = SidebarThreadMetadataStore::global(cx);
 839            store.read(cx).entries().collect::<Vec<_>>()
 840        });
 841        assert_eq!(list.len(), 13);
 842
 843        assert!(
 844            list.iter()
 845                .all(|metadata| !metadata.folder_paths.is_empty())
 846        );
 847        assert!(
 848            list.iter()
 849                .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
 850        );
 851
 852        let project_a_entries = list
 853            .iter()
 854            .filter(|metadata| metadata.folder_paths == project_a_paths)
 855            .collect::<Vec<_>>();
 856        assert_eq!(project_a_entries.len(), 10);
 857        assert_eq!(
 858            project_a_entries
 859                .iter()
 860                .map(|metadata| metadata.session_id.0.as_ref())
 861                .collect::<Vec<_>>(),
 862            vec![
 863                "project-a-session-11",
 864                "project-a-session-10",
 865                "project-a-session-9",
 866                "project-a-session-8",
 867                "project-a-session-7",
 868                "project-a-session-6",
 869                "project-a-session-5",
 870                "project-a-session-4",
 871                "project-a-session-3",
 872                "project-a-session-2",
 873            ]
 874        );
 875        assert!(
 876            project_a_entries
 877                .iter()
 878                .all(|metadata| metadata.agent_id.is_none())
 879        );
 880
 881        let project_b_entries = list
 882            .iter()
 883            .filter(|metadata| metadata.folder_paths == project_b_paths)
 884            .collect::<Vec<_>>();
 885        assert_eq!(project_b_entries.len(), 3);
 886        assert_eq!(
 887            project_b_entries
 888                .iter()
 889                .map(|metadata| metadata.session_id.0.as_ref())
 890                .collect::<Vec<_>>(),
 891            vec![
 892                "project-b-session-2",
 893                "project-b-session-1",
 894                "project-b-session-0",
 895            ]
 896        );
 897        assert!(
 898            project_b_entries
 899                .iter()
 900                .all(|metadata| metadata.agent_id.is_none())
 901        );
 902    }
 903
 904    #[gpui::test]
 905    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
 906        cx.update(|cx| {
 907            ThreadStore::init_global(cx);
 908            SidebarThreadMetadataStore::init_global(cx);
 909        });
 910
 911        // Pre-populate the metadata store with existing data
 912        let existing_metadata = ThreadMetadata {
 913            session_id: acp::SessionId::new("existing-session"),
 914            agent_id: None,
 915            title: "Existing Thread".into(),
 916            updated_at: Utc::now(),
 917            created_at: Some(Utc::now()),
 918            folder_paths: PathList::default(),
 919        };
 920
 921        cx.update(|cx| {
 922            let store = SidebarThreadMetadataStore::global(cx);
 923            store.update(cx, |store, cx| {
 924                store.save(existing_metadata, cx).detach();
 925            });
 926        });
 927
 928        cx.run_until_parked();
 929
 930        // Add an entry to native thread store that should NOT be migrated
 931        let save_task = cx.update(|cx| {
 932            let thread_store = ThreadStore::global(cx);
 933            thread_store.update(cx, |store, cx| {
 934                store.save_thread(
 935                    acp::SessionId::new("native-session"),
 936                    make_db_thread("Native Thread", Utc::now()),
 937                    PathList::default(),
 938                    cx,
 939                )
 940            })
 941        });
 942        save_task.await.unwrap();
 943        cx.run_until_parked();
 944
 945        // Run migration - should skip because metadata store is not empty
 946        cx.update(|cx| {
 947            migrate_thread_metadata(cx);
 948        });
 949
 950        cx.run_until_parked();
 951
 952        // Verify only the existing metadata is present (migration was skipped)
 953        let list = cx.update(|cx| {
 954            let store = SidebarThreadMetadataStore::global(cx);
 955            store.read(cx).entries().collect::<Vec<_>>()
 956        });
 957        assert_eq!(list.len(), 1);
 958        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
 959    }
 960
 961    #[gpui::test]
 962    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
 963        cx.update(|cx| {
 964            let settings_store = settings::SettingsStore::test(cx);
 965            cx.set_global(settings_store);
 966            cx.update_flags(true, vec!["agent-v2".to_string()]);
 967            ThreadStore::init_global(cx);
 968            SidebarThreadMetadataStore::init_global(cx);
 969        });
 970
 971        let fs = FakeFs::new(cx.executor());
 972        let project = Project::test(fs, None::<&Path>, cx).await;
 973        let connection = Rc::new(StubAgentConnection::new());
 974
 975        // Create a regular (non-subagent) AcpThread.
 976        let regular_thread = cx
 977            .update(|cx| {
 978                connection
 979                    .clone()
 980                    .new_session(project.clone(), PathList::default(), cx)
 981            })
 982            .await
 983            .unwrap();
 984
 985        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
 986
 987        // Set a title on the regular thread to trigger a save via handle_thread_update.
 988        cx.update(|cx| {
 989            regular_thread.update(cx, |thread, cx| {
 990                thread.set_title("Regular Thread".into(), cx).detach();
 991            });
 992        });
 993        cx.run_until_parked();
 994
 995        // Create a subagent AcpThread
 996        let subagent_session_id = acp::SessionId::new("subagent-session");
 997        let subagent_thread = cx.update(|cx| {
 998            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 999            cx.new(|cx| {
1000                acp_thread::AcpThread::new(
1001                    Some(regular_session_id.clone()),
1002                    Some("Subagent Thread".into()),
1003                    None,
1004                    connection.clone(),
1005                    project.clone(),
1006                    action_log,
1007                    subagent_session_id.clone(),
1008                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1009                    cx,
1010                )
1011            })
1012        });
1013
1014        // Set a title on the subagent thread to trigger handle_thread_update.
1015        cx.update(|cx| {
1016            subagent_thread.update(cx, |thread, cx| {
1017                thread
1018                    .set_title("Subagent Thread Title".into(), cx)
1019                    .detach();
1020            });
1021        });
1022        cx.run_until_parked();
1023
1024        // List all metadata from the store cache.
1025        let list = cx.update(|cx| {
1026            let store = SidebarThreadMetadataStore::global(cx);
1027            store.read(cx).entries().collect::<Vec<_>>()
1028        });
1029
1030        // The subagent thread should NOT appear in the sidebar metadata.
1031        // Only the regular thread should be listed.
1032        assert_eq!(
1033            list.len(),
1034            1,
1035            "Expected only the regular thread in sidebar metadata, \
1036             but found {} entries (subagent threads are leaking into the sidebar)",
1037            list.len(),
1038        );
1039        assert_eq!(list[0].session_id, regular_session_id);
1040        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1041    }
1042}