thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AcpThreadEvent;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::Context as _;
   7use chrono::{DateTime, Utc};
   8use collections::{HashMap, HashSet};
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    ThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We skip migrating threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    let store = ThreadMetadataStore::global(cx);
  46    let db = store.read(cx).db.clone();
  47
  48    cx.spawn(async move |cx| {
  49        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  50
  51        let is_first_migration = existing_entries.is_empty();
  52
  53        let mut to_migrate = store.read_with(cx, |_store, cx| {
  54            ThreadStore::global(cx)
  55                .read(cx)
  56                .entries()
  57                .filter_map(|entry| {
  58                    if existing_entries.contains(&entry.id.0) {
  59                        return None;
  60                    }
  61
  62                    Some(ThreadMetadata {
  63                        session_id: entry.id,
  64                        agent_id: ZED_AGENT_ID.clone(),
  65                        title: entry.title,
  66                        updated_at: entry.updated_at,
  67                        created_at: entry.created_at,
  68                        folder_paths: entry.folder_paths,
  69                        main_worktree_paths: PathList::default(),
  70                        archived: true,
  71                    })
  72                })
  73                .collect::<Vec<_>>()
  74        });
  75
  76        if to_migrate.is_empty() {
  77            return anyhow::Ok(());
  78        }
  79
  80        // On the first migration (no entries in DB yet), keep the 5 most
  81        // recent threads per project unarchived.
  82        if is_first_migration {
  83            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  84            for entry in &mut to_migrate {
  85                if entry.folder_paths.is_empty() {
  86                    continue;
  87                }
  88                per_project
  89                    .entry(entry.folder_paths.clone())
  90                    .or_default()
  91                    .push(entry);
  92            }
  93            for entries in per_project.values_mut() {
  94                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  95                for entry in entries.iter_mut().take(5) {
  96                    entry.archived = false;
  97                }
  98            }
  99        }
 100
 101        log::info!("Migrating {} thread store entries", to_migrate.len());
 102
 103        // Manually save each entry to the database and call reload, otherwise
 104        // we'll end up triggering lots of reloads after each save
 105        for entry in to_migrate {
 106            db.save(entry).await?;
 107        }
 108
 109        log::info!("Finished migrating thread store entries");
 110
 111        let _ = store.update(cx, |store, cx| store.reload(cx));
 112        anyhow::Ok(())
 113    })
 114    .detach_and_log_err(cx);
 115}
 116
 117struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 118impl Global for GlobalThreadMetadataStore {}
 119
 120/// Lightweight metadata for any thread (native or ACP), enough to populate
 121/// the sidebar list and route to the correct load path when clicked.
 122#[derive(Debug, Clone, PartialEq)]
 123pub struct ThreadMetadata {
 124    pub session_id: acp::SessionId,
 125    pub agent_id: AgentId,
 126    pub title: SharedString,
 127    pub updated_at: DateTime<Utc>,
 128    pub created_at: Option<DateTime<Utc>>,
 129    pub folder_paths: PathList,
 130    pub main_worktree_paths: PathList,
 131    pub archived: bool,
 132}
 133
 134impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 135    fn from(meta: &ThreadMetadata) -> Self {
 136        Self {
 137            session_id: meta.session_id.clone(),
 138            work_dirs: Some(meta.folder_paths.clone()),
 139            title: Some(meta.title.clone()),
 140            updated_at: Some(meta.updated_at),
 141            created_at: meta.created_at,
 142            meta: None,
 143        }
 144    }
 145}
 146
 147/// The store holds all metadata needed to show threads in the sidebar/the archive.
 148///
 149/// Automatically listens to AcpThread events and updates metadata if it has changed.
 150pub struct ThreadMetadataStore {
 151    db: ThreadMetadataDb,
 152    threads: HashMap<acp::SessionId, ThreadMetadata>,
 153    threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 154    threads_by_main_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 155    reload_task: Option<Shared<Task<()>>>,
 156    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 157    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 158    _db_operations_task: Task<()>,
 159}
 160
 161#[derive(Debug, PartialEq)]
 162enum DbOperation {
 163    Upsert(ThreadMetadata),
 164    Delete(acp::SessionId),
 165}
 166
 167impl DbOperation {
 168    fn id(&self) -> &acp::SessionId {
 169        match self {
 170            DbOperation::Upsert(thread) => &thread.session_id,
 171            DbOperation::Delete(session_id) => session_id,
 172        }
 173    }
 174}
 175
 176impl ThreadMetadataStore {
 177    #[cfg(not(any(test, feature = "test-support")))]
 178    pub fn init_global(cx: &mut App) {
 179        if cx.has_global::<Self>() {
 180            return;
 181        }
 182
 183        let db = ThreadMetadataDb::global(cx);
 184        let thread_store = cx.new(|cx| Self::new(db, cx));
 185        cx.set_global(GlobalThreadMetadataStore(thread_store));
 186    }
 187
 188    #[cfg(any(test, feature = "test-support"))]
 189    pub fn init_global(cx: &mut App) {
 190        let thread = std::thread::current();
 191        let test_name = thread.name().unwrap_or("unknown_test");
 192        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 193        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 194        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 195        cx.set_global(GlobalThreadMetadataStore(thread_store));
 196    }
 197
 198    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 199        cx.try_global::<GlobalThreadMetadataStore>()
 200            .map(|store| store.0.clone())
 201    }
 202
 203    pub fn global(cx: &App) -> Entity<Self> {
 204        cx.global::<GlobalThreadMetadataStore>().0.clone()
 205    }
 206
 207    pub fn is_empty(&self) -> bool {
 208        self.threads.is_empty()
 209    }
 210
 211    /// Returns all thread IDs.
 212    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 213        self.threads.keys().cloned()
 214    }
 215
 216    /// Returns the metadata for a specific thread, if it exists.
 217    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 218        self.threads.get(session_id)
 219    }
 220
 221    /// Returns all threads.
 222    pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 223        self.threads.values()
 224    }
 225
 226    /// Returns all archived threads.
 227    pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 228        self.entries().filter(|t| t.archived)
 229    }
 230
 231    /// Returns all threads for the given path list, excluding archived threads.
 232    pub fn entries_for_path(
 233        &self,
 234        path_list: &PathList,
 235    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 236        self.threads_by_paths
 237            .get(path_list)
 238            .into_iter()
 239            .flatten()
 240            .filter_map(|s| self.threads.get(s))
 241            .filter(|s| !s.archived)
 242    }
 243
 244    /// Returns threads whose `main_worktree_paths` matches the given path list,
 245    /// excluding archived threads. This finds threads that were opened in a
 246    /// linked worktree but are associated with the given main worktree.
 247    pub fn entries_for_main_worktree_path(
 248        &self,
 249        path_list: &PathList,
 250    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 251        self.threads_by_main_paths
 252            .get(path_list)
 253            .into_iter()
 254            .flatten()
 255            .filter_map(|s| self.threads.get(s))
 256            .filter(|s| !s.archived)
 257    }
 258
 259    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 260        let db = self.db.clone();
 261        self.reload_task.take();
 262
 263        let list_task = cx
 264            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 265
 266        let reload_task = cx
 267            .spawn(async move |this, cx| {
 268                let Some(rows) = list_task.await.log_err() else {
 269                    return;
 270                };
 271
 272                this.update(cx, |this, cx| {
 273                    this.threads.clear();
 274                    this.threads_by_paths.clear();
 275                    this.threads_by_main_paths.clear();
 276
 277                    for row in rows {
 278                        this.threads_by_paths
 279                            .entry(row.folder_paths.clone())
 280                            .or_default()
 281                            .insert(row.session_id.clone());
 282                        if !row.main_worktree_paths.is_empty() {
 283                            this.threads_by_main_paths
 284                                .entry(row.main_worktree_paths.clone())
 285                                .or_default()
 286                                .insert(row.session_id.clone());
 287                        }
 288                        this.threads.insert(row.session_id.clone(), row);
 289                    }
 290
 291                    cx.notify();
 292                })
 293                .ok();
 294            })
 295            .shared();
 296        self.reload_task = Some(reload_task.clone());
 297        reload_task
 298    }
 299
 300    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 301        if !cx.has_flag::<AgentV2FeatureFlag>() {
 302            return;
 303        }
 304
 305        for metadata in metadata {
 306            self.save_internal(metadata);
 307        }
 308        cx.notify();
 309    }
 310
 311    #[cfg(any(test, feature = "test-support"))]
 312    pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 313        self.save(metadata, cx)
 314    }
 315
 316    fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 317        if !cx.has_flag::<AgentV2FeatureFlag>() {
 318            return;
 319        }
 320
 321        self.save_internal(metadata);
 322        cx.notify();
 323    }
 324
 325    fn save_internal(&mut self, metadata: ThreadMetadata) {
 326        if let Some(thread) = self.threads.get(&metadata.session_id) {
 327            if thread.folder_paths != metadata.folder_paths {
 328                if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
 329                    session_ids.remove(&metadata.session_id);
 330                }
 331            }
 332            if thread.main_worktree_paths != metadata.main_worktree_paths
 333                && !thread.main_worktree_paths.is_empty()
 334            {
 335                if let Some(session_ids) = self
 336                    .threads_by_main_paths
 337                    .get_mut(&thread.main_worktree_paths)
 338                {
 339                    session_ids.remove(&metadata.session_id);
 340                }
 341            }
 342        }
 343
 344        self.threads
 345            .insert(metadata.session_id.clone(), metadata.clone());
 346
 347        self.threads_by_paths
 348            .entry(metadata.folder_paths.clone())
 349            .or_default()
 350            .insert(metadata.session_id.clone());
 351
 352        if !metadata.main_worktree_paths.is_empty() {
 353            self.threads_by_main_paths
 354                .entry(metadata.main_worktree_paths.clone())
 355                .or_default()
 356                .insert(metadata.session_id.clone());
 357        }
 358
 359        self.pending_thread_ops_tx
 360            .try_send(DbOperation::Upsert(metadata))
 361            .log_err();
 362    }
 363
 364    pub fn update_working_directories(
 365        &mut self,
 366        session_id: &acp::SessionId,
 367        work_dirs: PathList,
 368        cx: &mut Context<Self>,
 369    ) {
 370        if !cx.has_flag::<AgentV2FeatureFlag>() {
 371            return;
 372        }
 373
 374        if let Some(thread) = self.threads.get(session_id) {
 375            self.save_internal(ThreadMetadata {
 376                folder_paths: work_dirs,
 377                ..thread.clone()
 378            });
 379            cx.notify();
 380        }
 381    }
 382
 383    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 384        self.update_archived(session_id, true, cx);
 385    }
 386
 387    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 388        self.update_archived(session_id, false, cx);
 389    }
 390
 391    fn update_archived(
 392        &mut self,
 393        session_id: &acp::SessionId,
 394        archived: bool,
 395        cx: &mut Context<Self>,
 396    ) {
 397        if !cx.has_flag::<AgentV2FeatureFlag>() {
 398            return;
 399        }
 400
 401        if let Some(thread) = self.threads.get(session_id) {
 402            self.save_internal(ThreadMetadata {
 403                archived,
 404                ..thread.clone()
 405            });
 406            cx.notify();
 407        }
 408    }
 409
 410    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 411        if !cx.has_flag::<AgentV2FeatureFlag>() {
 412            return;
 413        }
 414
 415        if let Some(thread) = self.threads.get(&session_id) {
 416            if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
 417                session_ids.remove(&session_id);
 418            }
 419            if !thread.main_worktree_paths.is_empty() {
 420                if let Some(session_ids) = self
 421                    .threads_by_main_paths
 422                    .get_mut(&thread.main_worktree_paths)
 423                {
 424                    session_ids.remove(&session_id);
 425                }
 426            }
 427        }
 428        self.threads.remove(&session_id);
 429        self.pending_thread_ops_tx
 430            .try_send(DbOperation::Delete(session_id))
 431            .log_err();
 432        cx.notify();
 433    }
 434
 435    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 436        let weak_store = cx.weak_entity();
 437
 438        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 439            // Don't track subagent threads in the sidebar.
 440            if thread.parent_session_id().is_some() {
 441                return;
 442            }
 443
 444            let thread_entity = cx.entity();
 445
 446            cx.on_release({
 447                let weak_store = weak_store.clone();
 448                move |thread, cx| {
 449                    weak_store
 450                        .update(cx, |store, cx| {
 451                            let session_id = thread.session_id().clone();
 452                            store.session_subscriptions.remove(&session_id);
 453                            if thread.entries().is_empty() {
 454                                // Empty threads can be unloaded without ever being
 455                                // durably persisted by the underlying agent.
 456                                store.delete(session_id, cx);
 457                            }
 458                        })
 459                        .ok();
 460                }
 461            })
 462            .detach();
 463
 464            weak_store
 465                .update(cx, |this, cx| {
 466                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
 467                    this.session_subscriptions
 468                        .insert(thread.session_id().clone(), subscription);
 469                })
 470                .ok();
 471        })
 472        .detach();
 473
 474        let (tx, rx) = smol::channel::unbounded();
 475        let _db_operations_task = cx.background_spawn({
 476            let db = db.clone();
 477            async move {
 478                while let Ok(first_update) = rx.recv().await {
 479                    let mut updates = vec![first_update];
 480                    while let Ok(update) = rx.try_recv() {
 481                        updates.push(update);
 482                    }
 483                    let updates = Self::dedup_db_operations(updates);
 484                    for operation in updates {
 485                        match operation {
 486                            DbOperation::Upsert(metadata) => {
 487                                db.save(metadata).await.log_err();
 488                            }
 489                            DbOperation::Delete(session_id) => {
 490                                db.delete(session_id).await.log_err();
 491                            }
 492                        }
 493                    }
 494                }
 495            }
 496        });
 497
 498        let mut this = Self {
 499            db,
 500            threads: HashMap::default(),
 501            threads_by_paths: HashMap::default(),
 502            threads_by_main_paths: HashMap::default(),
 503            reload_task: None,
 504            session_subscriptions: HashMap::default(),
 505            pending_thread_ops_tx: tx,
 506            _db_operations_task,
 507        };
 508        let _ = this.reload(cx);
 509        this
 510    }
 511
 512    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 513        let mut ops = HashMap::default();
 514        for operation in operations.into_iter().rev() {
 515            if ops.contains_key(operation.id()) {
 516                continue;
 517            }
 518            ops.insert(operation.id().clone(), operation);
 519        }
 520        ops.into_values().collect()
 521    }
 522
 523    fn handle_thread_event(
 524        &mut self,
 525        thread: Entity<acp_thread::AcpThread>,
 526        event: &AcpThreadEvent,
 527        cx: &mut Context<Self>,
 528    ) {
 529        // Don't track subagent threads in the sidebar.
 530        if thread.read(cx).parent_session_id().is_some() {
 531            return;
 532        }
 533
 534        match event {
 535            AcpThreadEvent::NewEntry
 536            | AcpThreadEvent::TitleUpdated
 537            | AcpThreadEvent::EntryUpdated(_)
 538            | AcpThreadEvent::EntriesRemoved(_)
 539            | AcpThreadEvent::ToolAuthorizationRequested(_)
 540            | AcpThreadEvent::ToolAuthorizationReceived(_)
 541            | AcpThreadEvent::Retry(_)
 542            | AcpThreadEvent::Stopped(_)
 543            | AcpThreadEvent::Error
 544            | AcpThreadEvent::LoadError(_)
 545            | AcpThreadEvent::Refusal
 546            | AcpThreadEvent::WorkingDirectoriesUpdated => {
 547                let thread_ref = thread.read(cx);
 548                let existing_thread = self.threads.get(thread_ref.session_id());
 549                let session_id = thread_ref.session_id().clone();
 550                let title = thread_ref
 551                    .title()
 552                    .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 553
 554                let updated_at = Utc::now();
 555
 556                let created_at = existing_thread
 557                    .and_then(|t| t.created_at)
 558                    .unwrap_or_else(|| updated_at);
 559
 560                let agent_id = thread_ref.connection().agent_id();
 561
 562                let folder_paths = {
 563                    let project = thread_ref.project().read(cx);
 564                    let paths: Vec<Arc<Path>> = project
 565                        .visible_worktrees(cx)
 566                        .map(|worktree| worktree.read(cx).abs_path())
 567                        .collect();
 568                    PathList::new(&paths)
 569                };
 570
 571                let main_worktree_paths = {
 572                    let project = thread_ref.project().read(cx);
 573                    let mut main_paths: Vec<Arc<Path>> = Vec::new();
 574                    for repo in project.repositories(cx).values() {
 575                        let snapshot = repo.read(cx).snapshot();
 576                        if snapshot.is_linked_worktree() {
 577                            main_paths.push(snapshot.original_repo_abs_path.clone());
 578                        }
 579                    }
 580                    main_paths.sort();
 581                    main_paths.dedup();
 582                    PathList::new(&main_paths)
 583                };
 584
 585                // Threads without a folder path (e.g. started in an empty
 586                // window) are archived by default so they don't get lost,
 587                // because they won't show up in the sidebar. Users can reload
 588                // them from the archive.
 589                let archived = existing_thread
 590                    .map(|t| t.archived)
 591                    .unwrap_or(folder_paths.is_empty());
 592
 593                let metadata = ThreadMetadata {
 594                    session_id,
 595                    agent_id,
 596                    title,
 597                    created_at: Some(created_at),
 598                    updated_at,
 599                    folder_paths,
 600                    main_worktree_paths,
 601                    archived,
 602                };
 603
 604                self.save(metadata, cx);
 605            }
 606            AcpThreadEvent::TokenUsageUpdated
 607            | AcpThreadEvent::SubagentSpawned(_)
 608            | AcpThreadEvent::PromptCapabilitiesUpdated
 609            | AcpThreadEvent::AvailableCommandsUpdated(_)
 610            | AcpThreadEvent::ModeUpdated(_)
 611            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
 612        }
 613    }
 614}
 615
 616impl Global for ThreadMetadataStore {}
 617
 618struct ThreadMetadataDb(ThreadSafeConnection);
 619
 620impl Domain for ThreadMetadataDb {
 621    const NAME: &str = stringify!(ThreadMetadataDb);
 622
 623    const MIGRATIONS: &[&str] = &[
 624        sql!(
 625            CREATE TABLE IF NOT EXISTS sidebar_threads(
 626                session_id TEXT PRIMARY KEY,
 627                agent_id TEXT,
 628                title TEXT NOT NULL,
 629                updated_at TEXT NOT NULL,
 630                created_at TEXT,
 631                folder_paths TEXT,
 632                folder_paths_order TEXT
 633            ) STRICT;
 634        ),
 635        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 636        sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths TEXT),
 637        sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths_order TEXT),
 638    ];
 639}
 640
 641db::static_connection!(ThreadMetadataDb, []);
 642
 643impl ThreadMetadataDb {
 644    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 645        self.select::<Arc<str>>(
 646            "SELECT session_id FROM sidebar_threads \
 647             ORDER BY updated_at DESC",
 648        )?()
 649    }
 650
 651    /// List all sidebar thread metadata, ordered by updated_at descending.
 652    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 653        self.select::<ThreadMetadata>(
 654            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order \
 655             FROM sidebar_threads \
 656             ORDER BY updated_at DESC"
 657        )?()
 658    }
 659
 660    /// Upsert metadata for a thread.
 661    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 662        let id = row.session_id.0.clone();
 663        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 664            None
 665        } else {
 666            Some(row.agent_id.to_string())
 667        };
 668        let title = row.title.to_string();
 669        let updated_at = row.updated_at.to_rfc3339();
 670        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 671        let serialized = row.folder_paths.serialize();
 672        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 673            (None, None)
 674        } else {
 675            (Some(serialized.paths), Some(serialized.order))
 676        };
 677        let main_serialized = row.main_worktree_paths.serialize();
 678        let (main_worktree_paths, main_worktree_paths_order) = if row.main_worktree_paths.is_empty()
 679        {
 680            (None, None)
 681        } else {
 682            (Some(main_serialized.paths), Some(main_serialized.order))
 683        };
 684        let archived = row.archived;
 685
 686        self.write(move |conn| {
 687            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order) \
 688                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
 689                       ON CONFLICT(session_id) DO UPDATE SET \
 690                           agent_id = excluded.agent_id, \
 691                           title = excluded.title, \
 692                           updated_at = excluded.updated_at, \
 693                           created_at = excluded.created_at, \
 694                           folder_paths = excluded.folder_paths, \
 695                           folder_paths_order = excluded.folder_paths_order, \
 696                           archived = excluded.archived, \
 697                           main_worktree_paths = excluded.main_worktree_paths, \
 698                           main_worktree_paths_order = excluded.main_worktree_paths_order";
 699            let mut stmt = Statement::prepare(conn, sql)?;
 700            let mut i = stmt.bind(&id, 1)?;
 701            i = stmt.bind(&agent_id, i)?;
 702            i = stmt.bind(&title, i)?;
 703            i = stmt.bind(&updated_at, i)?;
 704            i = stmt.bind(&created_at, i)?;
 705            i = stmt.bind(&folder_paths, i)?;
 706            i = stmt.bind(&folder_paths_order, i)?;
 707            i = stmt.bind(&archived, i)?;
 708            i = stmt.bind(&main_worktree_paths, i)?;
 709            stmt.bind(&main_worktree_paths_order, i)?;
 710            stmt.exec()
 711        })
 712        .await
 713    }
 714
 715    /// Delete metadata for a single thread.
 716    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 717        let id = session_id.0.clone();
 718        self.write(move |conn| {
 719            let mut stmt =
 720                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 721            stmt.bind(&id, 1)?;
 722            stmt.exec()
 723        })
 724        .await
 725    }
 726}
 727
 728impl Column for ThreadMetadata {
 729    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 730        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 731        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 732        let (title, next): (String, i32) = Column::column(statement, next)?;
 733        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 734        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 735        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 736        let (folder_paths_order_str, next): (Option<String>, i32) =
 737            Column::column(statement, next)?;
 738        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 739        let (main_worktree_paths_str, next): (Option<String>, i32) =
 740            Column::column(statement, next)?;
 741        let (main_worktree_paths_order_str, next): (Option<String>, i32) =
 742            Column::column(statement, next)?;
 743
 744        let agent_id = agent_id
 745            .map(|id| AgentId::new(id))
 746            .unwrap_or(ZED_AGENT_ID.clone());
 747
 748        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 749        let created_at = created_at_str
 750            .as_deref()
 751            .map(DateTime::parse_from_rfc3339)
 752            .transpose()?
 753            .map(|dt| dt.with_timezone(&Utc));
 754
 755        let folder_paths = folder_paths_str
 756            .map(|paths| {
 757                PathList::deserialize(&util::path_list::SerializedPathList {
 758                    paths,
 759                    order: folder_paths_order_str.unwrap_or_default(),
 760                })
 761            })
 762            .unwrap_or_default();
 763
 764        let main_worktree_paths = main_worktree_paths_str
 765            .map(|paths| {
 766                PathList::deserialize(&util::path_list::SerializedPathList {
 767                    paths,
 768                    order: main_worktree_paths_order_str.unwrap_or_default(),
 769                })
 770            })
 771            .unwrap_or_default();
 772
 773        Ok((
 774            ThreadMetadata {
 775                session_id: acp::SessionId::new(id),
 776                agent_id,
 777                title: title.into(),
 778                updated_at,
 779                created_at,
 780                folder_paths,
 781                main_worktree_paths,
 782                archived,
 783            },
 784            next,
 785        ))
 786    }
 787}
 788
 789#[cfg(test)]
 790mod tests {
 791    use super::*;
 792    use acp_thread::{AgentConnection, StubAgentConnection};
 793    use action_log::ActionLog;
 794    use agent::DbThread;
 795    use agent_client_protocol as acp;
 796    use feature_flags::FeatureFlagAppExt;
 797    use gpui::TestAppContext;
 798    use project::FakeFs;
 799    use project::Project;
 800    use std::path::Path;
 801    use std::rc::Rc;
 802
 803    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 804        DbThread {
 805            title: title.to_string().into(),
 806            messages: Vec::new(),
 807            updated_at,
 808            detailed_summary: None,
 809            initial_project_snapshot: None,
 810            cumulative_token_usage: Default::default(),
 811            request_token_usage: Default::default(),
 812            model: None,
 813            profile: None,
 814            imported: false,
 815            subagent_context: None,
 816            speed: None,
 817            thinking_enabled: false,
 818            thinking_effort: None,
 819            draft_prompt: None,
 820            ui_scroll_position: None,
 821        }
 822    }
 823
 824    fn make_metadata(
 825        session_id: &str,
 826        title: &str,
 827        updated_at: DateTime<Utc>,
 828        folder_paths: PathList,
 829    ) -> ThreadMetadata {
 830        ThreadMetadata {
 831            archived: false,
 832            session_id: acp::SessionId::new(session_id),
 833            agent_id: agent::ZED_AGENT_ID.clone(),
 834            title: title.to_string().into(),
 835            updated_at,
 836            created_at: Some(updated_at),
 837            folder_paths,
 838            main_worktree_paths: PathList::default(),
 839        }
 840    }
 841
 842    fn init_test(cx: &mut TestAppContext) {
 843        cx.update(|cx| {
 844            let settings_store = settings::SettingsStore::test(cx);
 845            cx.set_global(settings_store);
 846            cx.update_flags(true, vec!["agent-v2".to_string()]);
 847            ThreadMetadataStore::init_global(cx);
 848            ThreadStore::init_global(cx);
 849        });
 850        cx.run_until_parked();
 851    }
 852
 853    #[gpui::test]
 854    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 855        let first_paths = PathList::new(&[Path::new("/project-a")]);
 856        let second_paths = PathList::new(&[Path::new("/project-b")]);
 857        let now = Utc::now();
 858        let older = now - chrono::Duration::seconds(1);
 859
 860        let thread = std::thread::current();
 861        let test_name = thread.name().unwrap_or("unknown_test");
 862        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 863        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 864            &db_name,
 865        )));
 866
 867        db.save(make_metadata(
 868            "session-1",
 869            "First Thread",
 870            now,
 871            first_paths.clone(),
 872        ))
 873        .await
 874        .unwrap();
 875        db.save(make_metadata(
 876            "session-2",
 877            "Second Thread",
 878            older,
 879            second_paths.clone(),
 880        ))
 881        .await
 882        .unwrap();
 883
 884        cx.update(|cx| {
 885            let settings_store = settings::SettingsStore::test(cx);
 886            cx.set_global(settings_store);
 887            cx.update_flags(true, vec!["agent-v2".to_string()]);
 888            ThreadMetadataStore::init_global(cx);
 889        });
 890
 891        cx.run_until_parked();
 892
 893        cx.update(|cx| {
 894            let store = ThreadMetadataStore::global(cx);
 895            let store = store.read(cx);
 896
 897            let entry_ids = store
 898                .entry_ids()
 899                .map(|session_id| session_id.0.to_string())
 900                .collect::<Vec<_>>();
 901            assert_eq!(entry_ids.len(), 2);
 902            assert!(entry_ids.contains(&"session-1".to_string()));
 903            assert!(entry_ids.contains(&"session-2".to_string()));
 904
 905            let first_path_entries = store
 906                .entries_for_path(&first_paths)
 907                .map(|entry| entry.session_id.0.to_string())
 908                .collect::<Vec<_>>();
 909            assert_eq!(first_path_entries, vec!["session-1"]);
 910
 911            let second_path_entries = store
 912                .entries_for_path(&second_paths)
 913                .map(|entry| entry.session_id.0.to_string())
 914                .collect::<Vec<_>>();
 915            assert_eq!(second_path_entries, vec!["session-2"]);
 916        });
 917    }
 918
 919    #[gpui::test]
 920    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 921        init_test(cx);
 922
 923        let first_paths = PathList::new(&[Path::new("/project-a")]);
 924        let second_paths = PathList::new(&[Path::new("/project-b")]);
 925        let initial_time = Utc::now();
 926        let updated_time = initial_time + chrono::Duration::seconds(1);
 927
 928        let initial_metadata = make_metadata(
 929            "session-1",
 930            "First Thread",
 931            initial_time,
 932            first_paths.clone(),
 933        );
 934
 935        let second_metadata = make_metadata(
 936            "session-2",
 937            "Second Thread",
 938            initial_time,
 939            second_paths.clone(),
 940        );
 941
 942        cx.update(|cx| {
 943            let store = ThreadMetadataStore::global(cx);
 944            store.update(cx, |store, cx| {
 945                store.save(initial_metadata, cx);
 946                store.save(second_metadata, cx);
 947            });
 948        });
 949
 950        cx.run_until_parked();
 951
 952        cx.update(|cx| {
 953            let store = ThreadMetadataStore::global(cx);
 954            let store = store.read(cx);
 955
 956            let first_path_entries = store
 957                .entries_for_path(&first_paths)
 958                .map(|entry| entry.session_id.0.to_string())
 959                .collect::<Vec<_>>();
 960            assert_eq!(first_path_entries, vec!["session-1"]);
 961
 962            let second_path_entries = store
 963                .entries_for_path(&second_paths)
 964                .map(|entry| entry.session_id.0.to_string())
 965                .collect::<Vec<_>>();
 966            assert_eq!(second_path_entries, vec!["session-2"]);
 967        });
 968
 969        let moved_metadata = make_metadata(
 970            "session-1",
 971            "First Thread",
 972            updated_time,
 973            second_paths.clone(),
 974        );
 975
 976        cx.update(|cx| {
 977            let store = ThreadMetadataStore::global(cx);
 978            store.update(cx, |store, cx| {
 979                store.save(moved_metadata, cx);
 980            });
 981        });
 982
 983        cx.run_until_parked();
 984
 985        cx.update(|cx| {
 986            let store = ThreadMetadataStore::global(cx);
 987            let store = store.read(cx);
 988
 989            let entry_ids = store
 990                .entry_ids()
 991                .map(|session_id| session_id.0.to_string())
 992                .collect::<Vec<_>>();
 993            assert_eq!(entry_ids.len(), 2);
 994            assert!(entry_ids.contains(&"session-1".to_string()));
 995            assert!(entry_ids.contains(&"session-2".to_string()));
 996
 997            let first_path_entries = store
 998                .entries_for_path(&first_paths)
 999                .map(|entry| entry.session_id.0.to_string())
1000                .collect::<Vec<_>>();
1001            assert!(first_path_entries.is_empty());
1002
1003            let second_path_entries = store
1004                .entries_for_path(&second_paths)
1005                .map(|entry| entry.session_id.0.to_string())
1006                .collect::<Vec<_>>();
1007            assert_eq!(second_path_entries.len(), 2);
1008            assert!(second_path_entries.contains(&"session-1".to_string()));
1009            assert!(second_path_entries.contains(&"session-2".to_string()));
1010        });
1011
1012        cx.update(|cx| {
1013            let store = ThreadMetadataStore::global(cx);
1014            store.update(cx, |store, cx| {
1015                store.delete(acp::SessionId::new("session-2"), cx);
1016            });
1017        });
1018
1019        cx.run_until_parked();
1020
1021        cx.update(|cx| {
1022            let store = ThreadMetadataStore::global(cx);
1023            let store = store.read(cx);
1024
1025            let entry_ids = store
1026                .entry_ids()
1027                .map(|session_id| session_id.0.to_string())
1028                .collect::<Vec<_>>();
1029            assert_eq!(entry_ids, vec!["session-1"]);
1030
1031            let second_path_entries = store
1032                .entries_for_path(&second_paths)
1033                .map(|entry| entry.session_id.0.to_string())
1034                .collect::<Vec<_>>();
1035            assert_eq!(second_path_entries, vec!["session-1"]);
1036        });
1037    }
1038
1039    #[gpui::test]
1040    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1041        init_test(cx);
1042
1043        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1044        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1045        let now = Utc::now();
1046
1047        let existing_metadata = ThreadMetadata {
1048            session_id: acp::SessionId::new("a-session-0"),
1049            agent_id: agent::ZED_AGENT_ID.clone(),
1050            title: "Existing Metadata".into(),
1051            updated_at: now - chrono::Duration::seconds(10),
1052            created_at: Some(now - chrono::Duration::seconds(10)),
1053            folder_paths: project_a_paths.clone(),
1054            main_worktree_paths: PathList::default(),
1055            archived: false,
1056        };
1057
1058        cx.update(|cx| {
1059            let store = ThreadMetadataStore::global(cx);
1060            store.update(cx, |store, cx| {
1061                store.save(existing_metadata, cx);
1062            });
1063        });
1064        cx.run_until_parked();
1065
1066        let threads_to_save = vec![
1067            (
1068                "a-session-0",
1069                "Thread A0 From Native Store",
1070                project_a_paths.clone(),
1071                now,
1072            ),
1073            (
1074                "a-session-1",
1075                "Thread A1",
1076                project_a_paths.clone(),
1077                now + chrono::Duration::seconds(1),
1078            ),
1079            (
1080                "b-session-0",
1081                "Thread B0",
1082                project_b_paths.clone(),
1083                now + chrono::Duration::seconds(2),
1084            ),
1085            (
1086                "projectless",
1087                "Projectless",
1088                PathList::default(),
1089                now + chrono::Duration::seconds(3),
1090            ),
1091        ];
1092
1093        for (session_id, title, paths, updated_at) in &threads_to_save {
1094            let save_task = cx.update(|cx| {
1095                let thread_store = ThreadStore::global(cx);
1096                let session_id = session_id.to_string();
1097                let title = title.to_string();
1098                let paths = paths.clone();
1099                thread_store.update(cx, |store, cx| {
1100                    store.save_thread(
1101                        acp::SessionId::new(session_id),
1102                        make_db_thread(&title, *updated_at),
1103                        paths,
1104                        cx,
1105                    )
1106                })
1107            });
1108            save_task.await.unwrap();
1109            cx.run_until_parked();
1110        }
1111
1112        cx.update(|cx| migrate_thread_metadata(cx));
1113        cx.run_until_parked();
1114
1115        let list = cx.update(|cx| {
1116            let store = ThreadMetadataStore::global(cx);
1117            store.read(cx).entries().cloned().collect::<Vec<_>>()
1118        });
1119
1120        assert_eq!(list.len(), 4);
1121        assert!(
1122            list.iter()
1123                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1124        );
1125
1126        let existing_metadata = list
1127            .iter()
1128            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1129            .unwrap();
1130        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1131        assert!(!existing_metadata.archived);
1132
1133        let migrated_session_ids = list
1134            .iter()
1135            .map(|metadata| metadata.session_id.0.as_ref())
1136            .collect::<Vec<_>>();
1137        assert!(migrated_session_ids.contains(&"a-session-1"));
1138        assert!(migrated_session_ids.contains(&"b-session-0"));
1139        assert!(migrated_session_ids.contains(&"projectless"));
1140
1141        let migrated_entries = list
1142            .iter()
1143            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1144            .collect::<Vec<_>>();
1145        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1146    }
1147
1148    #[gpui::test]
1149    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1150        cx: &mut TestAppContext,
1151    ) {
1152        init_test(cx);
1153
1154        let project_paths = PathList::new(&[Path::new("/project-a")]);
1155        let existing_updated_at = Utc::now();
1156
1157        let existing_metadata = ThreadMetadata {
1158            session_id: acp::SessionId::new("existing-session"),
1159            agent_id: agent::ZED_AGENT_ID.clone(),
1160            title: "Existing Metadata".into(),
1161            updated_at: existing_updated_at,
1162            created_at: Some(existing_updated_at),
1163            folder_paths: project_paths.clone(),
1164            main_worktree_paths: PathList::default(),
1165            archived: false,
1166        };
1167
1168        cx.update(|cx| {
1169            let store = ThreadMetadataStore::global(cx);
1170            store.update(cx, |store, cx| {
1171                store.save(existing_metadata, cx);
1172            });
1173        });
1174        cx.run_until_parked();
1175
1176        let save_task = cx.update(|cx| {
1177            let thread_store = ThreadStore::global(cx);
1178            thread_store.update(cx, |store, cx| {
1179                store.save_thread(
1180                    acp::SessionId::new("existing-session"),
1181                    make_db_thread(
1182                        "Updated Native Thread Title",
1183                        existing_updated_at + chrono::Duration::seconds(1),
1184                    ),
1185                    project_paths.clone(),
1186                    cx,
1187                )
1188            })
1189        });
1190        save_task.await.unwrap();
1191        cx.run_until_parked();
1192
1193        cx.update(|cx| migrate_thread_metadata(cx));
1194        cx.run_until_parked();
1195
1196        let list = cx.update(|cx| {
1197            let store = ThreadMetadataStore::global(cx);
1198            store.read(cx).entries().cloned().collect::<Vec<_>>()
1199        });
1200
1201        assert_eq!(list.len(), 1);
1202        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1203    }
1204
1205    #[gpui::test]
1206    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1207        cx: &mut TestAppContext,
1208    ) {
1209        init_test(cx);
1210
1211        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1212        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1213        let now = Utc::now();
1214
1215        // Create 7 threads for project A and 3 for project B
1216        let mut threads_to_save = Vec::new();
1217        for i in 0..7 {
1218            threads_to_save.push((
1219                format!("a-session-{i}"),
1220                format!("Thread A{i}"),
1221                project_a_paths.clone(),
1222                now + chrono::Duration::seconds(i as i64),
1223            ));
1224        }
1225        for i in 0..3 {
1226            threads_to_save.push((
1227                format!("b-session-{i}"),
1228                format!("Thread B{i}"),
1229                project_b_paths.clone(),
1230                now + chrono::Duration::seconds(i as i64),
1231            ));
1232        }
1233
1234        for (session_id, title, paths, updated_at) in &threads_to_save {
1235            let save_task = cx.update(|cx| {
1236                let thread_store = ThreadStore::global(cx);
1237                let session_id = session_id.to_string();
1238                let title = title.to_string();
1239                let paths = paths.clone();
1240                thread_store.update(cx, |store, cx| {
1241                    store.save_thread(
1242                        acp::SessionId::new(session_id),
1243                        make_db_thread(&title, *updated_at),
1244                        paths,
1245                        cx,
1246                    )
1247                })
1248            });
1249            save_task.await.unwrap();
1250            cx.run_until_parked();
1251        }
1252
1253        cx.update(|cx| migrate_thread_metadata(cx));
1254        cx.run_until_parked();
1255
1256        let list = cx.update(|cx| {
1257            let store = ThreadMetadataStore::global(cx);
1258            store.read(cx).entries().cloned().collect::<Vec<_>>()
1259        });
1260
1261        assert_eq!(list.len(), 10);
1262
1263        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1264        let mut project_a_entries: Vec<_> = list
1265            .iter()
1266            .filter(|m| m.folder_paths == project_a_paths)
1267            .collect();
1268        assert_eq!(project_a_entries.len(), 7);
1269        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1270
1271        for entry in &project_a_entries[..5] {
1272            assert!(
1273                !entry.archived,
1274                "Expected {} to be unarchived (top 5 most recent)",
1275                entry.session_id.0
1276            );
1277        }
1278        for entry in &project_a_entries[5..] {
1279            assert!(
1280                entry.archived,
1281                "Expected {} to be archived (older than top 5)",
1282                entry.session_id.0
1283            );
1284        }
1285
1286        // Project B: all 3 should be unarchived (under the limit)
1287        let project_b_entries: Vec<_> = list
1288            .iter()
1289            .filter(|m| m.folder_paths == project_b_paths)
1290            .collect();
1291        assert_eq!(project_b_entries.len(), 3);
1292        assert!(project_b_entries.iter().all(|m| !m.archived));
1293    }
1294
1295    #[gpui::test]
1296    async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1297        init_test(cx);
1298
1299        let fs = FakeFs::new(cx.executor());
1300        let project = Project::test(fs, None::<&Path>, cx).await;
1301        let connection = Rc::new(StubAgentConnection::new());
1302
1303        let thread = cx
1304            .update(|cx| {
1305                connection
1306                    .clone()
1307                    .new_session(project.clone(), PathList::default(), cx)
1308            })
1309            .await
1310            .unwrap();
1311        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1312
1313        cx.update(|cx| {
1314            thread.update(cx, |thread, cx| {
1315                thread.set_title("Draft Thread".into(), cx).detach();
1316            });
1317        });
1318        cx.run_until_parked();
1319
1320        let metadata_ids = cx.update(|cx| {
1321            ThreadMetadataStore::global(cx)
1322                .read(cx)
1323                .entry_ids()
1324                .collect::<Vec<_>>()
1325        });
1326        assert_eq!(metadata_ids, vec![session_id]);
1327
1328        drop(thread);
1329        cx.update(|_| {});
1330        cx.run_until_parked();
1331        cx.run_until_parked();
1332
1333        let metadata_ids = cx.update(|cx| {
1334            ThreadMetadataStore::global(cx)
1335                .read(cx)
1336                .entry_ids()
1337                .collect::<Vec<_>>()
1338        });
1339        assert!(
1340            metadata_ids.is_empty(),
1341            "expected empty draft thread metadata to be deleted on release"
1342        );
1343    }
1344
1345    #[gpui::test]
1346    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1347        init_test(cx);
1348
1349        let fs = FakeFs::new(cx.executor());
1350        let project = Project::test(fs, None::<&Path>, cx).await;
1351        let connection = Rc::new(StubAgentConnection::new());
1352
1353        let thread = cx
1354            .update(|cx| {
1355                connection
1356                    .clone()
1357                    .new_session(project.clone(), PathList::default(), cx)
1358            })
1359            .await
1360            .unwrap();
1361        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1362
1363        cx.update(|cx| {
1364            thread.update(cx, |thread, cx| {
1365                thread.push_user_content_block(None, "Hello".into(), cx);
1366            });
1367        });
1368        cx.run_until_parked();
1369
1370        let metadata_ids = cx.update(|cx| {
1371            ThreadMetadataStore::global(cx)
1372                .read(cx)
1373                .entry_ids()
1374                .collect::<Vec<_>>()
1375        });
1376        assert_eq!(metadata_ids, vec![session_id.clone()]);
1377
1378        drop(thread);
1379        cx.update(|_| {});
1380        cx.run_until_parked();
1381
1382        let metadata_ids = cx.update(|cx| {
1383            ThreadMetadataStore::global(cx)
1384                .read(cx)
1385                .entry_ids()
1386                .collect::<Vec<_>>()
1387        });
1388        assert_eq!(metadata_ids, vec![session_id]);
1389    }
1390
1391    #[gpui::test]
1392    async fn test_threads_without_project_association_are_archived_by_default(
1393        cx: &mut TestAppContext,
1394    ) {
1395        init_test(cx);
1396
1397        let fs = FakeFs::new(cx.executor());
1398        let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1399        let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1400        let connection = Rc::new(StubAgentConnection::new());
1401
1402        let thread_without_worktree = cx
1403            .update(|cx| {
1404                connection.clone().new_session(
1405                    project_without_worktree.clone(),
1406                    PathList::default(),
1407                    cx,
1408                )
1409            })
1410            .await
1411            .unwrap();
1412        let session_without_worktree =
1413            cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1414
1415        cx.update(|cx| {
1416            thread_without_worktree.update(cx, |thread, cx| {
1417                thread.set_title("No Project Thread".into(), cx).detach();
1418            });
1419        });
1420        cx.run_until_parked();
1421
1422        let thread_with_worktree = cx
1423            .update(|cx| {
1424                connection.clone().new_session(
1425                    project_with_worktree.clone(),
1426                    PathList::default(),
1427                    cx,
1428                )
1429            })
1430            .await
1431            .unwrap();
1432        let session_with_worktree =
1433            cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1434
1435        cx.update(|cx| {
1436            thread_with_worktree.update(cx, |thread, cx| {
1437                thread.set_title("Project Thread".into(), cx).detach();
1438            });
1439        });
1440        cx.run_until_parked();
1441
1442        cx.update(|cx| {
1443            let store = ThreadMetadataStore::global(cx);
1444            let store = store.read(cx);
1445
1446            let without_worktree = store
1447                .entry(&session_without_worktree)
1448                .expect("missing metadata for thread without project association");
1449            assert!(without_worktree.folder_paths.is_empty());
1450            assert!(
1451                without_worktree.archived,
1452                "expected thread without project association to be archived"
1453            );
1454
1455            let with_worktree = store
1456                .entry(&session_with_worktree)
1457                .expect("missing metadata for thread with project association");
1458            assert_eq!(
1459                with_worktree.folder_paths,
1460                PathList::new(&[Path::new("/project-a")])
1461            );
1462            assert!(
1463                !with_worktree.archived,
1464                "expected thread with project association to remain unarchived"
1465            );
1466        });
1467    }
1468
1469    #[gpui::test]
1470    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1471        init_test(cx);
1472
1473        let fs = FakeFs::new(cx.executor());
1474        let project = Project::test(fs, None::<&Path>, cx).await;
1475        let connection = Rc::new(StubAgentConnection::new());
1476
1477        // Create a regular (non-subagent) AcpThread.
1478        let regular_thread = cx
1479            .update(|cx| {
1480                connection
1481                    .clone()
1482                    .new_session(project.clone(), PathList::default(), cx)
1483            })
1484            .await
1485            .unwrap();
1486
1487        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1488
1489        // Set a title on the regular thread to trigger a save via handle_thread_update.
1490        cx.update(|cx| {
1491            regular_thread.update(cx, |thread, cx| {
1492                thread.set_title("Regular Thread".into(), cx).detach();
1493            });
1494        });
1495        cx.run_until_parked();
1496
1497        // Create a subagent AcpThread
1498        let subagent_session_id = acp::SessionId::new("subagent-session");
1499        let subagent_thread = cx.update(|cx| {
1500            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1501            cx.new(|cx| {
1502                acp_thread::AcpThread::new(
1503                    Some(regular_session_id.clone()),
1504                    Some("Subagent Thread".into()),
1505                    None,
1506                    connection.clone(),
1507                    project.clone(),
1508                    action_log,
1509                    subagent_session_id.clone(),
1510                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1511                    cx,
1512                )
1513            })
1514        });
1515
1516        // Set a title on the subagent thread to trigger handle_thread_update.
1517        cx.update(|cx| {
1518            subagent_thread.update(cx, |thread, cx| {
1519                thread
1520                    .set_title("Subagent Thread Title".into(), cx)
1521                    .detach();
1522            });
1523        });
1524        cx.run_until_parked();
1525
1526        // List all metadata from the store cache.
1527        let list = cx.update(|cx| {
1528            let store = ThreadMetadataStore::global(cx);
1529            store.read(cx).entries().cloned().collect::<Vec<_>>()
1530        });
1531
1532        // The subagent thread should NOT appear in the sidebar metadata.
1533        // Only the regular thread should be listed.
1534        assert_eq!(
1535            list.len(),
1536            1,
1537            "Expected only the regular thread in sidebar metadata, \
1538             but found {} entries (subagent threads are leaking into the sidebar)",
1539            list.len(),
1540        );
1541        assert_eq!(list[0].session_id, regular_session_id);
1542        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1543    }
1544
1545    #[test]
1546    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1547        let now = Utc::now();
1548
1549        let operations = vec![
1550            DbOperation::Upsert(make_metadata(
1551                "session-1",
1552                "First Thread",
1553                now,
1554                PathList::default(),
1555            )),
1556            DbOperation::Delete(acp::SessionId::new("session-1")),
1557        ];
1558
1559        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1560
1561        assert_eq!(deduped.len(), 1);
1562        assert_eq!(
1563            deduped[0],
1564            DbOperation::Delete(acp::SessionId::new("session-1"))
1565        );
1566    }
1567
1568    #[test]
1569    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1570        let now = Utc::now();
1571        let later = now + chrono::Duration::seconds(1);
1572
1573        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1574        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1575
1576        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1577            DbOperation::Upsert(old_metadata),
1578            DbOperation::Upsert(new_metadata.clone()),
1579        ]);
1580
1581        assert_eq!(deduped.len(), 1);
1582        assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1583    }
1584
1585    #[test]
1586    fn test_dedup_db_operations_preserves_distinct_sessions() {
1587        let now = Utc::now();
1588
1589        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1590        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1591        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1592            DbOperation::Upsert(metadata1.clone()),
1593            DbOperation::Upsert(metadata2.clone()),
1594        ]);
1595
1596        assert_eq!(deduped.len(), 2);
1597        assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1598        assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1599    }
1600
1601    #[gpui::test]
1602    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1603        init_test(cx);
1604
1605        let paths = PathList::new(&[Path::new("/project-a")]);
1606        let now = Utc::now();
1607        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1608
1609        cx.update(|cx| {
1610            let store = ThreadMetadataStore::global(cx);
1611            store.update(cx, |store, cx| {
1612                store.save(metadata, cx);
1613            });
1614        });
1615
1616        cx.run_until_parked();
1617
1618        cx.update(|cx| {
1619            let store = ThreadMetadataStore::global(cx);
1620            let store = store.read(cx);
1621
1622            let path_entries = store
1623                .entries_for_path(&paths)
1624                .map(|e| e.session_id.0.to_string())
1625                .collect::<Vec<_>>();
1626            assert_eq!(path_entries, vec!["session-1"]);
1627
1628            let archived = store
1629                .archived_entries()
1630                .map(|e| e.session_id.0.to_string())
1631                .collect::<Vec<_>>();
1632            assert!(archived.is_empty());
1633        });
1634
1635        cx.update(|cx| {
1636            let store = ThreadMetadataStore::global(cx);
1637            store.update(cx, |store, cx| {
1638                store.archive(&acp::SessionId::new("session-1"), cx);
1639            });
1640        });
1641
1642        cx.run_until_parked();
1643
1644        cx.update(|cx| {
1645            let store = ThreadMetadataStore::global(cx);
1646            let store = store.read(cx);
1647
1648            let path_entries = store
1649                .entries_for_path(&paths)
1650                .map(|e| e.session_id.0.to_string())
1651                .collect::<Vec<_>>();
1652            assert!(path_entries.is_empty());
1653
1654            let archived = store.archived_entries().collect::<Vec<_>>();
1655            assert_eq!(archived.len(), 1);
1656            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1657            assert!(archived[0].archived);
1658        });
1659
1660        cx.update(|cx| {
1661            let store = ThreadMetadataStore::global(cx);
1662            store.update(cx, |store, cx| {
1663                store.unarchive(&acp::SessionId::new("session-1"), cx);
1664            });
1665        });
1666
1667        cx.run_until_parked();
1668
1669        cx.update(|cx| {
1670            let store = ThreadMetadataStore::global(cx);
1671            let store = store.read(cx);
1672
1673            let path_entries = store
1674                .entries_for_path(&paths)
1675                .map(|e| e.session_id.0.to_string())
1676                .collect::<Vec<_>>();
1677            assert_eq!(path_entries, vec!["session-1"]);
1678
1679            let archived = store
1680                .archived_entries()
1681                .map(|e| e.session_id.0.to_string())
1682                .collect::<Vec<_>>();
1683            assert!(archived.is_empty());
1684        });
1685    }
1686
1687    #[gpui::test]
1688    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1689        init_test(cx);
1690
1691        let paths = PathList::new(&[Path::new("/project-a")]);
1692        let now = Utc::now();
1693
1694        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1695        let metadata2 = make_metadata(
1696            "session-2",
1697            "Archived Thread",
1698            now - chrono::Duration::seconds(1),
1699            paths.clone(),
1700        );
1701
1702        cx.update(|cx| {
1703            let store = ThreadMetadataStore::global(cx);
1704            store.update(cx, |store, cx| {
1705                store.save(metadata1, cx);
1706                store.save(metadata2, cx);
1707            });
1708        });
1709
1710        cx.run_until_parked();
1711
1712        cx.update(|cx| {
1713            let store = ThreadMetadataStore::global(cx);
1714            store.update(cx, |store, cx| {
1715                store.archive(&acp::SessionId::new("session-2"), cx);
1716            });
1717        });
1718
1719        cx.run_until_parked();
1720
1721        cx.update(|cx| {
1722            let store = ThreadMetadataStore::global(cx);
1723            let store = store.read(cx);
1724
1725            let path_entries = store
1726                .entries_for_path(&paths)
1727                .map(|e| e.session_id.0.to_string())
1728                .collect::<Vec<_>>();
1729            assert_eq!(path_entries, vec!["session-1"]);
1730
1731            let all_entries = store
1732                .entries()
1733                .map(|e| e.session_id.0.to_string())
1734                .collect::<Vec<_>>();
1735            assert_eq!(all_entries.len(), 2);
1736            assert!(all_entries.contains(&"session-1".to_string()));
1737            assert!(all_entries.contains(&"session-2".to_string()));
1738
1739            let archived = store
1740                .archived_entries()
1741                .map(|e| e.session_id.0.to_string())
1742                .collect::<Vec<_>>();
1743            assert_eq!(archived, vec!["session-2"]);
1744        });
1745    }
1746
1747    #[gpui::test]
1748    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1749        init_test(cx);
1750
1751        let paths = PathList::new(&[Path::new("/project-a")]);
1752        let now = Utc::now();
1753
1754        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1755        let m2 = make_metadata(
1756            "session-2",
1757            "Thread Two",
1758            now - chrono::Duration::seconds(1),
1759            paths.clone(),
1760        );
1761        let m3 = make_metadata(
1762            "session-3",
1763            "Thread Three",
1764            now - chrono::Duration::seconds(2),
1765            paths,
1766        );
1767
1768        cx.update(|cx| {
1769            let store = ThreadMetadataStore::global(cx);
1770            store.update(cx, |store, cx| {
1771                store.save_all(vec![m1, m2, m3], cx);
1772            });
1773        });
1774
1775        cx.run_until_parked();
1776
1777        cx.update(|cx| {
1778            let store = ThreadMetadataStore::global(cx);
1779            let store = store.read(cx);
1780
1781            let all_entries = store
1782                .entries()
1783                .map(|e| e.session_id.0.to_string())
1784                .collect::<Vec<_>>();
1785            assert_eq!(all_entries.len(), 3);
1786            assert!(all_entries.contains(&"session-1".to_string()));
1787            assert!(all_entries.contains(&"session-2".to_string()));
1788            assert!(all_entries.contains(&"session-3".to_string()));
1789
1790            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1791            assert_eq!(entry_ids.len(), 3);
1792        });
1793    }
1794
1795    #[gpui::test]
1796    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1797        init_test(cx);
1798
1799        let paths = PathList::new(&[Path::new("/project-a")]);
1800        let now = Utc::now();
1801        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1802
1803        cx.update(|cx| {
1804            let store = ThreadMetadataStore::global(cx);
1805            store.update(cx, |store, cx| {
1806                store.save(metadata, cx);
1807            });
1808        });
1809
1810        cx.run_until_parked();
1811
1812        cx.update(|cx| {
1813            let store = ThreadMetadataStore::global(cx);
1814            store.update(cx, |store, cx| {
1815                store.archive(&acp::SessionId::new("session-1"), cx);
1816            });
1817        });
1818
1819        cx.run_until_parked();
1820
1821        cx.update(|cx| {
1822            let store = ThreadMetadataStore::global(cx);
1823            store.update(cx, |store, cx| {
1824                let _ = store.reload(cx);
1825            });
1826        });
1827
1828        cx.run_until_parked();
1829
1830        cx.update(|cx| {
1831            let store = ThreadMetadataStore::global(cx);
1832            let store = store.read(cx);
1833
1834            let thread = store
1835                .entries()
1836                .find(|e| e.session_id.0.as_ref() == "session-1")
1837                .expect("thread should exist after reload");
1838            assert!(thread.archived);
1839
1840            let path_entries = store
1841                .entries_for_path(&paths)
1842                .map(|e| e.session_id.0.to_string())
1843                .collect::<Vec<_>>();
1844            assert!(path_entries.is_empty());
1845
1846            let archived = store
1847                .archived_entries()
1848                .map(|e| e.session_id.0.to_string())
1849                .collect::<Vec<_>>();
1850            assert_eq!(archived, vec!["session-1"]);
1851        });
1852    }
1853
1854    #[gpui::test]
1855    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1856        init_test(cx);
1857
1858        cx.run_until_parked();
1859
1860        cx.update(|cx| {
1861            let store = ThreadMetadataStore::global(cx);
1862            store.update(cx, |store, cx| {
1863                store.archive(&acp::SessionId::new("nonexistent"), cx);
1864            });
1865        });
1866
1867        cx.run_until_parked();
1868
1869        cx.update(|cx| {
1870            let store = ThreadMetadataStore::global(cx);
1871            let store = store.read(cx);
1872
1873            assert!(store.is_empty());
1874            assert_eq!(store.entries().count(), 0);
1875            assert_eq!(store.archived_entries().count(), 0);
1876        });
1877    }
1878
1879    #[gpui::test]
1880    async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1881        init_test(cx);
1882
1883        let paths = PathList::new(&[Path::new("/project-a")]);
1884        let now = Utc::now();
1885        let metadata = make_metadata("session-1", "Thread 1", now, paths);
1886        let session_id = metadata.session_id.clone();
1887
1888        cx.update(|cx| {
1889            let store = ThreadMetadataStore::global(cx);
1890            store.update(cx, |store, cx| {
1891                store.save(metadata.clone(), cx);
1892                store.archive(&session_id, cx);
1893            });
1894        });
1895
1896        cx.run_until_parked();
1897
1898        cx.update(|cx| {
1899            let store = ThreadMetadataStore::global(cx);
1900            let store = store.read(cx);
1901
1902            let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1903            pretty_assertions::assert_eq!(
1904                entries,
1905                vec![ThreadMetadata {
1906                    archived: true,
1907                    ..metadata
1908                }]
1909            );
1910        });
1911    }
1912}