thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AcpThreadEvent;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::Context as _;
   7use chrono::{DateTime, Utc};
   8use collections::{HashMap, HashSet};
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    ThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We skip migrating threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    let store = ThreadMetadataStore::global(cx);
  46    let db = store.read(cx).db.clone();
  47
  48    cx.spawn(async move |cx| {
  49        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  50
  51        let is_first_migration = existing_entries.is_empty();
  52
  53        let mut to_migrate = store.read_with(cx, |_store, cx| {
  54            ThreadStore::global(cx)
  55                .read(cx)
  56                .entries()
  57                .filter_map(|entry| {
  58                    if existing_entries.contains(&entry.id.0) {
  59                        return None;
  60                    }
  61
  62                    Some(ThreadMetadata {
  63                        session_id: entry.id,
  64                        agent_id: ZED_AGENT_ID.clone(),
  65                        title: entry.title,
  66                        updated_at: entry.updated_at,
  67                        created_at: entry.created_at,
  68                        folder_paths: entry.folder_paths,
  69                        main_worktree_paths: PathList::default(),
  70                        archived: true,
  71                    })
  72                })
  73                .collect::<Vec<_>>()
  74        });
  75
  76        if to_migrate.is_empty() {
  77            return anyhow::Ok(());
  78        }
  79
  80        // On the first migration (no entries in DB yet), keep the 5 most
  81        // recent threads per project unarchived.
  82        if is_first_migration {
  83            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  84            for entry in &mut to_migrate {
  85                if entry.folder_paths.is_empty() {
  86                    continue;
  87                }
  88                per_project
  89                    .entry(entry.folder_paths.clone())
  90                    .or_default()
  91                    .push(entry);
  92            }
  93            for entries in per_project.values_mut() {
  94                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  95                for entry in entries.iter_mut().take(5) {
  96                    entry.archived = false;
  97                }
  98            }
  99        }
 100
 101        log::info!("Migrating {} thread store entries", to_migrate.len());
 102
 103        // Manually save each entry to the database and call reload, otherwise
 104        // we'll end up triggering lots of reloads after each save
 105        for entry in to_migrate {
 106            db.save(entry).await?;
 107        }
 108
 109        log::info!("Finished migrating thread store entries");
 110
 111        let _ = store.update(cx, |store, cx| store.reload(cx));
 112        anyhow::Ok(())
 113    })
 114    .detach_and_log_err(cx);
 115}
 116
 117struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 118impl Global for GlobalThreadMetadataStore {}
 119
 120/// Lightweight metadata for any thread (native or ACP), enough to populate
 121/// the sidebar list and route to the correct load path when clicked.
 122#[derive(Debug, Clone, PartialEq)]
 123pub struct ThreadMetadata {
 124    pub session_id: acp::SessionId,
 125    pub agent_id: AgentId,
 126    pub title: SharedString,
 127    pub updated_at: DateTime<Utc>,
 128    pub created_at: Option<DateTime<Utc>>,
 129    pub folder_paths: PathList,
 130    pub main_worktree_paths: PathList,
 131    pub archived: bool,
 132}
 133
 134impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 135    fn from(meta: &ThreadMetadata) -> Self {
 136        Self {
 137            session_id: meta.session_id.clone(),
 138            work_dirs: Some(meta.folder_paths.clone()),
 139            title: Some(meta.title.clone()),
 140            updated_at: Some(meta.updated_at),
 141            created_at: meta.created_at,
 142            meta: None,
 143        }
 144    }
 145}
 146
 147/// The store holds all metadata needed to show threads in the sidebar/the archive.
 148///
 149/// Automatically listens to AcpThread events and updates metadata if it has changed.
 150pub struct ThreadMetadataStore {
 151    db: ThreadMetadataDb,
 152    threads: HashMap<acp::SessionId, ThreadMetadata>,
 153    threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 154    threads_by_main_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 155    reload_task: Option<Shared<Task<()>>>,
 156    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 157    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 158    _db_operations_task: Task<()>,
 159}
 160
 161#[derive(Debug, PartialEq)]
 162enum DbOperation {
 163    Upsert(ThreadMetadata),
 164    Delete(acp::SessionId),
 165}
 166
 167impl DbOperation {
 168    fn id(&self) -> &acp::SessionId {
 169        match self {
 170            DbOperation::Upsert(thread) => &thread.session_id,
 171            DbOperation::Delete(session_id) => session_id,
 172        }
 173    }
 174}
 175
 176impl ThreadMetadataStore {
 177    #[cfg(not(any(test, feature = "test-support")))]
 178    pub fn init_global(cx: &mut App) {
 179        if cx.has_global::<Self>() {
 180            return;
 181        }
 182
 183        let db = ThreadMetadataDb::global(cx);
 184        let thread_store = cx.new(|cx| Self::new(db, cx));
 185        cx.set_global(GlobalThreadMetadataStore(thread_store));
 186    }
 187
 188    #[cfg(any(test, feature = "test-support"))]
 189    pub fn init_global(cx: &mut App) {
 190        let thread = std::thread::current();
 191        let test_name = thread.name().unwrap_or("unknown_test");
 192        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 193        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 194        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 195        cx.set_global(GlobalThreadMetadataStore(thread_store));
 196    }
 197
 198    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 199        cx.try_global::<GlobalThreadMetadataStore>()
 200            .map(|store| store.0.clone())
 201    }
 202
 203    pub fn global(cx: &App) -> Entity<Self> {
 204        cx.global::<GlobalThreadMetadataStore>().0.clone()
 205    }
 206
 207    pub fn is_empty(&self) -> bool {
 208        self.threads.is_empty()
 209    }
 210
 211    /// Returns all thread IDs.
 212    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 213        self.threads.keys().cloned()
 214    }
 215
 216    /// Returns the metadata for a specific thread, if it exists.
 217    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 218        self.threads.get(session_id)
 219    }
 220
 221    /// Returns all threads.
 222    pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 223        self.threads.values()
 224    }
 225
 226    /// Returns all archived threads.
 227    pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 228        self.entries().filter(|t| t.archived)
 229    }
 230
 231    /// Returns all threads for the given path list, excluding archived threads.
 232    pub fn entries_for_path(
 233        &self,
 234        path_list: &PathList,
 235    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 236        self.threads_by_paths
 237            .get(path_list)
 238            .into_iter()
 239            .flatten()
 240            .filter_map(|s| self.threads.get(s))
 241            .filter(|s| !s.archived)
 242    }
 243
 244    /// Returns threads whose `main_worktree_paths` matches the given path list,
 245    /// excluding archived threads. This finds threads that were opened in a
 246    /// linked worktree but are associated with the given main worktree.
 247    pub fn entries_for_main_worktree_path(
 248        &self,
 249        path_list: &PathList,
 250    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 251        self.threads_by_main_paths
 252            .get(path_list)
 253            .into_iter()
 254            .flatten()
 255            .filter_map(|s| self.threads.get(s))
 256            .filter(|s| !s.archived)
 257    }
 258
 259    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 260        let db = self.db.clone();
 261        self.reload_task.take();
 262
 263        let list_task = cx
 264            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 265
 266        let reload_task = cx
 267            .spawn(async move |this, cx| {
 268                let Some(rows) = list_task.await.log_err() else {
 269                    return;
 270                };
 271
 272                this.update(cx, |this, cx| {
 273                    this.threads.clear();
 274                    this.threads_by_paths.clear();
 275                    this.threads_by_main_paths.clear();
 276
 277                    for row in rows {
 278                        this.threads_by_paths
 279                            .entry(row.folder_paths.clone())
 280                            .or_default()
 281                            .insert(row.session_id.clone());
 282                        if !row.main_worktree_paths.is_empty() {
 283                            this.threads_by_main_paths
 284                                .entry(row.main_worktree_paths.clone())
 285                                .or_default()
 286                                .insert(row.session_id.clone());
 287                        }
 288                        this.threads.insert(row.session_id.clone(), row);
 289                    }
 290
 291                    cx.notify();
 292                })
 293                .ok();
 294            })
 295            .shared();
 296        self.reload_task = Some(reload_task.clone());
 297        reload_task
 298    }
 299
 300    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 301        if !cx.has_flag::<AgentV2FeatureFlag>() {
 302            return;
 303        }
 304
 305        for metadata in metadata {
 306            self.save_internal(metadata);
 307        }
 308        cx.notify();
 309    }
 310
 311    #[cfg(any(test, feature = "test-support"))]
 312    pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 313        self.save(metadata, cx)
 314    }
 315
 316    fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 317        if !cx.has_flag::<AgentV2FeatureFlag>() {
 318            return;
 319        }
 320
 321        self.save_internal(metadata);
 322        cx.notify();
 323    }
 324
 325    fn save_internal(&mut self, metadata: ThreadMetadata) {
 326        if let Some(thread) = self.threads.get(&metadata.session_id) {
 327            if thread.folder_paths != metadata.folder_paths {
 328                if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
 329                    session_ids.remove(&metadata.session_id);
 330                }
 331            }
 332            if thread.main_worktree_paths != metadata.main_worktree_paths
 333                && !thread.main_worktree_paths.is_empty()
 334            {
 335                if let Some(session_ids) = self
 336                    .threads_by_main_paths
 337                    .get_mut(&thread.main_worktree_paths)
 338                {
 339                    session_ids.remove(&metadata.session_id);
 340                }
 341            }
 342        }
 343
 344        self.threads
 345            .insert(metadata.session_id.clone(), metadata.clone());
 346
 347        self.threads_by_paths
 348            .entry(metadata.folder_paths.clone())
 349            .or_default()
 350            .insert(metadata.session_id.clone());
 351
 352        if !metadata.main_worktree_paths.is_empty() {
 353            self.threads_by_main_paths
 354                .entry(metadata.main_worktree_paths.clone())
 355                .or_default()
 356                .insert(metadata.session_id.clone());
 357        }
 358
 359        self.pending_thread_ops_tx
 360            .try_send(DbOperation::Upsert(metadata))
 361            .log_err();
 362    }
 363
 364    pub fn update_working_directories(
 365        &mut self,
 366        session_id: &acp::SessionId,
 367        work_dirs: PathList,
 368        cx: &mut Context<Self>,
 369    ) {
 370        if !cx.has_flag::<AgentV2FeatureFlag>() {
 371            return;
 372        }
 373
 374        if let Some(thread) = self.threads.get(session_id) {
 375            self.save_internal(ThreadMetadata {
 376                folder_paths: work_dirs,
 377                ..thread.clone()
 378            });
 379            cx.notify();
 380        }
 381    }
 382
 383    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 384        self.update_archived(session_id, true, cx);
 385    }
 386
 387    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 388        self.update_archived(session_id, false, cx);
 389    }
 390
 391    fn update_archived(
 392        &mut self,
 393        session_id: &acp::SessionId,
 394        archived: bool,
 395        cx: &mut Context<Self>,
 396    ) {
 397        if !cx.has_flag::<AgentV2FeatureFlag>() {
 398            return;
 399        }
 400
 401        if let Some(thread) = self.threads.get(session_id) {
 402            self.save_internal(ThreadMetadata {
 403                archived,
 404                ..thread.clone()
 405            });
 406            cx.notify();
 407        }
 408    }
 409
 410    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 411        if !cx.has_flag::<AgentV2FeatureFlag>() {
 412            return;
 413        }
 414
 415        if let Some(thread) = self.threads.get(&session_id) {
 416            if let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths) {
 417                session_ids.remove(&session_id);
 418            }
 419            if !thread.main_worktree_paths.is_empty() {
 420                if let Some(session_ids) = self
 421                    .threads_by_main_paths
 422                    .get_mut(&thread.main_worktree_paths)
 423                {
 424                    session_ids.remove(&session_id);
 425                }
 426            }
 427        }
 428        self.threads.remove(&session_id);
 429        self.pending_thread_ops_tx
 430            .try_send(DbOperation::Delete(session_id))
 431            .log_err();
 432        cx.notify();
 433    }
 434
 435    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 436        let weak_store = cx.weak_entity();
 437
 438        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 439            // Don't track subagent threads in the sidebar.
 440            if thread.parent_session_id().is_some() {
 441                return;
 442            }
 443
 444            let thread_entity = cx.entity();
 445
 446            cx.on_release({
 447                let weak_store = weak_store.clone();
 448                move |thread, cx| {
 449                    weak_store
 450                        .update(cx, |store, _cx| {
 451                            let session_id = thread.session_id().clone();
 452                            store.session_subscriptions.remove(&session_id);
 453                        })
 454                        .ok();
 455                }
 456            })
 457            .detach();
 458
 459            weak_store
 460                .update(cx, |this, cx| {
 461                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
 462                    this.session_subscriptions
 463                        .insert(thread.session_id().clone(), subscription);
 464                })
 465                .ok();
 466        })
 467        .detach();
 468
 469        let (tx, rx) = smol::channel::unbounded();
 470        let _db_operations_task = cx.background_spawn({
 471            let db = db.clone();
 472            async move {
 473                while let Ok(first_update) = rx.recv().await {
 474                    let mut updates = vec![first_update];
 475                    while let Ok(update) = rx.try_recv() {
 476                        updates.push(update);
 477                    }
 478                    let updates = Self::dedup_db_operations(updates);
 479                    for operation in updates {
 480                        match operation {
 481                            DbOperation::Upsert(metadata) => {
 482                                db.save(metadata).await.log_err();
 483                            }
 484                            DbOperation::Delete(session_id) => {
 485                                db.delete(session_id).await.log_err();
 486                            }
 487                        }
 488                    }
 489                }
 490            }
 491        });
 492
 493        let mut this = Self {
 494            db,
 495            threads: HashMap::default(),
 496            threads_by_paths: HashMap::default(),
 497            threads_by_main_paths: HashMap::default(),
 498            reload_task: None,
 499            session_subscriptions: HashMap::default(),
 500            pending_thread_ops_tx: tx,
 501            _db_operations_task,
 502        };
 503        let _ = this.reload(cx);
 504        this
 505    }
 506
 507    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 508        let mut ops = HashMap::default();
 509        for operation in operations.into_iter().rev() {
 510            if ops.contains_key(operation.id()) {
 511                continue;
 512            }
 513            ops.insert(operation.id().clone(), operation);
 514        }
 515        ops.into_values().collect()
 516    }
 517
 518    fn handle_thread_event(
 519        &mut self,
 520        thread: Entity<acp_thread::AcpThread>,
 521        event: &AcpThreadEvent,
 522        cx: &mut Context<Self>,
 523    ) {
 524        // Don't track subagent threads in the sidebar.
 525        if thread.read(cx).parent_session_id().is_some() {
 526            return;
 527        }
 528
 529        match event {
 530            AcpThreadEvent::NewEntry
 531            | AcpThreadEvent::TitleUpdated
 532            | AcpThreadEvent::EntryUpdated(_)
 533            | AcpThreadEvent::EntriesRemoved(_)
 534            | AcpThreadEvent::ToolAuthorizationRequested(_)
 535            | AcpThreadEvent::ToolAuthorizationReceived(_)
 536            | AcpThreadEvent::Retry(_)
 537            | AcpThreadEvent::Stopped(_)
 538            | AcpThreadEvent::Error
 539            | AcpThreadEvent::LoadError(_)
 540            | AcpThreadEvent::Refusal
 541            | AcpThreadEvent::WorkingDirectoriesUpdated => {
 542                let thread_ref = thread.read(cx);
 543                if thread_ref.entries().is_empty() {
 544                    return;
 545                }
 546
 547                let existing_thread = self.threads.get(thread_ref.session_id());
 548                let session_id = thread_ref.session_id().clone();
 549                let title = thread_ref
 550                    .title()
 551                    .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 552
 553                let updated_at = Utc::now();
 554
 555                let created_at = existing_thread
 556                    .and_then(|t| t.created_at)
 557                    .unwrap_or_else(|| updated_at);
 558
 559                let agent_id = thread_ref.connection().agent_id();
 560
 561                let folder_paths = {
 562                    let project = thread_ref.project().read(cx);
 563                    let paths: Vec<Arc<Path>> = project
 564                        .visible_worktrees(cx)
 565                        .map(|worktree| worktree.read(cx).abs_path())
 566                        .collect();
 567                    PathList::new(&paths)
 568                };
 569
 570                let main_worktree_paths = {
 571                    let project = thread_ref.project().read(cx);
 572                    let mut main_paths: Vec<Arc<Path>> = Vec::new();
 573                    for repo in project.repositories(cx).values() {
 574                        let snapshot = repo.read(cx).snapshot();
 575                        if snapshot.is_linked_worktree() {
 576                            main_paths.push(snapshot.original_repo_abs_path.clone());
 577                        }
 578                    }
 579                    main_paths.sort();
 580                    main_paths.dedup();
 581                    PathList::new(&main_paths)
 582                };
 583
 584                // Threads without a folder path (e.g. started in an empty
 585                // window) are archived by default so they don't get lost,
 586                // because they won't show up in the sidebar. Users can reload
 587                // them from the archive.
 588                let archived = existing_thread
 589                    .map(|t| t.archived)
 590                    .unwrap_or(folder_paths.is_empty());
 591
 592                let metadata = ThreadMetadata {
 593                    session_id,
 594                    agent_id,
 595                    title,
 596                    created_at: Some(created_at),
 597                    updated_at,
 598                    folder_paths,
 599                    main_worktree_paths,
 600                    archived,
 601                };
 602
 603                self.save(metadata, cx);
 604            }
 605            AcpThreadEvent::TokenUsageUpdated
 606            | AcpThreadEvent::SubagentSpawned(_)
 607            | AcpThreadEvent::PromptCapabilitiesUpdated
 608            | AcpThreadEvent::AvailableCommandsUpdated(_)
 609            | AcpThreadEvent::ModeUpdated(_)
 610            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
 611        }
 612    }
 613}
 614
 615impl Global for ThreadMetadataStore {}
 616
 617struct ThreadMetadataDb(ThreadSafeConnection);
 618
 619impl Domain for ThreadMetadataDb {
 620    const NAME: &str = stringify!(ThreadMetadataDb);
 621
 622    const MIGRATIONS: &[&str] = &[
 623        sql!(
 624            CREATE TABLE IF NOT EXISTS sidebar_threads(
 625                session_id TEXT PRIMARY KEY,
 626                agent_id TEXT,
 627                title TEXT NOT NULL,
 628                updated_at TEXT NOT NULL,
 629                created_at TEXT,
 630                folder_paths TEXT,
 631                folder_paths_order TEXT
 632            ) STRICT;
 633        ),
 634        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 635        sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths TEXT),
 636        sql!(ALTER TABLE sidebar_threads ADD COLUMN main_worktree_paths_order TEXT),
 637    ];
 638}
 639
 640db::static_connection!(ThreadMetadataDb, []);
 641
 642impl ThreadMetadataDb {
 643    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 644        self.select::<Arc<str>>(
 645            "SELECT session_id FROM sidebar_threads \
 646             ORDER BY updated_at DESC",
 647        )?()
 648    }
 649
 650    /// List all sidebar thread metadata, ordered by updated_at descending.
 651    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 652        self.select::<ThreadMetadata>(
 653            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order \
 654             FROM sidebar_threads \
 655             ORDER BY updated_at DESC"
 656        )?()
 657    }
 658
 659    /// Upsert metadata for a thread.
 660    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 661        let id = row.session_id.0.clone();
 662        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 663            None
 664        } else {
 665            Some(row.agent_id.to_string())
 666        };
 667        let title = row.title.to_string();
 668        let updated_at = row.updated_at.to_rfc3339();
 669        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 670        let serialized = row.folder_paths.serialize();
 671        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 672            (None, None)
 673        } else {
 674            (Some(serialized.paths), Some(serialized.order))
 675        };
 676        let main_serialized = row.main_worktree_paths.serialize();
 677        let (main_worktree_paths, main_worktree_paths_order) = if row.main_worktree_paths.is_empty()
 678        {
 679            (None, None)
 680        } else {
 681            (Some(main_serialized.paths), Some(main_serialized.order))
 682        };
 683        let archived = row.archived;
 684
 685        self.write(move |conn| {
 686            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived, main_worktree_paths, main_worktree_paths_order) \
 687                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
 688                       ON CONFLICT(session_id) DO UPDATE SET \
 689                           agent_id = excluded.agent_id, \
 690                           title = excluded.title, \
 691                           updated_at = excluded.updated_at, \
 692                           created_at = excluded.created_at, \
 693                           folder_paths = excluded.folder_paths, \
 694                           folder_paths_order = excluded.folder_paths_order, \
 695                           archived = excluded.archived, \
 696                           main_worktree_paths = excluded.main_worktree_paths, \
 697                           main_worktree_paths_order = excluded.main_worktree_paths_order";
 698            let mut stmt = Statement::prepare(conn, sql)?;
 699            let mut i = stmt.bind(&id, 1)?;
 700            i = stmt.bind(&agent_id, i)?;
 701            i = stmt.bind(&title, i)?;
 702            i = stmt.bind(&updated_at, i)?;
 703            i = stmt.bind(&created_at, i)?;
 704            i = stmt.bind(&folder_paths, i)?;
 705            i = stmt.bind(&folder_paths_order, i)?;
 706            i = stmt.bind(&archived, i)?;
 707            i = stmt.bind(&main_worktree_paths, i)?;
 708            stmt.bind(&main_worktree_paths_order, i)?;
 709            stmt.exec()
 710        })
 711        .await
 712    }
 713
 714    /// Delete metadata for a single thread.
 715    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 716        let id = session_id.0.clone();
 717        self.write(move |conn| {
 718            let mut stmt =
 719                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 720            stmt.bind(&id, 1)?;
 721            stmt.exec()
 722        })
 723        .await
 724    }
 725}
 726
 727impl Column for ThreadMetadata {
 728    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 729        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 730        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 731        let (title, next): (String, i32) = Column::column(statement, next)?;
 732        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 733        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 734        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 735        let (folder_paths_order_str, next): (Option<String>, i32) =
 736            Column::column(statement, next)?;
 737        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 738        let (main_worktree_paths_str, next): (Option<String>, i32) =
 739            Column::column(statement, next)?;
 740        let (main_worktree_paths_order_str, next): (Option<String>, i32) =
 741            Column::column(statement, next)?;
 742
 743        let agent_id = agent_id
 744            .map(|id| AgentId::new(id))
 745            .unwrap_or(ZED_AGENT_ID.clone());
 746
 747        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 748        let created_at = created_at_str
 749            .as_deref()
 750            .map(DateTime::parse_from_rfc3339)
 751            .transpose()?
 752            .map(|dt| dt.with_timezone(&Utc));
 753
 754        let folder_paths = folder_paths_str
 755            .map(|paths| {
 756                PathList::deserialize(&util::path_list::SerializedPathList {
 757                    paths,
 758                    order: folder_paths_order_str.unwrap_or_default(),
 759                })
 760            })
 761            .unwrap_or_default();
 762
 763        let main_worktree_paths = main_worktree_paths_str
 764            .map(|paths| {
 765                PathList::deserialize(&util::path_list::SerializedPathList {
 766                    paths,
 767                    order: main_worktree_paths_order_str.unwrap_or_default(),
 768                })
 769            })
 770            .unwrap_or_default();
 771
 772        Ok((
 773            ThreadMetadata {
 774                session_id: acp::SessionId::new(id),
 775                agent_id,
 776                title: title.into(),
 777                updated_at,
 778                created_at,
 779                folder_paths,
 780                main_worktree_paths,
 781                archived,
 782            },
 783            next,
 784        ))
 785    }
 786}
 787
 788#[cfg(test)]
 789mod tests {
 790    use super::*;
 791    use acp_thread::{AgentConnection, StubAgentConnection};
 792    use action_log::ActionLog;
 793    use agent::DbThread;
 794    use agent_client_protocol as acp;
 795    use feature_flags::FeatureFlagAppExt;
 796    use gpui::TestAppContext;
 797    use project::FakeFs;
 798    use project::Project;
 799    use std::path::Path;
 800    use std::rc::Rc;
 801
 802    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 803        DbThread {
 804            title: title.to_string().into(),
 805            messages: Vec::new(),
 806            updated_at,
 807            detailed_summary: None,
 808            initial_project_snapshot: None,
 809            cumulative_token_usage: Default::default(),
 810            request_token_usage: Default::default(),
 811            model: None,
 812            profile: None,
 813            imported: false,
 814            subagent_context: None,
 815            speed: None,
 816            thinking_enabled: false,
 817            thinking_effort: None,
 818            draft_prompt: None,
 819            ui_scroll_position: None,
 820        }
 821    }
 822
 823    fn make_metadata(
 824        session_id: &str,
 825        title: &str,
 826        updated_at: DateTime<Utc>,
 827        folder_paths: PathList,
 828    ) -> ThreadMetadata {
 829        ThreadMetadata {
 830            archived: false,
 831            session_id: acp::SessionId::new(session_id),
 832            agent_id: agent::ZED_AGENT_ID.clone(),
 833            title: title.to_string().into(),
 834            updated_at,
 835            created_at: Some(updated_at),
 836            folder_paths,
 837            main_worktree_paths: PathList::default(),
 838        }
 839    }
 840
 841    fn init_test(cx: &mut TestAppContext) {
 842        cx.update(|cx| {
 843            let settings_store = settings::SettingsStore::test(cx);
 844            cx.set_global(settings_store);
 845            cx.update_flags(true, vec!["agent-v2".to_string()]);
 846            ThreadMetadataStore::init_global(cx);
 847            ThreadStore::init_global(cx);
 848        });
 849        cx.run_until_parked();
 850    }
 851
 852    #[gpui::test]
 853    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 854        let first_paths = PathList::new(&[Path::new("/project-a")]);
 855        let second_paths = PathList::new(&[Path::new("/project-b")]);
 856        let now = Utc::now();
 857        let older = now - chrono::Duration::seconds(1);
 858
 859        let thread = std::thread::current();
 860        let test_name = thread.name().unwrap_or("unknown_test");
 861        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 862        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 863            &db_name,
 864        )));
 865
 866        db.save(make_metadata(
 867            "session-1",
 868            "First Thread",
 869            now,
 870            first_paths.clone(),
 871        ))
 872        .await
 873        .unwrap();
 874        db.save(make_metadata(
 875            "session-2",
 876            "Second Thread",
 877            older,
 878            second_paths.clone(),
 879        ))
 880        .await
 881        .unwrap();
 882
 883        cx.update(|cx| {
 884            let settings_store = settings::SettingsStore::test(cx);
 885            cx.set_global(settings_store);
 886            cx.update_flags(true, vec!["agent-v2".to_string()]);
 887            ThreadMetadataStore::init_global(cx);
 888        });
 889
 890        cx.run_until_parked();
 891
 892        cx.update(|cx| {
 893            let store = ThreadMetadataStore::global(cx);
 894            let store = store.read(cx);
 895
 896            let entry_ids = store
 897                .entry_ids()
 898                .map(|session_id| session_id.0.to_string())
 899                .collect::<Vec<_>>();
 900            assert_eq!(entry_ids.len(), 2);
 901            assert!(entry_ids.contains(&"session-1".to_string()));
 902            assert!(entry_ids.contains(&"session-2".to_string()));
 903
 904            let first_path_entries = store
 905                .entries_for_path(&first_paths)
 906                .map(|entry| entry.session_id.0.to_string())
 907                .collect::<Vec<_>>();
 908            assert_eq!(first_path_entries, vec!["session-1"]);
 909
 910            let second_path_entries = store
 911                .entries_for_path(&second_paths)
 912                .map(|entry| entry.session_id.0.to_string())
 913                .collect::<Vec<_>>();
 914            assert_eq!(second_path_entries, vec!["session-2"]);
 915        });
 916    }
 917
 918    #[gpui::test]
 919    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 920        init_test(cx);
 921
 922        let first_paths = PathList::new(&[Path::new("/project-a")]);
 923        let second_paths = PathList::new(&[Path::new("/project-b")]);
 924        let initial_time = Utc::now();
 925        let updated_time = initial_time + chrono::Duration::seconds(1);
 926
 927        let initial_metadata = make_metadata(
 928            "session-1",
 929            "First Thread",
 930            initial_time,
 931            first_paths.clone(),
 932        );
 933
 934        let second_metadata = make_metadata(
 935            "session-2",
 936            "Second Thread",
 937            initial_time,
 938            second_paths.clone(),
 939        );
 940
 941        cx.update(|cx| {
 942            let store = ThreadMetadataStore::global(cx);
 943            store.update(cx, |store, cx| {
 944                store.save(initial_metadata, cx);
 945                store.save(second_metadata, cx);
 946            });
 947        });
 948
 949        cx.run_until_parked();
 950
 951        cx.update(|cx| {
 952            let store = ThreadMetadataStore::global(cx);
 953            let store = store.read(cx);
 954
 955            let first_path_entries = store
 956                .entries_for_path(&first_paths)
 957                .map(|entry| entry.session_id.0.to_string())
 958                .collect::<Vec<_>>();
 959            assert_eq!(first_path_entries, vec!["session-1"]);
 960
 961            let second_path_entries = store
 962                .entries_for_path(&second_paths)
 963                .map(|entry| entry.session_id.0.to_string())
 964                .collect::<Vec<_>>();
 965            assert_eq!(second_path_entries, vec!["session-2"]);
 966        });
 967
 968        let moved_metadata = make_metadata(
 969            "session-1",
 970            "First Thread",
 971            updated_time,
 972            second_paths.clone(),
 973        );
 974
 975        cx.update(|cx| {
 976            let store = ThreadMetadataStore::global(cx);
 977            store.update(cx, |store, cx| {
 978                store.save(moved_metadata, cx);
 979            });
 980        });
 981
 982        cx.run_until_parked();
 983
 984        cx.update(|cx| {
 985            let store = ThreadMetadataStore::global(cx);
 986            let store = store.read(cx);
 987
 988            let entry_ids = store
 989                .entry_ids()
 990                .map(|session_id| session_id.0.to_string())
 991                .collect::<Vec<_>>();
 992            assert_eq!(entry_ids.len(), 2);
 993            assert!(entry_ids.contains(&"session-1".to_string()));
 994            assert!(entry_ids.contains(&"session-2".to_string()));
 995
 996            let first_path_entries = store
 997                .entries_for_path(&first_paths)
 998                .map(|entry| entry.session_id.0.to_string())
 999                .collect::<Vec<_>>();
1000            assert!(first_path_entries.is_empty());
1001
1002            let second_path_entries = store
1003                .entries_for_path(&second_paths)
1004                .map(|entry| entry.session_id.0.to_string())
1005                .collect::<Vec<_>>();
1006            assert_eq!(second_path_entries.len(), 2);
1007            assert!(second_path_entries.contains(&"session-1".to_string()));
1008            assert!(second_path_entries.contains(&"session-2".to_string()));
1009        });
1010
1011        cx.update(|cx| {
1012            let store = ThreadMetadataStore::global(cx);
1013            store.update(cx, |store, cx| {
1014                store.delete(acp::SessionId::new("session-2"), cx);
1015            });
1016        });
1017
1018        cx.run_until_parked();
1019
1020        cx.update(|cx| {
1021            let store = ThreadMetadataStore::global(cx);
1022            let store = store.read(cx);
1023
1024            let entry_ids = store
1025                .entry_ids()
1026                .map(|session_id| session_id.0.to_string())
1027                .collect::<Vec<_>>();
1028            assert_eq!(entry_ids, vec!["session-1"]);
1029
1030            let second_path_entries = store
1031                .entries_for_path(&second_paths)
1032                .map(|entry| entry.session_id.0.to_string())
1033                .collect::<Vec<_>>();
1034            assert_eq!(second_path_entries, vec!["session-1"]);
1035        });
1036    }
1037
1038    #[gpui::test]
1039    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1040        init_test(cx);
1041
1042        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1043        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1044        let now = Utc::now();
1045
1046        let existing_metadata = ThreadMetadata {
1047            session_id: acp::SessionId::new("a-session-0"),
1048            agent_id: agent::ZED_AGENT_ID.clone(),
1049            title: "Existing Metadata".into(),
1050            updated_at: now - chrono::Duration::seconds(10),
1051            created_at: Some(now - chrono::Duration::seconds(10)),
1052            folder_paths: project_a_paths.clone(),
1053            main_worktree_paths: PathList::default(),
1054            archived: false,
1055        };
1056
1057        cx.update(|cx| {
1058            let store = ThreadMetadataStore::global(cx);
1059            store.update(cx, |store, cx| {
1060                store.save(existing_metadata, cx);
1061            });
1062        });
1063        cx.run_until_parked();
1064
1065        let threads_to_save = vec![
1066            (
1067                "a-session-0",
1068                "Thread A0 From Native Store",
1069                project_a_paths.clone(),
1070                now,
1071            ),
1072            (
1073                "a-session-1",
1074                "Thread A1",
1075                project_a_paths.clone(),
1076                now + chrono::Duration::seconds(1),
1077            ),
1078            (
1079                "b-session-0",
1080                "Thread B0",
1081                project_b_paths.clone(),
1082                now + chrono::Duration::seconds(2),
1083            ),
1084            (
1085                "projectless",
1086                "Projectless",
1087                PathList::default(),
1088                now + chrono::Duration::seconds(3),
1089            ),
1090        ];
1091
1092        for (session_id, title, paths, updated_at) in &threads_to_save {
1093            let save_task = cx.update(|cx| {
1094                let thread_store = ThreadStore::global(cx);
1095                let session_id = session_id.to_string();
1096                let title = title.to_string();
1097                let paths = paths.clone();
1098                thread_store.update(cx, |store, cx| {
1099                    store.save_thread(
1100                        acp::SessionId::new(session_id),
1101                        make_db_thread(&title, *updated_at),
1102                        paths,
1103                        cx,
1104                    )
1105                })
1106            });
1107            save_task.await.unwrap();
1108            cx.run_until_parked();
1109        }
1110
1111        cx.update(|cx| migrate_thread_metadata(cx));
1112        cx.run_until_parked();
1113
1114        let list = cx.update(|cx| {
1115            let store = ThreadMetadataStore::global(cx);
1116            store.read(cx).entries().cloned().collect::<Vec<_>>()
1117        });
1118
1119        assert_eq!(list.len(), 4);
1120        assert!(
1121            list.iter()
1122                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1123        );
1124
1125        let existing_metadata = list
1126            .iter()
1127            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1128            .unwrap();
1129        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1130        assert!(!existing_metadata.archived);
1131
1132        let migrated_session_ids = list
1133            .iter()
1134            .map(|metadata| metadata.session_id.0.as_ref())
1135            .collect::<Vec<_>>();
1136        assert!(migrated_session_ids.contains(&"a-session-1"));
1137        assert!(migrated_session_ids.contains(&"b-session-0"));
1138        assert!(migrated_session_ids.contains(&"projectless"));
1139
1140        let migrated_entries = list
1141            .iter()
1142            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1143            .collect::<Vec<_>>();
1144        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1145    }
1146
1147    #[gpui::test]
1148    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1149        cx: &mut TestAppContext,
1150    ) {
1151        init_test(cx);
1152
1153        let project_paths = PathList::new(&[Path::new("/project-a")]);
1154        let existing_updated_at = Utc::now();
1155
1156        let existing_metadata = ThreadMetadata {
1157            session_id: acp::SessionId::new("existing-session"),
1158            agent_id: agent::ZED_AGENT_ID.clone(),
1159            title: "Existing Metadata".into(),
1160            updated_at: existing_updated_at,
1161            created_at: Some(existing_updated_at),
1162            folder_paths: project_paths.clone(),
1163            main_worktree_paths: PathList::default(),
1164            archived: false,
1165        };
1166
1167        cx.update(|cx| {
1168            let store = ThreadMetadataStore::global(cx);
1169            store.update(cx, |store, cx| {
1170                store.save(existing_metadata, cx);
1171            });
1172        });
1173        cx.run_until_parked();
1174
1175        let save_task = cx.update(|cx| {
1176            let thread_store = ThreadStore::global(cx);
1177            thread_store.update(cx, |store, cx| {
1178                store.save_thread(
1179                    acp::SessionId::new("existing-session"),
1180                    make_db_thread(
1181                        "Updated Native Thread Title",
1182                        existing_updated_at + chrono::Duration::seconds(1),
1183                    ),
1184                    project_paths.clone(),
1185                    cx,
1186                )
1187            })
1188        });
1189        save_task.await.unwrap();
1190        cx.run_until_parked();
1191
1192        cx.update(|cx| migrate_thread_metadata(cx));
1193        cx.run_until_parked();
1194
1195        let list = cx.update(|cx| {
1196            let store = ThreadMetadataStore::global(cx);
1197            store.read(cx).entries().cloned().collect::<Vec<_>>()
1198        });
1199
1200        assert_eq!(list.len(), 1);
1201        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1202    }
1203
1204    #[gpui::test]
1205    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1206        cx: &mut TestAppContext,
1207    ) {
1208        init_test(cx);
1209
1210        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1211        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1212        let now = Utc::now();
1213
1214        // Create 7 threads for project A and 3 for project B
1215        let mut threads_to_save = Vec::new();
1216        for i in 0..7 {
1217            threads_to_save.push((
1218                format!("a-session-{i}"),
1219                format!("Thread A{i}"),
1220                project_a_paths.clone(),
1221                now + chrono::Duration::seconds(i as i64),
1222            ));
1223        }
1224        for i in 0..3 {
1225            threads_to_save.push((
1226                format!("b-session-{i}"),
1227                format!("Thread B{i}"),
1228                project_b_paths.clone(),
1229                now + chrono::Duration::seconds(i as i64),
1230            ));
1231        }
1232
1233        for (session_id, title, paths, updated_at) in &threads_to_save {
1234            let save_task = cx.update(|cx| {
1235                let thread_store = ThreadStore::global(cx);
1236                let session_id = session_id.to_string();
1237                let title = title.to_string();
1238                let paths = paths.clone();
1239                thread_store.update(cx, |store, cx| {
1240                    store.save_thread(
1241                        acp::SessionId::new(session_id),
1242                        make_db_thread(&title, *updated_at),
1243                        paths,
1244                        cx,
1245                    )
1246                })
1247            });
1248            save_task.await.unwrap();
1249            cx.run_until_parked();
1250        }
1251
1252        cx.update(|cx| migrate_thread_metadata(cx));
1253        cx.run_until_parked();
1254
1255        let list = cx.update(|cx| {
1256            let store = ThreadMetadataStore::global(cx);
1257            store.read(cx).entries().cloned().collect::<Vec<_>>()
1258        });
1259
1260        assert_eq!(list.len(), 10);
1261
1262        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1263        let mut project_a_entries: Vec<_> = list
1264            .iter()
1265            .filter(|m| m.folder_paths == project_a_paths)
1266            .collect();
1267        assert_eq!(project_a_entries.len(), 7);
1268        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1269
1270        for entry in &project_a_entries[..5] {
1271            assert!(
1272                !entry.archived,
1273                "Expected {} to be unarchived (top 5 most recent)",
1274                entry.session_id.0
1275            );
1276        }
1277        for entry in &project_a_entries[5..] {
1278            assert!(
1279                entry.archived,
1280                "Expected {} to be archived (older than top 5)",
1281                entry.session_id.0
1282            );
1283        }
1284
1285        // Project B: all 3 should be unarchived (under the limit)
1286        let project_b_entries: Vec<_> = list
1287            .iter()
1288            .filter(|m| m.folder_paths == project_b_paths)
1289            .collect();
1290        assert_eq!(project_b_entries.len(), 3);
1291        assert!(project_b_entries.iter().all(|m| !m.archived));
1292    }
1293
1294    #[gpui::test]
1295    async fn test_empty_thread_events_do_not_create_metadata(cx: &mut TestAppContext) {
1296        init_test(cx);
1297
1298        let fs = FakeFs::new(cx.executor());
1299        let project = Project::test(fs, None::<&Path>, cx).await;
1300        let connection = Rc::new(StubAgentConnection::new());
1301
1302        let thread = cx
1303            .update(|cx| {
1304                connection
1305                    .clone()
1306                    .new_session(project.clone(), PathList::default(), cx)
1307            })
1308            .await
1309            .unwrap();
1310        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1311
1312        cx.update(|cx| {
1313            thread.update(cx, |thread, cx| {
1314                thread.set_title("Draft Thread".into(), cx).detach();
1315            });
1316        });
1317        cx.run_until_parked();
1318
1319        let metadata_ids = cx.update(|cx| {
1320            ThreadMetadataStore::global(cx)
1321                .read(cx)
1322                .entry_ids()
1323                .collect::<Vec<_>>()
1324        });
1325        assert!(
1326            metadata_ids.is_empty(),
1327            "expected empty draft thread title updates to be ignored"
1328        );
1329
1330        cx.update(|cx| {
1331            thread.update(cx, |thread, cx| {
1332                thread.push_user_content_block(None, "Hello".into(), cx);
1333            });
1334        });
1335        cx.run_until_parked();
1336
1337        let metadata_ids = cx.update(|cx| {
1338            ThreadMetadataStore::global(cx)
1339                .read(cx)
1340                .entry_ids()
1341                .collect::<Vec<_>>()
1342        });
1343        assert_eq!(metadata_ids, vec![session_id]);
1344    }
1345
1346    #[gpui::test]
1347    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1348        init_test(cx);
1349
1350        let fs = FakeFs::new(cx.executor());
1351        let project = Project::test(fs, None::<&Path>, cx).await;
1352        let connection = Rc::new(StubAgentConnection::new());
1353
1354        let thread = cx
1355            .update(|cx| {
1356                connection
1357                    .clone()
1358                    .new_session(project.clone(), PathList::default(), cx)
1359            })
1360            .await
1361            .unwrap();
1362        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1363
1364        cx.update(|cx| {
1365            thread.update(cx, |thread, cx| {
1366                thread.push_user_content_block(None, "Hello".into(), cx);
1367            });
1368        });
1369        cx.run_until_parked();
1370
1371        let metadata_ids = cx.update(|cx| {
1372            ThreadMetadataStore::global(cx)
1373                .read(cx)
1374                .entry_ids()
1375                .collect::<Vec<_>>()
1376        });
1377        assert_eq!(metadata_ids, vec![session_id.clone()]);
1378
1379        drop(thread);
1380        cx.update(|_| {});
1381        cx.run_until_parked();
1382
1383        let metadata_ids = cx.update(|cx| {
1384            ThreadMetadataStore::global(cx)
1385                .read(cx)
1386                .entry_ids()
1387                .collect::<Vec<_>>()
1388        });
1389        assert_eq!(metadata_ids, vec![session_id]);
1390    }
1391
1392    #[gpui::test]
1393    async fn test_threads_without_project_association_are_archived_by_default(
1394        cx: &mut TestAppContext,
1395    ) {
1396        init_test(cx);
1397
1398        let fs = FakeFs::new(cx.executor());
1399        let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1400        let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1401        let connection = Rc::new(StubAgentConnection::new());
1402
1403        let thread_without_worktree = cx
1404            .update(|cx| {
1405                connection.clone().new_session(
1406                    project_without_worktree.clone(),
1407                    PathList::default(),
1408                    cx,
1409                )
1410            })
1411            .await
1412            .unwrap();
1413        let session_without_worktree =
1414            cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1415
1416        cx.update(|cx| {
1417            thread_without_worktree.update(cx, |thread, cx| {
1418                thread.push_user_content_block(None, "content".into(), cx);
1419                thread.set_title("No Project Thread".into(), cx).detach();
1420            });
1421        });
1422        cx.run_until_parked();
1423
1424        let thread_with_worktree = cx
1425            .update(|cx| {
1426                connection.clone().new_session(
1427                    project_with_worktree.clone(),
1428                    PathList::default(),
1429                    cx,
1430                )
1431            })
1432            .await
1433            .unwrap();
1434        let session_with_worktree =
1435            cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1436
1437        cx.update(|cx| {
1438            thread_with_worktree.update(cx, |thread, cx| {
1439                thread.push_user_content_block(None, "content".into(), cx);
1440                thread.set_title("Project Thread".into(), cx).detach();
1441            });
1442        });
1443        cx.run_until_parked();
1444
1445        cx.update(|cx| {
1446            let store = ThreadMetadataStore::global(cx);
1447            let store = store.read(cx);
1448
1449            let without_worktree = store
1450                .entry(&session_without_worktree)
1451                .expect("missing metadata for thread without project association");
1452            assert!(without_worktree.folder_paths.is_empty());
1453            assert!(
1454                without_worktree.archived,
1455                "expected thread without project association to be archived"
1456            );
1457
1458            let with_worktree = store
1459                .entry(&session_with_worktree)
1460                .expect("missing metadata for thread with project association");
1461            assert_eq!(
1462                with_worktree.folder_paths,
1463                PathList::new(&[Path::new("/project-a")])
1464            );
1465            assert!(
1466                !with_worktree.archived,
1467                "expected thread with project association to remain unarchived"
1468            );
1469        });
1470    }
1471
1472    #[gpui::test]
1473    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1474        init_test(cx);
1475
1476        let fs = FakeFs::new(cx.executor());
1477        let project = Project::test(fs, None::<&Path>, cx).await;
1478        let connection = Rc::new(StubAgentConnection::new());
1479
1480        // Create a regular (non-subagent) AcpThread.
1481        let regular_thread = cx
1482            .update(|cx| {
1483                connection
1484                    .clone()
1485                    .new_session(project.clone(), PathList::default(), cx)
1486            })
1487            .await
1488            .unwrap();
1489
1490        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1491
1492        // Set a title on the regular thread to trigger a save via handle_thread_update.
1493        cx.update(|cx| {
1494            regular_thread.update(cx, |thread, cx| {
1495                thread.push_user_content_block(None, "content".into(), cx);
1496                thread.set_title("Regular Thread".into(), cx).detach();
1497            });
1498        });
1499        cx.run_until_parked();
1500
1501        // Create a subagent AcpThread
1502        let subagent_session_id = acp::SessionId::new("subagent-session");
1503        let subagent_thread = cx.update(|cx| {
1504            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1505            cx.new(|cx| {
1506                acp_thread::AcpThread::new(
1507                    Some(regular_session_id.clone()),
1508                    Some("Subagent Thread".into()),
1509                    None,
1510                    connection.clone(),
1511                    project.clone(),
1512                    action_log,
1513                    subagent_session_id.clone(),
1514                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1515                    cx,
1516                )
1517            })
1518        });
1519
1520        // Set a title on the subagent thread to trigger handle_thread_update.
1521        cx.update(|cx| {
1522            subagent_thread.update(cx, |thread, cx| {
1523                thread
1524                    .set_title("Subagent Thread Title".into(), cx)
1525                    .detach();
1526            });
1527        });
1528        cx.run_until_parked();
1529
1530        // List all metadata from the store cache.
1531        let list = cx.update(|cx| {
1532            let store = ThreadMetadataStore::global(cx);
1533            store.read(cx).entries().cloned().collect::<Vec<_>>()
1534        });
1535
1536        // The subagent thread should NOT appear in the sidebar metadata.
1537        // Only the regular thread should be listed.
1538        assert_eq!(
1539            list.len(),
1540            1,
1541            "Expected only the regular thread in sidebar metadata, \
1542             but found {} entries (subagent threads are leaking into the sidebar)",
1543            list.len(),
1544        );
1545        assert_eq!(list[0].session_id, regular_session_id);
1546        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1547    }
1548
1549    #[test]
1550    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1551        let now = Utc::now();
1552
1553        let operations = vec![
1554            DbOperation::Upsert(make_metadata(
1555                "session-1",
1556                "First Thread",
1557                now,
1558                PathList::default(),
1559            )),
1560            DbOperation::Delete(acp::SessionId::new("session-1")),
1561        ];
1562
1563        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1564
1565        assert_eq!(deduped.len(), 1);
1566        assert_eq!(
1567            deduped[0],
1568            DbOperation::Delete(acp::SessionId::new("session-1"))
1569        );
1570    }
1571
1572    #[test]
1573    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1574        let now = Utc::now();
1575        let later = now + chrono::Duration::seconds(1);
1576
1577        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1578        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1579
1580        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1581            DbOperation::Upsert(old_metadata),
1582            DbOperation::Upsert(new_metadata.clone()),
1583        ]);
1584
1585        assert_eq!(deduped.len(), 1);
1586        assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1587    }
1588
1589    #[test]
1590    fn test_dedup_db_operations_preserves_distinct_sessions() {
1591        let now = Utc::now();
1592
1593        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1594        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1595        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1596            DbOperation::Upsert(metadata1.clone()),
1597            DbOperation::Upsert(metadata2.clone()),
1598        ]);
1599
1600        assert_eq!(deduped.len(), 2);
1601        assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1602        assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1603    }
1604
1605    #[gpui::test]
1606    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1607        init_test(cx);
1608
1609        let paths = PathList::new(&[Path::new("/project-a")]);
1610        let now = Utc::now();
1611        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1612
1613        cx.update(|cx| {
1614            let store = ThreadMetadataStore::global(cx);
1615            store.update(cx, |store, cx| {
1616                store.save(metadata, cx);
1617            });
1618        });
1619
1620        cx.run_until_parked();
1621
1622        cx.update(|cx| {
1623            let store = ThreadMetadataStore::global(cx);
1624            let store = store.read(cx);
1625
1626            let path_entries = store
1627                .entries_for_path(&paths)
1628                .map(|e| e.session_id.0.to_string())
1629                .collect::<Vec<_>>();
1630            assert_eq!(path_entries, vec!["session-1"]);
1631
1632            let archived = store
1633                .archived_entries()
1634                .map(|e| e.session_id.0.to_string())
1635                .collect::<Vec<_>>();
1636            assert!(archived.is_empty());
1637        });
1638
1639        cx.update(|cx| {
1640            let store = ThreadMetadataStore::global(cx);
1641            store.update(cx, |store, cx| {
1642                store.archive(&acp::SessionId::new("session-1"), cx);
1643            });
1644        });
1645
1646        cx.run_until_parked();
1647
1648        cx.update(|cx| {
1649            let store = ThreadMetadataStore::global(cx);
1650            let store = store.read(cx);
1651
1652            let path_entries = store
1653                .entries_for_path(&paths)
1654                .map(|e| e.session_id.0.to_string())
1655                .collect::<Vec<_>>();
1656            assert!(path_entries.is_empty());
1657
1658            let archived = store.archived_entries().collect::<Vec<_>>();
1659            assert_eq!(archived.len(), 1);
1660            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1661            assert!(archived[0].archived);
1662        });
1663
1664        cx.update(|cx| {
1665            let store = ThreadMetadataStore::global(cx);
1666            store.update(cx, |store, cx| {
1667                store.unarchive(&acp::SessionId::new("session-1"), cx);
1668            });
1669        });
1670
1671        cx.run_until_parked();
1672
1673        cx.update(|cx| {
1674            let store = ThreadMetadataStore::global(cx);
1675            let store = store.read(cx);
1676
1677            let path_entries = store
1678                .entries_for_path(&paths)
1679                .map(|e| e.session_id.0.to_string())
1680                .collect::<Vec<_>>();
1681            assert_eq!(path_entries, vec!["session-1"]);
1682
1683            let archived = store
1684                .archived_entries()
1685                .map(|e| e.session_id.0.to_string())
1686                .collect::<Vec<_>>();
1687            assert!(archived.is_empty());
1688        });
1689    }
1690
1691    #[gpui::test]
1692    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1693        init_test(cx);
1694
1695        let paths = PathList::new(&[Path::new("/project-a")]);
1696        let now = Utc::now();
1697
1698        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1699        let metadata2 = make_metadata(
1700            "session-2",
1701            "Archived Thread",
1702            now - chrono::Duration::seconds(1),
1703            paths.clone(),
1704        );
1705
1706        cx.update(|cx| {
1707            let store = ThreadMetadataStore::global(cx);
1708            store.update(cx, |store, cx| {
1709                store.save(metadata1, cx);
1710                store.save(metadata2, cx);
1711            });
1712        });
1713
1714        cx.run_until_parked();
1715
1716        cx.update(|cx| {
1717            let store = ThreadMetadataStore::global(cx);
1718            store.update(cx, |store, cx| {
1719                store.archive(&acp::SessionId::new("session-2"), cx);
1720            });
1721        });
1722
1723        cx.run_until_parked();
1724
1725        cx.update(|cx| {
1726            let store = ThreadMetadataStore::global(cx);
1727            let store = store.read(cx);
1728
1729            let path_entries = store
1730                .entries_for_path(&paths)
1731                .map(|e| e.session_id.0.to_string())
1732                .collect::<Vec<_>>();
1733            assert_eq!(path_entries, vec!["session-1"]);
1734
1735            let all_entries = store
1736                .entries()
1737                .map(|e| e.session_id.0.to_string())
1738                .collect::<Vec<_>>();
1739            assert_eq!(all_entries.len(), 2);
1740            assert!(all_entries.contains(&"session-1".to_string()));
1741            assert!(all_entries.contains(&"session-2".to_string()));
1742
1743            let archived = store
1744                .archived_entries()
1745                .map(|e| e.session_id.0.to_string())
1746                .collect::<Vec<_>>();
1747            assert_eq!(archived, vec!["session-2"]);
1748        });
1749    }
1750
1751    #[gpui::test]
1752    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1753        init_test(cx);
1754
1755        let paths = PathList::new(&[Path::new("/project-a")]);
1756        let now = Utc::now();
1757
1758        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1759        let m2 = make_metadata(
1760            "session-2",
1761            "Thread Two",
1762            now - chrono::Duration::seconds(1),
1763            paths.clone(),
1764        );
1765        let m3 = make_metadata(
1766            "session-3",
1767            "Thread Three",
1768            now - chrono::Duration::seconds(2),
1769            paths,
1770        );
1771
1772        cx.update(|cx| {
1773            let store = ThreadMetadataStore::global(cx);
1774            store.update(cx, |store, cx| {
1775                store.save_all(vec![m1, m2, m3], cx);
1776            });
1777        });
1778
1779        cx.run_until_parked();
1780
1781        cx.update(|cx| {
1782            let store = ThreadMetadataStore::global(cx);
1783            let store = store.read(cx);
1784
1785            let all_entries = store
1786                .entries()
1787                .map(|e| e.session_id.0.to_string())
1788                .collect::<Vec<_>>();
1789            assert_eq!(all_entries.len(), 3);
1790            assert!(all_entries.contains(&"session-1".to_string()));
1791            assert!(all_entries.contains(&"session-2".to_string()));
1792            assert!(all_entries.contains(&"session-3".to_string()));
1793
1794            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1795            assert_eq!(entry_ids.len(), 3);
1796        });
1797    }
1798
1799    #[gpui::test]
1800    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1801        init_test(cx);
1802
1803        let paths = PathList::new(&[Path::new("/project-a")]);
1804        let now = Utc::now();
1805        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1806
1807        cx.update(|cx| {
1808            let store = ThreadMetadataStore::global(cx);
1809            store.update(cx, |store, cx| {
1810                store.save(metadata, cx);
1811            });
1812        });
1813
1814        cx.run_until_parked();
1815
1816        cx.update(|cx| {
1817            let store = ThreadMetadataStore::global(cx);
1818            store.update(cx, |store, cx| {
1819                store.archive(&acp::SessionId::new("session-1"), cx);
1820            });
1821        });
1822
1823        cx.run_until_parked();
1824
1825        cx.update(|cx| {
1826            let store = ThreadMetadataStore::global(cx);
1827            store.update(cx, |store, cx| {
1828                let _ = store.reload(cx);
1829            });
1830        });
1831
1832        cx.run_until_parked();
1833
1834        cx.update(|cx| {
1835            let store = ThreadMetadataStore::global(cx);
1836            let store = store.read(cx);
1837
1838            let thread = store
1839                .entries()
1840                .find(|e| e.session_id.0.as_ref() == "session-1")
1841                .expect("thread should exist after reload");
1842            assert!(thread.archived);
1843
1844            let path_entries = store
1845                .entries_for_path(&paths)
1846                .map(|e| e.session_id.0.to_string())
1847                .collect::<Vec<_>>();
1848            assert!(path_entries.is_empty());
1849
1850            let archived = store
1851                .archived_entries()
1852                .map(|e| e.session_id.0.to_string())
1853                .collect::<Vec<_>>();
1854            assert_eq!(archived, vec!["session-1"]);
1855        });
1856    }
1857
1858    #[gpui::test]
1859    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1860        init_test(cx);
1861
1862        cx.run_until_parked();
1863
1864        cx.update(|cx| {
1865            let store = ThreadMetadataStore::global(cx);
1866            store.update(cx, |store, cx| {
1867                store.archive(&acp::SessionId::new("nonexistent"), cx);
1868            });
1869        });
1870
1871        cx.run_until_parked();
1872
1873        cx.update(|cx| {
1874            let store = ThreadMetadataStore::global(cx);
1875            let store = store.read(cx);
1876
1877            assert!(store.is_empty());
1878            assert_eq!(store.entries().count(), 0);
1879            assert_eq!(store.archived_entries().count(), 0);
1880        });
1881    }
1882
1883    #[gpui::test]
1884    async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1885        init_test(cx);
1886
1887        let paths = PathList::new(&[Path::new("/project-a")]);
1888        let now = Utc::now();
1889        let metadata = make_metadata("session-1", "Thread 1", now, paths);
1890        let session_id = metadata.session_id.clone();
1891
1892        cx.update(|cx| {
1893            let store = ThreadMetadataStore::global(cx);
1894            store.update(cx, |store, cx| {
1895                store.save(metadata.clone(), cx);
1896                store.archive(&session_id, cx);
1897            });
1898        });
1899
1900        cx.run_until_parked();
1901
1902        cx.update(|cx| {
1903            let store = ThreadMetadataStore::global(cx);
1904            let store = store.read(cx);
1905
1906            let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1907            pretty_assertions::assert_eq!(
1908                entries,
1909                vec![ThreadMetadata {
1910                    archived: true,
1911                    ..metadata
1912                }]
1913            );
1914        });
1915    }
1916}