thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AgentSessionInfo;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::{Context as _, Result};
   7use chrono::{DateTime, Utc};
   8use collections::HashMap;
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    SidebarThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We migrate the last 10 threads per project and skip threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
  46
  47    let store = SidebarThreadMetadataStore::global(cx);
  48    let db = store.read(cx).db.clone();
  49
  50    cx.spawn(async move |cx| {
  51        if !db.is_empty()? {
  52            return Ok::<(), anyhow::Error>(());
  53        }
  54
  55        let metadata = store.read_with(cx, |_store, app| {
  56            let mut migrated_threads_per_project = HashMap::default();
  57
  58            ThreadStore::global(app)
  59                .read(app)
  60                .entries()
  61                .filter_map(|entry| {
  62                    if entry.folder_paths.is_empty() {
  63                        return None;
  64                    }
  65
  66                    let migrated_thread_count = migrated_threads_per_project
  67                        .entry(entry.folder_paths.clone())
  68                        .or_insert(0);
  69                    if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
  70                        return None;
  71                    }
  72                    *migrated_thread_count += 1;
  73
  74                    Some(ThreadMetadata {
  75                        session_id: entry.id,
  76                        agent_id: None,
  77                        title: entry.title,
  78                        updated_at: entry.updated_at,
  79                        created_at: entry.created_at,
  80                        folder_paths: entry.folder_paths,
  81                    })
  82                })
  83                .collect::<Vec<_>>()
  84        });
  85
  86        log::info!("Migrating {} thread store entries", metadata.len());
  87
  88        // Manually save each entry to the database and call reload, otherwise
  89        // we'll end up triggering lots of reloads after each save
  90        for entry in metadata {
  91            db.save(entry).await?;
  92        }
  93
  94        log::info!("Finished migrating thread store entries");
  95
  96        let _ = store.update(cx, |store, cx| store.reload(cx));
  97        Ok(())
  98    })
  99    .detach_and_log_err(cx);
 100}
 101
 102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
 103impl Global for GlobalThreadMetadataStore {}
 104
 105/// Lightweight metadata for any thread (native or ACP), enough to populate
 106/// the sidebar list and route to the correct load path when clicked.
 107#[derive(Debug, Clone)]
 108pub struct ThreadMetadata {
 109    pub session_id: acp::SessionId,
 110    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 111    pub agent_id: Option<AgentId>,
 112    pub title: SharedString,
 113    pub updated_at: DateTime<Utc>,
 114    pub created_at: Option<DateTime<Utc>>,
 115    pub folder_paths: PathList,
 116}
 117
 118impl ThreadMetadata {
 119    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 120        let session_id = session.session_id.clone();
 121        let title = session.title.clone().unwrap_or_default();
 122        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 123        let created_at = session.created_at.unwrap_or(updated_at);
 124        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 125        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 126            None
 127        } else {
 128            Some(agent_id)
 129        };
 130        Self {
 131            session_id,
 132            agent_id,
 133            title,
 134            updated_at,
 135            created_at: Some(created_at),
 136            folder_paths,
 137        }
 138    }
 139
 140    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
 141        let thread_ref = thread.read(cx);
 142        let session_id = thread_ref.session_id().clone();
 143        let title = thread_ref
 144            .title()
 145            .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 146        let updated_at = Utc::now();
 147
 148        let agent_id = thread_ref.connection().agent_id();
 149
 150        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 151            None
 152        } else {
 153            Some(agent_id)
 154        };
 155
 156        let folder_paths = {
 157            let project = thread_ref.project().read(cx);
 158            let paths: Vec<Arc<Path>> = project
 159                .visible_worktrees(cx)
 160                .map(|worktree| worktree.read(cx).abs_path())
 161                .collect();
 162            PathList::new(&paths)
 163        };
 164
 165        Self {
 166            session_id,
 167            agent_id,
 168            title,
 169            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 170            updated_at,
 171            folder_paths,
 172        }
 173    }
 174}
 175
 176/// The store holds all metadata needed to show threads in the sidebar.
 177/// Effectively, all threads stored in here are "non-archived".
 178///
 179/// Automatically listens to AcpThread events and updates metadata if it has changed.
 180pub struct SidebarThreadMetadataStore {
 181    db: ThreadMetadataDb,
 182    threads: Vec<ThreadMetadata>,
 183    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 184    reload_task: Option<Shared<Task<()>>>,
 185    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 186}
 187
 188impl SidebarThreadMetadataStore {
 189    #[cfg(not(any(test, feature = "test-support")))]
 190    pub fn init_global(cx: &mut App) {
 191        if cx.has_global::<Self>() {
 192            return;
 193        }
 194
 195        let db = ThreadMetadataDb::global(cx);
 196        let thread_store = cx.new(|cx| Self::new(db, cx));
 197        cx.set_global(GlobalThreadMetadataStore(thread_store));
 198    }
 199
 200    #[cfg(any(test, feature = "test-support"))]
 201    pub fn init_global(cx: &mut App) {
 202        let thread = std::thread::current();
 203        let test_name = thread.name().unwrap_or("unknown_test");
 204        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 205        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 206        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 207        cx.set_global(GlobalThreadMetadataStore(thread_store));
 208    }
 209
 210    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 211        cx.try_global::<GlobalThreadMetadataStore>()
 212            .map(|store| store.0.clone())
 213    }
 214
 215    pub fn global(cx: &App) -> Entity<Self> {
 216        cx.global::<GlobalThreadMetadataStore>().0.clone()
 217    }
 218
 219    pub fn is_empty(&self) -> bool {
 220        self.threads.is_empty()
 221    }
 222
 223    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 224        self.threads.iter().cloned()
 225    }
 226
 227    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 228        self.threads.iter().map(|thread| thread.session_id.clone())
 229    }
 230
 231    pub fn has_entries_for_path(&self, path_list: &PathList) -> bool {
 232        self.threads_by_paths
 233            .get(path_list)
 234            .is_some_and(|entries| !entries.is_empty())
 235    }
 236
 237    pub fn entries_for_path(
 238        &self,
 239        path_list: &PathList,
 240    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 241        self.threads_by_paths
 242            .get(path_list)
 243            .into_iter()
 244            .flatten()
 245            .cloned()
 246    }
 247
 248    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 249        let db = self.db.clone();
 250        self.reload_task.take();
 251
 252        let list_task = cx
 253            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 254
 255        let reload_task = cx
 256            .spawn(async move |this, cx| {
 257                let Some(rows) = list_task.await.log_err() else {
 258                    return;
 259                };
 260
 261                this.update(cx, |this, cx| {
 262                    this.threads.clear();
 263                    this.threads_by_paths.clear();
 264
 265                    for row in rows {
 266                        this.threads_by_paths
 267                            .entry(row.folder_paths.clone())
 268                            .or_default()
 269                            .push(row.clone());
 270                        this.threads.push(row);
 271                    }
 272
 273                    cx.notify();
 274                })
 275                .ok();
 276            })
 277            .shared();
 278        self.reload_task = Some(reload_task.clone());
 279        reload_task
 280    }
 281
 282    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
 283        if !cx.has_flag::<AgentV2FeatureFlag>() {
 284            return Task::ready(Ok(()));
 285        }
 286
 287        let db = self.db.clone();
 288        cx.spawn(async move |this, cx| {
 289            db.save(metadata).await?;
 290            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 291            reload_task.await;
 292            Ok(())
 293        })
 294    }
 295
 296    pub fn delete(
 297        &mut self,
 298        session_id: acp::SessionId,
 299        cx: &mut Context<Self>,
 300    ) -> Task<Result<()>> {
 301        if !cx.has_flag::<AgentV2FeatureFlag>() {
 302            return Task::ready(Ok(()));
 303        }
 304
 305        let db = self.db.clone();
 306        cx.spawn(async move |this, cx| {
 307            db.delete(session_id).await?;
 308            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 309            reload_task.await;
 310            Ok(())
 311        })
 312    }
 313
 314    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 315        let weak_store = cx.weak_entity();
 316
 317        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 318            // Don't track subagent threads in the sidebar.
 319            if thread.parent_session_id().is_some() {
 320                return;
 321            }
 322
 323            let thread_entity = cx.entity();
 324
 325            cx.on_release({
 326                let weak_store = weak_store.clone();
 327                move |thread, cx| {
 328                    weak_store
 329                        .update(cx, |store, _cx| {
 330                            store.session_subscriptions.remove(thread.session_id());
 331                        })
 332                        .ok();
 333                }
 334            })
 335            .detach();
 336
 337            weak_store
 338                .update(cx, |this, cx| {
 339                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 340                    this.session_subscriptions
 341                        .insert(thread.session_id().clone(), subscription);
 342                })
 343                .ok();
 344        })
 345        .detach();
 346
 347        let mut this = Self {
 348            db,
 349            threads: Vec::new(),
 350            threads_by_paths: HashMap::default(),
 351            reload_task: None,
 352            session_subscriptions: HashMap::default(),
 353        };
 354        let _ = this.reload(cx);
 355        this
 356    }
 357
 358    fn handle_thread_update(
 359        &mut self,
 360        thread: Entity<acp_thread::AcpThread>,
 361        event: &acp_thread::AcpThreadEvent,
 362        cx: &mut Context<Self>,
 363    ) {
 364        // Don't track subagent threads in the sidebar.
 365        if thread.read(cx).parent_session_id().is_some() {
 366            return;
 367        }
 368
 369        match event {
 370            acp_thread::AcpThreadEvent::NewEntry
 371            | acp_thread::AcpThreadEvent::TitleUpdated
 372            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 373            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 374            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 375            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 376            | acp_thread::AcpThreadEvent::Retry(_)
 377            | acp_thread::AcpThreadEvent::Stopped(_)
 378            | acp_thread::AcpThreadEvent::Error
 379            | acp_thread::AcpThreadEvent::LoadError(_)
 380            | acp_thread::AcpThreadEvent::Refusal => {
 381                let metadata = ThreadMetadata::from_thread(&thread, cx);
 382                self.save(metadata, cx).detach_and_log_err(cx);
 383            }
 384            _ => {}
 385        }
 386    }
 387}
 388
 389impl Global for SidebarThreadMetadataStore {}
 390
 391struct ThreadMetadataDb(ThreadSafeConnection);
 392
 393impl Domain for ThreadMetadataDb {
 394    const NAME: &str = stringify!(ThreadMetadataDb);
 395
 396    const MIGRATIONS: &[&str] = &[sql!(
 397        CREATE TABLE IF NOT EXISTS sidebar_threads(
 398            session_id TEXT PRIMARY KEY,
 399            agent_id TEXT,
 400            title TEXT NOT NULL,
 401            updated_at TEXT NOT NULL,
 402            created_at TEXT,
 403            folder_paths TEXT,
 404            folder_paths_order TEXT
 405        ) STRICT;
 406    )];
 407}
 408
 409db::static_connection!(ThreadMetadataDb, []);
 410
 411impl ThreadMetadataDb {
 412    pub fn is_empty(&self) -> anyhow::Result<bool> {
 413        self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
 414            .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
 415    }
 416
 417    /// Returns all thread metadata, sorted by most recently created first.
 418    /// When `created_at` is NULL, falls back to `updated_at`.
 419    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 420        self.select::<ThreadMetadata>(
 421            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
 422             FROM sidebar_threads \
 423             ORDER BY COALESCE(created_at, updated_at) DESC"
 424        )?()
 425    }
 426
 427    /// Upsert metadata for a thread.
 428    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 429        let id = row.session_id.0.clone();
 430        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
 431        let title = row.title.to_string();
 432        let updated_at = row.updated_at.to_rfc3339();
 433        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 434        let serialized = row.folder_paths.serialize();
 435        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 436            (None, None)
 437        } else {
 438            (Some(serialized.paths), Some(serialized.order))
 439        };
 440
 441        self.write(move |conn| {
 442            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
 443                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
 444                       ON CONFLICT(session_id) DO UPDATE SET \
 445                           agent_id = excluded.agent_id, \
 446                           title = excluded.title, \
 447                           updated_at = excluded.updated_at, \
 448                           folder_paths = excluded.folder_paths, \
 449                           folder_paths_order = excluded.folder_paths_order";
 450            let mut stmt = Statement::prepare(conn, sql)?;
 451            let mut i = stmt.bind(&id, 1)?;
 452            i = stmt.bind(&agent_id, i)?;
 453            i = stmt.bind(&title, i)?;
 454            i = stmt.bind(&updated_at, i)?;
 455            i = stmt.bind(&created_at, i)?;
 456            i = stmt.bind(&folder_paths, i)?;
 457            stmt.bind(&folder_paths_order, i)?;
 458            stmt.exec()
 459        })
 460        .await
 461    }
 462
 463    /// Delete metadata for a single thread.
 464    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 465        let id = session_id.0.clone();
 466        self.write(move |conn| {
 467            let mut stmt =
 468                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 469            stmt.bind(&id, 1)?;
 470            stmt.exec()
 471        })
 472        .await
 473    }
 474}
 475
 476impl Column for ThreadMetadata {
 477    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 478        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 479        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 480        let (title, next): (String, i32) = Column::column(statement, next)?;
 481        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 482        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 483        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 484        let (folder_paths_order_str, next): (Option<String>, i32) =
 485            Column::column(statement, next)?;
 486
 487        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 488        let created_at = created_at_str
 489            .as_deref()
 490            .map(DateTime::parse_from_rfc3339)
 491            .transpose()?
 492            .map(|dt| dt.with_timezone(&Utc));
 493
 494        let folder_paths = folder_paths_str
 495            .map(|paths| {
 496                PathList::deserialize(&util::path_list::SerializedPathList {
 497                    paths,
 498                    order: folder_paths_order_str.unwrap_or_default(),
 499                })
 500            })
 501            .unwrap_or_default();
 502
 503        Ok((
 504            ThreadMetadata {
 505                session_id: acp::SessionId::new(id),
 506                agent_id: agent_id.map(|id| AgentId::new(id)),
 507                title: title.into(),
 508                updated_at,
 509                created_at,
 510                folder_paths,
 511            },
 512            next,
 513        ))
 514    }
 515}
 516
 517#[cfg(test)]
 518mod tests {
 519    use super::*;
 520    use acp_thread::{AgentConnection, StubAgentConnection};
 521    use action_log::ActionLog;
 522    use agent::DbThread;
 523    use agent_client_protocol as acp;
 524    use feature_flags::FeatureFlagAppExt;
 525    use gpui::TestAppContext;
 526    use project::FakeFs;
 527    use project::Project;
 528    use std::path::Path;
 529    use std::rc::Rc;
 530
 531    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 532        DbThread {
 533            title: title.to_string().into(),
 534            messages: Vec::new(),
 535            updated_at,
 536            detailed_summary: None,
 537            initial_project_snapshot: None,
 538            cumulative_token_usage: Default::default(),
 539            request_token_usage: Default::default(),
 540            model: None,
 541            profile: None,
 542            imported: false,
 543            subagent_context: None,
 544            speed: None,
 545            thinking_enabled: false,
 546            thinking_effort: None,
 547            draft_prompt: None,
 548            ui_scroll_position: None,
 549        }
 550    }
 551
 552    fn make_metadata(
 553        session_id: &str,
 554        title: &str,
 555        updated_at: DateTime<Utc>,
 556        folder_paths: PathList,
 557    ) -> ThreadMetadata {
 558        ThreadMetadata {
 559            session_id: acp::SessionId::new(session_id),
 560            agent_id: None,
 561            title: title.to_string().into(),
 562            updated_at,
 563            created_at: Some(updated_at),
 564            folder_paths,
 565        }
 566    }
 567
 568    #[gpui::test]
 569    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 570        let first_paths = PathList::new(&[Path::new("/project-a")]);
 571        let second_paths = PathList::new(&[Path::new("/project-b")]);
 572        let now = Utc::now();
 573        let older = now - chrono::Duration::seconds(1);
 574
 575        let thread = std::thread::current();
 576        let test_name = thread.name().unwrap_or("unknown_test");
 577        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 578        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 579            &db_name,
 580        )));
 581
 582        db.save(make_metadata(
 583            "session-1",
 584            "First Thread",
 585            now,
 586            first_paths.clone(),
 587        ))
 588        .await
 589        .unwrap();
 590        db.save(make_metadata(
 591            "session-2",
 592            "Second Thread",
 593            older,
 594            second_paths.clone(),
 595        ))
 596        .await
 597        .unwrap();
 598
 599        cx.update(|cx| {
 600            let settings_store = settings::SettingsStore::test(cx);
 601            cx.set_global(settings_store);
 602            cx.update_flags(true, vec!["agent-v2".to_string()]);
 603            SidebarThreadMetadataStore::init_global(cx);
 604        });
 605
 606        cx.run_until_parked();
 607
 608        cx.update(|cx| {
 609            let store = SidebarThreadMetadataStore::global(cx);
 610            let store = store.read(cx);
 611
 612            let entry_ids = store
 613                .entry_ids()
 614                .map(|session_id| session_id.0.to_string())
 615                .collect::<Vec<_>>();
 616            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 617
 618            let first_path_entries = store
 619                .entries_for_path(&first_paths)
 620                .map(|entry| entry.session_id.0.to_string())
 621                .collect::<Vec<_>>();
 622            assert_eq!(first_path_entries, vec!["session-1"]);
 623
 624            let second_path_entries = store
 625                .entries_for_path(&second_paths)
 626                .map(|entry| entry.session_id.0.to_string())
 627                .collect::<Vec<_>>();
 628            assert_eq!(second_path_entries, vec!["session-2"]);
 629        });
 630    }
 631
 632    #[gpui::test]
 633    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 634        cx.update(|cx| {
 635            let settings_store = settings::SettingsStore::test(cx);
 636            cx.set_global(settings_store);
 637            cx.update_flags(true, vec!["agent-v2".to_string()]);
 638            SidebarThreadMetadataStore::init_global(cx);
 639        });
 640
 641        let first_paths = PathList::new(&[Path::new("/project-a")]);
 642        let second_paths = PathList::new(&[Path::new("/project-b")]);
 643        let initial_time = Utc::now();
 644        let updated_time = initial_time + chrono::Duration::seconds(1);
 645
 646        let initial_metadata = make_metadata(
 647            "session-1",
 648            "First Thread",
 649            initial_time,
 650            first_paths.clone(),
 651        );
 652
 653        let second_metadata = make_metadata(
 654            "session-2",
 655            "Second Thread",
 656            initial_time,
 657            second_paths.clone(),
 658        );
 659
 660        cx.update(|cx| {
 661            let store = SidebarThreadMetadataStore::global(cx);
 662            store.update(cx, |store, cx| {
 663                store.save(initial_metadata, cx).detach();
 664                store.save(second_metadata, cx).detach();
 665            });
 666        });
 667
 668        cx.run_until_parked();
 669
 670        cx.update(|cx| {
 671            let store = SidebarThreadMetadataStore::global(cx);
 672            let store = store.read(cx);
 673
 674            let first_path_entries = store
 675                .entries_for_path(&first_paths)
 676                .map(|entry| entry.session_id.0.to_string())
 677                .collect::<Vec<_>>();
 678            assert_eq!(first_path_entries, vec!["session-1"]);
 679
 680            let second_path_entries = store
 681                .entries_for_path(&second_paths)
 682                .map(|entry| entry.session_id.0.to_string())
 683                .collect::<Vec<_>>();
 684            assert_eq!(second_path_entries, vec!["session-2"]);
 685        });
 686
 687        let moved_metadata = make_metadata(
 688            "session-1",
 689            "First Thread",
 690            updated_time,
 691            second_paths.clone(),
 692        );
 693
 694        cx.update(|cx| {
 695            let store = SidebarThreadMetadataStore::global(cx);
 696            store.update(cx, |store, cx| {
 697                store.save(moved_metadata, cx).detach();
 698            });
 699        });
 700
 701        cx.run_until_parked();
 702
 703        cx.update(|cx| {
 704            let store = SidebarThreadMetadataStore::global(cx);
 705            let store = store.read(cx);
 706
 707            let entry_ids = store
 708                .entry_ids()
 709                .map(|session_id| session_id.0.to_string())
 710                .collect::<Vec<_>>();
 711            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 712
 713            let first_path_entries = store
 714                .entries_for_path(&first_paths)
 715                .map(|entry| entry.session_id.0.to_string())
 716                .collect::<Vec<_>>();
 717            assert!(first_path_entries.is_empty());
 718
 719            let second_path_entries = store
 720                .entries_for_path(&second_paths)
 721                .map(|entry| entry.session_id.0.to_string())
 722                .collect::<Vec<_>>();
 723            assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
 724        });
 725
 726        cx.update(|cx| {
 727            let store = SidebarThreadMetadataStore::global(cx);
 728            store.update(cx, |store, cx| {
 729                store.delete(acp::SessionId::new("session-2"), cx).detach();
 730            });
 731        });
 732
 733        cx.run_until_parked();
 734
 735        cx.update(|cx| {
 736            let store = SidebarThreadMetadataStore::global(cx);
 737            let store = store.read(cx);
 738
 739            let entry_ids = store
 740                .entry_ids()
 741                .map(|session_id| session_id.0.to_string())
 742                .collect::<Vec<_>>();
 743            assert_eq!(entry_ids, vec!["session-1"]);
 744
 745            let second_path_entries = store
 746                .entries_for_path(&second_paths)
 747                .map(|entry| entry.session_id.0.to_string())
 748                .collect::<Vec<_>>();
 749            assert_eq!(second_path_entries, vec!["session-1"]);
 750        });
 751    }
 752
 753    #[gpui::test]
 754    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
 755        cx.update(|cx| {
 756            ThreadStore::init_global(cx);
 757            SidebarThreadMetadataStore::init_global(cx);
 758        });
 759
 760        // Verify the cache is empty before migration
 761        let list = cx.update(|cx| {
 762            let store = SidebarThreadMetadataStore::global(cx);
 763            store.read(cx).entries().collect::<Vec<_>>()
 764        });
 765        assert_eq!(list.len(), 0);
 766
 767        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 768        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 769        let now = Utc::now();
 770
 771        for index in 0..12 {
 772            let updated_at = now + chrono::Duration::seconds(index as i64);
 773            let session_id = format!("project-a-session-{index}");
 774            let title = format!("Project A Thread {index}");
 775
 776            let save_task = cx.update(|cx| {
 777                let thread_store = ThreadStore::global(cx);
 778                let session_id = session_id.clone();
 779                let title = title.clone();
 780                let project_a_paths = project_a_paths.clone();
 781                thread_store.update(cx, |store, cx| {
 782                    store.save_thread(
 783                        acp::SessionId::new(session_id),
 784                        make_db_thread(&title, updated_at),
 785                        project_a_paths,
 786                        cx,
 787                    )
 788                })
 789            });
 790            save_task.await.unwrap();
 791            cx.run_until_parked();
 792        }
 793
 794        for index in 0..3 {
 795            let updated_at = now + chrono::Duration::seconds(100 + index as i64);
 796            let session_id = format!("project-b-session-{index}");
 797            let title = format!("Project B Thread {index}");
 798
 799            let save_task = cx.update(|cx| {
 800                let thread_store = ThreadStore::global(cx);
 801                let session_id = session_id.clone();
 802                let title = title.clone();
 803                let project_b_paths = project_b_paths.clone();
 804                thread_store.update(cx, |store, cx| {
 805                    store.save_thread(
 806                        acp::SessionId::new(session_id),
 807                        make_db_thread(&title, updated_at),
 808                        project_b_paths,
 809                        cx,
 810                    )
 811                })
 812            });
 813            save_task.await.unwrap();
 814            cx.run_until_parked();
 815        }
 816
 817        let save_projectless = cx.update(|cx| {
 818            let thread_store = ThreadStore::global(cx);
 819            thread_store.update(cx, |store, cx| {
 820                store.save_thread(
 821                    acp::SessionId::new("projectless-session"),
 822                    make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
 823                    PathList::default(),
 824                    cx,
 825                )
 826            })
 827        });
 828        save_projectless.await.unwrap();
 829        cx.run_until_parked();
 830
 831        // Run migration
 832        cx.update(|cx| {
 833            migrate_thread_metadata(cx);
 834        });
 835
 836        cx.run_until_parked();
 837
 838        // Verify the metadata was migrated, limited to 10 per project, and
 839        // projectless threads were skipped.
 840        let list = cx.update(|cx| {
 841            let store = SidebarThreadMetadataStore::global(cx);
 842            store.read(cx).entries().collect::<Vec<_>>()
 843        });
 844        assert_eq!(list.len(), 13);
 845
 846        assert!(
 847            list.iter()
 848                .all(|metadata| !metadata.folder_paths.is_empty())
 849        );
 850        assert!(
 851            list.iter()
 852                .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
 853        );
 854
 855        let project_a_entries = list
 856            .iter()
 857            .filter(|metadata| metadata.folder_paths == project_a_paths)
 858            .collect::<Vec<_>>();
 859        assert_eq!(project_a_entries.len(), 10);
 860        assert_eq!(
 861            project_a_entries
 862                .iter()
 863                .map(|metadata| metadata.session_id.0.as_ref())
 864                .collect::<Vec<_>>(),
 865            vec![
 866                "project-a-session-11",
 867                "project-a-session-10",
 868                "project-a-session-9",
 869                "project-a-session-8",
 870                "project-a-session-7",
 871                "project-a-session-6",
 872                "project-a-session-5",
 873                "project-a-session-4",
 874                "project-a-session-3",
 875                "project-a-session-2",
 876            ]
 877        );
 878        assert!(
 879            project_a_entries
 880                .iter()
 881                .all(|metadata| metadata.agent_id.is_none())
 882        );
 883
 884        let project_b_entries = list
 885            .iter()
 886            .filter(|metadata| metadata.folder_paths == project_b_paths)
 887            .collect::<Vec<_>>();
 888        assert_eq!(project_b_entries.len(), 3);
 889        assert_eq!(
 890            project_b_entries
 891                .iter()
 892                .map(|metadata| metadata.session_id.0.as_ref())
 893                .collect::<Vec<_>>(),
 894            vec![
 895                "project-b-session-2",
 896                "project-b-session-1",
 897                "project-b-session-0",
 898            ]
 899        );
 900        assert!(
 901            project_b_entries
 902                .iter()
 903                .all(|metadata| metadata.agent_id.is_none())
 904        );
 905    }
 906
 907    #[gpui::test]
 908    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
 909        cx.update(|cx| {
 910            ThreadStore::init_global(cx);
 911            SidebarThreadMetadataStore::init_global(cx);
 912        });
 913
 914        // Pre-populate the metadata store with existing data
 915        let existing_metadata = ThreadMetadata {
 916            session_id: acp::SessionId::new("existing-session"),
 917            agent_id: None,
 918            title: "Existing Thread".into(),
 919            updated_at: Utc::now(),
 920            created_at: Some(Utc::now()),
 921            folder_paths: PathList::default(),
 922        };
 923
 924        cx.update(|cx| {
 925            let store = SidebarThreadMetadataStore::global(cx);
 926            store.update(cx, |store, cx| {
 927                store.save(existing_metadata, cx).detach();
 928            });
 929        });
 930
 931        cx.run_until_parked();
 932
 933        // Add an entry to native thread store that should NOT be migrated
 934        let save_task = cx.update(|cx| {
 935            let thread_store = ThreadStore::global(cx);
 936            thread_store.update(cx, |store, cx| {
 937                store.save_thread(
 938                    acp::SessionId::new("native-session"),
 939                    make_db_thread("Native Thread", Utc::now()),
 940                    PathList::default(),
 941                    cx,
 942                )
 943            })
 944        });
 945        save_task.await.unwrap();
 946        cx.run_until_parked();
 947
 948        // Run migration - should skip because metadata store is not empty
 949        cx.update(|cx| {
 950            migrate_thread_metadata(cx);
 951        });
 952
 953        cx.run_until_parked();
 954
 955        // Verify only the existing metadata is present (migration was skipped)
 956        let list = cx.update(|cx| {
 957            let store = SidebarThreadMetadataStore::global(cx);
 958            store.read(cx).entries().collect::<Vec<_>>()
 959        });
 960        assert_eq!(list.len(), 1);
 961        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
 962    }
 963
 964    #[gpui::test]
 965    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
 966        cx.update(|cx| {
 967            let settings_store = settings::SettingsStore::test(cx);
 968            cx.set_global(settings_store);
 969            cx.update_flags(true, vec!["agent-v2".to_string()]);
 970            ThreadStore::init_global(cx);
 971            SidebarThreadMetadataStore::init_global(cx);
 972        });
 973
 974        let fs = FakeFs::new(cx.executor());
 975        let project = Project::test(fs, None::<&Path>, cx).await;
 976        let connection = Rc::new(StubAgentConnection::new());
 977
 978        // Create a regular (non-subagent) AcpThread.
 979        let regular_thread = cx
 980            .update(|cx| {
 981                connection
 982                    .clone()
 983                    .new_session(project.clone(), PathList::default(), cx)
 984            })
 985            .await
 986            .unwrap();
 987
 988        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
 989
 990        // Set a title on the regular thread to trigger a save via handle_thread_update.
 991        cx.update(|cx| {
 992            regular_thread.update(cx, |thread, cx| {
 993                thread.set_title("Regular Thread".into(), cx).detach();
 994            });
 995        });
 996        cx.run_until_parked();
 997
 998        // Create a subagent AcpThread
 999        let subagent_session_id = acp::SessionId::new("subagent-session");
