thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AgentSessionInfo;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::Context as _;
   7use chrono::{DateTime, Utc};
   8use collections::HashMap;
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    SidebarThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We migrate the last 10 threads per project and skip threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
  46
  47    let store = SidebarThreadMetadataStore::global(cx);
  48    let db = store.read(cx).db.clone();
  49
  50    cx.spawn(async move |cx| {
  51        if !db.is_empty()? {
  52            return Ok::<(), anyhow::Error>(());
  53        }
  54
  55        let metadata = store.read_with(cx, |_store, app| {
  56            let mut migrated_threads_per_project = HashMap::default();
  57
  58            ThreadStore::global(app)
  59                .read(app)
  60                .entries()
  61                .filter_map(|entry| {
  62                    if entry.folder_paths.is_empty() {
  63                        return None;
  64                    }
  65
  66                    let migrated_thread_count = migrated_threads_per_project
  67                        .entry(entry.folder_paths.clone())
  68                        .or_insert(0);
  69                    if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
  70                        return None;
  71                    }
  72                    *migrated_thread_count += 1;
  73
  74                    Some(ThreadMetadata {
  75                        session_id: entry.id,
  76                        agent_id: None,
  77                        title: entry.title,
  78                        updated_at: entry.updated_at,
  79                        created_at: entry.created_at,
  80                        folder_paths: entry.folder_paths,
  81                    })
  82                })
  83                .collect::<Vec<_>>()
  84        });
  85
  86        log::info!("Migrating {} thread store entries", metadata.len());
  87
  88        // Manually save each entry to the database and call reload, otherwise
  89        // we'll end up triggering lots of reloads after each save
  90        for entry in metadata {
  91            db.save(entry).await?;
  92        }
  93
  94        log::info!("Finished migrating thread store entries");
  95
  96        let _ = store.update(cx, |store, cx| store.reload(cx));
  97        Ok(())
  98    })
  99    .detach_and_log_err(cx);
 100}
 101
 102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
 103impl Global for GlobalThreadMetadataStore {}
 104
 105/// Lightweight metadata for any thread (native or ACP), enough to populate
 106/// the sidebar list and route to the correct load path when clicked.
 107#[derive(Debug, Clone, PartialEq)]
 108pub struct ThreadMetadata {
 109    pub session_id: acp::SessionId,
 110    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 111    pub agent_id: Option<AgentId>,
 112    pub title: SharedString,
 113    pub updated_at: DateTime<Utc>,
 114    pub created_at: Option<DateTime<Utc>>,
 115    pub folder_paths: PathList,
 116}
 117
 118impl ThreadMetadata {
 119    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 120        let session_id = session.session_id.clone();
 121        let title = session.title.clone().unwrap_or_default();
 122        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 123        let created_at = session.created_at.unwrap_or(updated_at);
 124        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 125        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 126            None
 127        } else {
 128            Some(agent_id)
 129        };
 130        Self {
 131            session_id,
 132            agent_id,
 133            title,
 134            updated_at,
 135            created_at: Some(created_at),
 136            folder_paths,
 137        }
 138    }
 139
 140    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
 141        let thread_ref = thread.read(cx);
 142        let session_id = thread_ref.session_id().clone();
 143        let title = thread_ref
 144            .title()
 145            .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 146        let updated_at = Utc::now();
 147
 148        let agent_id = thread_ref.connection().agent_id();
 149
 150        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 151            None
 152        } else {
 153            Some(agent_id)
 154        };
 155
 156        let folder_paths = {
 157            let project = thread_ref.project().read(cx);
 158            let paths: Vec<Arc<Path>> = project
 159                .visible_worktrees(cx)
 160                .map(|worktree| worktree.read(cx).abs_path())
 161                .collect();
 162            PathList::new(&paths)
 163        };
 164
 165        Self {
 166            session_id,
 167            agent_id,
 168            title,
 169            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 170            updated_at,
 171            folder_paths,
 172        }
 173    }
 174}
 175
 176/// The store holds all metadata needed to show threads in the sidebar.
 177/// Effectively, all threads stored in here are "non-archived".
 178///
 179/// Automatically listens to AcpThread events and updates metadata if it has changed.
 180pub struct SidebarThreadMetadataStore {
 181    db: ThreadMetadataDb,
 182    threads: Vec<ThreadMetadata>,
 183    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 184    reload_task: Option<Shared<Task<()>>>,
 185    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 186    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 187    _db_operations_task: Task<()>,
 188}
 189
 190#[derive(Debug, PartialEq)]
 191enum DbOperation {
 192    Insert(ThreadMetadata),
 193    Delete(acp::SessionId),
 194}
 195
 196impl DbOperation {
 197    fn id(&self) -> &acp::SessionId {
 198        match self {
 199            DbOperation::Insert(thread) => &thread.session_id,
 200            DbOperation::Delete(session_id) => session_id,
 201        }
 202    }
 203}
 204
 205impl SidebarThreadMetadataStore {
 206    #[cfg(not(any(test, feature = "test-support")))]
 207    pub fn init_global(cx: &mut App) {
 208        if cx.has_global::<Self>() {
 209            return;
 210        }
 211
 212        let db = ThreadMetadataDb::global(cx);
 213        let thread_store = cx.new(|cx| Self::new(db, cx));
 214        cx.set_global(GlobalThreadMetadataStore(thread_store));
 215    }
 216
 217    #[cfg(any(test, feature = "test-support"))]
 218    pub fn init_global(cx: &mut App) {
 219        let thread = std::thread::current();
 220        let test_name = thread.name().unwrap_or("unknown_test");
 221        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 222        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 223        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 224        cx.set_global(GlobalThreadMetadataStore(thread_store));
 225    }
 226
 227    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 228        cx.try_global::<GlobalThreadMetadataStore>()
 229            .map(|store| store.0.clone())
 230    }
 231
 232    pub fn global(cx: &App) -> Entity<Self> {
 233        cx.global::<GlobalThreadMetadataStore>().0.clone()
 234    }
 235
 236    pub fn is_empty(&self) -> bool {
 237        self.threads.is_empty()
 238    }
 239
 240    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 241        self.threads.iter().cloned()
 242    }
 243
 244    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 245        self.threads.iter().map(|thread| thread.session_id.clone())
 246    }
 247
 248    pub fn entries_for_path(
 249        &self,
 250        path_list: &PathList,
 251    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 252        self.threads_by_paths
 253            .get(path_list)
 254            .into_iter()
 255            .flatten()
 256            .cloned()
 257    }
 258
 259    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 260        let db = self.db.clone();
 261        self.reload_task.take();
 262
 263        let list_task = cx
 264            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 265
 266        let reload_task = cx
 267            .spawn(async move |this, cx| {
 268                let Some(rows) = list_task.await.log_err() else {
 269                    return;
 270                };
 271
 272                this.update(cx, |this, cx| {
 273                    this.threads.clear();
 274                    this.threads_by_paths.clear();
 275
 276                    for row in rows {
 277                        this.threads_by_paths
 278                            .entry(row.folder_paths.clone())
 279                            .or_default()
 280                            .push(row.clone());
 281                        this.threads.push(row);
 282                    }
 283
 284                    cx.notify();
 285                })
 286                .ok();
 287            })
 288            .shared();
 289        self.reload_task = Some(reload_task.clone());
 290        reload_task
 291    }
 292
 293    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 294        if !cx.has_flag::<AgentV2FeatureFlag>() {
 295            return;
 296        }
 297
 298        self.pending_thread_ops_tx
 299            .try_send(DbOperation::Insert(metadata))
 300            .log_err();
 301    }
 302
 303    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 304        if !cx.has_flag::<AgentV2FeatureFlag>() {
 305            return;
 306        }
 307
 308        self.pending_thread_ops_tx
 309            .try_send(DbOperation::Delete(session_id))
 310            .log_err();
 311    }
 312
 313    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 314        let weak_store = cx.weak_entity();
 315
 316        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 317            // Don't track subagent threads in the sidebar.
 318            if thread.parent_session_id().is_some() {
 319                return;
 320            }
 321
 322            let thread_entity = cx.entity();
 323
 324            cx.on_release({
 325                let weak_store = weak_store.clone();
 326                move |thread, cx| {
 327                    weak_store
 328                        .update(cx, |store, _cx| {
 329                            store.session_subscriptions.remove(thread.session_id());
 330                        })
 331                        .ok();
 332                }
 333            })
 334            .detach();
 335
 336            weak_store
 337                .update(cx, |this, cx| {
 338                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 339                    this.session_subscriptions
 340                        .insert(thread.session_id().clone(), subscription);
 341                })
 342                .ok();
 343        })
 344        .detach();
 345
 346        let (tx, rx) = smol::channel::unbounded();
 347        let _db_operations_task = cx.spawn({
 348            let db = db.clone();
 349            async move |this, cx| {
 350                while let Ok(first_update) = rx.recv().await {
 351                    let mut updates = vec![first_update];
 352                    while let Ok(update) = rx.try_recv() {
 353                        updates.push(update);
 354                    }
 355                    let updates = Self::dedup_db_operations(updates);
 356                    for operation in updates {
 357                        match operation {
 358                            DbOperation::Insert(metadata) => {
 359                                db.save(metadata).await.log_err();
 360                            }
 361                            DbOperation::Delete(session_id) => {
 362                                db.delete(session_id).await.log_err();
 363                            }
 364                        }
 365                    }
 366
 367                    this.update(cx, |this, cx| this.reload(cx)).ok();
 368                }
 369            }
 370        });
 371
 372        let mut this = Self {
 373            db,
 374            threads: Vec::new(),
 375            threads_by_paths: HashMap::default(),
 376            reload_task: None,
 377            session_subscriptions: HashMap::default(),
 378            pending_thread_ops_tx: tx,
 379            _db_operations_task,
 380        };
 381        let _ = this.reload(cx);
 382        this
 383    }
 384
 385    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 386        let mut ops = HashMap::default();
 387        for operation in operations.into_iter().rev() {
 388            if ops.contains_key(operation.id()) {
 389                continue;
 390            }
 391            ops.insert(operation.id().clone(), operation);
 392        }
 393        ops.into_values().collect()
 394    }
 395
 396    fn handle_thread_update(
 397        &mut self,
 398        thread: Entity<acp_thread::AcpThread>,
 399        event: &acp_thread::AcpThreadEvent,
 400        cx: &mut Context<Self>,
 401    ) {
 402        // Don't track subagent threads in the sidebar.
 403        if thread.read(cx).parent_session_id().is_some() {
 404            return;
 405        }
 406
 407        match event {
 408            acp_thread::AcpThreadEvent::NewEntry
 409            | acp_thread::AcpThreadEvent::TitleUpdated
 410            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 411            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 412            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 413            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 414            | acp_thread::AcpThreadEvent::Retry(_)
 415            | acp_thread::AcpThreadEvent::Stopped(_)
 416            | acp_thread::AcpThreadEvent::Error
 417            | acp_thread::AcpThreadEvent::LoadError(_)
 418            | acp_thread::AcpThreadEvent::Refusal => {
 419                let metadata = ThreadMetadata::from_thread(&thread, cx);
 420                self.save(metadata, cx);
 421            }
 422            _ => {}
 423        }
 424    }
 425}
 426
 427impl Global for SidebarThreadMetadataStore {}
 428
 429struct ThreadMetadataDb(ThreadSafeConnection);
 430
 431impl Domain for ThreadMetadataDb {
 432    const NAME: &str = stringify!(ThreadMetadataDb);
 433
 434    const MIGRATIONS: &[&str] = &[sql!(
 435        CREATE TABLE IF NOT EXISTS sidebar_threads(
 436            session_id TEXT PRIMARY KEY,
 437            agent_id TEXT,
 438            title TEXT NOT NULL,
 439            updated_at TEXT NOT NULL,
 440            created_at TEXT,
 441            folder_paths TEXT,
 442            folder_paths_order TEXT
 443        ) STRICT;
 444    )];
 445}
 446
 447db::static_connection!(ThreadMetadataDb, []);
 448
 449impl ThreadMetadataDb {
 450    pub fn is_empty(&self) -> anyhow::Result<bool> {
 451        self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
 452            .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
 453    }
 454
 455    /// List all sidebar thread metadata, ordered by updated_at descending.
 456    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 457        self.select::<ThreadMetadata>(
 458            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
 459             FROM sidebar_threads \
 460             ORDER BY updated_at DESC"
 461        )?()
 462    }
 463
 464    /// Upsert metadata for a thread.
 465    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 466        let id = row.session_id.0.clone();
 467        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
 468        let title = row.title.to_string();
 469        let updated_at = row.updated_at.to_rfc3339();
 470        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 471        let serialized = row.folder_paths.serialize();
 472        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 473            (None, None)
 474        } else {
 475            (Some(serialized.paths), Some(serialized.order))
 476        };
 477
 478        self.write(move |conn| {
 479            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
 480                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
 481                       ON CONFLICT(session_id) DO UPDATE SET \
 482                           agent_id = excluded.agent_id, \
 483                           title = excluded.title, \
 484                           updated_at = excluded.updated_at, \
 485                           folder_paths = excluded.folder_paths, \
 486                           folder_paths_order = excluded.folder_paths_order";
 487            let mut stmt = Statement::prepare(conn, sql)?;
 488            let mut i = stmt.bind(&id, 1)?;
 489            i = stmt.bind(&agent_id, i)?;
 490            i = stmt.bind(&title, i)?;
 491            i = stmt.bind(&updated_at, i)?;
 492            i = stmt.bind(&created_at, i)?;
 493            i = stmt.bind(&folder_paths, i)?;
 494            stmt.bind(&folder_paths_order, i)?;
 495            stmt.exec()
 496        })
 497        .await
 498    }
 499
 500    /// Delete metadata for a single thread.
 501    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 502        let id = session_id.0.clone();
 503        self.write(move |conn| {
 504            let mut stmt =
 505                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 506            stmt.bind(&id, 1)?;
 507            stmt.exec()
 508        })
 509        .await
 510    }
 511}
 512
 513impl Column for ThreadMetadata {
 514    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 515        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 516        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 517        let (title, next): (String, i32) = Column::column(statement, next)?;
 518        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 519        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 520        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 521        let (folder_paths_order_str, next): (Option<String>, i32) =
 522            Column::column(statement, next)?;
 523
 524        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 525        let created_at = created_at_str
 526            .as_deref()
 527            .map(DateTime::parse_from_rfc3339)
 528            .transpose()?
 529            .map(|dt| dt.with_timezone(&Utc));
 530
 531        let folder_paths = folder_paths_str
 532            .map(|paths| {
 533                PathList::deserialize(&util::path_list::SerializedPathList {
 534                    paths,
 535                    order: folder_paths_order_str.unwrap_or_default(),
 536                })
 537            })
 538            .unwrap_or_default();
 539
 540        Ok((
 541            ThreadMetadata {
 542                session_id: acp::SessionId::new(id),
 543                agent_id: agent_id.map(|id| AgentId::new(id)),
 544                title: title.into(),
 545                updated_at,
 546                created_at,
 547                folder_paths,
 548            },
 549            next,
 550        ))
 551    }
 552}
 553
 554#[cfg(test)]
 555mod tests {
 556    use super::*;
 557    use acp_thread::{AgentConnection, StubAgentConnection};
 558    use action_log::ActionLog;
 559    use agent::DbThread;
 560    use agent_client_protocol as acp;
 561    use feature_flags::FeatureFlagAppExt;
 562    use gpui::TestAppContext;
 563    use project::FakeFs;
 564    use project::Project;
 565    use std::path::Path;
 566    use std::rc::Rc;
 567
 568    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 569        DbThread {
 570            title: title.to_string().into(),
 571            messages: Vec::new(),
 572            updated_at,
 573            detailed_summary: None,
 574            initial_project_snapshot: None,
 575            cumulative_token_usage: Default::default(),
 576            request_token_usage: Default::default(),
 577            model: None,
 578            profile: None,
 579            imported: false,
 580            subagent_context: None,
 581            speed: None,
 582            thinking_enabled: false,
 583            thinking_effort: None,
 584            draft_prompt: None,
 585            ui_scroll_position: None,
 586        }
 587    }
 588
 589    fn make_metadata(
 590        session_id: &str,
 591        title: &str,
 592        updated_at: DateTime<Utc>,
 593        folder_paths: PathList,
 594    ) -> ThreadMetadata {
 595        ThreadMetadata {
 596            session_id: acp::SessionId::new(session_id),
 597            agent_id: None,
 598            title: title.to_string().into(),
 599            updated_at,
 600            created_at: Some(updated_at),
 601            folder_paths,
 602        }
 603    }
 604
 605    #[gpui::test]
 606    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 607        let first_paths = PathList::new(&[Path::new("/project-a")]);
 608        let second_paths = PathList::new(&[Path::new("/project-b")]);
 609        let now = Utc::now();
 610        let older = now - chrono::Duration::seconds(1);
 611
 612        let thread = std::thread::current();
 613        let test_name = thread.name().unwrap_or("unknown_test");
 614        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 615        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 616            &db_name,
 617        )));
 618
 619        db.save(make_metadata(
 620            "session-1",
 621            "First Thread",
 622            now,
 623            first_paths.clone(),
 624        ))
 625        .await
 626        .unwrap();
 627        db.save(make_metadata(
 628            "session-2",
 629            "Second Thread",
 630            older,
 631            second_paths.clone(),
 632        ))
 633        .await
 634        .unwrap();
 635
 636        cx.update(|cx| {
 637            let settings_store = settings::SettingsStore::test(cx);
 638            cx.set_global(settings_store);
 639            cx.update_flags(true, vec!["agent-v2".to_string()]);
 640            SidebarThreadMetadataStore::init_global(cx);
 641        });
 642
 643        cx.run_until_parked();
 644
 645        cx.update(|cx| {
 646            let store = SidebarThreadMetadataStore::global(cx);
 647            let store = store.read(cx);
 648
 649            let entry_ids = store
 650                .entry_ids()
 651                .map(|session_id| session_id.0.to_string())
 652                .collect::<Vec<_>>();
 653            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 654
 655            let first_path_entries = store
 656                .entries_for_path(&first_paths)
 657                .map(|entry| entry.session_id.0.to_string())
 658                .collect::<Vec<_>>();
 659            assert_eq!(first_path_entries, vec!["session-1"]);
 660
 661            let second_path_entries = store
 662                .entries_for_path(&second_paths)
 663                .map(|entry| entry.session_id.0.to_string())
 664                .collect::<Vec<_>>();
 665            assert_eq!(second_path_entries, vec!["session-2"]);
 666        });
 667    }
 668
 669    #[gpui::test]
 670    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 671        cx.update(|cx| {
 672            let settings_store = settings::SettingsStore::test(cx);
 673            cx.set_global(settings_store);
 674            cx.update_flags(true, vec!["agent-v2".to_string()]);
 675            SidebarThreadMetadataStore::init_global(cx);
 676        });
 677
 678        let first_paths = PathList::new(&[Path::new("/project-a")]);
 679        let second_paths = PathList::new(&[Path::new("/project-b")]);
 680        let initial_time = Utc::now();
 681        let updated_time = initial_time + chrono::Duration::seconds(1);
 682
 683        let initial_metadata = make_metadata(
 684            "session-1",
 685            "First Thread",
 686            initial_time,
 687            first_paths.clone(),
 688        );
 689
 690        let second_metadata = make_metadata(
 691            "session-2",
 692            "Second Thread",
 693            initial_time,
 694            second_paths.clone(),
 695        );
 696
 697        cx.update(|cx| {
 698            let store = SidebarThreadMetadataStore::global(cx);
 699            store.update(cx, |store, cx| {
 700                store.save(initial_metadata, cx);
 701                store.save(second_metadata, cx);
 702            });
 703        });
 704
 705        cx.run_until_parked();
 706
 707        cx.update(|cx| {
 708            let store = SidebarThreadMetadataStore::global(cx);
 709            let store = store.read(cx);
 710
 711            let first_path_entries = store
 712                .entries_for_path(&first_paths)
 713                .map(|entry| entry.session_id.0.to_string())
 714                .collect::<Vec<_>>();
 715            assert_eq!(first_path_entries, vec!["session-1"]);
 716
 717            let second_path_entries = store
 718                .entries_for_path(&second_paths)
 719                .map(|entry| entry.session_id.0.to_string())
 720                .collect::<Vec<_>>();
 721            assert_eq!(second_path_entries, vec!["session-2"]);
 722        });
 723
 724        let moved_metadata = make_metadata(
 725            "session-1",
 726            "First Thread",
 727            updated_time,
 728            second_paths.clone(),
 729        );
 730
 731        cx.update(|cx| {
 732            let store = SidebarThreadMetadataStore::global(cx);
 733            store.update(cx, |store, cx| {
 734                store.save(moved_metadata, cx);
 735            });
 736        });
 737
 738        cx.run_until_parked();
 739
 740        cx.update(|cx| {
 741            let store = SidebarThreadMetadataStore::global(cx);
 742            let store = store.read(cx);
 743
 744            let entry_ids = store
 745                .entry_ids()
 746                .map(|session_id| session_id.0.to_string())
 747                .collect::<Vec<_>>();
 748            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 749
 750            let first_path_entries = store
 751                .entries_for_path(&first_paths)
 752                .map(|entry| entry.session_id.0.to_string())
 753                .collect::<Vec<_>>();
 754            assert!(first_path_entries.is_empty());
 755
 756            let second_path_entries = store
 757                .entries_for_path(&second_paths)
 758                .map(|entry| entry.session_id.0.to_string())
 759                .collect::<Vec<_>>();
 760            assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
 761        });
 762
 763        cx.update(|cx| {
 764            let store = SidebarThreadMetadataStore::global(cx);
 765            store.update(cx, |store, cx| {
 766                store.delete(acp::SessionId::new("session-2"), cx);
 767            });
 768        });
 769
 770        cx.run_until_parked();
 771
 772        cx.update(|cx| {
 773            let store = SidebarThreadMetadataStore::global(cx);
 774            let store = store.read(cx);
 775
 776            let entry_ids = store
 777                .entry_ids()
 778                .map(|session_id| session_id.0.to_string())
 779                .collect::<Vec<_>>();
 780            assert_eq!(entry_ids, vec!["session-1"]);
 781
 782            let second_path_entries = store
 783                .entries_for_path(&second_paths)
 784                .map(|entry| entry.session_id.0.to_string())
 785                .collect::<Vec<_>>();
 786            assert_eq!(second_path_entries, vec!["session-1"]);
 787        });
 788    }
 789
 790    #[gpui::test]
 791    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
 792        cx.update(|cx| {
 793            ThreadStore::init_global(cx);
 794            SidebarThreadMetadataStore::init_global(cx);
 795        });
 796
 797        // Verify the cache is empty before migration
 798        let list = cx.update(|cx| {
 799            let store = SidebarThreadMetadataStore::global(cx);
 800            store.read(cx).entries().collect::<Vec<_>>()
 801        });
 802        assert_eq!(list.len(), 0);
 803
 804        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 805        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 806        let now = Utc::now();
 807
 808        for index in 0..12 {
 809            let updated_at = now + chrono::Duration::seconds(index as i64);
 810            let session_id = format!("project-a-session-{index}");
 811            let title = format!("Project A Thread {index}");
 812
 813            let save_task = cx.update(|cx| {
 814                let thread_store = ThreadStore::global(cx);
 815                let session_id = session_id.clone();
 816                let title = title.clone();
 817                let project_a_paths = project_a_paths.clone();
 818                thread_store.update(cx, |store, cx| {
 819                    store.save_thread(
 820                        acp::SessionId::new(session_id),
 821                        make_db_thread(&title, updated_at),
 822                        project_a_paths,
 823                        cx,
 824                    )
 825                })
 826            });
 827            save_task.await.unwrap();
 828            cx.run_until_parked();
 829        }
 830
 831        for index in 0..3 {
 832            let updated_at = now + chrono::Duration::seconds(100 + index as i64);
 833            let session_id = format!("project-b-session-{index}");
 834            let title = format!("Project B Thread {index}");
 835
 836            let save_task = cx.update(|cx| {
 837                let thread_store = ThreadStore::global(cx);
 838                let session_id = session_id.clone();
 839                let title = title.clone();
 840                let project_b_paths = project_b_paths.clone();
 841                thread_store.update(cx, |store, cx| {
 842                    store.save_thread(
 843                        acp::SessionId::new(session_id),
 844                        make_db_thread(&title, updated_at),
 845                        project_b_paths,
 846                        cx,
 847                    )
 848                })
 849            });
 850            save_task.await.unwrap();
 851            cx.run_until_parked();
 852        }
 853
 854        let save_projectless = cx.update(|cx| {
 855            let thread_store = ThreadStore::global(cx);
 856            thread_store.update(cx, |store, cx| {
 857                store.save_thread(
 858                    acp::SessionId::new("projectless-session"),
 859                    make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
 860                    PathList::default(),
 861                    cx,
 862                )
 863            })
 864        });
 865        save_projectless.await.unwrap();
 866        cx.run_until_parked();
 867
 868        // Run migration
 869        cx.update(|cx| {
 870            migrate_thread_metadata(cx);
 871        });
 872
 873        cx.run_until_parked();
 874
 875        // Verify the metadata was migrated, limited to 10 per project, and
 876        // projectless threads were skipped.
 877        let list = cx.update(|cx| {
 878            let store = SidebarThreadMetadataStore::global(cx);
 879            store.read(cx).entries().collect::<Vec<_>>()
 880        });
 881        assert_eq!(list.len(), 13);
 882
 883        assert!(
 884            list.iter()
 885                .all(|metadata| !metadata.folder_paths.is_empty())
 886        );
 887        assert!(
 888            list.iter()
 889                .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
 890        );
 891
 892        let project_a_entries = list
 893            .iter()
 894            .filter(|metadata| metadata.folder_paths == project_a_paths)
 895            .collect::<Vec<_>>();
 896        assert_eq!(project_a_entries.len(), 10);
 897        assert_eq!(
 898            project_a_entries
 899                .iter()
 900                .map(|metadata| metadata.session_id.0.as_ref())
 901                .collect::<Vec<_>>(),
 902            vec![
 903                "project-a-session-11",
 904                "project-a-session-10",
 905                "project-a-session-9",
 906                "project-a-session-8",
 907                "project-a-session-7",
 908                "project-a-session-6",
 909                "project-a-session-5",
 910                "project-a-session-4",
 911                "project-a-session-3",
 912                "project-a-session-2",
 913            ]
 914        );
 915        assert!(
 916            project_a_entries
 917                .iter()
 918                .all(|metadata| metadata.agent_id.is_none())
 919        );
 920
 921        let project_b_entries = list
 922            .iter()
 923            .filter(|metadata| metadata.folder_paths == project_b_paths)
 924            .collect::<Vec<_>>();
 925        assert_eq!(project_b_entries.len(), 3);
 926        assert_eq!(
 927            project_b_entries
 928                .iter()
 929                .map(|metadata| metadata.session_id.0.as_ref())
 930                .collect::<Vec<_>>(),
 931            vec![
 932                "project-b-session-2",
 933                "project-b-session-1",
 934                "project-b-session-0",
 935            ]
 936        );
 937        assert!(
 938            project_b_entries
 939                .iter()
 940                .all(|metadata| metadata.agent_id.is_none())
 941        );
 942    }
 943
 944    #[gpui::test]
 945    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
 946        cx.update(|cx| {
 947            ThreadStore::init_global(cx);
 948            SidebarThreadMetadataStore::init_global(cx);
 949        });
 950
 951        // Pre-populate the metadata store with existing data
 952        let existing_metadata = ThreadMetadata {
 953            session_id: acp::SessionId::new("existing-session"),
 954            agent_id: None,
 955            title: "Existing Thread".into(),
 956            updated_at: Utc::now(),
 957            created_at: Some(Utc::now()),
 958            folder_paths: PathList::default(),
 959        };
 960
 961        cx.update(|cx| {
 962            let store = SidebarThreadMetadataStore::global(cx);
 963            store.update(cx, |store, cx| {
 964                store.save(existing_metadata, cx);
 965            });
 966        });
 967
 968        cx.run_until_parked();
 969
 970        // Add an entry to native thread store that should NOT be migrated
 971        let save_task = cx.update(|cx| {
 972            let thread_store = ThreadStore::global(cx);
 973            thread_store.update(cx, |store, cx| {
 974                store.save_thread(
 975                    acp::SessionId::new("native-session"),
 976                    make_db_thread("Native Thread", Utc::now()),
 977                    PathList::default(),
 978                    cx,
 979                )
 980            })
 981        });
 982        save_task.await.unwrap();
 983        cx.run_until_parked();
 984
 985        // Run migration - should skip because metadata store is not empty
 986        cx.update(|cx| {
 987            migrate_thread_metadata(cx);
 988        });
 989
 990        cx.run_until_parked();
 991
 992        // Verify only the existing metadata is present (migration was skipped)
 993        let list = cx.update(|cx| {
 994            let store = SidebarThreadMetadataStore::global(cx);
 995            store.read(cx).entries().collect::<Vec<_>>()
 996        });
 997        assert_eq!(list.len(), 1);
 998        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
 999    }
