thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AcpThreadEvent;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::Context as _;
   7use chrono::{DateTime, Utc};
   8use collections::{HashMap, HashSet};
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    ThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We skip migrating threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    let store = ThreadMetadataStore::global(cx);
  46    let db = store.read(cx).db.clone();
  47
  48    cx.spawn(async move |cx| {
  49        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  50
  51        let is_first_migration = existing_entries.is_empty();
  52
  53        let mut to_migrate = store.read_with(cx, |_store, cx| {
  54            ThreadStore::global(cx)
  55                .read(cx)
  56                .entries()
  57                .filter_map(|entry| {
  58                    if existing_entries.contains(&entry.id.0) {
  59                        return None;
  60                    }
  61
  62                    Some(ThreadMetadata {
  63                        session_id: entry.id,
  64                        agent_id: ZED_AGENT_ID.clone(),
  65                        title: entry.title,
  66                        updated_at: entry.updated_at,
  67                        created_at: entry.created_at,
  68                        folder_paths: entry.folder_paths,
  69                        main_worktree_paths: PathList::default(),
  70                        archived: true,
  71                    })
  72                })
  73                .collect::<Vec<_>>()
  74        });
  75
  76        if to_migrate.is_empty() {
  77            return anyhow::Ok(());
  78        }
  79
  80        // On the first migration (no entries in DB yet), keep the 5 most
  81        // recent threads per project unarchived.
  82        if is_first_migration {
  83            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  84            for entry in &mut to_migrate {
  85                if entry.folder_paths.is_empty() {
  86                    continue;
  87                }
  88                per_project
  89                    .entry(entry.folder_paths.clone())
  90                    .or_default()
  91                    .push(entry);
  92            }
  93            for entries in per_project.values_mut() {
  94                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  95                for entry in entries.iter_mut().take(5) {
  96                    entry.archived = false;
  97                }
  98            }
  99        }
 100
 101        log::info!("Migrating {} thread store entries", to_migrate.len());
 102
 103        // Manually save each entry to the database and call reload, otherwise
 104        // we'll end up triggering lots of reloads after each save
 105        for entry in to_migrate {
 106            db.save(entry).await?;
 107        }
 108
 109        log::info!("Finished migrating thread store entries");
 110
 111        let _ = store.update(cx, |store, cx| store.reload(cx));
 112        anyhow::Ok(())
 113    })
 114    .detach_and_log_err(cx);
 115}
 116
 117struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 118impl Global for GlobalThreadMetadataStore {}
 119
 120/// Lightweight metadata for any thread (native or ACP), enough to populate
 121/// the sidebar list and route to the correct load path when clicked.
 122#[derive(Debug, Clone, PartialEq)]
 123pub struct ThreadMetadata {
 124    pub session_id: acp::SessionId,
 125    pub agent_id: AgentId,
 126    pub title: SharedString,
 127    pub updated_at: DateTime<Utc>,
 128    pub created_at: Option<DateTime<Utc>>,
 129    pub folder_paths: PathList,
 130    pub main_worktree_paths: PathList,
 131    pub archived: bool,
 132}
 133
 134impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 135    fn from(meta: &ThreadMetadata) -> Self {
 136        Self {
 137            session_id: meta.session_id.clone(),
 138            work_dirs: Some(meta.folder_paths.clone()),
 139            title: Some(meta.title.clone()),
 140            updated_at: Some(meta.updated_at),
 141            created_at: meta.created_at,
 142            meta: None,
 143        }
 144    }
 145}
 146
 147/// The store holds all metadata needed to show threads in the sidebar/the archive.
 148///
 149/// Automatically listens to AcpThread events and updates metadata if it has changed.
 150pub struct ThreadMetadataStore {
 151    db: ThreadMetadataDb,
 152    threads: HashMap<acp::SessionId, ThreadMetadata>,
 153    threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 154    threads_by_main_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 155    reload_task: Option<Shared<Task<()>>>,
 156    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 157    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 158    _db_operations_task: Task<()>,
 159}
 160
 161#[derive(Debug, PartialEq)]
 162enum DbOperation {
 163    Upsert(ThreadMetadata),
 164    Delete(acp::SessionId),
 165}
 166
 167impl DbOperation {
 168    fn id(&self) -> &acp::SessionId {
 169        match self {
 170            DbOperation::Upsert(thread) => &thread.session_id,
 171            DbOperation::Delete(session_id) => session_id,
 172        }
 173    }
 174}
 175
 176impl ThreadMetadataStore {
 177    #[cfg(not(any(test, feature = "test-support")))]
 178    pub fn init_global(cx: &mut App) {
 179        if cx.has_global::<Self>() {
 180            return;
 181        }
 182
 183        let db = ThreadMetadataDb::global(cx);
 184        let thread_store = cx.new(|cx| Self::new(db, cx));
 185        cx.set_global(GlobalThreadMetadataStore(thread_store));
 186    }
 187
 188    #[cfg(any(test, feature = "test-support"))]
 189    pub fn init_global(cx: &mut App) {
 190        let thread = std::thread::current();
 191        let test_name = thread.name().unwrap_or("unknown_test");
 192        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 193        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 194        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 195        cx.set_global(GlobalThreadMetadataStore(thread_store));
 196    }
 197
 198    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 199        cx.try_global::<GlobalThreadMetadataStore>()
 200            .map(|store| store.0.clone())
 201    }
 202
 203    pub fn global(cx: &App) -> Entity<Self> {
 204        cx.global::<GlobalThreadMetadataStore>().0.clone()
 205    }
 206
 207    pub fn is_empty(&self) -> bool {
 208        self.threads.is_empty()
 209    }
 210
 211    /// Returns all thread IDs.
 212    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 213        self.threads.keys().cloned()
 214    }
 215
 216    /// Returns the metadata for a specific thread, if it exists.
 217    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 218        self.threads.get(session_id)
 219    }
 220
 221    /// Returns all threads.
 222    pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 223        self.threads.values()
 224    }
 225
 226    /// Returns all archived threads.
 227    pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 228        self.entries().filter(|t| t.archived)
 229    }
 230
 231    /// Returns all threads for the given path list, excluding archived threads.
 232    pub fn entries_for_path(
 233        &self,
 234        path_list: &PathList,
 235    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 236        self.threads_by_paths
 237            .get(path_list)
 238            .into_iter()
 239            .flatten()
 240            .filter_map(|s| self.threads.get(s))
 241            .filter(|s| !s.archived)
 242    }
 243
 244    /// Returns threads whose `main_worktree_paths` matches the given path list,
 245    /// excluding archived threads. This finds threads that were opened in a
 246    /// linked worktree but are associated with the given main worktree.
 247    pub fn entries_for_main_worktree_path(
 248        &self,
 249        path_list: &PathList,
 250    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 251        self.threads_by_main_paths
 252            .get(path_list)
 253            .into_iter()
 254            .flatten()
 255            .filter_map(|s| self.threads.get(s))
 256            .filter(|s| !s.archived)
 257    }
 258
 259    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 260        let db = self.db.clone();
 261        self.reload_task.take();
 262
 263        let list_task = cx
 264            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 265
 266        let reload_task = cx
 267            .spawn(async move |this, cx| {
 268                let Some(rows) = list_task.await.log_err() else {
 269                    return;
 270                };
 271
 272                this.update(cx, |this, cx| {
 273                    this.threads.clear();
 274                    this.threads_by_paths.clear();
 275                    this.threads_by_main_paths.clear();
 276
 277                    for row in rows {
 278                        this.threads_by_paths
 279                            .entry(row.folder_paths.clone())
 280                            .or_default()
 281                            .insert(row.session_id.clone());
 282                        if !row.main_worktree_paths.is_empty() {
 283                            this.threads_by_main_paths
 284                                .entry(row.main_worktree_paths.clone())
 285                                .or_default()
 286                                .insert(row.session_id.clone());
 287                        }
 288                        this.threads.insert(row.session_id.clone(), row);
 289                    }
 290
 291                    cx.notify();
 292                })
 293                .ok();
 294            })
 295            .shared();
 296        self.reload_task = Some(reload_task.clone());
 297        reload_task
 298    }
 299
 300    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 301        if !cx.has_flag::<AgentV2FeatureFlag>() {
 302            return;
 303        }
 304
 305        for metadata in metadata {
 306            self.save_internal(metadata);
 307        }
 308        cx.notify();
 309    }
 310
 311    #[cfg(any(test, feature = "test-support"))]
 312    pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 313        self.save(metadata, cx)
 314    }
 315
 316    fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 317        if !cx.has_flag::<AgentV2FeatureFlag>() {
 318            return;
 319        }
 320
 321        self.save_internal(metadata);
 322        cx.notify();
 323    }
 324
 325    fn save_internal(&mut self, metadata: ThreadMetadata) {
 326        if let Some(thread) = self.threads.get(&metadata.session_id) {
 327            if thread.folder_paths != metadata.folder_paths {
 328                if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
 329                    session_ids.remove(&metadata.session_id);
 330                }
 331            }
 332            if thread.main_worktree_paths != metadata.main_worktree_paths
 333                && !thread.main_worktree_paths.is_empty()
 334            {
 335                if let Some(session_ids) = self
 336                    .threads_by_main_paths
 337                    .get_mut(&thread.main_worktree_paths)
 338                {
 339                    session_ids.remove(&metadata.session_id);
 340                }
 341            }
 342        }
 343
 344        self.threads
 345            .insert(metadata.session_id.clone(), metadata.clone());
 346
 347        self.threads_by_paths
 348            .entry(metadata.folder_paths.clone())
 349            .or_default()
 350            .insert(metadata.session_id.clone());
 351
 352        if !metadata.main_worktree_paths.is_empty() {
 353            self.threads_by_main_paths
 354                .entry(metadata.main_worktree_paths.clone())
 355                .or_default()
 356                .insert(metadata.session_id.clone());
 357        }
 358
 359        self.pending_thread_ops_tx
 360            .try_send(DbOperation::Upsert(metadata))
 361            .log_err();
 362    }
 363
 364    pub fn update_working_directories(
 365        &mut self,
 366        session_id: &acp::SessionId,
 367        work_dirs: PathList,
 368        cx: &mut Context<Self>,
 369    ) {
 370        if !cx.has_flag::<AgentV2FeatureFlag>() {
 371            return;
 372        }
 373
 374        if let Some(thread) = self.threads.get(session_id) {
 375            self.save_internal(ThreadMetadata {
 376                folder_paths: work_dirs,
 377                ..thread.clone()
 378            });
 379            cx.notify();
 380        }
 381    }
 382
 383    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 384        self.update_archived(session_id, true, cx);
 385    }
 386
 387    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 388        self.update_archived(session_id, false, cx);
 389    }
 390
 391    fn update_archived(
 392        &mut self,
 393        session_id: &acp::SessionId,
 394        archived: bool,
 395        cx: &mut Context<Self>,
 396    ) {
 397        if !cx.has_flag::<AgentV2FeatureFlag>() {
 398            return;
 399        }
 400
 401        if let Some(thread) = self.threads.get(session_id) {
 402            self.save_internal(ThreadMetadata {
 403                archived,
 404                ..thread.clone()
 405            });
 406            cx.notify();
 407        }
 408    }
 409
 410    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 411        if !cx.has_flag::<AgentV2FeatureFlag>() {
 412            return;
 413        }
 414
 415        if let Some(thread) = self.threads.get(&session_id) {
 416            if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
 417                session_ids.remove(&session_id);
 418            }
 419            if !thread.main_worktree_paths.is_empty() {
 420                if let Some(session_ids) = self
 421                    .threads_by_main_paths
 422                    .get_mut(&thread.main_worktree_paths)
 423                {
 424                    session_ids.remove(&session_id);
 425                }
 426            }
 427        }
 428        self.threads.remove(&session_id);
 429        self.pending_thread_ops_tx
 430            .try_send(DbOperation::Delete(session_id))
 431            .log_err();
 432        cx.notify();
 433    }
 434
 435    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 436        let weak_store = cx.weak_entity();
 437
 438        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 439            // Don't track subagent threads in the sidebar.
 440            if thread.parent_session_id().is_some() {
 441                return;
 442            }
 443
 444            let thread_entity = cx.entity();
 445
 446            cx.on_release({
 447                let weak_store = weak_store.clone();
 448                move |thread, cx| {
 449                    weak_store
 450                        .update(cx, |store, _cx| {
 451                            let session_id = thread.session_id().clone();
 452                            store.session_subscriptions.remove(&session_id);
 453                        })
 454                        .ok();
 455                }
 456            })
 457            .detach();
 458
 459            weak_store
 460                .update(cx, |this, cx| {
 461                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
 462                    this.session_subscriptions
 463                        .insert(thread.session_id().clone(), subscription);
 464                })
 465                .ok();
 466        })
 467        .detach();
 468
 469        let (tx, rx) = smol::channel::unbounded();
 470        let _db_operations_task = cx.background_spawn({
 471            let db = db.clone();
 472            async move {
 473                while let Ok(first_update) = rx.recv().await {
 474                    let mut updates = vec![first_update];
 475                    while let Ok(update) = rx.try_recv() {
 476                        updates.push(update);
 477                    }
 478                    let updates = Self::dedup_db_operations(updates);
 479                    for operation in updates {
 480                        match operation {
 481                            DbOperation::Upsert(metadata) => {
 482                                db.save(metadata).await.log_err();
 483                            }
 484                            DbOperation::Delete(session_id) => {
 485                                db.delete(session_id).await.log_err();
 486                            }
 487                        }
 488                    }
 489                }
 490            }
 491        });
 492
 493        let mut this = Self {
 494            db,
 495            threads: HashMap::default(),
 496            threads_by_paths: HashMap::default(),
 497            threads_by_main_paths: HashMap::default(),
 498            reload_task: None,
 499            session_subscriptions: HashMap::default(),
 500            pending_thread_ops_tx: tx,
 501            _db_operations_task,
 502        };
 503        let _ = this.reload(cx);
 504        this
 505    }
 506
 507    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 508        let mut ops = HashMap::default();
 509        for operation in operations.into_iter().rev() {
 510            if ops.contains_key(operation.id()) {
 511                continue;
 512            }
 513            ops.insert(operation.id().clone(), operation);
 514        }
 515        ops.into_values().collect()
 516    }
 517
 518    fn handle_thread_event(
 519        &mut self,
 520        thread: Entity<acp_thread::AcpThread>,
 521        event: &AcpThreadEvent,
 522        cx: &mut Context<Self>,
 523    ) {
 524        // Don't track subagent threads in the sidebar.
 525        if thread.read(cx).parent_session_id().is_some() {
 526            return;
 527        }
 528
 529        match event {
 530            AcpThreadEvent::NewEntry
 531            | AcpThreadEvent::TitleUpdated
 532            | AcpThreadEvent::EntryUpdated(_)
 533            | AcpThreadEvent::EntriesRemoved(_)
 534            | AcpThreadEvent::ToolAuthorizationRequested(_)
 535            | AcpThreadEvent::ToolAuthorizationReceived(_)
 536            | AcpThreadEvent::Retry(_)
 537            | AcpThreadEvent::Stopped(_)
 538            | AcpThreadEvent::Error
 539            | AcpThreadEvent::LoadError(_)
 540            | AcpThreadEvent::Refusal
 541            | AcpThreadEvent::WorkingDirectoriesUpdated => {
 542                let thread_ref = thread.read(cx);
 543                if thread_ref.entries().is_empty() {
 544                    return;
 545                }
 546
 547                let existing_thread = self.threads.get(thread_ref.session_id());
 548                let session_id = thread_ref.session_id().clone();
 549                let title = thread_ref
 550                    .title()
 551                    .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 552
 553                let updated_at = Utc::now();
 554
 555                let created_at = existing_thread
 556                    .and_then(|t| t.created_at)
 557                    .unwrap_or_else(|| updated_at);
 558
 559                let agent_id = thread_ref.connection().agent_id();
 560
 561                let folder_paths = {
 562                    let project = thread_ref.project().read(cx);
 563                    let paths: Vec<Arc<Path>> = project
 564                        .visible_worktrees(cx)
 565                        .map(|worktree| worktree.read(cx).abs_path())
 566                        .collect();
 567                    PathList::new(&paths)
 568                };
 569
 570                let main_worktree_paths = thread_ref
 571                    .project()
 572                    .read(cx)
 573                    .project_group_key(cx)
 574                    .path_list()
 575                    .clone();
 576
 577                // Threads without a folder path (e.g. started in an empty
 578                // window) are archived by default so they don't get lost,
 579                // because they won't show up in the sidebar. Users can reload
 580                // them from the archive.
 581                let archived = existing_thread
 582                    .map(|t| t.archived)
 583                    .unwrap_or(folder_paths.is_empty());
 584
 585                let metadata = ThreadMetadata {
 586                    session_id,
 587                    agent_id,
 588                    title,
 589                    created_at: Some(created_at),
 590                    updated_at,
 591                    folder_paths,
 592                    main_worktree_paths,
 593                    archived,
 594                };
 595
 596                self.save(metadata, cx);
 597            }
 598            AcpThreadEvent::TokenUsageUpdated
 599            | AcpThreadEvent::SubagentSpawned(_)
 600            | AcpThreadEvent::PromptCapabilitiesUpdated
 601            | AcpThreadEvent::AvailableCommandsUpdated(_)
 602            | AcpThreadEvent::ModeUpdated(_)
 603            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
 604        }
 605    }
 606}
 607
 608impl Global for ThreadMetadataStore {}
 609
 610struct ThreadMetadataDb(ThreadSafeConnection);
 611
 612impl Domain for ThreadMetadataDb {
 613    const NAME: &str = stringify!(ThreadMetadataDb);
 614
 615    const MIGRATIONS: &[&str] = &[
 616        sql!(
 617            CREATE TABLE IF NOT EXISTS sidebar_threads(
 618                session_id TEXT PRIMARY KEY,
 619                agent_id TEXT,
 620                title TEXT NOT NULL,
 621                updated_at TEXT NOT NULL,
 622                created_at TEXT,
 623                folder_paths TEXT,
 624                folder_paths_order TEXT
 625            ) STRICT;
 626        ),
 627        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 628        sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths TEXT),
 629        sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths_order TEXT),
 630    ];
 631}
 632
 633db::static_connection!(ThreadMetadataDb, []);
 634
 635impl ThreadMetadataDb {
 636    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 637        self.select::<Arc<str>>(
 638            "SELECT session_id FROM sidebar_threads \
 639             ORDER BY updated_at DESC",
 640        )?()
 641    }
 642
 643    /// List all sidebar thread metadata, ordered by updated_at descending.
 644    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 645        self.select::<ThreadMetadata>(
 646            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order \
 647             FROM sidebar_threads \
 648             ORDER BY updated_at DESC"
 649        )?()
 650    }
 651
 652    /// Upsert metadata for a thread.
 653    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 654        let id = row.session_id.0.clone();
 655        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 656            None
 657        } else {
 658            Some(row.agent_id.to_string())
 659        };
 660        let title = row.title.to_string();
 661        let updated_at = row.updated_at.to_rfc3339();
 662        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 663        let serialized = row.folder_paths.serialize();
 664        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 665            (None, None)
 666        } else {
 667            (Some(serialized.paths), Some(serialized.order))
 668        };
 669        let main_serialized = row.main_worktree_paths.serialize();
 670        let (main_worktree_paths, main_worktree_paths_order) = if row.main_worktree_paths.is_empty()
 671        {
 672            (None, None)
 673        } else {
 674            (Some(main_serialized.paths), Some(main_serialized.order))
 675        };
 676        let archived = row.archived;
 677
 678        self.write(move |conn| {
 679            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order) \
 680                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
 681                       ON CONFLICT(session_id) DO UPDATE SET \
 682                           agent_id = excluded.agent_id, \
 683                           title = excluded.title, \
 684                           updated_at = excluded.updated_at, \
 685                           created_at = excluded.created_at, \
 686                           folder_paths = excluded.folder_paths, \
 687                           folder_paths_order = excluded.folder_paths_order, \
 688                           archived = excluded.archived, \
 689                           main_worktree_paths = excluded.main_worktree_paths, \
 690                           main_worktree_paths_order = excluded.main_worktree_paths_order";
 691            let mut stmt = Statement::prepare(conn, sql)?;
 692            let mut i = stmt.bind(&id, 1)?;
 693            i = stmt.bind(&agent_id, i)?;
 694            i = stmt.bind(&title, i)?;
 695            i = stmt.bind(&updated_at, i)?;
 696            i = stmt.bind(&created_at, i)?;
 697            i = stmt.bind(&folder_paths, i)?;
 698            i = stmt.bind(&folder_paths_order, i)?;
 699            i = stmt.bind(&archived, i)?;
 700            i = stmt.bind(&main_worktree_paths, i)?;
 701            stmt.bind(&main_worktree_paths_order, i)?;
 702            stmt.exec()
 703        })
 704        .await
 705    }
 706
 707    /// Delete metadata for a single thread.
 708    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 709        let id = session_id.0.clone();
 710        self.write(move |conn| {
 711            let mut stmt =
 712                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 713            stmt.bind(&id, 1)?;
 714            stmt.exec()
 715        })
 716        .await
 717    }
 718}
 719
 720impl Column for ThreadMetadata {
 721    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 722        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 723        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 724        let (title, next): (String, i32) = Column::column(statement, next)?;
 725        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 726        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 727        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 728        let (folder_paths_order_str, next): (Option<String>, i32) =
 729            Column::column(statement, next)?;
 730        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 731        let (main_worktree_paths_str, next): (Option<String>, i32) =
 732            Column::column(statement, next)?;
 733        let (main_worktree_paths_order_str, next): (Option<String>, i32) =
 734            Column::column(statement, next)?;
 735
 736        let agent_id = agent_id
 737            .map(|id| AgentId::new(id))
 738            .unwrap_or(ZED_AGENT_ID.clone());
 739
 740        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 741        let created_at = created_at_str
 742            .as_deref()
 743            .map(DateTime::parse_from_rfc3339)
 744            .transpose()?
 745            .map(|dt| dt.with_timezone(&Utc));
 746
 747        let folder_paths = folder_paths_str
 748            .map(|paths| {
 749                PathList::deserialize(&util::path_list::SerializedPathList {
 750                    paths,
 751                    order: folder_paths_order_str.unwrap_or_default(),
 752                })
 753            })
 754            .unwrap_or_default();
 755
 756        let main_worktree_paths = main_worktree_paths_str
 757            .map(|paths| {
 758                PathList::deserialize(&util::path_list::SerializedPathList {
 759                    paths,
 760                    order: main_worktree_paths_order_str.unwrap_or_default(),
 761                })
 762            })
 763            .unwrap_or_default();
 764
 765        Ok((
 766            ThreadMetadata {
 767                session_id: acp::SessionId::new(id),
 768                agent_id,
 769                title: title.into(),
 770                updated_at,
 771                created_at,
 772                folder_paths,
 773                main_worktree_paths,
 774                archived,
 775            },
 776            next,
 777        ))
 778    }
 779}
 780
 781#[cfg(test)]
 782mod tests {
 783    use super::*;
 784    use acp_thread::{AgentConnection, StubAgentConnection};
 785    use action_log::ActionLog;
 786    use agent::DbThread;
 787    use agent_client_protocol as acp;
 788    use feature_flags::FeatureFlagAppExt;
 789    use gpui::TestAppContext;
 790    use project::FakeFs;
 791    use project::Project;
 792    use std::path::Path;
 793    use std::rc::Rc;
 794
 795    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 796        DbThread {
 797            title: title.to_string().into(),
 798            messages: Vec::new(),
 799            updated_at,
 800            detailed_summary: None,
 801            initial_project_snapshot: None,
 802            cumulative_token_usage: Default::default(),
 803            request_token_usage: Default::default(),
 804            model: None,
 805            profile: None,
 806            imported: false,
 807            subagent_context: None,
 808            speed: None,
 809            thinking_enabled: false,
 810            thinking_effort: None,
 811            draft_prompt: None,
 812            ui_scroll_position: None,
 813        }
 814    }
 815
 816    fn make_metadata(
 817        session_id: &str,
 818        title: &str,
 819        updated_at: DateTime<Utc>,
 820        folder_paths: PathList,
 821    ) -> ThreadMetadata {
 822        ThreadMetadata {
 823            archived: false,
 824            session_id: acp::SessionId::new(session_id),
 825            agent_id: agent::ZED_AGENT_ID.clone(),
 826            title: title.to_string().into(),
 827            updated_at,
 828            created_at: Some(updated_at),
 829            folder_paths,
 830            main_worktree_paths: PathList::default(),
 831        }
 832    }
 833
 834    fn init_test(cx: &mut TestAppContext) {
 835        cx.update(|cx| {
 836            let settings_store = settings::SettingsStore::test(cx);
 837            cx.set_global(settings_store);
 838            cx.update_flags(true, vec!["agent-v2".to_string()]);
 839            ThreadMetadataStore::init_global(cx);
 840            ThreadStore::init_global(cx);
 841        });
 842        cx.run_until_parked();
 843    }
 844
 845    #[gpui::test]
 846    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 847        let first_paths = PathList::new(&[Path::new("/project-a")]);
 848        let second_paths = PathList::new(&[Path::new("/project-b")]);
 849        let now = Utc::now();
 850        let older = now - chrono::Duration::seconds(1);
 851
 852        let thread = std::thread::current();
 853        let test_name = thread.name().unwrap_or("unknown_test");
 854        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 855        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 856            &db_name,
 857        )));
 858
 859        db.save(make_metadata(
 860            "session-1",
 861            "First Thread",
 862            now,
 863            first_paths.clone(),
 864        ))
 865        .await
 866        .unwrap();
 867        db.save(make_metadata(
 868            "session-2",
 869            "Second Thread",
 870            older,
 871            second_paths.clone(),
 872        ))
 873        .await
 874        .unwrap();
 875
 876        cx.update(|cx| {
 877            let settings_store = settings::SettingsStore::test(cx);
 878            cx.set_global(settings_store);
 879            cx.update_flags(true, vec!["agent-v2".to_string()]);
 880            ThreadMetadataStore::init_global(cx);
 881        });
 882
 883        cx.run_until_parked();
 884
 885        cx.update(|cx| {
 886            let store = ThreadMetadataStore::global(cx);
 887            let store = store.read(cx);
 888
 889            let entry_ids = store
 890                .entry_ids()
 891                .map(|session_id| session_id.0.to_string())
 892                .collect::<Vec<_>>();
 893            assert_eq!(entry_ids.len(), 2);
 894            assert!(entry_ids.contains(&"session-1".to_string()));
 895            assert!(entry_ids.contains(&"session-2".to_string()));
 896
 897            let first_path_entries = store
 898                .entries_for_path(&first_paths)
 899                .map(|entry| entry.session_id.0.to_string())
 900                .collect::<Vec<_>>();
 901            assert_eq!(first_path_entries, vec!["session-1"]);
 902
 903            let second_path_entries = store
 904                .entries_for_path(&second_paths)
 905                .map(|entry| entry.session_id.0.to_string())
 906                .collect::<Vec<_>>();
 907            assert_eq!(second_path_entries, vec!["session-2"]);
 908        });
 909    }
 910
 911    #[gpui::test]
 912    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 913        init_test(cx);
 914
 915        let first_paths = PathList::new(&[Path::new("/project-a")]);
 916        let second_paths = PathList::new(&[Path::new("/project-b")]);
 917        let initial_time = Utc::now();
 918        let updated_time = initial_time + chrono::Duration::seconds(1);
 919
 920        let initial_metadata = make_metadata(
 921            "session-1",
 922            "First Thread",
 923            initial_time,
 924            first_paths.clone(),
 925        );
 926
 927        let second_metadata = make_metadata(
 928            "session-2",
 929            "Second Thread",
 930            initial_time,
 931            second_paths.clone(),
 932        );
 933
 934        cx.update(|cx| {
 935            let store = ThreadMetadataStore::global(cx);
 936            store.update(cx, |store, cx| {
 937                store.save(initial_metadata, cx);
 938                store.save(second_metadata, cx);
 939            });
 940        });
 941
 942        cx.run_until_parked();
 943
 944        cx.update(|cx| {
 945            let store = ThreadMetadataStore::global(cx);
 946            let store = store.read(cx);
 947
 948            let first_path_entries = store
 949                .entries_for_path(&first_paths)
 950                .map(|entry| entry.session_id.0.to_string())
 951                .collect::<Vec<_>>();
 952            assert_eq!(first_path_entries, vec!["session-1"]);
 953
 954            let second_path_entries = store
 955                .entries_for_path(&second_paths)
 956                .map(|entry| entry.session_id.0.to_string())
 957                .collect::<Vec<_>>();
 958            assert_eq!(second_path_entries, vec!["session-2"]);
 959        });
 960
 961        let moved_metadata = make_metadata(
 962            "session-1",
 963            "First Thread",
 964            updated_time,
 965            second_paths.clone(),
 966        );
 967
 968        cx.update(|cx| {
 969            let store = ThreadMetadataStore::global(cx);
 970            store.update(cx, |store, cx| {
 971                store.save(moved_metadata, cx);
 972            });
 973        });
 974
 975        cx.run_until_parked();
 976
 977        cx.update(|cx| {
 978            let store = ThreadMetadataStore::global(cx);
 979            let store = store.read(cx);
 980
 981            let entry_ids = store
 982                .entry_ids()
 983                .map(|session_id| session_id.0.to_string())
 984                .collect::<Vec<_>>();
 985            assert_eq!(entry_ids.len(), 2);
 986            assert!(entry_ids.contains(&"session-1".to_string()));
 987            assert!(entry_ids.contains(&"session-2".to_string()));
 988
 989            let first_path_entries = store
 990                .entries_for_path(&first_paths)
 991                .map(|entry| entry.session_id.0.to_string())
 992                .collect::<Vec<_>>();
 993            assert!(first_path_entries.is_empty());
 994
 995            let second_path_entries = store
 996                .entries_for_path(&second_paths)
 997                .map(|entry| entry.session_id.0.to_string())
 998                .collect::<Vec<_>>();
 999            assert_eq!(second_path_entries.len(), 2);
1000            assert!(second_path_entries.contains(&"session-1".to_string()));
1001            assert!(second_path_entries.contains(&"session-2".to_string()));
1002        });
1003
1004        cx.update(|cx| {
1005            let store = ThreadMetadataStore::global(cx);
1006            store.update(cx, |store, cx| {
1007                store.delete(acp::SessionId::new("session-2"), cx);
1008            });
1009        });
1010
1011        cx.run_until_parked();
1012
1013        cx.update(|cx| {
1014            let store = ThreadMetadataStore::global(cx);
1015            let store = store.read(cx);
1016
1017            let entry_ids = store
1018                .entry_ids()
1019                .map(|session_id| session_id.0.to_string())
1020                .collect::<Vec<_>>();
1021            assert_eq!(entry_ids, vec!["session-1"]);
1022
1023            let second_path_entries = store
1024                .entries_for_path(&second_paths)
1025                .map(|entry| entry.session_id.0.to_string())
1026                .collect::<Vec<_>>();
1027            assert_eq!(second_path_entries, vec!["session-1"]);
1028        });
1029    }
1030
1031    #[gpui::test]
1032    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1033        init_test(cx);
1034
1035        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1036        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1037        let now = Utc::now();
1038
1039        let existing_metadata = ThreadMetadata {
1040            session_id: acp::SessionId::new("a-session-0"),
1041            agent_id: agent::ZED_AGENT_ID.clone(),
1042            title: "Existing Metadata".into(),
1043            updated_at: now - chrono::Duration::seconds(10),
1044            created_at: Some(now - chrono::Duration::seconds(10)),
1045            folder_paths: project_a_paths.clone(),
1046            main_worktree_paths: PathList::default(),
1047            archived: false,
1048        };
1049
1050        cx.update(|cx| {
1051            let store = ThreadMetadataStore::global(cx);
1052            store.update(cx, |store, cx| {
1053                store.save(existing_metadata, cx);
1054            });
1055        });
1056        cx.run_until_parked();
1057
1058        let threads_to_save = vec![
1059            (
1060                "a-session-0",
1061                "Thread A0 From Native Store",
1062                project_a_paths.clone(),
1063                now,
1064            ),
1065            (
1066                "a-session-1",
1067                "Thread A1",
1068                project_a_paths.clone(),
1069                now + chrono::Duration::seconds(1),
1070            ),
1071            (
1072                "b-session-0",
1073                "Thread B0",
1074                project_b_paths.clone(),
1075                now + chrono::Duration::seconds(2),
1076            ),
1077            (
1078                "projectless",
1079                "Projectless",
1080                PathList::default(),
1081                now + chrono::Duration::seconds(3),
1082            ),
1083        ];
1084
1085        for (session_id, title, paths, updated_at) in &threads_to_save {
1086            let save_task = cx.update(|cx| {
1087                let thread_store = ThreadStore::global(cx);
1088                let session_id = session_id.to_string();
1089                let title = title.to_string();
1090                let paths = paths.clone();
1091                thread_store.update(cx, |store, cx| {
1092                    store.save_thread(
1093                        acp::SessionId::new(session_id),
1094                        make_db_thread(&title, *updated_at),
1095                        paths,
1096                        cx,
1097                    )
1098                })
1099            });
1100            save_task.await.unwrap();
1101            cx.run_until_parked();
1102        }
1103
1104        cx.update(|cx| migrate_thread_metadata(cx));
1105        cx.run_until_parked();
1106
1107        let list = cx.update(|cx| {
1108            let store = ThreadMetadataStore::global(cx);
1109            store.read(cx).entries().cloned().collect::<Vec<_>>()
1110        });
1111
1112        assert_eq!(list.len(), 4);
1113        assert!(
1114            list.iter()
1115                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1116        );
1117
1118        let existing_metadata = list
1119            .iter()
1120            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1121            .unwrap();
1122        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1123        assert!(!existing_metadata.archived);
1124
1125        let migrated_session_ids = list
1126            .iter()
1127            .map(|metadata| metadata.session_id.0.as_ref())
1128            .collect::<Vec<_>>();
1129        assert!(migrated_session_ids.contains(&"a-session-1"));
1130        assert!(migrated_session_ids.contains(&"b-session-0"));
1131        assert!(migrated_session_ids.contains(&"projectless"));
1132
1133        let migrated_entries = list
1134            .iter()
1135            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1136            .collect::<Vec<_>>();
1137        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1138    }
1139
1140    #[gpui::test]
1141    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1142        cx: &mut TestAppContext,
1143    ) {
1144        init_test(cx);
1145
1146        let project_paths = PathList::new(&[Path::new("/project-a")]);
1147        let existing_updated_at = Utc::now();
1148
1149        let existing_metadata = ThreadMetadata {
1150            session_id: acp::SessionId::new("existing-session"),
1151            agent_id: agent::ZED_AGENT_ID.clone(),
1152            title: "Existing Metadata".into(),
1153            updated_at: existing_updated_at,
1154            created_at: Some(existing_updated_at),
1155            folder_paths: project_paths.clone(),
1156            main_worktree_paths: PathList::default(),
1157            archived: false,
1158        };
1159
1160        cx.update(|cx| {
1161            let store = ThreadMetadataStore::global(cx);
1162            store.update(cx, |store, cx| {
1163                store.save(existing_metadata, cx);
1164            });
1165        });
1166        cx.run_until_parked();
1167
1168        let save_task = cx.update(|cx| {
1169            let thread_store = ThreadStore::global(cx);
1170            thread_store.update(cx, |store, cx| {
1171                store.save_thread(
1172                    acp::SessionId::new("existing-session"),
1173                    make_db_thread(
1174                        "Updated Native Thread Title",
1175                        existing_updated_at + chrono::Duration::seconds(1),
1176                    ),
1177                    project_paths.clone(),
1178                    cx,
1179                )
1180            })
1181        });
1182        save_task.await.unwrap();
1183        cx.run_until_parked();
1184
1185        cx.update(|cx| migrate_thread_metadata(cx));
1186        cx.run_until_parked();
1187
1188        let list = cx.update(|cx| {
1189            let store = ThreadMetadataStore::global(cx);
1190            store.read(cx).entries().cloned().collect::<Vec<_>>()
1191        });
1192
1193        assert_eq!(list.len(), 1);
1194        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1195    }
1196
1197    #[gpui::test]
1198    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1199        cx: &mut TestAppContext,
1200    ) {
1201        init_test(cx);
1202
1203        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1204        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1205        let now = Utc::now();
1206
1207        // Create 7 threads for project A and 3 for project B
1208        let mut threads_to_save = Vec::new();
1209        for i in 0..7 {
1210            threads_to_save.push((
1211                format!("a-session-{i}"),
1212                format!("Thread A{i}"),
1213                project_a_paths.clone(),
1214                now + chrono::Duration::seconds(i as i64),
1215            ));
1216        }
1217        for i in 0..3 {
1218            threads_to_save.push((
1219                format!("b-session-{i}"),
1220                format!("Thread B{i}"),
1221                project_b_paths.clone(),
1222                now + chrono::Duration::seconds(i as i64),
1223            ));
1224        }
1225
1226        for (session_id, title, paths, updated_at) in &threads_to_save {
1227            let save_task = cx.update(|cx| {
1228                let thread_store = ThreadStore::global(cx);
1229                let session_id = session_id.to_string();
1230                let title = title.to_string();
1231                let paths = paths.clone();
1232                thread_store.update(cx, |store, cx| {
1233                    store.save_thread(
1234                        acp::SessionId::new(session_id),
1235                        make_db_thread(&title, *updated_at),
1236                        paths,
1237                        cx,
1238                    )
1239                })
1240            });
1241            save_task.await.unwrap();
1242            cx.run_until_parked();
1243        }
1244
1245        cx.update(|cx| migrate_thread_metadata(cx));
1246        cx.run_until_parked();
1247
1248        let list = cx.update(|cx| {
1249            let store = ThreadMetadataStore::global(cx);
1250            store.read(cx).entries().cloned().collect::<Vec<_>>()
1251        });
1252
1253        assert_eq!(list.len(), 10);
1254
1255        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1256        let mut project_a_entries: Vec<_> = list
1257            .iter()
1258            .filter(|m| m.folder_paths == project_a_paths)
1259            .collect();
1260        assert_eq!(project_a_entries.len(), 7);
1261        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1262
1263        for entry in &project_a_entries[..5] {
1264            assert!(
1265                !entry.archived,
1266                "Expected {} to be unarchived (top 5 most recent)",
1267                entry.session_id.0
1268            );
1269        }
1270        for entry in &project_a_entries[5..] {
1271            assert!(
1272                entry.archived,
1273                "Expected {} to be archived (older than top 5)",
1274                entry.session_id.0
1275            );
1276        }
1277
1278        // Project B: all 3 should be unarchived (under the limit)
1279        let project_b_entries: Vec<_> = list
1280            .iter()
1281            .filter(|m| m.folder_paths == project_b_paths)
1282            .collect();
1283        assert_eq!(project_b_entries.len(), 3);
1284        assert!(project_b_entries.iter().all(|m| !m.archived));
1285    }
1286
1287    #[gpui::test]
1288    async fn test_empty_thread_events_do_not_create_metadata(cx: &mut TestAppContext) {
1289        init_test(cx);
1290
1291        let fs = FakeFs::new(cx.executor());
1292        let project = Project::test(fs, None::<&Path>, cx).await;
1293        let connection = Rc::new(StubAgentConnection::new());
1294
1295        let thread = cx
1296            .update(|cx| {
1297                connection
1298                    .clone()
1299                    .new_session(project.clone(), PathList::default(), cx)
1300            })
1301            .await
1302            .unwrap();
1303        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1304
1305        cx.update(|cx| {
1306            thread.update(cx, |thread, cx| {
1307                thread.set_title("Draft Thread".into(), cx).detach();
1308            });
1309        });
1310        cx.run_until_parked();
1311
1312        let metadata_ids = cx.update(|cx| {
1313            ThreadMetadataStore::global(cx)
1314                .read(cx)
1315                .entry_ids()
1316                .collect::<Vec<_>>()
1317        });
1318        assert!(
1319            metadata_ids.is_empty(),
1320            "expected empty draft thread title updates to be ignored"
1321        );
1322
1323        cx.update(|cx| {
1324            thread.update(cx, |thread, cx| {
1325                thread.push_user_content_block(None, "Hello".into(), cx);
1326            });
1327        });
1328        cx.run_until_parked();
1329
1330        let metadata_ids = cx.update(|cx| {
1331            ThreadMetadataStore::global(cx)
1332                .read(cx)
1333                .entry_ids()
1334                .collect::<Vec<_>>()
1335        });
1336        assert_eq!(metadata_ids, vec![session_id]);
1337    }
1338
1339    #[gpui::test]
1340    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1341        init_test(cx);
1342
1343        let fs = FakeFs::new(cx.executor());
1344        let project = Project::test(fs, None::<&Path>, cx).await;
1345        let connection = Rc::new(StubAgentConnection::new());
1346
1347        let thread = cx
1348            .update(|cx| {
1349                connection
1350                    .clone()
1351                    .new_session(project.clone(), PathList::default(), cx)
1352            })
1353            .await
1354            .unwrap();
1355        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1356
1357        cx.update(|cx| {
1358            thread.update(cx, |thread, cx| {
1359                thread.push_user_content_block(None, "Hello".into(), cx);
1360            });
1361        });
1362        cx.run_until_parked();
1363
1364        let metadata_ids = cx.update(|cx| {
1365            ThreadMetadataStore::global(cx)
1366                .read(cx)
1367                .entry_ids()
1368                .collect::<Vec<_>>()
1369        });
1370        assert_eq!(metadata_ids, vec![session_id.clone()]);
1371
1372        drop(thread);
1373        cx.update(|_| {});
1374        cx.run_until_parked();
1375
1376        let metadata_ids = cx.update(|cx| {
1377            ThreadMetadataStore::global(cx)
1378                .read(cx)
1379                .entry_ids()
1380                .collect::<Vec<_>>()
1381        });
1382        assert_eq!(metadata_ids, vec![session_id]);
1383    }
1384
1385    #[gpui::test]
1386    async fn test_threads_without_project_association_are_archived_by_default(
1387        cx: &mut TestAppContext,
1388    ) {
1389        init_test(cx);
1390
1391        let fs = FakeFs::new(cx.executor());
1392        let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1393        let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1394        let connection = Rc::new(StubAgentConnection::new());
1395
1396        let thread_without_worktree = cx
1397            .update(|cx| {
1398                connection.clone().new_session(
1399                    project_without_worktree.clone(),
1400                    PathList::default(),
1401                    cx,
1402                )
1403            })
1404            .await
1405            .unwrap();
1406        let session_without_worktree =
1407            cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1408
1409        cx.update(|cx| {
1410            thread_without_worktree.update(cx, |thread, cx| {
1411                thread.push_user_content_block(None, "content".into(), cx);
1412                thread.set_title("No Project Thread".into(), cx).detach();
1413            });
1414        });
1415        cx.run_until_parked();
1416
1417        let thread_with_worktree = cx
1418            .update(|cx| {
1419                connection.clone().new_session(
1420                    project_with_worktree.clone(),
1421                    PathList::default(),
1422                    cx,
1423                )
1424            })
1425            .await
1426            .unwrap();
1427        let session_with_worktree =
1428            cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1429
1430        cx.update(|cx| {
1431            thread_with_worktree.update(cx, |thread, cx| {
1432                thread.push_user_content_block(None, "content".into(), cx);
1433                thread.set_title("Project Thread".into(), cx).detach();
1434            });
1435        });
1436        cx.run_until_parked();
1437
1438        cx.update(|cx| {
1439            let store = ThreadMetadataStore::global(cx);
1440            let store = store.read(cx);
1441
1442            let without_worktree = store
1443                .entry(&session_without_worktree)
1444                .expect("missing metadata for thread without project association");
1445            assert!(without_worktree.folder_paths.is_empty());
1446            assert!(
1447                without_worktree.archived,
1448                "expected thread without project association to be archived"
1449            );
1450
1451            let with_worktree = store
1452                .entry(&session_with_worktree)
1453                .expect("missing metadata for thread with project association");
1454            assert_eq!(
1455                with_worktree.folder_paths,
1456                PathList::new(&[Path::new("/project-a")])
1457            );
1458            assert!(
1459                !with_worktree.archived,
1460                "expected thread with project association to remain unarchived"
1461            );
1462        });
1463    }
1464
1465    #[gpui::test]
1466    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1467        init_test(cx);
1468
1469        let fs = FakeFs::new(cx.executor());
1470        let project = Project::test(fs, None::<&Path>, cx).await;
1471        let connection = Rc::new(StubAgentConnection::new());
1472
1473        // Create a regular (non-subagent) AcpThread.
1474        let regular_thread = cx
1475            .update(|cx| {
1476                connection
1477                    .clone()
1478                    .new_session(project.clone(), PathList::default(), cx)
1479            })
1480            .await
1481            .unwrap();
1482
1483        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1484
1485        // Set a title on the regular thread to trigger a save via handle_thread_update.
1486        cx.update(|cx| {
1487            regular_thread.update(cx, |thread, cx| {
1488                thread.push_user_content_block(None, "content".into(), cx);
1489                thread.set_title("Regular Thread".into(), cx).detach();
1490            });
1491        });
1492        cx.run_until_parked();
1493
1494        // Create a subagent AcpThread
1495        let subagent_session_id = acp::SessionId::new("subagent-session");
1496        let subagent_thread = cx.update(|cx| {
1497            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1498            cx.new(|cx| {
1499                acp_thread::AcpThread::new(
1500                    Some(regular_session_id.clone()),
1501                    Some("Subagent Thread".into()),
1502                    None,
1503                    connection.clone(),
1504                    project.clone(),
1505                    action_log,
1506                    subagent_session_id.clone(),
1507                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1508                    cx,
1509                )
1510            })
1511        });
1512
1513        // Set a title on the subagent thread to trigger handle_thread_update.
1514        cx.update(|cx| {
1515            subagent_thread.update(cx, |thread, cx| {
1516                thread
1517                    .set_title("Subagent Thread Title".into(), cx)
1518                    .detach();
1519            });
1520        });
1521        cx.run_until_parked();
1522
1523        // List all metadata from the store cache.
1524        let list = cx.update(|cx| {
1525            let store = ThreadMetadataStore::global(cx);
1526            store.read(cx).entries().cloned().collect::<Vec<_>>()
1527        });
1528
1529        // The subagent thread should NOT appear in the sidebar metadata.
1530        // Only the regular thread should be listed.
1531        assert_eq!(
1532            list.len(),
1533            1,
1534            "Expected only the regular thread in sidebar metadata, \
1535             but found {} entries (subagent threads are leaking into the sidebar)",
1536            list.len(),
1537        );
1538        assert_eq!(list[0].session_id, regular_session_id);
1539        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1540    }
1541
1542    #[test]
1543    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1544        let now = Utc::now();
1545
1546        let operations = vec![
1547            DbOperation::Upsert(make_metadata(
1548                "session-1",
1549                "First Thread",
1550                now,
1551                PathList::default(),
1552            )),
1553            DbOperation::Delete(acp::SessionId::new("session-1")),
1554        ];
1555
1556        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1557
1558        assert_eq!(deduped.len(), 1);
1559        assert_eq!(
1560            deduped[0],
1561            DbOperation::Delete(acp::SessionId::new("session-1"))
1562        );
1563    }
1564
1565    #[test]
1566    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1567        let now = Utc::now();
1568        let later = now + chrono::Duration::seconds(1);
1569
1570        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1571        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1572
1573        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1574            DbOperation::Upsert(old_metadata),
1575            DbOperation::Upsert(new_metadata.clone()),
1576        ]);
1577
1578        assert_eq!(deduped.len(), 1);
1579        assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1580    }
1581
1582    #[test]
1583    fn test_dedup_db_operations_preserves_distinct_sessions() {
1584        let now = Utc::now();
1585
1586        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1587        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1588        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1589            DbOperation::Upsert(metadata1.clone()),
1590            DbOperation::Upsert(metadata2.clone()),
1591        ]);
1592
1593        assert_eq!(deduped.len(), 2);
1594        assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1595        assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1596    }
1597
1598    #[gpui::test]
1599    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1600        init_test(cx);
1601
1602        let paths = PathList::new(&[Path::new("/project-a")]);
1603        let now = Utc::now();
1604        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1605
1606        cx.update(|cx| {
1607            let store = ThreadMetadataStore::global(cx);
1608            store.update(cx, |store, cx| {
1609                store.save(metadata, cx);
1610            });
1611        });
1612
1613        cx.run_until_parked();
1614
1615        cx.update(|cx| {
1616            let store = ThreadMetadataStore::global(cx);
1617            let store = store.read(cx);
1618
1619            let path_entries = store
1620                .entries_for_path(&paths)
1621                .map(|e| e.session_id.0.to_string())
1622                .collect::<Vec<_>>();
1623            assert_eq!(path_entries, vec!["session-1"]);
1624
1625            let archived = store
1626                .archived_entries()
1627                .map(|e| e.session_id.0.to_string())
1628                .collect::<Vec<_>>();
1629            assert!(archived.is_empty());
1630        });
1631
1632        cx.update(|cx| {
1633            let store = ThreadMetadataStore::global(cx);
1634            store.update(cx, |store, cx| {
1635                store.archive(&acp::SessionId::new("session-1"), cx);
1636            });
1637        });
1638
1639        cx.run_until_parked();
1640
1641        cx.update(|cx| {
1642            let store = ThreadMetadataStore::global(cx);
1643            let store = store.read(cx);
1644
1645            let path_entries = store
1646                .entries_for_path(&paths)
1647                .map(|e| e.session_id.0.to_string())
1648                .collect::<Vec<_>>();
1649            assert!(path_entries.is_empty());
1650
1651            let archived = store.archived_entries().collect::<Vec<_>>();
1652            assert_eq!(archived.len(), 1);
1653            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1654            assert!(archived[0].archived);
1655        });
1656
1657        cx.update(|cx| {
1658            let store = ThreadMetadataStore::global(cx);
1659            store.update(cx, |store, cx| {
1660                store.unarchive(&acp::SessionId::new("session-1"), cx);
1661            });
1662        });
1663
1664        cx.run_until_parked();
1665
1666        cx.update(|cx| {
1667            let store = ThreadMetadataStore::global(cx);
1668            let store = store.read(cx);
1669
1670            let path_entries = store
1671                .entries_for_path(&paths)
1672                .map(|e| e.session_id.0.to_string())
1673                .collect::<Vec<_>>();
1674            assert_eq!(path_entries, vec!["session-1"]);
1675
1676            let archived = store
1677                .archived_entries()
1678                .map(|e| e.session_id.0.to_string())
1679                .collect::<Vec<_>>();
1680            assert!(archived.is_empty());
1681        });
1682    }
1683
1684    #[gpui::test]
1685    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1686        init_test(cx);
1687
1688        let paths = PathList::new(&[Path::new("/project-a")]);
1689        let now = Utc::now();
1690
1691        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1692        let metadata2 = make_metadata(
1693            "session-2",
1694            "Archived Thread",
1695            now - chrono::Duration::seconds(1),
1696            paths.clone(),
1697        );
1698
1699        cx.update(|cx| {
1700            let store = ThreadMetadataStore::global(cx);
1701            store.update(cx, |store, cx| {
1702                store.save(metadata1, cx);
1703                store.save(metadata2, cx);
1704            });
1705        });
1706
1707        cx.run_until_parked();
1708
1709        cx.update(|cx| {
1710            let store = ThreadMetadataStore::global(cx);
1711            store.update(cx, |store, cx| {
1712                store.archive(&acp::SessionId::new("session-2"), cx);
1713            });
1714        });
1715
1716        cx.run_until_parked();
1717
1718        cx.update(|cx| {
1719            let store = ThreadMetadataStore::global(cx);
1720            let store = store.read(cx);
1721
1722            let path_entries = store
1723                .entries_for_path(&paths)
1724                .map(|e| e.session_id.0.to_string())
1725                .collect::<Vec<_>>();
1726            assert_eq!(path_entries, vec!["session-1"]);
1727
1728            let all_entries = store
1729                .entries()
1730                .map(|e| e.session_id.0.to_string())
1731                .collect::<Vec<_>>();
1732            assert_eq!(all_entries.len(), 2);
1733            assert!(all_entries.contains(&"session-1".to_string()));
1734            assert!(all_entries.contains(&"session-2".to_string()));
1735
1736            let archived = store
1737                .archived_entries()
1738                .map(|e| e.session_id.0.to_string())
1739                .collect::<Vec<_>>();
1740            assert_eq!(archived, vec!["session-2"]);
1741        });
1742    }
1743
1744    #[gpui::test]
1745    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1746        init_test(cx);
1747
1748        let paths = PathList::new(&[Path::new("/project-a")]);
1749        let now = Utc::now();
1750
1751        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1752        let m2 = make_metadata(
1753            "session-2",
1754            "Thread Two",
1755            now - chrono::Duration::seconds(1),
1756            paths.clone(),
1757        );
1758        let m3 = make_metadata(
1759            "session-3",
1760            "Thread Three",
1761            now - chrono::Duration::seconds(2),
1762            paths,
1763        );
1764
1765        cx.update(|cx| {
1766            let store = ThreadMetadataStore::global(cx);
1767            store.update(cx, |store, cx| {
1768                store.save_all(vec![m1, m2, m3], cx);
1769            });
1770        });
1771
1772        cx.run_until_parked();
1773
1774        cx.update(|cx| {
1775            let store = ThreadMetadataStore::global(cx);
1776            let store = store.read(cx);
1777
1778            let all_entries = store
1779                .entries()
1780                .map(|e| e.session_id.0.to_string())
1781                .collect::<Vec<_>>();
1782            assert_eq!(all_entries.len(), 3);
1783            assert!(all_entries.contains(&"session-1".to_string()));
1784            assert!(all_entries.contains(&"session-2".to_string()));
1785            assert!(all_entries.contains(&"session-3".to_string()));
1786
1787            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1788            assert_eq!(entry_ids.len(), 3);
1789        });
1790    }
1791
1792    #[gpui::test]
1793    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1794        init_test(cx);
1795
1796        let paths = PathList::new(&[Path::new("/project-a")]);
1797        let now = Utc::now();
1798        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1799
1800        cx.update(|cx| {
1801            let store = ThreadMetadataStore::global(cx);
1802            store.update(cx, |store, cx| {
1803                store.save(metadata, cx);
1804            });
1805        });
1806
1807        cx.run_until_parked();
1808
1809        cx.update(|cx| {
1810            let store = ThreadMetadataStore::global(cx);
1811            store.update(cx, |store, cx| {
1812                store.archive(&acp::SessionId::new("session-1"), cx);
1813            });
1814        });
1815
1816        cx.run_until_parked();
1817
1818        cx.update(|cx| {
1819            let store = ThreadMetadataStore::global(cx);
1820            store.update(cx, |store, cx| {
1821                let _ = store.reload(cx);
1822            });
1823        });
1824
1825        cx.run_until_parked();
1826
1827        cx.update(|cx| {
1828            let store = ThreadMetadataStore::global(cx);
1829            let store = store.read(cx);
1830
1831            let thread = store
1832                .entries()
1833                .find(|e| e.session_id.0.as_ref() == "session-1")
1834                .expect("thread should exist after reload");
1835            assert!(thread.archived);
1836
1837            let path_entries = store
1838                .entries_for_path(&paths)
1839                .map(|e| e.session_id.0.to_string())
1840                .collect::<Vec<_>>();
1841            assert!(path_entries.is_empty());
1842
1843            let archived = store
1844                .archived_entries()
1845                .map(|e| e.session_id.0.to_string())
1846                .collect::<Vec<_>>();
1847            assert_eq!(archived, vec!["session-1"]);
1848        });
1849    }
1850
1851    #[gpui::test]
1852    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1853        init_test(cx);
1854
1855        cx.run_until_parked();
1856
1857        cx.update(|cx| {
1858            let store = ThreadMetadataStore::global(cx);
1859            store.update(cx, |store, cx| {
1860                store.archive(&acp::SessionId::new("nonexistent"), cx);
1861            });
1862        });
1863
1864        cx.run_until_parked();
1865
1866        cx.update(|cx| {
1867            let store = ThreadMetadataStore::global(cx);
1868            let store = store.read(cx);
1869
1870            assert!(store.is_empty());
1871            assert_eq!(store.entries().count(), 0);
1872            assert_eq!(store.archived_entries().count(), 0);
1873        });
1874    }
1875
1876    #[gpui::test]
1877    async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1878        init_test(cx);
1879
1880        let paths = PathList::new(&[Path::new("/project-a")]);
1881        let now = Utc::now();
1882        let metadata = make_metadata("session-1", "Thread 1", now, paths);
1883        let session_id = metadata.session_id.clone();
1884
1885        cx.update(|cx| {
1886            let store = ThreadMetadataStore::global(cx);
1887            store.update(cx, |store, cx| {
1888                store.save(metadata.clone(), cx);
1889                store.archive(&session_id, cx);
1890            });
1891        });
1892
1893        cx.run_until_parked();
1894
1895        cx.update(|cx| {
1896            let store = ThreadMetadataStore::global(cx);
1897            let store = store.read(cx);
1898
1899            let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1900            pretty_assertions::assert_eq!(
1901                entries,
1902                vec![ThreadMetadata {
1903                    archived: true,
1904                    ..metadata
1905                }]
1906            );
1907        });
1908    }
1909}