thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AgentSessionInfo;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::{Context as _, Result};
   7use chrono::{DateTime, Utc};
   8use collections::HashMap;
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24pub fn init(cx: &mut App) {
  25    SidebarThreadMetadataStore::init_global(cx);
  26
  27    if cx.has_flag::<AgentV2FeatureFlag>() {
  28        migrate_thread_metadata(cx);
  29    }
  30    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  31        if has_flag {
  32            migrate_thread_metadata(cx);
  33        }
  34    })
  35    .detach();
  36}
  37
  38/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  39/// We migrate the last 10 threads per project and skip threads that do not have a project.
  40///
  41/// TODO: Remove this after N weeks of shipping the sidebar
  42fn migrate_thread_metadata(cx: &mut App) {
  43    const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
  44
  45    let store = SidebarThreadMetadataStore::global(cx);
  46    let db = store.read(cx).db.clone();
  47
  48    cx.spawn(async move |cx| {
  49        if !db.is_empty()? {
  50            return Ok::<(), anyhow::Error>(());
  51        }
  52
  53        let metadata = store.read_with(cx, |_store, app| {
  54            let mut migrated_threads_per_project = HashMap::default();
  55
  56            ThreadStore::global(app)
  57                .read(app)
  58                .entries()
  59                .filter_map(|entry| {
  60                    if entry.folder_paths.is_empty() {
  61                        return None;
  62                    }
  63
  64                    let migrated_thread_count = migrated_threads_per_project
  65                        .entry(entry.folder_paths.clone())
  66                        .or_insert(0);
  67                    if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
  68                        return None;
  69                    }
  70                    *migrated_thread_count += 1;
  71
  72                    Some(ThreadMetadata {
  73                        session_id: entry.id,
  74                        agent_id: None,
  75                        title: entry.title,
  76                        updated_at: entry.updated_at,
  77                        created_at: entry.created_at,
  78                        folder_paths: entry.folder_paths,
  79                    })
  80                })
  81                .collect::<Vec<_>>()
  82        });
  83
  84        // Manually save each entry to the database and call reload, otherwise
  85        // we'll end up triggering lots of reloads after each save
  86        for entry in metadata {
  87            db.save(entry).await?;
  88        }
  89
  90        let _ = store.update(cx, |store, cx| store.reload(cx));
  91        Ok(())
  92    })
  93    .detach_and_log_err(cx);
  94}
  95
  96struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
  97impl Global for GlobalThreadMetadataStore {}
  98
  99/// Lightweight metadata for any thread (native or ACP), enough to populate
 100/// the sidebar list and route to the correct load path when clicked.
 101#[derive(Debug, Clone)]
 102pub struct ThreadMetadata {
 103    pub session_id: acp::SessionId,
 104    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 105    pub agent_id: Option<AgentId>,
 106    pub title: SharedString,
 107    pub updated_at: DateTime<Utc>,
 108    pub created_at: Option<DateTime<Utc>>,
 109    pub folder_paths: PathList,
 110}
 111
 112impl ThreadMetadata {
 113    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 114        let session_id = session.session_id.clone();
 115        let title = session.title.clone().unwrap_or_default();
 116        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 117        let created_at = session.created_at.unwrap_or(updated_at);
 118        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 119        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 120            None
 121        } else {
 122            Some(agent_id)
 123        };
 124        Self {
 125            session_id,
 126            agent_id,
 127            title,
 128            updated_at,
 129            created_at: Some(created_at),
 130            folder_paths,
 131        }
 132    }
 133
 134    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
 135        let thread_ref = thread.read(cx);
 136        let session_id = thread_ref.session_id().clone();
 137        let title = thread_ref.title();
 138        let updated_at = Utc::now();
 139
 140        let agent_id = thread_ref.connection().agent_id();
 141
 142        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 143            None
 144        } else {
 145            Some(agent_id)
 146        };
 147
 148        let folder_paths = {
 149            let project = thread_ref.project().read(cx);
 150            let paths: Vec<Arc<Path>> = project
 151                .visible_worktrees(cx)
 152                .map(|worktree| worktree.read(cx).abs_path())
 153                .collect();
 154            PathList::new(&paths)
 155        };
 156
 157        Self {
 158            session_id,
 159            agent_id,
 160            title,
 161            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 162            updated_at,
 163            folder_paths,
 164        }
 165    }
 166}
 167
 168/// The store holds all metadata needed to show threads in the sidebar.
 169/// Effectively, all threads stored in here are "non-archived".
 170///
 171/// Automatically listens to AcpThread events and updates metadata if it has changed.
 172pub struct SidebarThreadMetadataStore {
 173    db: ThreadMetadataDb,
 174    threads: Vec<ThreadMetadata>,
 175    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 176    reload_task: Option<Shared<Task<()>>>,
 177    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 178}
 179
 180impl SidebarThreadMetadataStore {
 181    #[cfg(not(any(test, feature = "test-support")))]
 182    pub fn init_global(cx: &mut App) {
 183        if cx.has_global::<Self>() {
 184            return;
 185        }
 186
 187        let db = ThreadMetadataDb::global(cx);
 188        let thread_store = cx.new(|cx| Self::new(db, cx));
 189        cx.set_global(GlobalThreadMetadataStore(thread_store));
 190    }
 191
 192    #[cfg(any(test, feature = "test-support"))]
 193    pub fn init_global(cx: &mut App) {
 194        let thread = std::thread::current();
 195        let test_name = thread.name().unwrap_or("unknown_test");
 196        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 197        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 198        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 199        cx.set_global(GlobalThreadMetadataStore(thread_store));
 200    }
 201
 202    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 203        cx.try_global::<GlobalThreadMetadataStore>()
 204            .map(|store| store.0.clone())
 205    }
 206
 207    pub fn global(cx: &App) -> Entity<Self> {
 208        cx.global::<GlobalThreadMetadataStore>().0.clone()
 209    }
 210
 211    pub fn is_empty(&self) -> bool {
 212        self.threads.is_empty()
 213    }
 214
 215    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 216        self.threads.iter().cloned()
 217    }
 218
 219    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 220        self.threads.iter().map(|thread| thread.session_id.clone())
 221    }
 222
 223    pub fn entries_for_path(
 224        &self,
 225        path_list: &PathList,
 226    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 227        self.threads_by_paths
 228            .get(path_list)
 229            .into_iter()
 230            .flatten()
 231            .cloned()
 232    }
 233
 234    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 235        let db = self.db.clone();
 236        self.reload_task.take();
 237
 238        let list_task = cx
 239            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 240
 241        let reload_task = cx
 242            .spawn(async move |this, cx| {
 243                let Some(rows) = list_task.await.log_err() else {
 244                    return;
 245                };
 246
 247                this.update(cx, |this, cx| {
 248                    this.threads.clear();
 249                    this.threads_by_paths.clear();
 250
 251                    for row in rows {
 252                        this.threads_by_paths
 253                            .entry(row.folder_paths.clone())
 254                            .or_default()
 255                            .push(row.clone());
 256                        this.threads.push(row);
 257                    }
 258
 259                    cx.notify();
 260                })
 261                .ok();
 262            })
 263            .shared();
 264        self.reload_task = Some(reload_task.clone());
 265        reload_task
 266    }
 267
 268    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
 269        if !cx.has_flag::<AgentV2FeatureFlag>() {
 270            return Task::ready(Ok(()));
 271        }
 272
 273        let db = self.db.clone();
 274        cx.spawn(async move |this, cx| {
 275            db.save(metadata).await?;
 276            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 277            reload_task.await;
 278            Ok(())
 279        })
 280    }
 281
 282    pub fn delete(
 283        &mut self,
 284        session_id: acp::SessionId,
 285        cx: &mut Context<Self>,
 286    ) -> Task<Result<()>> {
 287        if !cx.has_flag::<AgentV2FeatureFlag>() {
 288            return Task::ready(Ok(()));
 289        }
 290
 291        let db = self.db.clone();
 292        cx.spawn(async move |this, cx| {
 293            db.delete(session_id).await?;
 294            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 295            reload_task.await;
 296            Ok(())
 297        })
 298    }
 299
 300    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 301        let weak_store = cx.weak_entity();
 302
 303        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 304            // Don't track subagent threads in the sidebar.
 305            if thread.parent_session_id().is_some() {
 306                return;
 307            }
 308
 309            let thread_entity = cx.entity();
 310
 311            cx.on_release({
 312                let weak_store = weak_store.clone();
 313                move |thread, cx| {
 314                    weak_store
 315                        .update(cx, |store, _cx| {
 316                            store.session_subscriptions.remove(thread.session_id());
 317                        })
 318                        .ok();
 319                }
 320            })
 321            .detach();
 322
 323            weak_store
 324                .update(cx, |this, cx| {
 325                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 326                    this.session_subscriptions
 327                        .insert(thread.session_id().clone(), subscription);
 328                })
 329                .ok();
 330        })
 331        .detach();
 332
 333        let mut this = Self {
 334            db,
 335            threads: Vec::new(),
 336            threads_by_paths: HashMap::default(),
 337            reload_task: None,
 338            session_subscriptions: HashMap::default(),
 339        };
 340        let _ = this.reload(cx);
 341        this
 342    }
 343
 344    fn handle_thread_update(
 345        &mut self,
 346        thread: Entity<acp_thread::AcpThread>,
 347        event: &acp_thread::AcpThreadEvent,
 348        cx: &mut Context<Self>,
 349    ) {
 350        // Don't track subagent threads in the sidebar.
 351        if thread.read(cx).parent_session_id().is_some() {
 352            return;
 353        }
 354
 355        match event {
 356            acp_thread::AcpThreadEvent::NewEntry
 357            | acp_thread::AcpThreadEvent::TitleUpdated
 358            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 359            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 360            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 361            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 362            | acp_thread::AcpThreadEvent::Retry(_)
 363            | acp_thread::AcpThreadEvent::Stopped(_)
 364            | acp_thread::AcpThreadEvent::Error
 365            | acp_thread::AcpThreadEvent::LoadError(_)
 366            | acp_thread::AcpThreadEvent::Refusal => {
 367                let metadata = ThreadMetadata::from_thread(&thread, cx);
 368                self.save(metadata, cx).detach_and_log_err(cx);
 369            }
 370            _ => {}
 371        }
 372    }
 373}
 374
 375impl Global for SidebarThreadMetadataStore {}
 376
 377struct ThreadMetadataDb(ThreadSafeConnection);
 378
 379impl Domain for ThreadMetadataDb {
 380    const NAME: &str = stringify!(ThreadMetadataDb);
 381
 382    const MIGRATIONS: &[&str] = &[sql!(
 383        CREATE TABLE IF NOT EXISTS sidebar_threads(
 384            session_id TEXT PRIMARY KEY,
 385            agent_id TEXT,
 386            title TEXT NOT NULL,
 387            updated_at TEXT NOT NULL,
 388            created_at TEXT,
 389            folder_paths TEXT,
 390            folder_paths_order TEXT
 391        ) STRICT;
 392    )];
 393}
 394
 395db::static_connection!(ThreadMetadataDb, []);
 396
 397impl ThreadMetadataDb {
 398    pub fn is_empty(&self) -> anyhow::Result<bool> {
 399        self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
 400            .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
 401    }
 402
 403    /// List all sidebar thread metadata, ordered by updated_at descending.
 404    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 405        self.select::<ThreadMetadata>(
 406            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
 407             FROM sidebar_threads \
 408             ORDER BY updated_at DESC"
 409        )?()
 410    }
 411
 412    /// Upsert metadata for a thread.
 413    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 414        let id = row.session_id.0.clone();
 415        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
 416        let title = row.title.to_string();
 417        let updated_at = row.updated_at.to_rfc3339();
 418        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 419        let serialized = row.folder_paths.serialize();
 420        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 421            (None, None)
 422        } else {
 423            (Some(serialized.paths), Some(serialized.order))
 424        };
 425
 426        self.write(move |conn| {
 427            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
 428                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
 429                       ON CONFLICT(session_id) DO UPDATE SET \
 430                           agent_id = excluded.agent_id, \
 431                           title = excluded.title, \
 432                           updated_at = excluded.updated_at, \
 433                           folder_paths = excluded.folder_paths, \
 434                           folder_paths_order = excluded.folder_paths_order";
 435            let mut stmt = Statement::prepare(conn, sql)?;
 436            let mut i = stmt.bind(&id, 1)?;
 437            i = stmt.bind(&agent_id, i)?;
 438            i = stmt.bind(&title, i)?;
 439            i = stmt.bind(&updated_at, i)?;
 440            i = stmt.bind(&created_at, i)?;
 441            i = stmt.bind(&folder_paths, i)?;
 442            stmt.bind(&folder_paths_order, i)?;
 443            stmt.exec()
 444        })
 445        .await
 446    }
 447
 448    /// Delete metadata for a single thread.
 449    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 450        let id = session_id.0.clone();
 451        self.write(move |conn| {
 452            let mut stmt =
 453                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 454            stmt.bind(&id, 1)?;
 455            stmt.exec()
 456        })
 457        .await
 458    }
 459}
 460
 461impl Column for ThreadMetadata {
 462    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 463        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 464        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 465        let (title, next): (String, i32) = Column::column(statement, next)?;
 466        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 467        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 468        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 469        let (folder_paths_order_str, next): (Option<String>, i32) =
 470            Column::column(statement, next)?;
 471
 472        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 473        let created_at = created_at_str
 474            .as_deref()
 475            .map(DateTime::parse_from_rfc3339)
 476            .transpose()?
 477            .map(|dt| dt.with_timezone(&Utc));
 478
 479        let folder_paths = folder_paths_str
 480            .map(|paths| {
 481                PathList::deserialize(&util::path_list::SerializedPathList {
 482                    paths,
 483                    order: folder_paths_order_str.unwrap_or_default(),
 484                })
 485            })
 486            .unwrap_or_default();
 487
 488        Ok((
 489            ThreadMetadata {
 490                session_id: acp::SessionId::new(id),
 491                agent_id: agent_id.map(|id| AgentId::new(id)),
 492                title: title.into(),
 493                updated_at,
 494                created_at,
 495                folder_paths,
 496            },
 497            next,
 498        ))
 499    }
 500}
 501
 502#[cfg(test)]
 503mod tests {
 504    use super::*;
 505    use acp_thread::{AgentConnection, StubAgentConnection};
 506    use action_log::ActionLog;
 507    use agent::DbThread;
 508    use agent_client_protocol as acp;
 509    use feature_flags::FeatureFlagAppExt;
 510    use gpui::TestAppContext;
 511    use project::FakeFs;
 512    use project::Project;
 513    use std::path::Path;
 514    use std::rc::Rc;
 515
 516    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 517        DbThread {
 518            title: title.to_string().into(),
 519            messages: Vec::new(),
 520            updated_at,
 521            detailed_summary: None,
 522            initial_project_snapshot: None,
 523            cumulative_token_usage: Default::default(),
 524            request_token_usage: Default::default(),
 525            model: None,
 526            profile: None,
 527            imported: false,
 528            subagent_context: None,
 529            speed: None,
 530            thinking_enabled: false,
 531            thinking_effort: None,
 532            draft_prompt: None,
 533            ui_scroll_position: None,
 534        }
 535    }
 536
 537    fn make_metadata(
 538        session_id: &str,
 539        title: &str,
 540        updated_at: DateTime<Utc>,
 541        folder_paths: PathList,
 542    ) -> ThreadMetadata {
 543        ThreadMetadata {
 544            session_id: acp::SessionId::new(session_id),
 545            agent_id: None,
 546            title: title.to_string().into(),
 547            updated_at,
 548            created_at: Some(updated_at),
 549            folder_paths,
 550        }
 551    }
 552
 553    #[gpui::test]
 554    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 555        let first_paths = PathList::new(&[Path::new("/project-a")]);
 556        let second_paths = PathList::new(&[Path::new("/project-b")]);
 557        let now = Utc::now();
 558        let older = now - chrono::Duration::seconds(1);
 559
 560        let thread = std::thread::current();
 561        let test_name = thread.name().unwrap_or("unknown_test");
 562        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 563        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 564            &db_name,
 565        )));
 566
 567        db.save(make_metadata(
 568            "session-1",
 569            "First Thread",
 570            now,
 571            first_paths.clone(),
 572        ))
 573        .await
 574        .unwrap();
 575        db.save(make_metadata(
 576            "session-2",
 577            "Second Thread",
 578            older,
 579            second_paths.clone(),
 580        ))
 581        .await
 582        .unwrap();
 583
 584        cx.update(|cx| {
 585            let settings_store = settings::SettingsStore::test(cx);
 586            cx.set_global(settings_store);
 587            cx.update_flags(true, vec!["agent-v2".to_string()]);
 588            SidebarThreadMetadataStore::init_global(cx);
 589        });
 590
 591        cx.run_until_parked();
 592
 593        cx.update(|cx| {
 594            let store = SidebarThreadMetadataStore::global(cx);
 595            let store = store.read(cx);
 596
 597            let entry_ids = store
 598                .entry_ids()
 599                .map(|session_id| session_id.0.to_string())
 600                .collect::<Vec<_>>();
 601            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 602
 603            let first_path_entries = store
 604                .entries_for_path(&first_paths)
 605                .map(|entry| entry.session_id.0.to_string())
 606                .collect::<Vec<_>>();
 607            assert_eq!(first_path_entries, vec!["session-1"]);
 608
 609            let second_path_entries = store
 610                .entries_for_path(&second_paths)
 611                .map(|entry| entry.session_id.0.to_string())
 612                .collect::<Vec<_>>();
 613            assert_eq!(second_path_entries, vec!["session-2"]);
 614        });
 615    }
 616
 617    #[gpui::test]
 618    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 619        cx.update(|cx| {
 620            let settings_store = settings::SettingsStore::test(cx);
 621            cx.set_global(settings_store);
 622            cx.update_flags(true, vec!["agent-v2".to_string()]);
 623            SidebarThreadMetadataStore::init_global(cx);
 624        });
 625
 626        let first_paths = PathList::new(&[Path::new("/project-a")]);
 627        let second_paths = PathList::new(&[Path::new("/project-b")]);
 628        let initial_time = Utc::now();
 629        let updated_time = initial_time + chrono::Duration::seconds(1);
 630
 631        let initial_metadata = make_metadata(
 632            "session-1",
 633            "First Thread",
 634            initial_time,
 635            first_paths.clone(),
 636        );
 637
 638        let second_metadata = make_metadata(
 639            "session-2",
 640            "Second Thread",
 641            initial_time,
 642            second_paths.clone(),
 643        );
 644
 645        cx.update(|cx| {
 646            let store = SidebarThreadMetadataStore::global(cx);
 647            store.update(cx, |store, cx| {
 648                store.save(initial_metadata, cx).detach();
 649                store.save(second_metadata, cx).detach();
 650            });
 651        });
 652
 653        cx.run_until_parked();
 654
 655        cx.update(|cx| {
 656            let store = SidebarThreadMetadataStore::global(cx);
 657            let store = store.read(cx);
 658
 659            let first_path_entries = store
 660                .entries_for_path(&first_paths)
 661                .map(|entry| entry.session_id.0.to_string())
 662                .collect::<Vec<_>>();
 663            assert_eq!(first_path_entries, vec!["session-1"]);
 664
 665            let second_path_entries = store
 666                .entries_for_path(&second_paths)
 667                .map(|entry| entry.session_id.0.to_string())
 668                .collect::<Vec<_>>();
 669            assert_eq!(second_path_entries, vec!["session-2"]);
 670        });
 671
 672        let moved_metadata = make_metadata(
 673            "session-1",
 674            "First Thread",
 675            updated_time,
 676            second_paths.clone(),
 677        );
 678
 679        cx.update(|cx| {
 680            let store = SidebarThreadMetadataStore::global(cx);
 681            store.update(cx, |store, cx| {
 682                store.save(moved_metadata, cx).detach();
 683            });
 684        });
 685
 686        cx.run_until_parked();
 687
 688        cx.update(|cx| {
 689            let store = SidebarThreadMetadataStore::global(cx);
 690            let store = store.read(cx);
 691
 692            let entry_ids = store
 693                .entry_ids()
 694                .map(|session_id| session_id.0.to_string())
 695                .collect::<Vec<_>>();
 696            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 697
 698            let first_path_entries = store
 699                .entries_for_path(&first_paths)
 700                .map(|entry| entry.session_id.0.to_string())
 701                .collect::<Vec<_>>();
 702            assert!(first_path_entries.is_empty());
 703
 704            let second_path_entries = store
 705                .entries_for_path(&second_paths)
 706                .map(|entry| entry.session_id.0.to_string())
 707                .collect::<Vec<_>>();
 708            assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
 709        });
 710
 711        cx.update(|cx| {
 712            let store = SidebarThreadMetadataStore::global(cx);
 713            store.update(cx, |store, cx| {
 714                store.delete(acp::SessionId::new("session-2"), cx).detach();
 715            });
 716        });
 717
 718        cx.run_until_parked();
 719
 720        cx.update(|cx| {
 721            let store = SidebarThreadMetadataStore::global(cx);
 722            let store = store.read(cx);
 723
 724            let entry_ids = store
 725                .entry_ids()
 726                .map(|session_id| session_id.0.to_string())
 727                .collect::<Vec<_>>();
 728            assert_eq!(entry_ids, vec!["session-1"]);
 729
 730            let second_path_entries = store
 731                .entries_for_path(&second_paths)
 732                .map(|entry| entry.session_id.0.to_string())
 733                .collect::<Vec<_>>();
 734            assert_eq!(second_path_entries, vec!["session-1"]);
 735        });
 736    }
 737
 738    #[gpui::test]
 739    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
 740        cx.update(|cx| {
 741            ThreadStore::init_global(cx);
 742            SidebarThreadMetadataStore::init_global(cx);
 743        });
 744
 745        // Verify the cache is empty before migration
 746        let list = cx.update(|cx| {
 747            let store = SidebarThreadMetadataStore::global(cx);
 748            store.read(cx).entries().collect::<Vec<_>>()
 749        });
 750        assert_eq!(list.len(), 0);
 751
 752        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 753        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 754        let now = Utc::now();
 755
 756        for index in 0..12 {
 757            let updated_at = now + chrono::Duration::seconds(index as i64);
 758            let session_id = format!("project-a-session-{index}");
 759            let title = format!("Project A Thread {index}");
 760
 761            let save_task = cx.update(|cx| {
 762                let thread_store = ThreadStore::global(cx);
 763                let session_id = session_id.clone();
 764                let title = title.clone();
 765                let project_a_paths = project_a_paths.clone();
 766                thread_store.update(cx, |store, cx| {
 767                    store.save_thread(
 768                        acp::SessionId::new(session_id),
 769                        make_db_thread(&title, updated_at),
 770                        project_a_paths,
 771                        cx,
 772                    )
 773                })
 774            });
 775            save_task.await.unwrap();
 776            cx.run_until_parked();
 777        }
 778
 779        for index in 0..3 {
 780            let updated_at = now + chrono::Duration::seconds(100 + index as i64);
 781            let session_id = format!("project-b-session-{index}");
 782            let title = format!("Project B Thread {index}");
 783
 784            let save_task = cx.update(|cx| {
 785                let thread_store = ThreadStore::global(cx);
 786                let session_id = session_id.clone();
 787                let title = title.clone();
 788                let project_b_paths = project_b_paths.clone();
 789                thread_store.update(cx, |store, cx| {
 790                    store.save_thread(
 791                        acp::SessionId::new(session_id),
 792                        make_db_thread(&title, updated_at),
 793                        project_b_paths,
 794                        cx,
 795                    )
 796                })
 797            });
 798            save_task.await.unwrap();
 799            cx.run_until_parked();
 800        }
 801
 802        let save_projectless = cx.update(|cx| {
 803            let thread_store = ThreadStore::global(cx);
 804            thread_store.update(cx, |store, cx| {
 805                store.save_thread(
 806                    acp::SessionId::new("projectless-session"),
 807                    make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
 808                    PathList::default(),
 809                    cx,
 810                )
 811            })
 812        });
 813        save_projectless.await.unwrap();
 814        cx.run_until_parked();
 815
 816        // Run migration
 817        cx.update(|cx| {
 818            migrate_thread_metadata(cx);
 819        });
 820
 821        cx.run_until_parked();
 822
 823        // Verify the metadata was migrated, limited to 10 per project, and
 824        // projectless threads were skipped.
 825        let list = cx.update(|cx| {
 826            let store = SidebarThreadMetadataStore::global(cx);
 827            store.read(cx).entries().collect::<Vec<_>>()
 828        });
 829        assert_eq!(list.len(), 13);
 830
 831        assert!(
 832            list.iter()
 833                .all(|metadata| !metadata.folder_paths.is_empty())
 834        );
 835        assert!(
 836            list.iter()
 837                .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
 838        );
 839
 840        let project_a_entries = list
 841            .iter()
 842            .filter(|metadata| metadata.folder_paths == project_a_paths)
 843            .collect::<Vec<_>>();
 844        assert_eq!(project_a_entries.len(), 10);
 845        assert_eq!(
 846            project_a_entries
 847                .iter()
 848                .map(|metadata| metadata.session_id.0.as_ref())
 849                .collect::<Vec<_>>(),
 850            vec![
 851                "project-a-session-11",
 852                "project-a-session-10",
 853                "project-a-session-9",
 854                "project-a-session-8",
 855                "project-a-session-7",
 856                "project-a-session-6",
 857                "project-a-session-5",
 858                "project-a-session-4",
 859                "project-a-session-3",
 860                "project-a-session-2",
 861            ]
 862        );
 863        assert!(
 864            project_a_entries
 865                .iter()
 866                .all(|metadata| metadata.agent_id.is_none())
 867        );
 868
 869        let project_b_entries = list
 870            .iter()
 871            .filter(|metadata| metadata.folder_paths == project_b_paths)
 872            .collect::<Vec<_>>();
 873        assert_eq!(project_b_entries.len(), 3);
 874        assert_eq!(
 875            project_b_entries
 876                .iter()
 877                .map(|metadata| metadata.session_id.0.as_ref())
 878                .collect::<Vec<_>>(),
 879            vec![
 880                "project-b-session-2",
 881                "project-b-session-1",
 882                "project-b-session-0",
 883            ]
 884        );
 885        assert!(
 886            project_b_entries
 887                .iter()
 888                .all(|metadata| metadata.agent_id.is_none())
 889        );
 890    }
 891
 892    #[gpui::test]
 893    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
 894        cx.update(|cx| {
 895            ThreadStore::init_global(cx);
 896            SidebarThreadMetadataStore::init_global(cx);
 897        });
 898
 899        // Pre-populate the metadata store with existing data
 900        let existing_metadata = ThreadMetadata {
 901            session_id: acp::SessionId::new("existing-session"),
 902            agent_id: None,
 903            title: "Existing Thread".into(),
 904            updated_at: Utc::now(),
 905            created_at: Some(Utc::now()),
 906            folder_paths: PathList::default(),
 907        };
 908
 909        cx.update(|cx| {
 910            let store = SidebarThreadMetadataStore::global(cx);
 911            store.update(cx, |store, cx| {
 912                store.save(existing_metadata, cx).detach();
 913            });
 914        });
 915
 916        cx.run_until_parked();
 917
 918        // Add an entry to native thread store that should NOT be migrated
 919        let save_task = cx.update(|cx| {
 920            let thread_store = ThreadStore::global(cx);
 921            thread_store.update(cx, |store, cx| {
 922                store.save_thread(
 923                    acp::SessionId::new("native-session"),
 924                    make_db_thread("Native Thread", Utc::now()),
 925                    PathList::default(),
 926                    cx,
 927                )
 928            })
 929        });
 930        save_task.await.unwrap();
 931        cx.run_until_parked();
 932
 933        // Run migration - should skip because metadata store is not empty
 934        cx.update(|cx| {
 935            migrate_thread_metadata(cx);
 936        });
 937
 938        cx.run_until_parked();
 939
 940        // Verify only the existing metadata is present (migration was skipped)
 941        let list = cx.update(|cx| {
 942            let store = SidebarThreadMetadataStore::global(cx);
 943            store.read(cx).entries().collect::<Vec<_>>()
 944        });
 945        assert_eq!(list.len(), 1);
 946        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
 947    }
 948
 949    #[gpui::test]
 950    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
 951        cx.update(|cx| {
 952            let settings_store = settings::SettingsStore::test(cx);
 953            cx.set_global(settings_store);
 954            cx.update_flags(true, vec!["agent-v2".to_string()]);
 955            ThreadStore::init_global(cx);
 956            SidebarThreadMetadataStore::init_global(cx);
 957        });
 958
 959        let fs = FakeFs::new(cx.executor());
 960        let project = Project::test(fs, None::<&Path>, cx).await;
 961        let connection = Rc::new(StubAgentConnection::new());
 962
 963        // Create a regular (non-subagent) AcpThread.
 964        let regular_thread = cx
 965            .update(|cx| {
 966                connection
 967                    .clone()
 968                    .new_session(project.clone(), PathList::default(), cx)
 969            })
 970            .await
 971            .unwrap();
 972
 973        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
 974
 975        // Set a title on the regular thread to trigger a save via handle_thread_update.
 976        cx.update(|cx| {
 977            regular_thread.update(cx, |thread, cx| {
 978                thread.set_title("Regular Thread".into(), cx).detach();
 979            });
 980        });
 981        cx.run_until_parked();
 982
 983        // Create a subagent AcpThread
 984        let subagent_session_id = acp::SessionId::new("subagent-session");
 985        let subagent_thread = cx.update(|cx| {
 986            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 987            cx.new(|cx| {
 988                acp_thread::AcpThread::new(
 989                    Some(regular_session_id.clone()),
 990                    "Subagent Thread",
 991                    None,
 992                    connection.clone(),
 993                    project.clone(),
 994                    action_log,
 995                    subagent_session_id.clone(),
 996                    watch::Receiver::constant(acp::PromptCapabilities::new()),
 997                    cx,
 998                )
 999            })
1000        });
1001
1002        // Set a title on the subagent thread to trigger handle_thread_update.
1003        cx.update(|cx| {
1004            subagent_thread.update(cx, |thread, cx| {
1005                thread
1006                    .set_title("Subagent Thread Title".into(), cx)
1007                    .detach();
1008            });
1009        });
1010        cx.run_until_parked();
1011
1012        // List all metadata from the store cache.
1013        let list = cx.update(|cx| {
1014            let store = SidebarThreadMetadataStore::global(cx);
1015            store.read(cx).entries().collect::<Vec<_>>()
1016        });
1017
1018        // The subagent thread should NOT appear in the sidebar metadata.
1019        // Only the regular thread should be listed.
1020        assert_eq!(
1021            list.len(),
1022            1,
1023            "Expected only the regular thread in sidebar metadata, \
1024             but found {} entries (subagent threads are leaking into the sidebar)",
1025            list.len(),
1026        );
1027        assert_eq!(list[0].session_id, regular_session_id);
1028        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1029    }
1030}