thread_metadata_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    sync::Arc,
   4};
   5
   6use acp_thread::AcpThreadEvent;
   7use agent::{ThreadStore, ZED_AGENT_ID};
   8use agent_client_protocol as acp;
   9use anyhow::Context as _;
  10use chrono::{DateTime, Utc};
  11use collections::{HashMap, HashSet};
  12use db::{
  13    sqlez::{
  14        bindable::Column, domain::Domain, statement::Statement,
  15        thread_safe_connection::ThreadSafeConnection,
  16    },
  17    sqlez_macros::sql,
  18};
  19use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  20use futures::{FutureExt as _, future::Shared};
  21use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  22use project::AgentId;
  23use ui::{App, Context, SharedString};
  24use util::ResultExt as _;
  25use workspace::PathList;
  26
  27use crate::DEFAULT_THREAD_TITLE;
  28
  29pub fn init(cx: &mut App) {
  30    ThreadMetadataStore::init_global(cx);
  31
  32    if cx.has_flag::<AgentV2FeatureFlag>() {
  33        migrate_thread_metadata(cx);
  34    }
  35    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  36        if has_flag {
  37            migrate_thread_metadata(cx);
  38        }
  39    })
  40    .detach();
  41}
  42
  43/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  44/// We skip migrating threads that do not have a project.
  45///
  46/// TODO: Remove this after N weeks of shipping the sidebar
  47fn migrate_thread_metadata(cx: &mut App) {
  48    let store = ThreadMetadataStore::global(cx);
  49    let db = store.read(cx).db.clone();
  50
  51    cx.spawn(async move |cx| {
  52        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  53
  54        let is_first_migration = existing_entries.is_empty();
  55
  56        let mut to_migrate = store.read_with(cx, |_store, cx| {
  57            ThreadStore::global(cx)
  58                .read(cx)
  59                .entries()
  60                .filter_map(|entry| {
  61                    if existing_entries.contains(&entry.id.0) || entry.folder_paths.is_empty() {
  62                        return None;
  63                    }
  64
  65                    Some(ThreadMetadata {
  66                        session_id: entry.id,
  67                        agent_id: ZED_AGENT_ID.clone(),
  68                        title: entry.title,
  69                        updated_at: entry.updated_at,
  70                        created_at: entry.created_at,
  71                        folder_paths: entry.folder_paths,
  72                        archived: true,
  73                    })
  74                })
  75                .collect::<Vec<_>>()
  76        });
  77
  78        if to_migrate.is_empty() {
  79            return anyhow::Ok(());
  80        }
  81
  82        // On the first migration (no entries in DB yet), keep the 5 most
  83        // recent threads per project unarchived.
  84        if is_first_migration {
  85            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  86            for entry in &mut to_migrate {
  87                per_project
  88                    .entry(entry.folder_paths.clone())
  89                    .or_default()
  90                    .push(entry);
  91            }
  92            for entries in per_project.values_mut() {
  93                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  94                for entry in entries.iter_mut().take(5) {
  95                    entry.archived = false;
  96                }
  97            }
  98        }
  99
 100        log::info!("Migrating {} thread store entries", to_migrate.len());
 101
 102        // Manually save each entry to the database and call reload, otherwise
 103        // we'll end up triggering lots of reloads after each save
 104        for entry in to_migrate {
 105            db.save(entry).await?;
 106        }
 107
 108        log::info!("Finished migrating thread store entries");
 109
 110        let _ = store.update(cx, |store, cx| store.reload(cx));
 111        anyhow::Ok(())
 112    })
 113    .detach_and_log_err(cx);
 114}
 115
 116struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 117impl Global for GlobalThreadMetadataStore {}
 118
 119/// Lightweight metadata for any thread (native or ACP), enough to populate
 120/// the sidebar list and route to the correct load path when clicked.
 121#[derive(Debug, Clone, PartialEq)]
 122pub struct ThreadMetadata {
 123    pub session_id: acp::SessionId,
 124    pub agent_id: AgentId,
 125    pub title: SharedString,
 126    pub updated_at: DateTime<Utc>,
 127    pub created_at: Option<DateTime<Utc>>,
 128    pub folder_paths: PathList,
 129    pub archived: bool,
 130}
 131
 132impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 133    fn from(meta: &ThreadMetadata) -> Self {
 134        Self {
 135            session_id: meta.session_id.clone(),
 136            work_dirs: Some(meta.folder_paths.clone()),
 137            title: Some(meta.title.clone()),
 138            updated_at: Some(meta.updated_at),
 139            created_at: meta.created_at,
 140            meta: None,
 141        }
 142    }
 143}
 144
 145/// Record of a git worktree that was archived (deleted from disk) when its last thread was archived.
 146/// Lives in this module because it shares the same SQLite database as thread metadata.
 147pub struct ArchivedGitWorktree {
 148    pub id: i64,
 149    pub worktree_path: PathBuf,
 150    pub main_repo_path: PathBuf,
 151    pub branch_name: Option<String>,
 152    pub commit_hash: String,
 153    pub restored: bool,
 154}
 155
 156/// The store holds all metadata needed to show threads in the sidebar/the archive.
 157///
 158/// Automatically listens to AcpThread events and updates metadata if it has changed.
 159pub struct ThreadMetadataStore {
 160    db: ThreadMetadataDb,
 161    threads: HashMap<acp::SessionId, ThreadMetadata>,
 162    threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 163    reload_task: Option<Shared<Task<()>>>,
 164    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 165    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 166    _db_operations_task: Task<()>,
 167}
 168
 169#[derive(Debug, PartialEq)]
 170enum DbOperation {
 171    Upsert(ThreadMetadata),
 172    Delete(acp::SessionId),
 173}
 174
 175impl DbOperation {
 176    fn id(&self) -> &acp::SessionId {
 177        match self {
 178            DbOperation::Upsert(thread) => &thread.session_id,
 179            DbOperation::Delete(session_id) => session_id,
 180        }
 181    }
 182}
 183
 184impl ThreadMetadataStore {
 185    #[cfg(not(any(test, feature = "test-support")))]
 186    pub fn init_global(cx: &mut App) {
 187        if cx.has_global::<Self>() {
 188            return;
 189        }
 190
 191        let db = ThreadMetadataDb::global(cx);
 192        let thread_store = cx.new(|cx| Self::new(db, cx));
 193        cx.set_global(GlobalThreadMetadataStore(thread_store));
 194    }
 195
 196    #[cfg(any(test, feature = "test-support"))]
 197    pub fn init_global(cx: &mut App) {
 198        let thread = std::thread::current();
 199        let test_name = thread.name().unwrap_or("unknown_test");
 200        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 201        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 202        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 203        cx.set_global(GlobalThreadMetadataStore(thread_store));
 204    }
 205
 206    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 207        cx.try_global::<GlobalThreadMetadataStore>()
 208            .map(|store| store.0.clone())
 209    }
 210
 211    pub fn global(cx: &App) -> Entity<Self> {
 212        cx.global::<GlobalThreadMetadataStore>().0.clone()
 213    }
 214
 215    pub fn is_empty(&self) -> bool {
 216        self.threads.is_empty()
 217    }
 218
 219    /// Returns all thread IDs.
 220    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 221        self.threads.keys().cloned()
 222    }
 223
 224    /// Returns the metadata for a specific thread, if it exists.
 225    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 226        self.threads.get(session_id)
 227    }
 228
 229    /// Returns all threads.
 230    pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 231        self.threads.values()
 232    }
 233
 234    /// Returns all archived threads.
 235    pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 236        self.entries().filter(|t| t.archived)
 237    }
 238
 239    /// Returns all threads for the given path list, excluding archived threads.
 240    pub fn entries_for_path(
 241        &self,
 242        path_list: &PathList,
 243    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 244        self.threads_by_paths
 245            .get(path_list)
 246            .into_iter()
 247            .flatten()
 248            .filter_map(|s| self.threads.get(s))
 249            .filter(|s| !s.archived)
 250    }
 251
 252    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 253        let db = self.db.clone();
 254        self.reload_task.take();
 255
 256        let list_task = cx
 257            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 258
 259        let reload_task = cx
 260            .spawn(async move |this, cx| {
 261                let Some(rows) = list_task.await.log_err() else {
 262                    return;
 263                };
 264
 265                this.update(cx, |this, cx| {
 266                    this.threads.clear();
 267                    this.threads_by_paths.clear();
 268
 269                    for row in rows {
 270                        this.threads_by_paths
 271                            .entry(row.folder_paths.clone())
 272                            .or_default()
 273                            .insert(row.session_id.clone());
 274                        this.threads.insert(row.session_id.clone(), row);
 275                    }
 276
 277                    cx.notify();
 278                })
 279                .ok();
 280            })
 281            .shared();
 282        self.reload_task = Some(reload_task.clone());
 283        reload_task
 284    }
 285
 286    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 287        if !cx.has_flag::<AgentV2FeatureFlag>() {
 288            return;
 289        }
 290
 291        for metadata in metadata {
 292            self.save_internal(metadata);
 293        }
 294        cx.notify();
 295    }
 296
 297    #[cfg(any(test, feature = "test-support"))]
 298    pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 299        self.save(metadata, cx)
 300    }
 301
 302    fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 303        if !cx.has_flag::<AgentV2FeatureFlag>() {
 304            return;
 305        }
 306
 307        self.save_internal(metadata);
 308        cx.notify();
 309    }
 310
 311    fn save_internal(&mut self, metadata: ThreadMetadata) {
 312        // If the folder paths have changed, we need to clear the old entry
 313        if let Some(thread) = self.threads.get(&metadata.session_id)
 314            && thread.folder_paths != metadata.folder_paths
 315            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 316        {
 317            session_ids.remove(&metadata.session_id);
 318        }
 319
 320        self.threads
 321            .insert(metadata.session_id.clone(), metadata.clone());
 322
 323        self.threads_by_paths
 324            .entry(metadata.folder_paths.clone())
 325            .or_default()
 326            .insert(metadata.session_id.clone());
 327
 328        self.pending_thread_ops_tx
 329            .try_send(DbOperation::Upsert(metadata))
 330            .log_err();
 331    }
 332
 333    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 334        self.update_archived(session_id, true, cx);
 335    }
 336
 337    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 338        self.update_archived(session_id, false, cx);
 339    }
 340
 341    fn update_archived(
 342        &mut self,
 343        session_id: &acp::SessionId,
 344        archived: bool,
 345        cx: &mut Context<Self>,
 346    ) {
 347        if !cx.has_flag::<AgentV2FeatureFlag>() {
 348            return;
 349        }
 350
 351        if let Some(thread) = self.threads.get(session_id) {
 352            self.save_internal(ThreadMetadata {
 353                archived,
 354                ..thread.clone()
 355            });
 356            cx.notify();
 357        }
 358    }
 359
 360    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 361        if !cx.has_flag::<AgentV2FeatureFlag>() {
 362            return;
 363        }
 364
 365        if let Some(thread) = self.threads.get(&session_id)
 366            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 367        {
 368            session_ids.remove(&session_id);
 369        }
 370        self.threads.remove(&session_id);
 371        self.pending_thread_ops_tx
 372            .try_send(DbOperation::Delete(session_id))
 373            .log_err();
 374        cx.notify();
 375    }
 376
 377    pub fn create_archived_worktree(
 378        &self,
 379        worktree_path: String,
 380        main_repo_path: String,
 381        branch_name: Option<String>,
 382        commit_hash: String,
 383        cx: &mut Context<Self>,
 384    ) -> Task<anyhow::Result<i64>> {
 385        let db = self.db.clone();
 386        cx.background_spawn(async move {
 387            db.create_archived_worktree(
 388                &worktree_path,
 389                &main_repo_path,
 390                branch_name.as_deref(),
 391                &commit_hash,
 392            )
 393            .await
 394        })
 395    }
 396
 397    pub fn get_archived_worktree_by_path(
 398        &self,
 399        worktree_path: String,
 400        cx: &mut Context<Self>,
 401    ) -> Task<anyhow::Result<Option<ArchivedGitWorktree>>> {
 402        let db = self.db.clone();
 403        cx.background_spawn(async move { db.get_archived_worktree_by_path(&worktree_path).await })
 404    }
 405
 406    pub fn delete_archived_worktree(
 407        &self,
 408        id: i64,
 409        cx: &mut Context<Self>,
 410    ) -> Task<anyhow::Result<()>> {
 411        let db = self.db.clone();
 412        cx.background_spawn(async move { db.delete_archived_worktree(id).await })
 413    }
 414
 415    pub fn update_archived_worktree_restored(
 416        &self,
 417        id: i64,
 418        worktree_path: String,
 419        branch_name: Option<String>,
 420        cx: &mut Context<Self>,
 421    ) -> Task<anyhow::Result<()>> {
 422        let db = self.db.clone();
 423        cx.background_spawn(async move {
 424            db.update_archived_worktree_restored(id, &worktree_path, branch_name.as_deref())
 425                .await
 426        })
 427    }
 428
 429    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 430        let weak_store = cx.weak_entity();
 431
 432        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 433            // Don't track subagent threads in the sidebar.
 434            if thread.parent_session_id().is_some() {
 435                return;
 436            }
 437
 438            let thread_entity = cx.entity();
 439
 440            cx.on_release({
 441                let weak_store = weak_store.clone();
 442                move |thread, cx| {
 443                    weak_store
 444                        .update(cx, |store, cx| {
 445                            let session_id = thread.session_id().clone();
 446                            store.session_subscriptions.remove(&session_id);
 447                            if thread.entries().is_empty() {
 448                                // Empty threads can be unloaded without ever being
 449                                // durably persisted by the underlying agent.
 450                                store.delete(session_id, cx);
 451                            }
 452                        })
 453                        .ok();
 454                }
 455            })
 456            .detach();
 457
 458            weak_store
 459                .update(cx, |this, cx| {
 460                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
 461                    this.session_subscriptions
 462                        .insert(thread.session_id().clone(), subscription);
 463                })
 464                .ok();
 465        })
 466        .detach();
 467
 468        let (tx, rx) = smol::channel::unbounded();
 469        let _db_operations_task = cx.background_spawn({
 470            let db = db.clone();
 471            async move {
 472                while let Ok(first_update) = rx.recv().await {
 473                    let mut updates = vec![first_update];
 474                    while let Ok(update) = rx.try_recv() {
 475                        updates.push(update);
 476                    }
 477                    let updates = Self::dedup_db_operations(updates);
 478                    for operation in updates {
 479                        match operation {
 480                            DbOperation::Upsert(metadata) => {
 481                                db.save(metadata).await.log_err();
 482                            }
 483                            DbOperation::Delete(session_id) => {
 484                                db.delete(session_id).await.log_err();
 485                            }
 486                        }
 487                    }
 488                }
 489            }
 490        });
 491
 492        let mut this = Self {
 493            db,
 494            threads: HashMap::default(),
 495            threads_by_paths: HashMap::default(),
 496            reload_task: None,
 497            session_subscriptions: HashMap::default(),
 498            pending_thread_ops_tx: tx,
 499            _db_operations_task,
 500        };
 501        let _ = this.reload(cx);
 502        this
 503    }
 504
 505    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 506        let mut ops = HashMap::default();
 507        for operation in operations.into_iter().rev() {
 508            if ops.contains_key(operation.id()) {
 509                continue;
 510            }
 511            ops.insert(operation.id().clone(), operation);
 512        }
 513        ops.into_values().collect()
 514    }
 515
 516    fn handle_thread_event(
 517        &mut self,
 518        thread: Entity<acp_thread::AcpThread>,
 519        event: &AcpThreadEvent,
 520        cx: &mut Context<Self>,
 521    ) {
 522        // Don't track subagent threads in the sidebar.
 523        if thread.read(cx).parent_session_id().is_some() {
 524            return;
 525        }
 526
 527        match event {
 528            AcpThreadEvent::NewEntry
 529            | AcpThreadEvent::TitleUpdated
 530            | AcpThreadEvent::EntryUpdated(_)
 531            | AcpThreadEvent::EntriesRemoved(_)
 532            | AcpThreadEvent::ToolAuthorizationRequested(_)
 533            | AcpThreadEvent::ToolAuthorizationReceived(_)
 534            | AcpThreadEvent::Retry(_)
 535            | AcpThreadEvent::Stopped(_)
 536            | AcpThreadEvent::Error
 537            | AcpThreadEvent::LoadError(_)
 538            | AcpThreadEvent::Refusal
 539            | AcpThreadEvent::WorkingDirectoriesUpdated => {
 540                let thread_ref = thread.read(cx);
 541                let existing_thread = self.threads.get(thread_ref.session_id());
 542                let session_id = thread_ref.session_id().clone();
 543                let title = thread_ref
 544                    .title()
 545                    .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 546
 547                let updated_at = Utc::now();
 548
 549                let created_at = existing_thread
 550                    .and_then(|t| t.created_at)
 551                    .unwrap_or_else(|| updated_at);
 552
 553                let agent_id = thread_ref.connection().agent_id();
 554
 555                let folder_paths = {
 556                    let project = thread_ref.project().read(cx);
 557                    let paths: Vec<Arc<Path>> = project
 558                        .visible_worktrees(cx)
 559                        .map(|worktree| worktree.read(cx).abs_path())
 560                        .collect();
 561                    PathList::new(&paths)
 562                };
 563
 564                let archived = existing_thread.map(|t| t.archived).unwrap_or(false);
 565
 566                let metadata = ThreadMetadata {
 567                    session_id,
 568                    agent_id,
 569                    title,
 570                    created_at: Some(created_at),
 571                    updated_at,
 572                    folder_paths,
 573                    archived,
 574                };
 575
 576                self.save(metadata, cx);
 577            }
 578            AcpThreadEvent::TokenUsageUpdated
 579            | AcpThreadEvent::SubagentSpawned(_)
 580            | AcpThreadEvent::PromptCapabilitiesUpdated
 581            | AcpThreadEvent::AvailableCommandsUpdated(_)
 582            | AcpThreadEvent::ModeUpdated(_)
 583            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
 584        }
 585    }
 586}
 587
 588impl Global for ThreadMetadataStore {}
 589
 590struct ThreadMetadataDb(ThreadSafeConnection);
 591
 592impl Domain for ThreadMetadataDb {
 593    const NAME: &str = stringify!(ThreadMetadataDb);
 594
 595    const MIGRATIONS: &[&str] = &[
 596        sql!(
 597            CREATE TABLE IF NOT EXISTS sidebar_threads(
 598                session_id TEXT PRIMARY KEY,
 599                agent_id TEXT,
 600                title TEXT NOT NULL,
 601                updated_at TEXT NOT NULL,
 602                created_at TEXT,
 603                folder_paths TEXT,
 604                folder_paths_order TEXT
 605            ) STRICT;
 606        ),
 607        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 608        sql!(
 609            CREATE TABLE IF NOT EXISTS archived_git_worktrees(
 610                id INTEGER PRIMARY KEY,
 611                worktree_path TEXT NOT NULL,
 612                main_repo_path TEXT NOT NULL,
 613                branch_name TEXT,
 614                commit_hash TEXT NOT NULL,
 615                restored INTEGER NOT NULL DEFAULT 0
 616            ) STRICT;
 617
 618            CREATE UNIQUE INDEX IF NOT EXISTS idx_archived_worktrees_path
 619                ON archived_git_worktrees(worktree_path);
 620        ),
 621    ];
 622}
 623
 624db::static_connection!(ThreadMetadataDb, []);
 625
 626impl ThreadMetadataDb {
 627    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 628        self.select::<Arc<str>>(
 629            "SELECT session_id FROM sidebar_threads \
 630             ORDER BY updated_at DESC",
 631        )?()
 632    }
 633
 634    /// List all sidebar thread metadata, ordered by updated_at descending.
 635    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 636        self.select::<ThreadMetadata>(
 637            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
 638             FROM sidebar_threads \
 639             ORDER BY updated_at DESC"
 640        )?()
 641    }
 642
 643    /// Upsert metadata for a thread.
 644    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 645        let id = row.session_id.0.clone();
 646        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 647            None
 648        } else {
 649            Some(row.agent_id.to_string())
 650        };
 651        let title = row.title.to_string();
 652        let updated_at = row.updated_at.to_rfc3339();
 653        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 654        let serialized = row.folder_paths.serialize();
 655        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 656            (None, None)
 657        } else {
 658            (Some(serialized.paths), Some(serialized.order))
 659        };
 660        let archived = row.archived;
 661
 662        self.write(move |conn| {
 663            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
 664                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
 665                       ON CONFLICT(session_id) DO UPDATE SET \
 666                           agent_id = excluded.agent_id, \
 667                           title = excluded.title, \
 668                           updated_at = excluded.updated_at, \
 669                           created_at = excluded.created_at, \
 670                           folder_paths = excluded.folder_paths, \
 671                           folder_paths_order = excluded.folder_paths_order, \
 672                           archived = excluded.archived";
 673            let mut stmt = Statement::prepare(conn, sql)?;
 674            let mut i = stmt.bind(&id, 1)?;
 675            i = stmt.bind(&agent_id, i)?;
 676            i = stmt.bind(&title, i)?;
 677            i = stmt.bind(&updated_at, i)?;
 678            i = stmt.bind(&created_at, i)?;
 679            i = stmt.bind(&folder_paths, i)?;
 680            i = stmt.bind(&folder_paths_order, i)?;
 681            stmt.bind(&archived, i)?;
 682            stmt.exec()
 683        })
 684        .await
 685    }
 686
 687    /// Delete metadata for a single thread.
 688    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 689        let id = session_id.0.clone();
 690        self.write(move |conn| {
 691            let mut stmt =
 692                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 693            stmt.bind(&id, 1)?;
 694            stmt.exec()
 695        })
 696        .await
 697    }
 698
 699    pub async fn create_archived_worktree(
 700        &self,
 701        worktree_path: &str,
 702        main_repo_path: &str,
 703        branch_name: Option<&str>,
 704        commit_hash: &str,
 705    ) -> anyhow::Result<i64> {
 706        let worktree_path = worktree_path.to_string();
 707        let main_repo_path = main_repo_path.to_string();
 708        let branch_name = branch_name.map(|s| s.to_string());
 709        let commit_hash = commit_hash.to_string();
 710        self.write(move |conn| {
 711            let mut stmt = Statement::prepare(
 712                conn,
 713                "INSERT OR REPLACE INTO archived_git_worktrees(\
 714                     worktree_path, main_repo_path, branch_name, commit_hash\
 715                 ) VALUES (?, ?, ?, ?)",
 716            )?;
 717            let mut i = stmt.bind(&worktree_path, 1)?;
 718            i = stmt.bind(&main_repo_path, i)?;
 719            i = stmt.bind(&branch_name, i)?;
 720            stmt.bind(&commit_hash, i)?;
 721            stmt.exec()?;
 722
 723            let mut id_stmt = Statement::prepare(conn, "SELECT last_insert_rowid()")?;
 724            let id = id_stmt
 725                .maybe_row::<i64>()?
 726                .ok_or_else(|| anyhow::anyhow!("No row ID returned after INSERT"))?;
 727
 728            Ok(id)
 729        })
 730        .await
 731    }
 732
 733    pub async fn get_archived_worktree_by_path(
 734        &self,
 735        worktree_path: &str,
 736    ) -> anyhow::Result<Option<ArchivedGitWorktree>> {
 737        let worktree_path = worktree_path.to_string();
 738        self.select_row_bound::<String, ArchivedGitWorktree>(
 739            "SELECT id, worktree_path, main_repo_path, branch_name, commit_hash, restored \
 740             FROM archived_git_worktrees WHERE worktree_path = ?",
 741        )?(worktree_path)
 742    }
 743
 744    pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> {
 745        self.write(move |conn| {
 746            let mut stmt =
 747                Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
 748            stmt.bind(&id, 1)?;
 749            stmt.exec()
 750        })
 751        .await
 752    }
 753
 754    pub async fn update_archived_worktree_restored(
 755        &self,
 756        id: i64,
 757        worktree_path: &str,
 758        branch_name: Option<&str>,
 759    ) -> anyhow::Result<()> {
 760        let worktree_path = worktree_path.to_string();
 761        let branch_name = branch_name.map(|s| s.to_string());
 762        self.write(move |conn| {
 763            let mut stmt = Statement::prepare(
 764                conn,
 765                "UPDATE archived_git_worktrees \
 766                 SET restored = 1, worktree_path = ?, branch_name = ? \
 767                 WHERE id = ?",
 768            )?;
 769            let mut i = stmt.bind(&worktree_path, 1)?;
 770            i = stmt.bind(&branch_name, i)?;
 771            stmt.bind(&id, i)?;
 772            stmt.exec()
 773        })
 774        .await
 775    }
 776}
 777
 778impl Column for ThreadMetadata {
 779    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 780        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 781        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 782        let (title, next): (String, i32) = Column::column(statement, next)?;
 783        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 784        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 785        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 786        let (folder_paths_order_str, next): (Option<String>, i32) =
 787            Column::column(statement, next)?;
 788        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 789
 790        let agent_id = agent_id
 791            .map(|id| AgentId::new(id))
 792            .unwrap_or(ZED_AGENT_ID.clone());
 793
 794        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 795        let created_at = created_at_str
 796            .as_deref()
 797            .map(DateTime::parse_from_rfc3339)
 798            .transpose()?
 799            .map(|dt| dt.with_timezone(&Utc));
 800
 801        let folder_paths = folder_paths_str
 802            .map(|paths| {
 803                PathList::deserialize(&util::path_list::SerializedPathList {
 804                    paths,
 805                    order: folder_paths_order_str.unwrap_or_default(),
 806                })
 807            })
 808            .unwrap_or_default();
 809
 810        Ok((
 811            ThreadMetadata {
 812                session_id: acp::SessionId::new(id),
 813                agent_id,
 814                title: title.into(),
 815                updated_at,
 816                created_at,
 817                folder_paths,
 818                archived,
 819            },
 820            next,
 821        ))
 822    }
 823}
 824
 825impl Column for ArchivedGitWorktree {
 826    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 827        let (id, next): (i64, i32) = Column::column(statement, start_index)?;
 828        let (worktree_path_str, next): (String, i32) = Column::column(statement, next)?;
 829        let (main_repo_path_str, next): (String, i32) = Column::column(statement, next)?;
 830        let (branch_name, next): (Option<String>, i32) = Column::column(statement, next)?;
 831        let (commit_hash, next): (String, i32) = Column::column(statement, next)?;
 832        let (restored_int, next): (i64, i32) = Column::column(statement, next)?;
 833        Ok((
 834            ArchivedGitWorktree {
 835                id,
 836                worktree_path: PathBuf::from(worktree_path_str),
 837                main_repo_path: PathBuf::from(main_repo_path_str),
 838                branch_name,
 839                commit_hash,
 840                restored: restored_int != 0,
 841            },
 842            next,
 843        ))
 844    }
 845}
 846
 847#[cfg(test)]
 848mod tests {
 849    use super::*;
 850    use acp_thread::{AgentConnection, StubAgentConnection};
 851    use action_log::ActionLog;
 852    use agent::DbThread;
 853    use agent_client_protocol as acp;
 854    use feature_flags::FeatureFlagAppExt;
 855    use gpui::TestAppContext;
 856    use project::FakeFs;
 857    use project::Project;
 858    use std::path::Path;
 859    use std::rc::Rc;
 860
 861    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 862        DbThread {
 863            title: title.to_string().into(),
 864            messages: Vec::new(),
 865            updated_at,
 866            detailed_summary: None,
 867            initial_project_snapshot: None,
 868            cumulative_token_usage: Default::default(),
 869            request_token_usage: Default::default(),
 870            model: None,
 871            profile: None,
 872            imported: false,
 873            subagent_context: None,
 874            speed: None,
 875            thinking_enabled: false,
 876            thinking_effort: None,
 877            draft_prompt: None,
 878            ui_scroll_position: None,
 879        }
 880    }
 881
 882    fn make_metadata(
 883        session_id: &str,
 884        title: &str,
 885        updated_at: DateTime<Utc>,
 886        folder_paths: PathList,
 887    ) -> ThreadMetadata {
 888        ThreadMetadata {
 889            archived: false,
 890            session_id: acp::SessionId::new(session_id),
 891            agent_id: agent::ZED_AGENT_ID.clone(),
 892            title: title.to_string().into(),
 893            updated_at,
 894            created_at: Some(updated_at),
 895            folder_paths,
 896        }
 897    }
 898
 899    fn init_test(cx: &mut TestAppContext) {
 900        cx.update(|cx| {
 901            let settings_store = settings::SettingsStore::test(cx);
 902            cx.set_global(settings_store);
 903            cx.update_flags(true, vec!["agent-v2".to_string()]);
 904            ThreadMetadataStore::init_global(cx);
 905            ThreadStore::init_global(cx);
 906        });
 907        cx.run_until_parked();
 908    }
 909
 910    #[gpui::test]
 911    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 912        let first_paths = PathList::new(&[Path::new("/project-a")]);
 913        let second_paths = PathList::new(&[Path::new("/project-b")]);
 914        let now = Utc::now();
 915        let older = now - chrono::Duration::seconds(1);
 916
 917        let thread = std::thread::current();
 918        let test_name = thread.name().unwrap_or("unknown_test");
 919        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 920        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 921            &db_name,
 922        )));
 923
 924        db.save(make_metadata(
 925            "session-1",
 926            "First Thread",
 927            now,
 928            first_paths.clone(),
 929        ))
 930        .await
 931        .unwrap();
 932        db.save(make_metadata(
 933            "session-2",
 934            "Second Thread",
 935            older,
 936            second_paths.clone(),
 937        ))
 938        .await
 939        .unwrap();
 940
 941        cx.update(|cx| {
 942            let settings_store = settings::SettingsStore::test(cx);
 943            cx.set_global(settings_store);
 944            cx.update_flags(true, vec!["agent-v2".to_string()]);
 945            ThreadMetadataStore::init_global(cx);
 946        });
 947
 948        cx.run_until_parked();
 949
 950        cx.update(|cx| {
 951            let store = ThreadMetadataStore::global(cx);
 952            let store = store.read(cx);
 953
 954            let entry_ids = store
 955                .entry_ids()
 956                .map(|session_id| session_id.0.to_string())
 957                .collect::<Vec<_>>();
 958            assert_eq!(entry_ids.len(), 2);
 959            assert!(entry_ids.contains(&"session-1".to_string()));
 960            assert!(entry_ids.contains(&"session-2".to_string()));
 961
 962            let first_path_entries = store
 963                .entries_for_path(&first_paths)
 964                .map(|entry| entry.session_id.0.to_string())
 965                .collect::<Vec<_>>();
 966            assert_eq!(first_path_entries, vec!["session-1"]);
 967
 968            let second_path_entries = store
 969                .entries_for_path(&second_paths)
 970                .map(|entry| entry.session_id.0.to_string())
 971                .collect::<Vec<_>>();
 972            assert_eq!(second_path_entries, vec!["session-2"]);
 973        });
 974    }
 975
 976    #[gpui::test]
 977    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 978        init_test(cx);
 979
 980        let first_paths = PathList::new(&[Path::new("/project-a")]);
 981        let second_paths = PathList::new(&[Path::new("/project-b")]);
 982        let initial_time = Utc::now();
 983        let updated_time = initial_time + chrono::Duration::seconds(1);
 984
 985        let initial_metadata = make_metadata(
 986            "session-1",
 987            "First Thread",
 988            initial_time,
 989            first_paths.clone(),
 990        );
 991
 992        let second_metadata = make_metadata(
 993            "session-2",
 994            "Second Thread",
 995            initial_time,
 996            second_paths.clone(),
 997        );
 998
 999        cx.update(|cx| {
1000            let store = ThreadMetadataStore::global(cx);
1001            store.update(cx, |store, cx| {
1002                store.save(initial_metadata, cx);
1003                store.save(second_metadata, cx);
1004            });
1005        });
1006
1007        cx.run_until_parked();
1008
1009        cx.update(|cx| {
1010            let store = ThreadMetadataStore::global(cx);
1011            let store = store.read(cx);
1012
1013            let first_path_entries = store
1014                .entries_for_path(&first_paths)
1015                .map(|entry| entry.session_id.0.to_string())
1016                .collect::<Vec<_>>();
1017            assert_eq!(first_path_entries, vec!["session-1"]);
1018
1019            let second_path_entries = store
1020                .entries_for_path(&second_paths)
1021                .map(|entry| entry.session_id.0.to_string())
1022                .collect::<Vec<_>>();
1023            assert_eq!(second_path_entries, vec!["session-2"]);
1024        });
1025
1026        let moved_metadata = make_metadata(
1027            "session-1",
1028            "First Thread",
1029            updated_time,
1030            second_paths.clone(),
1031        );
1032
1033        cx.update(|cx| {
1034            let store = ThreadMetadataStore::global(cx);
1035            store.update(cx, |store, cx| {
1036                store.save(moved_metadata, cx);
1037            });
1038        });
1039
1040        cx.run_until_parked();
1041
1042        cx.update(|cx| {
1043            let store = ThreadMetadataStore::global(cx);
1044            let store = store.read(cx);
1045
1046            let entry_ids = store
1047                .entry_ids()
1048                .map(|session_id| session_id.0.to_string())
1049                .collect::<Vec<_>>();
1050            assert_eq!(entry_ids.len(), 2);
1051            assert!(entry_ids.contains(&"session-1".to_string()));
1052            assert!(entry_ids.contains(&"session-2".to_string()));
1053
1054            let first_path_entries = store
1055                .entries_for_path(&first_paths)
1056                .map(|entry| entry.session_id.0.to_string())
1057                .collect::<Vec<_>>();
1058            assert!(first_path_entries.is_empty());
1059
1060            let second_path_entries = store
1061                .entries_for_path(&second_paths)
1062                .map(|entry| entry.session_id.0.to_string())
1063                .collect::<Vec<_>>();
1064            assert_eq!(second_path_entries.len(), 2);
1065            assert!(second_path_entries.contains(&"session-1".to_string()));
1066            assert!(second_path_entries.contains(&"session-2".to_string()));
1067        });
1068
1069        cx.update(|cx| {
1070            let store = ThreadMetadataStore::global(cx);
1071            store.update(cx, |store, cx| {
1072                store.delete(acp::SessionId::new("session-2"), cx);
1073            });
1074        });
1075
1076        cx.run_until_parked();
1077
1078        cx.update(|cx| {
1079            let store = ThreadMetadataStore::global(cx);
1080            let store = store.read(cx);
1081
1082            let entry_ids = store
1083                .entry_ids()
1084                .map(|session_id| session_id.0.to_string())
1085                .collect::<Vec<_>>();
1086            assert_eq!(entry_ids, vec!["session-1"]);
1087
1088            let second_path_entries = store
1089                .entries_for_path(&second_paths)
1090                .map(|entry| entry.session_id.0.to_string())
1091                .collect::<Vec<_>>();
1092            assert_eq!(second_path_entries, vec!["session-1"]);
1093        });
1094    }
1095
1096    #[gpui::test]
1097    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1098        init_test(cx);
1099
1100        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1101        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1102        let now = Utc::now();
1103
1104        let existing_metadata = ThreadMetadata {
1105            session_id: acp::SessionId::new("a-session-0"),
1106            agent_id: agent::ZED_AGENT_ID.clone(),
1107            title: "Existing Metadata".into(),
1108            updated_at: now - chrono::Duration::seconds(10),
1109            created_at: Some(now - chrono::Duration::seconds(10)),
1110            folder_paths: project_a_paths.clone(),
1111            archived: false,
1112        };
1113
1114        cx.update(|cx| {
1115            let store = ThreadMetadataStore::global(cx);
1116            store.update(cx, |store, cx| {
1117                store.save(existing_metadata, cx);
1118            });
1119        });
1120        cx.run_until_parked();
1121
1122        let threads_to_save = vec![
1123            (
1124                "a-session-0",
1125                "Thread A0 From Native Store",
1126                project_a_paths.clone(),
1127                now,
1128            ),
1129            (
1130                "a-session-1",
1131                "Thread A1",
1132                project_a_paths.clone(),
1133                now + chrono::Duration::seconds(1),
1134            ),
1135            (
1136                "b-session-0",
1137                "Thread B0",
1138                project_b_paths.clone(),
1139                now + chrono::Duration::seconds(2),
1140            ),
1141            (
1142                "projectless",
1143                "Projectless",
1144                PathList::default(),
1145                now + chrono::Duration::seconds(3),
1146            ),
1147        ];
1148
1149        for (session_id, title, paths, updated_at) in &threads_to_save {
1150            let save_task = cx.update(|cx| {
1151                let thread_store = ThreadStore::global(cx);
1152                let session_id = session_id.to_string();
1153                let title = title.to_string();
1154                let paths = paths.clone();
1155                thread_store.update(cx, |store, cx| {
1156                    store.save_thread(
1157                        acp::SessionId::new(session_id),
1158                        make_db_thread(&title, *updated_at),
1159                        paths,
1160                        cx,
1161                    )
1162                })
1163            });
1164            save_task.await.unwrap();
1165            cx.run_until_parked();
1166        }
1167
1168        cx.update(|cx| migrate_thread_metadata(cx));
1169        cx.run_until_parked();
1170
1171        let list = cx.update(|cx| {
1172            let store = ThreadMetadataStore::global(cx);
1173            store.read(cx).entries().cloned().collect::<Vec<_>>()
1174        });
1175
1176        assert_eq!(list.len(), 3);
1177        assert!(
1178            list.iter()
1179                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1180        );
1181
1182        let existing_metadata = list
1183            .iter()
1184            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1185            .unwrap();
1186        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1187        assert!(!existing_metadata.archived);
1188
1189        let migrated_session_ids = list
1190            .iter()
1191            .map(|metadata| metadata.session_id.0.as_ref())
1192            .collect::<Vec<_>>();
1193        assert!(migrated_session_ids.contains(&"a-session-1"));
1194        assert!(migrated_session_ids.contains(&"b-session-0"));
1195        assert!(!migrated_session_ids.contains(&"projectless"));
1196
1197        let migrated_entries = list
1198            .iter()
1199            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1200            .collect::<Vec<_>>();
1201        assert!(
1202            migrated_entries
1203                .iter()
1204                .all(|metadata| !metadata.folder_paths.is_empty())
1205        );
1206        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1207    }
1208
1209    #[gpui::test]
1210    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1211        cx: &mut TestAppContext,
1212    ) {
1213        init_test(cx);
1214
1215        let project_paths = PathList::new(&[Path::new("/project-a")]);
1216        let existing_updated_at = Utc::now();
1217
1218        let existing_metadata = ThreadMetadata {
1219            session_id: acp::SessionId::new("existing-session"),
1220            agent_id: agent::ZED_AGENT_ID.clone(),
1221            title: "Existing Metadata".into(),
1222            updated_at: existing_updated_at,
1223            created_at: Some(existing_updated_at),
1224            folder_paths: project_paths.clone(),
1225            archived: false,
1226        };
1227
1228        cx.update(|cx| {
1229            let store = ThreadMetadataStore::global(cx);
1230            store.update(cx, |store, cx| {
1231                store.save(existing_metadata, cx);
1232            });
1233        });
1234        cx.run_until_parked();
1235
1236        let save_task = cx.update(|cx| {
1237            let thread_store = ThreadStore::global(cx);
1238            thread_store.update(cx, |store, cx| {
1239                store.save_thread(
1240                    acp::SessionId::new("existing-session"),
1241                    make_db_thread(
1242                        "Updated Native Thread Title",
1243                        existing_updated_at + chrono::Duration::seconds(1),
1244                    ),
1245                    project_paths.clone(),
1246                    cx,
1247                )
1248            })
1249        });
1250        save_task.await.unwrap();
1251        cx.run_until_parked();
1252
1253        cx.update(|cx| migrate_thread_metadata(cx));
1254        cx.run_until_parked();
1255
1256        let list = cx.update(|cx| {
1257            let store = ThreadMetadataStore::global(cx);
1258            store.read(cx).entries().cloned().collect::<Vec<_>>()
1259        });
1260
1261        assert_eq!(list.len(), 1);
1262        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1263    }
1264
1265    #[gpui::test]
1266    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1267        cx: &mut TestAppContext,
1268    ) {
1269        init_test(cx);
1270
1271        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1272        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1273        let now = Utc::now();
1274
1275        // Create 7 threads for project A and 3 for project B
1276        let mut threads_to_save = Vec::new();
1277        for i in 0..7 {
1278            threads_to_save.push((
1279                format!("a-session-{i}"),
1280                format!("Thread A{i}"),
1281                project_a_paths.clone(),
1282                now + chrono::Duration::seconds(i as i64),
1283            ));
1284        }
1285        for i in 0..3 {
1286            threads_to_save.push((
1287                format!("b-session-{i}"),
1288                format!("Thread B{i}"),
1289                project_b_paths.clone(),
1290                now + chrono::Duration::seconds(i as i64),
1291            ));
1292        }
1293
1294        for (session_id, title, paths, updated_at) in &threads_to_save {
1295            let save_task = cx.update(|cx| {
1296                let thread_store = ThreadStore::global(cx);
1297                let session_id = session_id.to_string();
1298                let title = title.to_string();
1299                let paths = paths.clone();
1300                thread_store.update(cx, |store, cx| {
1301                    store.save_thread(
1302                        acp::SessionId::new(session_id),
1303                        make_db_thread(&title, *updated_at),
1304                        paths,
1305                        cx,
1306                    )
1307                })
1308            });
1309            save_task.await.unwrap();
1310            cx.run_until_parked();
1311        }
1312
1313        cx.update(|cx| migrate_thread_metadata(cx));
1314        cx.run_until_parked();
1315
1316        let list = cx.update(|cx| {
1317            let store = ThreadMetadataStore::global(cx);
1318            store.read(cx).entries().cloned().collect::<Vec<_>>()
1319        });
1320
1321        assert_eq!(list.len(), 10);
1322
1323        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1324        let mut project_a_entries: Vec<_> = list
1325            .iter()
1326            .filter(|m| m.folder_paths == project_a_paths)
1327            .collect();
1328        assert_eq!(project_a_entries.len(), 7);
1329        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1330
1331        for entry in &project_a_entries[..5] {
1332            assert!(
1333                !entry.archived,
1334                "Expected {} to be unarchived (top 5 most recent)",
1335                entry.session_id.0
1336            );
1337        }
1338        for entry in &project_a_entries[5..] {
1339            assert!(
1340                entry.archived,
1341                "Expected {} to be archived (older than top 5)",
1342                entry.session_id.0
1343            );
1344        }
1345
1346        // Project B: all 3 should be unarchived (under the limit)
1347        let project_b_entries: Vec<_> = list
1348            .iter()
1349            .filter(|m| m.folder_paths == project_b_paths)
1350            .collect();
1351        assert_eq!(project_b_entries.len(), 3);
1352        assert!(project_b_entries.iter().all(|m| !m.archived));
1353    }
1354
1355    #[gpui::test]
1356    async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1357        init_test(cx);
1358
1359        let fs = FakeFs::new(cx.executor());
1360        let project = Project::test(fs, None::<&Path>, cx).await;
1361        let connection = Rc::new(StubAgentConnection::new());
1362
1363        let thread = cx
1364            .update(|cx| {
1365                connection
1366                    .clone()
1367                    .new_session(project.clone(), PathList::default(), cx)
1368            })
1369            .await
1370            .unwrap();
1371        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1372
1373        cx.update(|cx| {
1374            thread.update(cx, |thread, cx| {
1375                thread.set_title("Draft Thread".into(), cx).detach();
1376            });
1377        });
1378        cx.run_until_parked();
1379
1380        let metadata_ids = cx.update(|cx| {
1381            ThreadMetadataStore::global(cx)
1382                .read(cx)
1383                .entry_ids()
1384                .collect::<Vec<_>>()
1385        });
1386        assert_eq!(metadata_ids, vec![session_id]);
1387
1388        drop(thread);
1389        cx.update(|_| {});
1390        cx.run_until_parked();
1391        cx.run_until_parked();
1392
1393        let metadata_ids = cx.update(|cx| {
1394            ThreadMetadataStore::global(cx)
1395                .read(cx)
1396                .entry_ids()
1397                .collect::<Vec<_>>()
1398        });
1399        assert!(
1400            metadata_ids.is_empty(),
1401            "expected empty draft thread metadata to be deleted on release"
1402        );
1403    }
1404
1405    #[gpui::test]
1406    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1407        init_test(cx);
1408
1409        let fs = FakeFs::new(cx.executor());
1410        let project = Project::test(fs, None::<&Path>, cx).await;
1411        let connection = Rc::new(StubAgentConnection::new());
1412
1413        let thread = cx
1414            .update(|cx| {
1415                connection
1416                    .clone()
1417                    .new_session(project.clone(), PathList::default(), cx)
1418            })
1419            .await
1420            .unwrap();
1421        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1422
1423        cx.update(|cx| {
1424            thread.update(cx, |thread, cx| {
1425                thread.push_user_content_block(None, "Hello".into(), cx);
1426            });
1427        });
1428        cx.run_until_parked();
1429
1430        let metadata_ids = cx.update(|cx| {
1431            ThreadMetadataStore::global(cx)
1432                .read(cx)
1433                .entry_ids()
1434                .collect::<Vec<_>>()
1435        });
1436        assert_eq!(metadata_ids, vec![session_id.clone()]);
1437
1438        drop(thread);
1439        cx.update(|_| {});
1440        cx.run_until_parked();
1441
1442        let metadata_ids = cx.update(|cx| {
1443            ThreadMetadataStore::global(cx)
1444                .read(cx)
1445                .entry_ids()
1446                .collect::<Vec<_>>()
1447        });
1448        assert_eq!(metadata_ids, vec![session_id]);
1449    }
1450
1451    #[gpui::test]
1452    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1453        init_test(cx);
1454
1455        let fs = FakeFs::new(cx.executor());
1456        let project = Project::test(fs, None::<&Path>, cx).await;
1457        let connection = Rc::new(StubAgentConnection::new());
1458
1459        // Create a regular (non-subagent) AcpThread.
1460        let regular_thread = cx
1461            .update(|cx| {
1462                connection
1463                    .clone()
1464                    .new_session(project.clone(), PathList::default(), cx)
1465            })
1466            .await
1467            .unwrap();
1468
1469        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1470
1471        // Set a title on the regular thread to trigger a save via handle_thread_update.
1472        cx.update(|cx| {
1473            regular_thread.update(cx, |thread, cx| {
1474                thread.set_title("Regular Thread".into(), cx).detach();
1475            });
1476        });
1477        cx.run_until_parked();
1478
1479        // Create a subagent AcpThread
1480        let subagent_session_id = acp::SessionId::new("subagent-session");
1481        let subagent_thread = cx.update(|cx| {
1482            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1483            cx.new(|cx| {
1484                acp_thread::AcpThread::new(
1485                    Some(regular_session_id.clone()),
1486                    Some("Subagent Thread".into()),
1487                    None,
1488                    connection.clone(),
1489                    project.clone(),
1490                    action_log,
1491                    subagent_session_id.clone(),
1492                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1493                    cx,
1494                )
1495            })
1496        });
1497
1498        // Set a title on the subagent thread to trigger handle_thread_update.
1499        cx.update(|cx| {
1500            subagent_thread.update(cx, |thread, cx| {
1501                thread
1502                    .set_title("Subagent Thread Title".into(), cx)
1503                    .detach();
1504            });
1505        });
1506        cx.run_until_parked();
1507
1508        // List all metadata from the store cache.
1509        let list = cx.update(|cx| {
1510            let store = ThreadMetadataStore::global(cx);
1511            store.read(cx).entries().cloned().collect::<Vec<_>>()
1512        });
1513
1514        // The subagent thread should NOT appear in the sidebar metadata.
1515        // Only the regular thread should be listed.
1516        assert_eq!(
1517            list.len(),
1518            1,
1519            "Expected only the regular thread in sidebar metadata, \
1520             but found {} entries (subagent threads are leaking into the sidebar)",
1521            list.len(),
1522        );
1523        assert_eq!(list[0].session_id, regular_session_id);
1524        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1525    }
1526
1527    #[test]
1528    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1529        let now = Utc::now();
1530
1531        let operations = vec![
1532            DbOperation::Upsert(make_metadata(
1533                "session-1",
1534                "First Thread",
1535                now,
1536                PathList::default(),
1537            )),
1538            DbOperation::Delete(acp::SessionId::new("session-1")),
1539        ];
1540
1541        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1542
1543        assert_eq!(deduped.len(), 1);
1544        assert_eq!(
1545            deduped[0],
1546            DbOperation::Delete(acp::SessionId::new("session-1"))
1547        );
1548    }
1549
1550    #[test]
1551    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1552        let now = Utc::now();
1553        let later = now + chrono::Duration::seconds(1);
1554
1555        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1556        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1557
1558        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1559            DbOperation::Upsert(old_metadata),
1560            DbOperation::Upsert(new_metadata.clone()),
1561        ]);
1562
1563        assert_eq!(deduped.len(), 1);
1564        assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1565    }
1566
1567    #[test]
1568    fn test_dedup_db_operations_preserves_distinct_sessions() {
1569        let now = Utc::now();
1570
1571        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1572        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1573        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1574            DbOperation::Upsert(metadata1.clone()),
1575            DbOperation::Upsert(metadata2.clone()),
1576        ]);
1577
1578        assert_eq!(deduped.len(), 2);
1579        assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1580        assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1581    }
1582
1583    #[gpui::test]
1584    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1585        init_test(cx);
1586
1587        let paths = PathList::new(&[Path::new("/project-a")]);
1588        let now = Utc::now();
1589        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1590
1591        cx.update(|cx| {
1592            let store = ThreadMetadataStore::global(cx);
1593            store.update(cx, |store, cx| {
1594                store.save(metadata, cx);
1595            });
1596        });
1597
1598        cx.run_until_parked();
1599
1600        cx.update(|cx| {
1601            let store = ThreadMetadataStore::global(cx);
1602            let store = store.read(cx);
1603
1604            let path_entries = store
1605                .entries_for_path(&paths)
1606                .map(|e| e.session_id.0.to_string())
1607                .collect::<Vec<_>>();
1608            assert_eq!(path_entries, vec!["session-1"]);
1609
1610            let archived = store
1611                .archived_entries()
1612                .map(|e| e.session_id.0.to_string())
1613                .collect::<Vec<_>>();
1614            assert!(archived.is_empty());
1615        });
1616
1617        cx.update(|cx| {
1618            let store = ThreadMetadataStore::global(cx);
1619            store.update(cx, |store, cx| {
1620                store.archive(&acp::SessionId::new("session-1"), cx);
1621            });
1622        });
1623
1624        cx.run_until_parked();
1625
1626        cx.update(|cx| {
1627            let store = ThreadMetadataStore::global(cx);
1628            let store = store.read(cx);
1629
1630            let path_entries = store
1631                .entries_for_path(&paths)
1632                .map(|e| e.session_id.0.to_string())
1633                .collect::<Vec<_>>();
1634            assert!(path_entries.is_empty());
1635
1636            let archived = store.archived_entries().collect::<Vec<_>>();
1637            assert_eq!(archived.len(), 1);
1638            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1639            assert!(archived[0].archived);
1640        });
1641
1642        cx.update(|cx| {
1643            let store = ThreadMetadataStore::global(cx);
1644            store.update(cx, |store, cx| {
1645                store.unarchive(&acp::SessionId::new("session-1"), cx);
1646            });
1647        });
1648
1649        cx.run_until_parked();
1650
1651        cx.update(|cx| {
1652            let store = ThreadMetadataStore::global(cx);
1653            let store = store.read(cx);
1654
1655            let path_entries = store
1656                .entries_for_path(&paths)
1657                .map(|e| e.session_id.0.to_string())
1658                .collect::<Vec<_>>();
1659            assert_eq!(path_entries, vec!["session-1"]);
1660
1661            let archived = store
1662                .archived_entries()
1663                .map(|e| e.session_id.0.to_string())
1664                .collect::<Vec<_>>();
1665            assert!(archived.is_empty());
1666        });
1667    }
1668
1669    #[gpui::test]
1670    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1671        init_test(cx);
1672
1673        let paths = PathList::new(&[Path::new("/project-a")]);
1674        let now = Utc::now();
1675
1676        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1677        let metadata2 = make_metadata(
1678            "session-2",
1679            "Archived Thread",
1680            now - chrono::Duration::seconds(1),
1681            paths.clone(),
1682        );
1683
1684        cx.update(|cx| {
1685            let store = ThreadMetadataStore::global(cx);
1686            store.update(cx, |store, cx| {
1687                store.save(metadata1, cx);
1688                store.save(metadata2, cx);
1689            });
1690        });
1691
1692        cx.run_until_parked();
1693
1694        cx.update(|cx| {
1695            let store = ThreadMetadataStore::global(cx);
1696            store.update(cx, |store, cx| {
1697                store.archive(&acp::SessionId::new("session-2"), cx);
1698            });
1699        });
1700
1701        cx.run_until_parked();
1702
1703        cx.update(|cx| {
1704            let store = ThreadMetadataStore::global(cx);
1705            let store = store.read(cx);
1706
1707            let path_entries = store
1708                .entries_for_path(&paths)
1709                .map(|e| e.session_id.0.to_string())
1710                .collect::<Vec<_>>();
1711            assert_eq!(path_entries, vec!["session-1"]);
1712
1713            let all_entries = store
1714                .entries()
1715                .map(|e| e.session_id.0.to_string())
1716                .collect::<Vec<_>>();
1717            assert_eq!(all_entries.len(), 2);
1718            assert!(all_entries.contains(&"session-1".to_string()));
1719            assert!(all_entries.contains(&"session-2".to_string()));
1720
1721            let archived = store
1722                .archived_entries()
1723                .map(|e| e.session_id.0.to_string())
1724                .collect::<Vec<_>>();
1725            assert_eq!(archived, vec!["session-2"]);
1726        });
1727    }
1728
1729    #[gpui::test]
1730    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1731        init_test(cx);
1732
1733        let paths = PathList::new(&[Path::new("/project-a")]);
1734        let now = Utc::now();
1735
1736        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1737        let m2 = make_metadata(
1738            "session-2",
1739            "Thread Two",
1740            now - chrono::Duration::seconds(1),
1741            paths.clone(),
1742        );
1743        let m3 = make_metadata(
1744            "session-3",
1745            "Thread Three",
1746            now - chrono::Duration::seconds(2),
1747            paths,
1748        );
1749
1750        cx.update(|cx| {
1751            let store = ThreadMetadataStore::global(cx);
1752            store.update(cx, |store, cx| {
1753                store.save_all(vec![m1, m2, m3], cx);
1754            });
1755        });
1756
1757        cx.run_until_parked();
1758
1759        cx.update(|cx| {
1760            let store = ThreadMetadataStore::global(cx);
1761            let store = store.read(cx);
1762
1763            let all_entries = store
1764                .entries()
1765                .map(|e| e.session_id.0.to_string())
1766                .collect::<Vec<_>>();
1767            assert_eq!(all_entries.len(), 3);
1768            assert!(all_entries.contains(&"session-1".to_string()));
1769            assert!(all_entries.contains(&"session-2".to_string()));
1770            assert!(all_entries.contains(&"session-3".to_string()));
1771
1772            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1773            assert_eq!(entry_ids.len(), 3);
1774        });
1775    }
1776
1777    #[gpui::test]
1778    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1779        init_test(cx);
1780
1781        let paths = PathList::new(&[Path::new("/project-a")]);
1782        let now = Utc::now();
1783        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1784
1785        cx.update(|cx| {
1786            let store = ThreadMetadataStore::global(cx);
1787            store.update(cx, |store, cx| {
1788                store.save(metadata, cx);
1789            });
1790        });
1791
1792        cx.run_until_parked();
1793
1794        cx.update(|cx| {
1795            let store = ThreadMetadataStore::global(cx);
1796            store.update(cx, |store, cx| {
1797                store.archive(&acp::SessionId::new("session-1"), cx);
1798            });
1799        });
1800
1801        cx.run_until_parked();
1802
1803        cx.update(|cx| {
1804            let store = ThreadMetadataStore::global(cx);
1805            store.update(cx, |store, cx| {
1806                let _ = store.reload(cx);
1807            });
1808        });
1809
1810        cx.run_until_parked();
1811
1812        cx.update(|cx| {
1813            let store = ThreadMetadataStore::global(cx);
1814            let store = store.read(cx);
1815
1816            let thread = store
1817                .entries()
1818                .find(|e| e.session_id.0.as_ref() == "session-1")
1819                .expect("thread should exist after reload");
1820            assert!(thread.archived);
1821
1822            let path_entries = store
1823                .entries_for_path(&paths)
1824                .map(|e| e.session_id.0.to_string())
1825                .collect::<Vec<_>>();
1826            assert!(path_entries.is_empty());
1827
1828            let archived = store
1829                .archived_entries()
1830                .map(|e| e.session_id.0.to_string())
1831                .collect::<Vec<_>>();
1832            assert_eq!(archived, vec!["session-1"]);
1833        });
1834    }
1835
1836    #[gpui::test]
1837    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1838        init_test(cx);
1839
1840        cx.run_until_parked();
1841
1842        cx.update(|cx| {
1843            let store = ThreadMetadataStore::global(cx);
1844            store.update(cx, |store, cx| {
1845                store.archive(&acp::SessionId::new("nonexistent"), cx);
1846            });
1847        });
1848
1849        cx.run_until_parked();
1850
1851        cx.update(|cx| {
1852            let store = ThreadMetadataStore::global(cx);
1853            let store = store.read(cx);
1854
1855            assert!(store.is_empty());
1856            assert_eq!(store.entries().count(), 0);
1857            assert_eq!(store.archived_entries().count(), 0);
1858        });
1859    }
1860
1861    #[gpui::test]
1862    async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1863        init_test(cx);
1864
1865        let paths = PathList::new(&[Path::new("/project-a")]);
1866        let now = Utc::now();
1867        let metadata = make_metadata("session-1", "Thread 1", now, paths);
1868        let session_id = metadata.session_id.clone();
1869
1870        cx.update(|cx| {
1871            let store = ThreadMetadataStore::global(cx);
1872            store.update(cx, |store, cx| {
1873                store.save(metadata.clone(), cx);
1874                store.archive(&session_id, cx);
1875            });
1876        });
1877
1878        cx.run_until_parked();
1879
1880        cx.update(|cx| {
1881            let store = ThreadMetadataStore::global(cx);
1882            let store = store.read(cx);
1883
1884            let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1885            pretty_assertions::assert_eq!(
1886                entries,
1887                vec![ThreadMetadata {
1888                    archived: true,
1889                    ..metadata
1890                }]
1891            );
1892        });
1893    }
1894}