1000
1001    #[gpui::test]
1002    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1003        cx.update(|cx| {
1004            let settings_store = settings::SettingsStore::test(cx);
1005            cx.set_global(settings_store);
1006            cx.update_flags(true, vec!["agent-v2".to_string()]);
1007            ThreadStore::init_global(cx);
1008            SidebarThreadMetadataStore::init_global(cx);
1009        });
1010
1011        let fs = FakeFs::new(cx.executor());
1012        let project = Project::test(fs, None::<&Path>, cx).await;
1013        let connection = Rc::new(StubAgentConnection::new());
1014
1015        // Create a regular (non-subagent) AcpThread.
1016        let regular_thread = cx
1017            .update(|cx| {
1018                connection
1019                    .clone()
1020                    .new_session(project.clone(), PathList::default(), cx)
1021            })
1022            .await
1023            .unwrap();
1024
1025        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1026
1027        // Set a title on the regular thread to trigger a save via handle_thread_update.
1028        cx.update(|cx| {
1029            regular_thread.update(cx, |thread, cx| {
1030                thread.set_title("Regular Thread".into(), cx).detach();
1031            });
1032        });
1033        cx.run_until_parked();
1034
1035        // Create a subagent AcpThread
1036        let subagent_session_id = acp::SessionId::new("subagent-session");
1037        let subagent_thread = cx.update(|cx| {
1038            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1039            cx.new(|cx| {
1040                acp_thread::AcpThread::new(
1041                    Some(regular_session_id.clone()),
1042                    Some("Subagent Thread".into()),
1043                    None,
1044                    connection.clone(),
1045                    project.clone(),
1046                    action_log,
1047                    subagent_session_id.clone(),
1048                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1049                    cx,
1050                )
1051            })
1052        });
1053
1054        // Set a title on the subagent thread to trigger handle_thread_update.
1055        cx.update(|cx| {
1056            subagent_thread.update(cx, |thread, cx| {
1057                thread
1058                    .set_title("Subagent Thread Title".into(), cx)
1059                    .detach();
1060            });
1061        });
1062        cx.run_until_parked();
1063
1064        // List all metadata from the store cache.
1065        let list = cx.update(|cx| {
1066            let store = SidebarThreadMetadataStore::global(cx);
1067            store.read(cx).entries().collect::<Vec<_>>()
1068        });
1069
1070        // The subagent thread should NOT appear in the sidebar metadata.
1071        // Only the regular thread should be listed.
1072        assert_eq!(
1073            list.len(),
1074            1,
1075            "Expected only the regular thread in sidebar metadata, \
1076             but found {} entries (subagent threads are leaking into the sidebar)",
1077            list.len(),
1078        );
1079        assert_eq!(list[0].session_id, regular_session_id);
1080        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1081    }
1082
1083    #[test]
1084    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1085        let now = Utc::now();
1086
1087        let operations = vec![
1088            DbOperation::Insert(make_metadata(
1089                "session-1",
1090                "First Thread",
1091                now,
1092                PathList::default(),
1093            )),
1094            DbOperation::Delete(acp::SessionId::new("session-1")),
1095        ];
1096
1097        let deduped = SidebarThreadMetadataStore::dedup_db_operations(operations);
1098
1099        assert_eq!(deduped.len(), 1);
1100        assert_eq!(
1101            deduped[0],
1102            DbOperation::Delete(acp::SessionId::new("session-1"))
1103        );
1104    }
1105
1106    #[test]
1107    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1108        let now = Utc::now();
1109        let later = now + chrono::Duration::seconds(1);
1110
1111        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1112        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1113
1114        let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1115            DbOperation::Insert(old_metadata),
1116            DbOperation::Insert(new_metadata.clone()),
1117        ]);
1118
1119        assert_eq!(deduped.len(), 1);
1120        assert_eq!(deduped[0], DbOperation::Insert(new_metadata));
1121    }
1122
1123    #[test]
1124    fn test_dedup_db_operations_preserves_distinct_sessions() {
1125        let now = Utc::now();
1126
1127        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1128        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1129        let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1130            DbOperation::Insert(metadata1.clone()),
1131            DbOperation::Insert(metadata2.clone()),
1132        ]);
1133
1134        assert_eq!(deduped.len(), 2);
1135        assert!(deduped.contains(&DbOperation::Insert(metadata1)));
1136        assert!(deduped.contains(&DbOperation::Insert(metadata2)));
1137    }
1138}