thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AgentSessionInfo;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::{Context as _, Result};
   7use chrono::{DateTime, Utc};
   8use collections::HashMap;
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    SidebarThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We migrate the last 10 threads per project and skip threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
  46
  47    let store = SidebarThreadMetadataStore::global(cx);
  48    let db = store.read(cx).db.clone();
  49
  50    cx.spawn(async move |cx| {
  51        if !db.is_empty()? {
  52            return Ok::<(), anyhow::Error>(());
  53        }
  54
  55        let metadata = store.read_with(cx, |_store, app| {
  56            let mut migrated_threads_per_project = HashMap::default();
  57
  58            ThreadStore::global(app)
  59                .read(app)
  60                .entries()
  61                .filter_map(|entry| {
  62                    if entry.folder_paths.is_empty() {
  63                        return None;
  64                    }
  65
  66                    let migrated_thread_count = migrated_threads_per_project
  67                        .entry(entry.folder_paths.clone())
  68                        .or_insert(0);
  69                    if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
  70                        return None;
  71                    }
  72                    *migrated_thread_count += 1;
  73
  74                    Some(ThreadMetadata {
  75                        session_id: entry.id,
  76                        agent_id: None,
  77                        title: entry.title,
  78                        updated_at: entry.updated_at,
  79                        created_at: entry.created_at,
  80                        folder_paths: entry.folder_paths,
  81                    })
  82                })
  83                .collect::<Vec<_>>()
  84        });
  85
  86        log::info!("Migrating {} thread store entries", metadata.len());
  87
  88        // Manually save each entry to the database and call reload, otherwise
  89        // we'll end up triggering lots of reloads after each save
  90        for entry in metadata {
  91            db.save(entry).await?;
  92        }
  93
  94        log::info!("Finished migrating thread store entries");
  95
  96        let _ = store.update(cx, |store, cx| store.reload(cx));
  97        Ok(())
  98    })
  99    .detach_and_log_err(cx);
 100}
 101
 102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
 103impl Global for GlobalThreadMetadataStore {}
 104
 105/// Lightweight metadata for any thread (native or ACP), enough to populate
 106/// the sidebar list and route to the correct load path when clicked.
 107#[derive(Debug, Clone)]
 108pub struct ThreadMetadata {
 109    pub session_id: acp::SessionId,
 110    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 111    pub agent_id: Option<AgentId>,
 112    pub title: SharedString,
 113    pub updated_at: DateTime<Utc>,
 114    pub created_at: Option<DateTime<Utc>>,
 115    pub folder_paths: PathList,
 116}
 117
 118impl ThreadMetadata {
 119    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 120        let session_id = session.session_id.clone();
 121        let title = session.title.clone().unwrap_or_default();
 122        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 123        let created_at = session.created_at.unwrap_or(updated_at);
 124        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 125        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 126            None
 127        } else {
 128            Some(agent_id)
 129        };
 130        Self {
 131            session_id,
 132            agent_id,
 133            title,
 134            updated_at,
 135            created_at: Some(created_at),
 136            folder_paths,
 137        }
 138    }
 139
 140    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
 141        let thread_ref = thread.read(cx);
 142        let session_id = thread_ref.session_id().clone();
 143        let title = thread_ref
 144            .title()
 145            .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 146        let updated_at = Utc::now();
 147
 148        let agent_id = thread_ref.connection().agent_id();
 149
 150        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 151            None
 152        } else {
 153            Some(agent_id)
 154        };
 155
 156        let folder_paths = {
 157            let project = thread_ref.project().read(cx);
 158            let paths: Vec<Arc<Path>> = project
 159                .visible_worktrees(cx)
 160                .map(|worktree| worktree.read(cx).abs_path())
 161                .collect();
 162            PathList::new(&paths)
 163        };
 164
 165        Self {
 166            session_id,
 167            agent_id,
 168            title,
 169            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 170            updated_at,
 171            folder_paths,
 172        }
 173    }
 174}
 175
 176/// The store holds all metadata needed to show threads in the sidebar.
 177/// Effectively, all threads stored in here are "non-archived".
 178///
 179/// Automatically listens to AcpThread events and updates metadata if it has changed.
 180pub struct SidebarThreadMetadataStore {
 181    db: ThreadMetadataDb,
 182    threads: Vec<ThreadMetadata>,
 183    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 184    reload_task: Option<Shared<Task<()>>>,
 185    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 186}
 187
 188impl SidebarThreadMetadataStore {
 189    #[cfg(not(any(test, feature = "test-support")))]
 190    pub fn init_global(cx: &mut App) {
 191        if cx.has_global::<Self>() {
 192            return;
 193        }
 194
 195        let db = ThreadMetadataDb::global(cx);
 196        let thread_store = cx.new(|cx| Self::new(db, cx));
 197        cx.set_global(GlobalThreadMetadataStore(thread_store));
 198    }
 199
 200    #[cfg(any(test, feature = "test-support"))]
 201    pub fn init_global(cx: &mut App) {
 202        let thread = std::thread::current();
 203        let test_name = thread.name().unwrap_or("unknown_test");
 204        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 205        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 206        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 207        cx.set_global(GlobalThreadMetadataStore(thread_store));
 208    }
 209
 210    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 211        cx.try_global::<GlobalThreadMetadataStore>()
 212            .map(|store| store.0.clone())
 213    }
 214
 215    pub fn global(cx: &App) -> Entity<Self> {
 216        cx.global::<GlobalThreadMetadataStore>().0.clone()
 217    }
 218
 219    pub fn is_empty(&self) -> bool {
 220        self.threads.is_empty()
 221    }
 222
 223    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 224        self.threads.iter().cloned()
 225    }
 226
 227    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 228        self.threads.iter().map(|thread| thread.session_id.clone())
 229    }
 230
 231    pub fn entries_for_path(
 232        &self,
 233        path_list: &PathList,
 234    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 235        self.threads_by_paths
 236            .get(path_list)
 237            .into_iter()
 238            .flatten()
 239            .cloned()
 240    }
 241
 242    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 243        let db = self.db.clone();
 244        self.reload_task.take();
 245
 246        let list_task = cx
 247            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 248
 249        let reload_task = cx
 250            .spawn(async move |this, cx| {
 251                let Some(rows) = list_task.await.log_err() else {
 252                    return;
 253                };
 254
 255                this.update(cx, |this, cx| {
 256                    this.threads.clear();
 257                    this.threads_by_paths.clear();
 258
 259                    for row in rows {
 260                        this.threads_by_paths
 261                            .entry(row.folder_paths.clone())
 262                            .or_default()
 263                            .push(row.clone());
 264                        this.threads.push(row);
 265                    }
 266
 267                    cx.notify();
 268                })
 269                .ok();
 270            })
 271            .shared();
 272        self.reload_task = Some(reload_task.clone());
 273        reload_task
 274    }
 275
 276    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
 277        if !cx.has_flag::<AgentV2FeatureFlag>() {
 278            return Task::ready(Ok(()));
 279        }
 280
 281        let db = self.db.clone();
 282        cx.spawn(async move |this, cx| {
 283            db.save(metadata).await?;
 284            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 285            reload_task.await;
 286            Ok(())
 287        })
 288    }
 289
 290    pub fn delete(
 291        &mut self,
 292        session_id: acp::SessionId,
 293        cx: &mut Context<Self>,
 294    ) -> Task<Result<()>> {
 295        if !cx.has_flag::<AgentV2FeatureFlag>() {
 296            return Task::ready(Ok(()));
 297        }
 298
 299        let db = self.db.clone();
 300        cx.spawn(async move |this, cx| {
 301            db.delete(session_id).await?;
 302            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 303            reload_task.await;
 304            Ok(())
 305        })
 306    }
 307
 308    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 309        let weak_store = cx.weak_entity();
 310
 311        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 312            // Don't track subagent threads in the sidebar.
 313            if thread.parent_session_id().is_some() {
 314                return;
 315            }
 316
 317            let thread_entity = cx.entity();
 318
 319            cx.on_release({
 320                let weak_store = weak_store.clone();
 321                move |thread, cx| {
 322                    weak_store
 323                        .update(cx, |store, _cx| {
 324                            store.session_subscriptions.remove(thread.session_id());
 325                        })
 326                        .ok();
 327                }
 328            })
 329            .detach();
 330
 331            weak_store
 332                .update(cx, |this, cx| {
 333                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 334                    this.session_subscriptions
 335                        .insert(thread.session_id().clone(), subscription);
 336                })
 337                .ok();
 338        })
 339        .detach();
 340
 341        let mut this = Self {
 342            db,
 343            threads: Vec::new(),
 344            threads_by_paths: HashMap::default(),
 345            reload_task: None,
 346            session_subscriptions: HashMap::default(),
 347        };
 348        let _ = this.reload(cx);
 349        this
 350    }
 351
 352    fn handle_thread_update(
 353        &mut self,
 354        thread: Entity<acp_thread::AcpThread>,
 355        event: &acp_thread::AcpThreadEvent,
 356        cx: &mut Context<Self>,
 357    ) {
 358        // Don't track subagent threads in the sidebar.
 359        if thread.read(cx).parent_session_id().is_some() {
 360            return;
 361        }
 362
 363        match event {
 364            acp_thread::AcpThreadEvent::NewEntry
 365            | acp_thread::AcpThreadEvent::TitleUpdated
 366            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 367            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 368            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 369            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 370            | acp_thread::AcpThreadEvent::Retry(_)
 371            | acp_thread::AcpThreadEvent::Stopped(_)
 372            | acp_thread::AcpThreadEvent::Error
 373            | acp_thread::AcpThreadEvent::LoadError(_)
 374            | acp_thread::AcpThreadEvent::Refusal => {
 375                let metadata = ThreadMetadata::from_thread(&thread, cx);
 376                self.save(metadata, cx).detach_and_log_err(cx);
 377            }
 378            _ => {}
 379        }
 380    }
 381}
 382
 383impl Global for SidebarThreadMetadataStore {}
 384
 385struct ThreadMetadataDb(ThreadSafeConnection);
 386
 387impl Domain for ThreadMetadataDb {
 388    const NAME: &str = stringify!(ThreadMetadataDb);
 389
 390    const MIGRATIONS: &[&str] = &[sql!(
 391        CREATE TABLE IF NOT EXISTS sidebar_threads(
 392            session_id TEXT PRIMARY KEY,
 393            agent_id TEXT,
 394            title TEXT NOT NULL,
 395            updated_at TEXT NOT NULL,
 396            created_at TEXT,
 397            folder_paths TEXT,
 398            folder_paths_order TEXT
 399        ) STRICT;
 400    )];
 401}
 402
 403db::static_connection!(ThreadMetadataDb, []);
 404
 405impl ThreadMetadataDb {
 406    pub fn is_empty(&self) -> anyhow::Result<bool> {
 407        self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
 408            .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
 409    }
 410
 411    /// List all sidebar thread metadata, ordered by updated_at descending.
 412    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 413        self.select::<ThreadMetadata>(
 414            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
 415             FROM sidebar_threads \
 416             ORDER BY updated_at DESC"
 417        )?()
 418    }
 419
 420    /// Upsert metadata for a thread.
 421    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 422        let id = row.session_id.0.clone();
 423        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
 424        let title = row.title.to_string();
 425        let updated_at = row.updated_at.to_rfc3339();
 426        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 427        let serialized = row.folder_paths.serialize();
 428        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 429            (None, None)
 430        } else {
 431            (Some(serialized.paths), Some(serialized.order))
 432        };
 433
 434        self.write(move |conn| {
 435            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
 436                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
 437                       ON CONFLICT(session_id) DO UPDATE SET \
 438                           agent_id = excluded.agent_id, \
 439                           title = excluded.title, \
 440                           updated_at = excluded.updated_at, \
 441                           folder_paths = excluded.folder_paths, \
 442                           folder_paths_order = excluded.folder_paths_order";
 443            let mut stmt = Statement::prepare(conn, sql)?;
 444            let mut i = stmt.bind(&id, 1)?;
 445            i = stmt.bind(&agent_id, i)?;
 446            i = stmt.bind(&title, i)?;
 447            i = stmt.bind(&updated_at, i)?;
 448            i = stmt.bind(&created_at, i)?;
 449            i = stmt.bind(&folder_paths, i)?;
 450            stmt.bind(&folder_paths_order, i)?;
 451            stmt.exec()
 452        })
 453        .await
 454    }
 455
 456    /// Delete metadata for a single thread.
 457    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 458        let id = session_id.0.clone();
 459        self.write(move |conn| {
 460            let mut stmt =
 461                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 462            stmt.bind(&id, 1)?;
 463            stmt.exec()
 464        })
 465        .await
 466    }
 467}
 468
 469impl Column for ThreadMetadata {
 470    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 471        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 472        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 473        let (title, next): (String, i32) = Column::column(statement, next)?;
 474        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 475        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 476        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 477        let (folder_paths_order_str, next): (Option<String>, i32) =
 478            Column::column(statement, next)?;
 479
 480        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 481        let created_at = created_at_str
 482            .as_deref()
 483            .map(DateTime::parse_from_rfc3339)
 484            .transpose()?
 485            .map(|dt| dt.with_timezone(&Utc));
 486
 487        let folder_paths = folder_paths_str
 488            .map(|paths| {
 489                PathList::deserialize(&util::path_list::SerializedPathList {
 490                    paths,
 491                    order: folder_paths_order_str.unwrap_or_default(),
 492                })
 493            })
 494            .unwrap_or_default();
 495
 496        Ok((
 497            ThreadMetadata {
 498                session_id: acp::SessionId::new(id),
 499                agent_id: agent_id.map(|id| AgentId::new(id)),
 500                title: title.into(),
 501                updated_at,
 502                created_at,
 503                folder_paths,
 504            },
 505            next,
 506        ))
 507    }
 508}
 509
 510#[cfg(test)]
 511mod tests {
 512    use super::*;
 513    use acp_thread::{AgentConnection, StubAgentConnection};
 514    use action_log::ActionLog;
 515    use agent::DbThread;
 516    use agent_client_protocol as acp;
 517    use feature_flags::FeatureFlagAppExt;
 518    use gpui::TestAppContext;
 519    use project::FakeFs;
 520    use project::Project;
 521    use std::path::Path;
 522    use std::rc::Rc;
 523
 524    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 525        DbThread {
 526            title: title.to_string().into(),
 527            messages: Vec::new(),
 528            updated_at,
 529            detailed_summary: None,
 530            initial_project_snapshot: None,
 531            cumulative_token_usage: Default::default(),
 532            request_token_usage: Default::default(),
 533            model: None,
 534            profile: None,
 535            imported: false,
 536            subagent_context: None,
 537            speed: None,
 538            thinking_enabled: false,
 539            thinking_effort: None,
 540            draft_prompt: None,
 541            ui_scroll_position: None,
 542        }
 543    }
 544
 545    fn make_metadata(
 546        session_id: &str,
 547        title: &str,
 548        updated_at: DateTime<Utc>,
 549        folder_paths: PathList,
 550    ) -> ThreadMetadata {
 551        ThreadMetadata {
 552            session_id: acp::SessionId::new(session_id),
 553            agent_id: None,
 554            title: title.to_string().into(),
 555            updated_at,
 556            created_at: Some(updated_at),
 557            folder_paths,
 558        }
 559    }
 560
 561    #[gpui::test]
 562    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 563        let first_paths = PathList::new(&[Path::new("/project-a")]);
 564        let second_paths = PathList::new(&[Path::new("/project-b")]);
 565        let now = Utc::now();
 566        let older = now - chrono::Duration::seconds(1);
 567
 568        let thread = std::thread::current();
 569        let test_name = thread.name().unwrap_or("unknown_test");
 570        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 571        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 572            &db_name,
 573        )));
 574
 575        db.save(make_metadata(
 576            "session-1",
 577            "First Thread",
 578            now,
 579            first_paths.clone(),
 580        ))
 581        .await
 582        .unwrap();
 583        db.save(make_metadata(
 584            "session-2",
 585            "Second Thread",
 586            older,
 587            second_paths.clone(),
 588        ))
 589        .await
 590        .unwrap();
 591
 592        cx.update(|cx| {
 593            let settings_store = settings::SettingsStore::test(cx);
 594            cx.set_global(settings_store);
 595            cx.update_flags(true, vec!["agent-v2".to_string()]);
 596            SidebarThreadMetadataStore::init_global(cx);
 597        });
 598
 599        cx.run_until_parked();
 600
 601        cx.update(|cx| {
 602            let store = SidebarThreadMetadataStore::global(cx);
 603            let store = store.read(cx);
 604
 605            let entry_ids = store
 606                .entry_ids()
 607                .map(|session_id| session_id.0.to_string())
 608                .collect::<Vec<_>>();
 609            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 610
 611            let first_path_entries = store
 612                .entries_for_path(&first_paths)
 613                .map(|entry| entry.session_id.0.to_string())
 614                .collect::<Vec<_>>();
 615            assert_eq!(first_path_entries, vec!["session-1"]);
 616
 617            let second_path_entries = store
 618                .entries_for_path(&second_paths)
 619                .map(|entry| entry.session_id.0.to_string())
 620                .collect::<Vec<_>>();
 621            assert_eq!(second_path_entries, vec!["session-2"]);
 622        });
 623    }
 624
 625    #[gpui::test]
 626    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 627        cx.update(|cx| {
 628            let settings_store = settings::SettingsStore::test(cx);
 629            cx.set_global(settings_store);
 630            cx.update_flags(true, vec!["agent-v2".to_string()]);
 631            SidebarThreadMetadataStore::init_global(cx);
 632        });
 633
 634        let first_paths = PathList::new(&[Path::new("/project-a")]);
 635        let second_paths = PathList::new(&[Path::new("/project-b")]);
 636        let initial_time = Utc::now();
 637        let updated_time = initial_time + chrono::Duration::seconds(1);
 638
 639        let initial_metadata = make_metadata(
 640            "session-1",
 641            "First Thread",
 642            initial_time,
 643            first_paths.clone(),
 644        );
 645
 646        let second_metadata = make_metadata(
 647            "session-2",
 648            "Second Thread",
 649            initial_time,
 650            second_paths.clone(),
 651        );
 652
 653        cx.update(|cx| {
 654            let store = SidebarThreadMetadataStore::global(cx);
 655            store.update(cx, |store, cx| {
 656                store.save(initial_metadata, cx).detach();
 657                store.save(second_metadata, cx).detach();
 658            });
 659        });
 660
 661        cx.run_until_parked();
 662
 663        cx.update(|cx| {
 664            let store = SidebarThreadMetadataStore::global(cx);
 665            let store = store.read(cx);
 666
 667            let first_path_entries = store
 668                .entries_for_path(&first_paths)
 669                .map(|entry| entry.session_id.0.to_string())
 670                .collect::<Vec<_>>();
 671            assert_eq!(first_path_entries, vec!["session-1"]);
 672
 673            let second_path_entries = store
 674                .entries_for_path(&second_paths)
 675                .map(|entry| entry.session_id.0.to_string())
 676                .collect::<Vec<_>>();
 677            assert_eq!(second_path_entries, vec!["session-2"]);
 678        });
 679
 680        let moved_metadata = make_metadata(
 681            "session-1",
 682            "First Thread",
 683            updated_time,
 684            second_paths.clone(),
 685        );
 686
 687        cx.update(|cx| {
 688            let store = SidebarThreadMetadataStore::global(cx);
 689            store.update(cx, |store, cx| {
 690                store.save(moved_metadata, cx).detach();
 691            });
 692        });
 693
 694        cx.run_until_parked();
 695
 696        cx.update(|cx| {
 697            let store = SidebarThreadMetadataStore::global(cx);
 698            let store = store.read(cx);
 699
 700            let entry_ids = store
 701                .entry_ids()
 702                .map(|session_id| session_id.0.to_string())
 703                .collect::<Vec<_>>();
 704            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 705
 706            let first_path_entries = store
 707                .entries_for_path(&first_paths)
 708                .map(|entry| entry.session_id.0.to_string())
 709                .collect::<Vec<_>>();
 710            assert!(first_path_entries.is_empty());
 711
 712            let second_path_entries = store
 713                .entries_for_path(&second_paths)
 714                .map(|entry| entry.session_id.0.to_string())
 715                .collect::<Vec<_>>();
 716            assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
 717        });
 718
 719        cx.update(|cx| {
 720            let store = SidebarThreadMetadataStore::global(cx);
 721            store.update(cx, |store, cx| {
 722                store.delete(acp::SessionId::new("session-2"), cx).detach();
 723            });
 724        });
 725
 726        cx.run_until_parked();
 727
 728        cx.update(|cx| {
 729            let store = SidebarThreadMetadataStore::global(cx);
 730            let store = store.read(cx);
 731
 732            let entry_ids = store
 733                .entry_ids()
 734                .map(|session_id| session_id.0.to_string())
 735                .collect::<Vec<_>>();
 736            assert_eq!(entry_ids, vec!["session-1"]);
 737
 738            let second_path_entries = store
 739                .entries_for_path(&second_paths)
 740                .map(|entry| entry.session_id.0.to_string())
 741                .collect::<Vec<_>>();
 742            assert_eq!(second_path_entries, vec!["session-1"]);
 743        });
 744    }
 745
 746    #[gpui::test]
 747    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
 748        cx.update(|cx| {
 749            ThreadStore::init_global(cx);
 750            SidebarThreadMetadataStore::init_global(cx);
 751        });
 752
 753        // Verify the cache is empty before migration
 754        let list = cx.update(|cx| {
 755            let store = SidebarThreadMetadataStore::global(cx);
 756            store.read(cx).entries().collect::<Vec<_>>()
 757        });
 758        assert_eq!(list.len(), 0);
 759
 760        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 761        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 762        let now = Utc::now();
 763
 764        for index in 0..12 {
 765            let updated_at = now + chrono::Duration::seconds(index as i64);
 766            let session_id = format!("project-a-session-{index}");
 767            let title = format!("Project A Thread {index}");
 768
 769            let save_task = cx.update(|cx| {
 770                let thread_store = ThreadStore::global(cx);
 771                let session_id = session_id.clone();
 772                let title = title.clone();
 773                let project_a_paths = project_a_paths.clone();
 774                thread_store.update(cx, |store, cx| {
 775                    store.save_thread(
 776                        acp::SessionId::new(session_id),
 777                        make_db_thread(&title, updated_at),
 778                        project_a_paths,
 779                        cx,
 780                    )
 781                })
 782            });
 783            save_task.await.unwrap();
 784            cx.run_until_parked();
 785        }
 786
 787        for index in 0..3 {
 788            let updated_at = now + chrono::Duration::seconds(100 + index as i64);
 789            let session_id = format!("project-b-session-{index}");
 790            let title = format!("Project B Thread {index}");
 791
 792            let save_task = cx.update(|cx| {
 793                let thread_store = ThreadStore::global(cx);
 794                let session_id = session_id.clone();
 795                let title = title.clone();
 796                let project_b_paths = project_b_paths.clone();
 797                thread_store.update(cx, |store, cx| {
 798                    store.save_thread(
 799                        acp::SessionId::new(session_id),
 800                        make_db_thread(&title, updated_at),
 801                        project_b_paths,
 802                        cx,
 803                    )
 804                })
 805            });
 806            save_task.await.unwrap();
 807            cx.run_until_parked();
 808        }
 809
 810        let save_projectless = cx.update(|cx| {
 811            let thread_store = ThreadStore::global(cx);
 812            thread_store.update(cx, |store, cx| {
 813                store.save_thread(
 814                    acp::SessionId::new("projectless-session"),
 815                    make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
 816                    PathList::default(),
 817                    cx,
 818                )
 819            })
 820        });
 821        save_projectless.await.unwrap();
 822        cx.run_until_parked();
 823
 824        // Run migration
 825        cx.update(|cx| {
 826            migrate_thread_metadata(cx);
 827        });
 828
 829        cx.run_until_parked();
 830
 831        // Verify the metadata was migrated, limited to 10 per project, and
 832        // projectless threads were skipped.
 833        let list = cx.update(|cx| {
 834            let store = SidebarThreadMetadataStore::global(cx);
 835            store.read(cx).entries().collect::<Vec<_>>()
 836        });
 837        assert_eq!(list.len(), 13);
 838
 839        assert!(
 840            list.iter()
 841                .all(|metadata| !metadata.folder_paths.is_empty())
 842        );
 843        assert!(
 844            list.iter()
 845                .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
 846        );
 847
 848        let project_a_entries = list
 849            .iter()
 850            .filter(|metadata| metadata.folder_paths == project_a_paths)
 851            .collect::<Vec<_>>();
 852        assert_eq!(project_a_entries.len(), 10);
 853        assert_eq!(
 854            project_a_entries
 855                .iter()
 856                .map(|metadata| metadata.session_id.0.as_ref())
 857                .collect::<Vec<_>>(),
 858            vec![
 859                "project-a-session-11",
 860                "project-a-session-10",
 861                "project-a-session-9",
 862                "project-a-session-8",
 863                "project-a-session-7",
 864                "project-a-session-6",
 865                "project-a-session-5",
 866                "project-a-session-4",
 867                "project-a-session-3",
 868                "project-a-session-2",
 869            ]
 870        );
 871        assert!(
 872            project_a_entries
 873                .iter()
 874                .all(|metadata| metadata.agent_id.is_none())
 875        );
 876
 877        let project_b_entries = list
 878            .iter()
 879            .filter(|metadata| metadata.folder_paths == project_b_paths)
 880            .collect::<Vec<_>>();
 881        assert_eq!(project_b_entries.len(), 3);
 882        assert_eq!(
 883            project_b_entries
 884                .iter()
 885                .map(|metadata| metadata.session_id.0.as_ref())
 886                .collect::<Vec<_>>(),
 887            vec![
 888                "project-b-session-2",
 889                "project-b-session-1",
 890                "project-b-session-0",
 891            ]
 892        );
 893        assert!(
 894            project_b_entries
 895                .iter()
 896                .all(|metadata| metadata.agent_id.is_none())
 897        );
 898    }
 899
 900    #[gpui::test]
 901    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
 902        cx.update(|cx| {
 903            ThreadStore::init_global(cx);
 904            SidebarThreadMetadataStore::init_global(cx);
 905        });
 906
 907        // Pre-populate the metadata store with existing data
 908        let existing_metadata = ThreadMetadata {
 909            session_id: acp::SessionId::new("existing-session"),
 910            agent_id: None,
 911            title: "Existing Thread".into(),
 912            updated_at: Utc::now(),
 913            created_at: Some(Utc::now()),
 914            folder_paths: PathList::default(),
 915        };
 916
 917        cx.update(|cx| {
 918            let store = SidebarThreadMetadataStore::global(cx);
 919            store.update(cx, |store, cx| {
 920                store.save(existing_metadata, cx).detach();
 921            });
 922        });
 923
 924        cx.run_until_parked();
 925
 926        // Add an entry to native thread store that should NOT be migrated
 927        let save_task = cx.update(|cx| {
 928            let thread_store = ThreadStore::global(cx);
 929            thread_store.update(cx, |store, cx| {
 930                store.save_thread(
 931                    acp::SessionId::new("native-session"),
 932                    make_db_thread("Native Thread", Utc::now()),
 933                    PathList::default(),
 934                    cx,
 935                )
 936            })
 937        });
 938        save_task.await.unwrap();
 939        cx.run_until_parked();
 940
 941        // Run migration - should skip because metadata store is not empty
 942        cx.update(|cx| {
 943            migrate_thread_metadata(cx);
 944        });
 945
 946        cx.run_until_parked();
 947
 948        // Verify only the existing metadata is present (migration was skipped)
 949        let list = cx.update(|cx| {
 950            let store = SidebarThreadMetadataStore::global(cx);
 951            store.read(cx).entries().collect::<Vec<_>>()
 952        });
 953        assert_eq!(list.len(), 1);
 954        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
 955    }
 956
 957    #[gpui::test]
 958    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
 959        cx.update(|cx| {
 960            let settings_store = settings::SettingsStore::test(cx);
 961            cx.set_global(settings_store);
 962            cx.update_flags(true, vec!["agent-v2".to_string()]);
 963            ThreadStore::init_global(cx);
 964            SidebarThreadMetadataStore::init_global(cx);
 965        });
 966
 967        let fs = FakeFs::new(cx.executor());
 968        let project = Project::test(fs, None::<&Path>, cx).await;
 969        let connection = Rc::new(StubAgentConnection::new());
 970
 971        // Create a regular (non-subagent) AcpThread.
 972        let regular_thread = cx
 973            .update(|cx| {
 974                connection
 975                    .clone()
 976                    .new_session(project.clone(), PathList::default(), cx)
 977            })
 978            .await
 979            .unwrap();
 980
 981        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
 982
 983        // Set a title on the regular thread to trigger a save via handle_thread_update.
 984        cx.update(|cx| {
 985            regular_thread.update(cx, |thread, cx| {
 986                thread.set_title("Regular Thread".into(), cx).detach();
 987            });
 988        });
 989        cx.run_until_parked();
 990
 991        // Create a subagent AcpThread
 992        let subagent_session_id = acp::SessionId::new("subagent-session");
 993        let subagent_thread = cx.update(|cx| {
 994            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 995            cx.new(|cx| {
 996                acp_thread::AcpThread::new(
 997                    Some(regular_session_id.clone()),
 998                    Some("Subagent Thread".into()),
 999                    None,
1000                    connection.clone(),
1001                    project.clone(),
1002                    action_log,
1003                    subagent_session_id.clone(),
1004                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1005                    cx,
1006                )
1007            })
1008        });
1009
1010        // Set a title on the subagent thread to trigger handle_thread_update.
1011        cx.update(|cx| {
1012            subagent_thread.update(cx, |thread, cx| {
1013                thread
1014                    .set_title("Subagent Thread Title".into(), cx)
1015                    .detach();
1016            });
1017        });
1018        cx.run_until_parked();
1019
1020        // List all metadata from the store cache.
1021        let list = cx.update(|cx| {
1022            let store = SidebarThreadMetadataStore::global(cx);
1023            store.read(cx).entries().collect::<Vec<_>>()
1024        });
1025
1026        // The subagent thread should NOT appear in the sidebar metadata.
1027        // Only the regular thread should be listed.
1028        assert_eq!(
1029            list.len(),
1030            1,
1031            "Expected only the regular thread in sidebar metadata, \
1032             but found {} entries (subagent threads are leaking into the sidebar)",
1033            list.len(),
1034        );
1035        assert_eq!(list[0].session_id, regular_session_id);
1036        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1037    }
1038}