1000        let subagent_thread = cx.update(|cx| {
1001            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1002            cx.new(|cx| {
1003                acp_thread::AcpThread::new(
1004                    Some(regular_session_id.clone()),
1005                    Some("Subagent Thread".into()),
1006                    None,
1007                    connection.clone(),
1008                    project.clone(),
1009                    action_log,
1010                    subagent_session_id.clone(),
1011                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1012                    cx,
1013                )
1014            })
1015        });
1016
1017        // Set a title on the subagent thread to trigger handle_thread_update.
1018        cx.update(|cx| {
1019            subagent_thread.update(cx, |thread, cx| {
1020                thread
1021                    .set_title("Subagent Thread Title".into(), cx)
1022                    .detach();
1023            });
1024        });
1025        cx.run_until_parked();
1026
1027        // List all metadata from the store cache.
1028        let list = cx.update(|cx| {
1029            let store = SidebarThreadMetadataStore::global(cx);
1030            store.read(cx).entries().collect::<Vec<_>>()
1031        });
1032
1033        // The subagent thread should NOT appear in the sidebar metadata.
1034        // Only the regular thread should be listed.
1035        assert_eq!(
1036            list.len(),
1037            1,
1038            "Expected only the regular thread in sidebar metadata, \
1039             but found {} entries (subagent threads are leaking into the sidebar)",
1040            list.len(),
1041        );
1042        assert_eq!(list[0].session_id, regular_session_id);
1043        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1044    }
1045}