thread_metadata_store.rs

   1use std::{
   2    path::{Path, PathBuf},
   3    sync::Arc,
   4};
   5
   6use acp_thread::AcpThreadEvent;
   7use agent::{ThreadStore, ZED_AGENT_ID};
   8use agent_client_protocol as acp;
   9use anyhow::Context as _;
  10use chrono::{DateTime, Utc};
  11use collections::{HashMap, HashSet};
  12use db::{
  13    sqlez::{
  14        bindable::Column, domain::Domain, statement::Statement,
  15        thread_safe_connection::ThreadSafeConnection,
  16    },
  17    sqlez_macros::sql,
  18};
  19use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  20use futures::{FutureExt as _, future::Shared};
  21use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  22use project::AgentId;
  23use ui::{App, Context, SharedString};
  24use util::ResultExt as _;
  25use workspace::PathList;
  26
  27use crate::DEFAULT_THREAD_TITLE;
  28
  29pub fn init(cx: &mut App) {
  30    ThreadMetadataStore::init_global(cx);
  31
  32    if cx.has_flag::<AgentV2FeatureFlag>() {
  33        migrate_thread_metadata(cx);
  34    }
  35    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  36        if has_flag {
  37            migrate_thread_metadata(cx);
  38        }
  39    })
  40    .detach();
  41}
  42
  43/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  44/// We skip migrating threads that do not have a project.
  45///
  46/// TODO: Remove this after N weeks of shipping the sidebar
  47fn migrate_thread_metadata(cx: &mut App) {
  48    let store = ThreadMetadataStore::global(cx);
  49    let db = store.read(cx).db.clone();
  50
  51    cx.spawn(async move |cx| {
  52        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  53
  54        let is_first_migration = existing_entries.is_empty();
  55
  56        let mut to_migrate = store.read_with(cx, |_store, cx| {
  57            ThreadStore::global(cx)
  58                .read(cx)
  59                .entries()
  60                .filter_map(|entry| {
  61                    if existing_entries.contains(&entry.id.0) || entry.folder_paths.is_empty() {
  62                        return None;
  63                    }
  64
  65                    Some(ThreadMetadata {
  66                        session_id: entry.id,
  67                        agent_id: ZED_AGENT_ID.clone(),
  68                        title: entry.title,
  69                        updated_at: entry.updated_at,
  70                        created_at: entry.created_at,
  71                        folder_paths: entry.folder_paths,
  72                        archived: true,
  73                    })
  74                })
  75                .collect::<Vec<_>>()
  76        });
  77
  78        if to_migrate.is_empty() {
  79            return anyhow::Ok(());
  80        }
  81
  82        // On the first migration (no entries in DB yet), keep the 5 most
  83        // recent threads per project unarchived.
  84        if is_first_migration {
  85            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  86            for entry in &mut to_migrate {
  87                per_project
  88                    .entry(entry.folder_paths.clone())
  89                    .or_default()
  90                    .push(entry);
  91            }
  92            for entries in per_project.values_mut() {
  93                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  94                for entry in entries.iter_mut().take(5) {
  95                    entry.archived = false;
  96                }
  97            }
  98        }
  99
 100        log::info!("Migrating {} thread store entries", to_migrate.len());
 101
 102        // Manually save each entry to the database and call reload, otherwise
 103        // we'll end up triggering lots of reloads after each save
 104        for entry in to_migrate {
 105            db.save(entry).await?;
 106        }
 107
 108        log::info!("Finished migrating thread store entries");
 109
 110        let _ = store.update(cx, |store, cx| store.reload(cx));
 111        anyhow::Ok(())
 112    })
 113    .detach_and_log_err(cx);
 114}
 115
 116struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 117impl Global for GlobalThreadMetadataStore {}
 118
 119/// Lightweight metadata for any thread (native or ACP), enough to populate
 120/// the sidebar list and route to the correct load path when clicked.
 121#[derive(Debug, Clone, PartialEq)]
 122pub struct ThreadMetadata {
 123    pub session_id: acp::SessionId,
 124    pub agent_id: AgentId,
 125    pub title: SharedString,
 126    pub updated_at: DateTime<Utc>,
 127    pub created_at: Option<DateTime<Utc>>,
 128    pub folder_paths: PathList,
 129    pub archived: bool,
 130}
 131
 132impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 133    fn from(meta: &ThreadMetadata) -> Self {
 134        Self {
 135            session_id: meta.session_id.clone(),
 136            work_dirs: Some(meta.folder_paths.clone()),
 137            title: Some(meta.title.clone()),
 138            updated_at: Some(meta.updated_at),
 139            created_at: meta.created_at,
 140            meta: None,
 141        }
 142    }
 143}
 144
 145pub struct ArchivedGitWorktree {
 146    pub id: i64,
 147    pub worktree_path: PathBuf,
 148    pub main_repo_path: PathBuf,
 149    pub branch_name: Option<String>,
 150    pub commit_hash: String,
 151    pub restored: bool,
 152}
 153
 154/// The store holds all metadata needed to show threads in the sidebar/the archive.
 155///
 156/// Automatically listens to AcpThread events and updates metadata if it has changed.
 157pub struct ThreadMetadataStore {
 158    db: ThreadMetadataDb,
 159    threads: HashMap<acp::SessionId, ThreadMetadata>,
 160    threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 161    reload_task: Option<Shared<Task<()>>>,
 162    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 163    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 164    _db_operations_task: Task<()>,
 165}
 166
 167#[derive(Debug, PartialEq)]
 168enum DbOperation {
 169    Upsert(ThreadMetadata),
 170    Delete(acp::SessionId),
 171}
 172
 173impl DbOperation {
 174    fn id(&self) -> &acp::SessionId {
 175        match self {
 176            DbOperation::Upsert(thread) => &thread.session_id,
 177            DbOperation::Delete(session_id) => session_id,
 178        }
 179    }
 180}
 181
 182impl ThreadMetadataStore {
 183    #[cfg(not(any(test, feature = "test-support")))]
 184    pub fn init_global(cx: &mut App) {
 185        if cx.has_global::<Self>() {
 186            return;
 187        }
 188
 189        let db = ThreadMetadataDb::global(cx);
 190        let thread_store = cx.new(|cx| Self::new(db, cx));
 191        cx.set_global(GlobalThreadMetadataStore(thread_store));
 192    }
 193
 194    #[cfg(any(test, feature = "test-support"))]
 195    pub fn init_global(cx: &mut App) {
 196        let thread = std::thread::current();
 197        let test_name = thread.name().unwrap_or("unknown_test");
 198        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 199        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 200        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 201        cx.set_global(GlobalThreadMetadataStore(thread_store));
 202    }
 203
 204    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 205        cx.try_global::<GlobalThreadMetadataStore>()
 206            .map(|store| store.0.clone())
 207    }
 208
 209    pub fn global(cx: &App) -> Entity<Self> {
 210        cx.global::<GlobalThreadMetadataStore>().0.clone()
 211    }
 212
 213    pub fn is_empty(&self) -> bool {
 214        self.threads.is_empty()
 215    }
 216
 217    /// Returns all thread IDs.
 218    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 219        self.threads.keys().cloned()
 220    }
 221
 222    /// Returns the metadata for a specific thread, if it exists.
 223    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 224        self.threads.get(session_id)
 225    }
 226
 227    /// Returns all threads.
 228    pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 229        self.threads.values()
 230    }
 231
 232    /// Returns all archived threads.
 233    pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 234        self.entries().filter(|t| t.archived)
 235    }
 236
 237    /// Returns all threads for the given path list, excluding archived threads.
 238    pub fn entries_for_path(
 239        &self,
 240        path_list: &PathList,
 241    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 242        self.threads_by_paths
 243            .get(path_list)
 244            .into_iter()
 245            .flatten()
 246            .filter_map(|s| self.threads.get(s))
 247            .filter(|s| !s.archived)
 248    }
 249
 250    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 251        let db = self.db.clone();
 252        self.reload_task.take();
 253
 254        let list_task = cx
 255            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 256
 257        let reload_task = cx
 258            .spawn(async move |this, cx| {
 259                let Some(rows) = list_task.await.log_err() else {
 260                    return;
 261                };
 262
 263                this.update(cx, |this, cx| {
 264                    this.threads.clear();
 265                    this.threads_by_paths.clear();
 266
 267                    for row in rows {
 268                        this.threads_by_paths
 269                            .entry(row.folder_paths.clone())
 270                            .or_default()
 271                            .insert(row.session_id.clone());
 272                        this.threads.insert(row.session_id.clone(), row);
 273                    }
 274
 275                    cx.notify();
 276                })
 277                .ok();
 278            })
 279            .shared();
 280        self.reload_task = Some(reload_task.clone());
 281        reload_task
 282    }
 283
 284    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 285        if !cx.has_flag::<AgentV2FeatureFlag>() {
 286            return;
 287        }
 288
 289        for metadata in metadata {
 290            self.save_internal(metadata);
 291        }
 292        cx.notify();
 293    }
 294
 295    #[cfg(any(test, feature = "test-support"))]
 296    pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 297        self.save(metadata, cx)
 298    }
 299
 300    fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 301        if !cx.has_flag::<AgentV2FeatureFlag>() {
 302            return;
 303        }
 304
 305        self.save_internal(metadata);
 306        cx.notify();
 307    }
 308
 309    fn save_internal(&mut self, metadata: ThreadMetadata) {
 310        // If the folder paths have changed, we need to clear the old entry
 311        if let Some(thread) = self.threads.get(&metadata.session_id)
 312            && thread.folder_paths != metadata.folder_paths
 313            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 314        {
 315            session_ids.remove(&metadata.session_id);
 316        }
 317
 318        self.threads
 319            .insert(metadata.session_id.clone(), metadata.clone());
 320
 321        self.threads_by_paths
 322            .entry(metadata.folder_paths.clone())
 323            .or_default()
 324            .insert(metadata.session_id.clone());
 325
 326        self.pending_thread_ops_tx
 327            .try_send(DbOperation::Upsert(metadata))
 328            .log_err();
 329    }
 330
 331    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 332        self.update_archived(session_id, true, cx);
 333    }
 334
 335    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 336        self.update_archived(session_id, false, cx);
 337    }
 338
 339    fn update_archived(
 340        &mut self,
 341        session_id: &acp::SessionId,
 342        archived: bool,
 343        cx: &mut Context<Self>,
 344    ) {
 345        if !cx.has_flag::<AgentV2FeatureFlag>() {
 346            return;
 347        }
 348
 349        if let Some(thread) = self.threads.get(session_id) {
 350            self.save_internal(ThreadMetadata {
 351                archived,
 352                ..thread.clone()
 353            });
 354            cx.notify();
 355        }
 356    }
 357
 358    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 359        if !cx.has_flag::<AgentV2FeatureFlag>() {
 360            return;
 361        }
 362
 363        if let Some(thread) = self.threads.get(&session_id)
 364            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 365        {
 366            session_ids.remove(&session_id);
 367        }
 368        self.threads.remove(&session_id);
 369        self.pending_thread_ops_tx
 370            .try_send(DbOperation::Delete(session_id))
 371            .log_err();
 372        cx.notify();
 373    }
 374
 375    pub fn create_archived_worktree(
 376        &self,
 377        worktree_path: String,
 378        main_repo_path: String,
 379        branch_name: Option<String>,
 380        commit_hash: String,
 381        cx: &mut Context<Self>,
 382    ) -> Task<anyhow::Result<i64>> {
 383        let db = self.db.clone();
 384        cx.background_spawn(async move {
 385            db.create_archived_worktree(
 386                &worktree_path,
 387                &main_repo_path,
 388                branch_name.as_deref(),
 389                &commit_hash,
 390            )
 391            .await
 392        })
 393    }
 394
 395    pub fn get_archived_worktree_by_path(
 396        &self,
 397        worktree_path: String,
 398        cx: &mut Context<Self>,
 399    ) -> Task<anyhow::Result<Option<ArchivedGitWorktree>>> {
 400        let db = self.db.clone();
 401        cx.background_spawn(async move { db.get_archived_worktree_by_path(&worktree_path).await })
 402    }
 403
 404    pub fn delete_archived_worktree(
 405        &self,
 406        id: i64,
 407        cx: &mut Context<Self>,
 408    ) -> Task<anyhow::Result<()>> {
 409        let db = self.db.clone();
 410        cx.background_spawn(async move { db.delete_archived_worktree(id).await })
 411    }
 412
 413    pub fn update_archived_worktree_restored(
 414        &self,
 415        id: i64,
 416        worktree_path: String,
 417        branch_name: Option<String>,
 418        cx: &mut Context<Self>,
 419    ) -> Task<anyhow::Result<()>> {
 420        let db = self.db.clone();
 421        cx.background_spawn(async move {
 422            db.update_archived_worktree_restored(id, &worktree_path, branch_name.as_deref())
 423                .await
 424        })
 425    }
 426
 427    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 428        let weak_store = cx.weak_entity();
 429
 430        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 431            // Don't track subagent threads in the sidebar.
 432            if thread.parent_session_id().is_some() {
 433                return;
 434            }
 435
 436            let thread_entity = cx.entity();
 437
 438            cx.on_release({
 439                let weak_store = weak_store.clone();
 440                move |thread, cx| {
 441                    weak_store
 442                        .update(cx, |store, cx| {
 443                            let session_id = thread.session_id().clone();
 444                            store.session_subscriptions.remove(&session_id);
 445                            if thread.entries().is_empty() {
 446                                // Empty threads can be unloaded without ever being
 447                                // durably persisted by the underlying agent.
 448                                store.delete(session_id, cx);
 449                            }
 450                        })
 451                        .ok();
 452                }
 453            })
 454            .detach();
 455
 456            weak_store
 457                .update(cx, |this, cx| {
 458                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
 459                    this.session_subscriptions
 460                        .insert(thread.session_id().clone(), subscription);
 461                })
 462                .ok();
 463        })
 464        .detach();
 465
 466        let (tx, rx) = smol::channel::unbounded();
 467        let _db_operations_task = cx.background_spawn({
 468            let db = db.clone();
 469            async move {
 470                while let Ok(first_update) = rx.recv().await {
 471                    let mut updates = vec![first_update];
 472                    while let Ok(update) = rx.try_recv() {
 473                        updates.push(update);
 474                    }
 475                    let updates = Self::dedup_db_operations(updates);
 476                    for operation in updates {
 477                        match operation {
 478                            DbOperation::Upsert(metadata) => {
 479                                db.save(metadata).await.log_err();
 480                            }
 481                            DbOperation::Delete(session_id) => {
 482                                db.delete(session_id).await.log_err();
 483                            }
 484                        }
 485                    }
 486                }
 487            }
 488        });
 489
 490        let mut this = Self {
 491            db,
 492            threads: HashMap::default(),
 493            threads_by_paths: HashMap::default(),
 494            reload_task: None,
 495            session_subscriptions: HashMap::default(),
 496            pending_thread_ops_tx: tx,
 497            _db_operations_task,
 498        };
 499        let _ = this.reload(cx);
 500        this
 501    }
 502
 503    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 504        let mut ops = HashMap::default();
 505        for operation in operations.into_iter().rev() {
 506            if ops.contains_key(operation.id()) {
 507                continue;
 508            }
 509            ops.insert(operation.id().clone(), operation);
 510        }
 511        ops.into_values().collect()
 512    }
 513
 514    fn handle_thread_event(
 515        &mut self,
 516        thread: Entity<acp_thread::AcpThread>,
 517        event: &AcpThreadEvent,
 518        cx: &mut Context<Self>,
 519    ) {
 520        // Don't track subagent threads in the sidebar.
 521        if thread.read(cx).parent_session_id().is_some() {
 522            return;
 523        }
 524
 525        match event {
 526            AcpThreadEvent::NewEntry
 527            | AcpThreadEvent::TitleUpdated
 528            | AcpThreadEvent::EntryUpdated(_)
 529            | AcpThreadEvent::EntriesRemoved(_)
 530            | AcpThreadEvent::ToolAuthorizationRequested(_)
 531            | AcpThreadEvent::ToolAuthorizationReceived(_)
 532            | AcpThreadEvent::Retry(_)
 533            | AcpThreadEvent::Stopped(_)
 534            | AcpThreadEvent::Error
 535            | AcpThreadEvent::LoadError(_)
 536            | AcpThreadEvent::Refusal
 537            | AcpThreadEvent::WorkingDirectoriesUpdated => {
 538                let thread_ref = thread.read(cx);
 539                let existing_thread = self.threads.get(thread_ref.session_id());
 540                let session_id = thread_ref.session_id().clone();
 541                let title = thread_ref
 542                    .title()
 543                    .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 544
 545                let updated_at = Utc::now();
 546
 547                let created_at = existing_thread
 548                    .and_then(|t| t.created_at)
 549                    .unwrap_or_else(|| updated_at);
 550
 551                let agent_id = thread_ref.connection().agent_id();
 552
 553                let folder_paths = {
 554                    let project = thread_ref.project().read(cx);
 555                    let paths: Vec<Arc<Path>> = project
 556                        .visible_worktrees(cx)
 557                        .map(|worktree| worktree.read(cx).abs_path())
 558                        .collect();
 559                    PathList::new(&paths)
 560                };
 561
 562                let archived = existing_thread.map(|t| t.archived).unwrap_or(false);
 563
 564                let metadata = ThreadMetadata {
 565                    session_id,
 566                    agent_id,
 567                    title,
 568                    created_at: Some(created_at),
 569                    updated_at,
 570                    folder_paths,
 571                    archived,
 572                };
 573
 574                self.save(metadata, cx);
 575            }
 576            AcpThreadEvent::TokenUsageUpdated
 577            | AcpThreadEvent::SubagentSpawned(_)
 578            | AcpThreadEvent::PromptCapabilitiesUpdated
 579            | AcpThreadEvent::AvailableCommandsUpdated(_)
 580            | AcpThreadEvent::ModeUpdated(_)
 581            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
 582        }
 583    }
 584}
 585
 586impl Global for ThreadMetadataStore {}
 587
 588struct ThreadMetadataDb(ThreadSafeConnection);
 589
 590impl Domain for ThreadMetadataDb {
 591    const NAME: &str = stringify!(ThreadMetadataDb);
 592
 593    const MIGRATIONS: &[&str] = &[
 594        sql!(
 595            CREATE TABLE IF NOT EXISTS sidebar_threads(
 596                session_id TEXT PRIMARY KEY,
 597                agent_id TEXT,
 598                title TEXT NOT NULL,
 599                updated_at TEXT NOT NULL,
 600                created_at TEXT,
 601                folder_paths TEXT,
 602                folder_paths_order TEXT
 603            ) STRICT;
 604        ),
 605        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 606        sql!(
 607            CREATE TABLE IF NOT EXISTS archived_git_worktrees(
 608                id INTEGER PRIMARY KEY,
 609                worktree_path TEXT NOT NULL,
 610                main_repo_path TEXT NOT NULL,
 611                branch_name TEXT,
 612                commit_hash TEXT NOT NULL,
 613                restored INTEGER NOT NULL DEFAULT 0
 614            ) STRICT;
 615
 616            CREATE UNIQUE INDEX IF NOT EXISTS idx_archived_worktrees_path
 617                ON archived_git_worktrees(worktree_path);
 618        ),
 619    ];
 620}
 621
 622db::static_connection!(ThreadMetadataDb, []);
 623
 624impl ThreadMetadataDb {
 625    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 626        self.select::<Arc<str>>(
 627            "SELECT session_id FROM sidebar_threads \
 628             ORDER BY updated_at DESC",
 629        )?()
 630    }
 631
 632    /// List all sidebar thread metadata, ordered by updated_at descending.
 633    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 634        self.select::<ThreadMetadata>(
 635            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
 636             FROM sidebar_threads \
 637             ORDER BY updated_at DESC"
 638        )?()
 639    }
 640
 641    /// Upsert metadata for a thread.
 642    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 643        let id = row.session_id.0.clone();
 644        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 645            None
 646        } else {
 647            Some(row.agent_id.to_string())
 648        };
 649        let title = row.title.to_string();
 650        let updated_at = row.updated_at.to_rfc3339();
 651        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 652        let serialized = row.folder_paths.serialize();
 653        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 654            (None, None)
 655        } else {
 656            (Some(serialized.paths), Some(serialized.order))
 657        };
 658        let archived = row.archived;
 659
 660        self.write(move |conn| {
 661            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
 662                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
 663                       ON CONFLICT(session_id) DO UPDATE SET \
 664                           agent_id = excluded.agent_id, \
 665                           title = excluded.title, \
 666                           updated_at = excluded.updated_at, \
 667                           created_at = excluded.created_at, \
 668                           folder_paths = excluded.folder_paths, \
 669                           folder_paths_order = excluded.folder_paths_order, \
 670                           archived = excluded.archived";
 671            let mut stmt = Statement::prepare(conn, sql)?;
 672            let mut i = stmt.bind(&id, 1)?;
 673            i = stmt.bind(&agent_id, i)?;
 674            i = stmt.bind(&title, i)?;
 675            i = stmt.bind(&updated_at, i)?;
 676            i = stmt.bind(&created_at, i)?;
 677            i = stmt.bind(&folder_paths, i)?;
 678            i = stmt.bind(&folder_paths_order, i)?;
 679            stmt.bind(&archived, i)?;
 680            stmt.exec()
 681        })
 682        .await
 683    }
 684
 685    /// Delete metadata for a single thread.
 686    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 687        let id = session_id.0.clone();
 688        self.write(move |conn| {
 689            let mut stmt =
 690                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 691            stmt.bind(&id, 1)?;
 692            stmt.exec()
 693        })
 694        .await
 695    }
 696
 697    pub async fn create_archived_worktree(
 698        &self,
 699        worktree_path: &str,
 700        main_repo_path: &str,
 701        branch_name: Option<&str>,
 702        commit_hash: &str,
 703    ) -> anyhow::Result<i64> {
 704        let worktree_path = worktree_path.to_string();
 705        let main_repo_path = main_repo_path.to_string();
 706        let branch_name = branch_name.map(|s| s.to_string());
 707        let commit_hash = commit_hash.to_string();
 708        self.write(move |conn| {
 709            let mut stmt = Statement::prepare(
 710                conn,
 711                "INSERT INTO archived_git_worktrees(\
 712                     worktree_path, main_repo_path, branch_name, commit_hash\
 713                 ) VALUES (?, ?, ?, ?)",
 714            )?;
 715            let mut i = stmt.bind(&worktree_path, 1)?;
 716            i = stmt.bind(&main_repo_path, i)?;
 717            i = stmt.bind(&branch_name, i)?;
 718            stmt.bind(&commit_hash, i)?;
 719            stmt.exec()?;
 720
 721            let mut id_stmt = Statement::prepare(conn, "SELECT last_insert_rowid()")?;
 722            let id = id_stmt
 723                .maybe_row::<i64>()?
 724                .ok_or_else(|| anyhow::anyhow!("No row ID returned after INSERT"))?;
 725
 726            Ok(id)
 727        })
 728        .await
 729    }
 730
 731    pub async fn get_archived_worktree_by_path(
 732        &self,
 733        worktree_path: &str,
 734    ) -> anyhow::Result<Option<ArchivedGitWorktree>> {
 735        let worktree_path = worktree_path.to_string();
 736        self.select_row_bound::<String, ArchivedGitWorktree>(
 737            "SELECT id, worktree_path, main_repo_path, branch_name, commit_hash, restored \
 738             FROM archived_git_worktrees WHERE worktree_path = ?",
 739        )?(worktree_path)
 740    }
 741
 742    pub async fn delete_archived_worktree(&self, id: i64) -> anyhow::Result<()> {
 743        self.write(move |conn| {
 744            let mut stmt =
 745                Statement::prepare(conn, "DELETE FROM archived_git_worktrees WHERE id = ?")?;
 746            stmt.bind(&id, 1)?;
 747            stmt.exec()
 748        })
 749        .await
 750    }
 751
 752    pub async fn update_archived_worktree_restored(
 753        &self,
 754        id: i64,
 755        worktree_path: &str,
 756        branch_name: Option<&str>,
 757    ) -> anyhow::Result<()> {
 758        let worktree_path = worktree_path.to_string();
 759        let branch_name = branch_name.map(|s| s.to_string());
 760        self.write(move |conn| {
 761            let mut stmt = Statement::prepare(
 762                conn,
 763                "UPDATE archived_git_worktrees \
 764                 SET restored = 1, worktree_path = ?, branch_name = ? \
 765                 WHERE id = ?",
 766            )?;
 767            let mut i = stmt.bind(&worktree_path, 1)?;
 768            i = stmt.bind(&branch_name, i)?;
 769            stmt.bind(&id, i)?;
 770            stmt.exec()
 771        })
 772        .await
 773    }
 774}
 775
 776impl Column for ThreadMetadata {
 777    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 778        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 779        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 780        let (title, next): (String, i32) = Column::column(statement, next)?;
 781        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 782        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 783        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 784        let (folder_paths_order_str, next): (Option<String>, i32) =
 785            Column::column(statement, next)?;
 786        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 787
 788        let agent_id = agent_id
 789            .map(|id| AgentId::new(id))
 790            .unwrap_or(ZED_AGENT_ID.clone());
 791
 792        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 793        let created_at = created_at_str
 794            .as_deref()
 795            .map(DateTime::parse_from_rfc3339)
 796            .transpose()?
 797            .map(|dt| dt.with_timezone(&Utc));
 798
 799        let folder_paths = folder_paths_str
 800            .map(|paths| {
 801                PathList::deserialize(&util::path_list::SerializedPathList {
 802                    paths,
 803                    order: folder_paths_order_str.unwrap_or_default(),
 804                })
 805            })
 806            .unwrap_or_default();
 807
 808        Ok((
 809            ThreadMetadata {
 810                session_id: acp::SessionId::new(id),
 811                agent_id,
 812                title: title.into(),
 813                updated_at,
 814                created_at,
 815                folder_paths,
 816                archived,
 817            },
 818            next,
 819        ))
 820    }
 821}
 822
 823impl Column for ArchivedGitWorktree {
 824    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 825        let (id, next): (i64, i32) = Column::column(statement, start_index)?;
 826        let (worktree_path_str, next): (String, i32) = Column::column(statement, next)?;
 827        let (main_repo_path_str, next): (String, i32) = Column::column(statement, next)?;
 828        let (branch_name, next): (Option<String>, i32) = Column::column(statement, next)?;
 829        let (commit_hash, next): (String, i32) = Column::column(statement, next)?;
 830        let (restored_int, next): (i64, i32) = Column::column(statement, next)?;
 831        Ok((
 832            ArchivedGitWorktree {
 833                id,
 834                worktree_path: PathBuf::from(worktree_path_str),
 835                main_repo_path: PathBuf::from(main_repo_path_str),
 836                branch_name,
 837                commit_hash,
 838                restored: restored_int != 0,
 839            },
 840            next,
 841        ))
 842    }
 843}
 844
 845#[cfg(test)]
 846mod tests {
 847    use super::*;
 848    use acp_thread::{AgentConnection, StubAgentConnection};
 849    use action_log::ActionLog;
 850    use agent::DbThread;
 851    use agent_client_protocol as acp;
 852    use feature_flags::FeatureFlagAppExt;
 853    use gpui::TestAppContext;
 854    use project::FakeFs;
 855    use project::Project;
 856    use std::path::Path;
 857    use std::rc::Rc;
 858
 859    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 860        DbThread {
 861            title: title.to_string().into(),
 862            messages: Vec::new(),
 863            updated_at,
 864            detailed_summary: None,
 865            initial_project_snapshot: None,
 866            cumulative_token_usage: Default::default(),
 867            request_token_usage: Default::default(),
 868            model: None,
 869            profile: None,
 870            imported: false,
 871            subagent_context: None,
 872            speed: None,
 873            thinking_enabled: false,
 874            thinking_effort: None,
 875            draft_prompt: None,
 876            ui_scroll_position: None,
 877        }
 878    }
 879
 880    fn make_metadata(
 881        session_id: &str,
 882        title: &str,
 883        updated_at: DateTime<Utc>,
 884        folder_paths: PathList,
 885    ) -> ThreadMetadata {
 886        ThreadMetadata {
 887            archived: false,
 888            session_id: acp::SessionId::new(session_id),
 889            agent_id: agent::ZED_AGENT_ID.clone(),
 890            title: title.to_string().into(),
 891            updated_at,
 892            created_at: Some(updated_at),
 893            folder_paths,
 894        }
 895    }
 896
 897    fn init_test(cx: &mut TestAppContext) {
 898        cx.update(|cx| {
 899            let settings_store = settings::SettingsStore::test(cx);
 900            cx.set_global(settings_store);
 901            cx.update_flags(true, vec!["agent-v2".to_string()]);
 902            ThreadMetadataStore::init_global(cx);
 903            ThreadStore::init_global(cx);
 904        });
 905        cx.run_until_parked();
 906    }
 907
 908    #[gpui::test]
 909    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 910        let first_paths = PathList::new(&[Path::new("/project-a")]);
 911        let second_paths = PathList::new(&[Path::new("/project-b")]);
 912        let now = Utc::now();
 913        let older = now - chrono::Duration::seconds(1);
 914
 915        let thread = std::thread::current();
 916        let test_name = thread.name().unwrap_or("unknown_test");
 917        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 918        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 919            &db_name,
 920        )));
 921
 922        db.save(make_metadata(
 923            "session-1",
 924            "First Thread",
 925            now,
 926            first_paths.clone(),
 927        ))
 928        .await
 929        .unwrap();
 930        db.save(make_metadata(
 931            "session-2",
 932            "Second Thread",
 933            older,
 934            second_paths.clone(),
 935        ))
 936        .await
 937        .unwrap();
 938
 939        cx.update(|cx| {
 940            let settings_store = settings::SettingsStore::test(cx);
 941            cx.set_global(settings_store);
 942            cx.update_flags(true, vec!["agent-v2".to_string()]);
 943            ThreadMetadataStore::init_global(cx);
 944        });
 945
 946        cx.run_until_parked();
 947
 948        cx.update(|cx| {
 949            let store = ThreadMetadataStore::global(cx);
 950            let store = store.read(cx);
 951
 952            let entry_ids = store
 953                .entry_ids()
 954                .map(|session_id| session_id.0.to_string())
 955                .collect::<Vec<_>>();
 956            assert_eq!(entry_ids.len(), 2);
 957            assert!(entry_ids.contains(&"session-1".to_string()));
 958            assert!(entry_ids.contains(&"session-2".to_string()));
 959
 960            let first_path_entries = store
 961                .entries_for_path(&first_paths)
 962                .map(|entry| entry.session_id.0.to_string())
 963                .collect::<Vec<_>>();
 964            assert_eq!(first_path_entries, vec!["session-1"]);
 965
 966            let second_path_entries = store
 967                .entries_for_path(&second_paths)
 968                .map(|entry| entry.session_id.0.to_string())
 969                .collect::<Vec<_>>();
 970            assert_eq!(second_path_entries, vec!["session-2"]);
 971        });
 972    }
 973
 974    #[gpui::test]
 975    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 976        init_test(cx);
 977
 978        let first_paths = PathList::new(&[Path::new("/project-a")]);
 979        let second_paths = PathList::new(&[Path::new("/project-b")]);
 980        let initial_time = Utc::now();
 981        let updated_time = initial_time + chrono::Duration::seconds(1);
 982
 983        let initial_metadata = make_metadata(
 984            "session-1",
 985            "First Thread",
 986            initial_time,
 987            first_paths.clone(),
 988        );
 989
 990        let second_metadata = make_metadata(
 991            "session-2",
 992            "Second Thread",
 993            initial_time,
 994            second_paths.clone(),
 995        );
 996
 997        cx.update(|cx| {
 998            let store = ThreadMetadataStore::global(cx);
 999            store.update(cx, |store, cx| {
1000                store.save(initial_metadata, cx);
1001                store.save(second_metadata, cx);
1002            });
1003        });
1004
1005        cx.run_until_parked();
1006
1007        cx.update(|cx| {
1008            let store = ThreadMetadataStore::global(cx);
1009            let store = store.read(cx);
1010
1011            let first_path_entries = store
1012                .entries_for_path(&first_paths)
1013                .map(|entry| entry.session_id.0.to_string())
1014                .collect::<Vec<_>>();
1015            assert_eq!(first_path_entries, vec!["session-1"]);
1016
1017            let second_path_entries = store
1018                .entries_for_path(&second_paths)
1019                .map(|entry| entry.session_id.0.to_string())
1020                .collect::<Vec<_>>();
1021            assert_eq!(second_path_entries, vec!["session-2"]);
1022        });
1023
1024        let moved_metadata = make_metadata(
1025            "session-1",
1026            "First Thread",
1027            updated_time,
1028            second_paths.clone(),
1029        );
1030
1031        cx.update(|cx| {
1032            let store = ThreadMetadataStore::global(cx);
1033            store.update(cx, |store, cx| {
1034                store.save(moved_metadata, cx);
1035            });
1036        });
1037
1038        cx.run_until_parked();
1039
1040        cx.update(|cx| {
1041            let store = ThreadMetadataStore::global(cx);
1042            let store = store.read(cx);
1043
1044            let entry_ids = store
1045                .entry_ids()
1046                .map(|session_id| session_id.0.to_string())
1047                .collect::<Vec<_>>();
1048            assert_eq!(entry_ids.len(), 2);
1049            assert!(entry_ids.contains(&"session-1".to_string()));
1050            assert!(entry_ids.contains(&"session-2".to_string()));
1051
1052            let first_path_entries = store
1053                .entries_for_path(&first_paths)
1054                .map(|entry| entry.session_id.0.to_string())
1055                .collect::<Vec<_>>();
1056            assert!(first_path_entries.is_empty());
1057
1058            let second_path_entries = store
1059                .entries_for_path(&second_paths)
1060                .map(|entry| entry.session_id.0.to_string())
1061                .collect::<Vec<_>>();
1062            assert_eq!(second_path_entries.len(), 2);
1063            assert!(second_path_entries.contains(&"session-1".to_string()));
1064            assert!(second_path_entries.contains(&"session-2".to_string()));
1065        });
1066
1067        cx.update(|cx| {
1068            let store = ThreadMetadataStore::global(cx);
1069            store.update(cx, |store, cx| {
1070                store.delete(acp::SessionId::new("session-2"), cx);
1071            });
1072        });
1073
1074        cx.run_until_parked();
1075
1076        cx.update(|cx| {
1077            let store = ThreadMetadataStore::global(cx);
1078            let store = store.read(cx);
1079
1080            let entry_ids = store
1081                .entry_ids()
1082                .map(|session_id| session_id.0.to_string())
1083                .collect::<Vec<_>>();
1084            assert_eq!(entry_ids, vec!["session-1"]);
1085
1086            let second_path_entries = store
1087                .entries_for_path(&second_paths)
1088                .map(|entry| entry.session_id.0.to_string())
1089                .collect::<Vec<_>>();
1090            assert_eq!(second_path_entries, vec!["session-1"]);
1091        });
1092    }
1093
1094    #[gpui::test]
1095    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
1096        init_test(cx);
1097
1098        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1099        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1100        let now = Utc::now();
1101
1102        let existing_metadata = ThreadMetadata {
1103            session_id: acp::SessionId::new("a-session-0"),
1104            agent_id: agent::ZED_AGENT_ID.clone(),
1105            title: "Existing Metadata".into(),
1106            updated_at: now - chrono::Duration::seconds(10),
1107            created_at: Some(now - chrono::Duration::seconds(10)),
1108            folder_paths: project_a_paths.clone(),
1109            archived: false,
1110        };
1111
1112        cx.update(|cx| {
1113            let store = ThreadMetadataStore::global(cx);
1114            store.update(cx, |store, cx| {
1115                store.save(existing_metadata, cx);
1116            });
1117        });
1118        cx.run_until_parked();
1119
1120        let threads_to_save = vec![
1121            (
1122                "a-session-0",
1123                "Thread A0 From Native Store",
1124                project_a_paths.clone(),
1125                now,
1126            ),
1127            (
1128                "a-session-1",
1129                "Thread A1",
1130                project_a_paths.clone(),
1131                now + chrono::Duration::seconds(1),
1132            ),
1133            (
1134                "b-session-0",
1135                "Thread B0",
1136                project_b_paths.clone(),
1137                now + chrono::Duration::seconds(2),
1138            ),
1139            (
1140                "projectless",
1141                "Projectless",
1142                PathList::default(),
1143                now + chrono::Duration::seconds(3),
1144            ),
1145        ];
1146
1147        for (session_id, title, paths, updated_at) in &threads_to_save {
1148            let save_task = cx.update(|cx| {
1149                let thread_store = ThreadStore::global(cx);
1150                let session_id = session_id.to_string();
1151                let title = title.to_string();
1152                let paths = paths.clone();
1153                thread_store.update(cx, |store, cx| {
1154                    store.save_thread(
1155                        acp::SessionId::new(session_id),
1156                        make_db_thread(&title, *updated_at),
1157                        paths,
1158                        cx,
1159                    )
1160                })
1161            });
1162            save_task.await.unwrap();
1163            cx.run_until_parked();
1164        }
1165
1166        cx.update(|cx| migrate_thread_metadata(cx));
1167        cx.run_until_parked();
1168
1169        let list = cx.update(|cx| {
1170            let store = ThreadMetadataStore::global(cx);
1171            store.read(cx).entries().cloned().collect::<Vec<_>>()
1172        });
1173
1174        assert_eq!(list.len(), 3);
1175        assert!(
1176            list.iter()
1177                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1178        );
1179
1180        let existing_metadata = list
1181            .iter()
1182            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1183            .unwrap();
1184        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1185        assert!(!existing_metadata.archived);
1186
1187        let migrated_session_ids = list
1188            .iter()
1189            .map(|metadata| metadata.session_id.0.as_ref())
1190            .collect::<Vec<_>>();
1191        assert!(migrated_session_ids.contains(&"a-session-1"));
1192        assert!(migrated_session_ids.contains(&"b-session-0"));
1193        assert!(!migrated_session_ids.contains(&"projectless"));
1194
1195        let migrated_entries = list
1196            .iter()
1197            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1198            .collect::<Vec<_>>();
1199        assert!(
1200            migrated_entries
1201                .iter()
1202                .all(|metadata| !metadata.folder_paths.is_empty())
1203        );
1204        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1205    }
1206
1207    #[gpui::test]
1208    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1209        cx: &mut TestAppContext,
1210    ) {
1211        init_test(cx);
1212
1213        let project_paths = PathList::new(&[Path::new("/project-a")]);
1214        let existing_updated_at = Utc::now();
1215
1216        let existing_metadata = ThreadMetadata {
1217            session_id: acp::SessionId::new("existing-session"),
1218            agent_id: agent::ZED_AGENT_ID.clone(),
1219            title: "Existing Metadata".into(),
1220            updated_at: existing_updated_at,
1221            created_at: Some(existing_updated_at),
1222            folder_paths: project_paths.clone(),
1223            archived: false,
1224        };
1225
1226        cx.update(|cx| {
1227            let store = ThreadMetadataStore::global(cx);
1228            store.update(cx, |store, cx| {
1229                store.save(existing_metadata, cx);
1230            });
1231        });
1232        cx.run_until_parked();
1233
1234        let save_task = cx.update(|cx| {
1235            let thread_store = ThreadStore::global(cx);
1236            thread_store.update(cx, |store, cx| {
1237                store.save_thread(
1238                    acp::SessionId::new("existing-session"),
1239                    make_db_thread(
1240                        "Updated Native Thread Title",
1241                        existing_updated_at + chrono::Duration::seconds(1),
1242                    ),
1243                    project_paths.clone(),
1244                    cx,
1245                )
1246            })
1247        });
1248        save_task.await.unwrap();
1249        cx.run_until_parked();
1250
1251        cx.update(|cx| migrate_thread_metadata(cx));
1252        cx.run_until_parked();
1253
1254        let list = cx.update(|cx| {
1255            let store = ThreadMetadataStore::global(cx);
1256            store.read(cx).entries().cloned().collect::<Vec<_>>()
1257        });
1258
1259        assert_eq!(list.len(), 1);
1260        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1261    }
1262
1263    #[gpui::test]
1264    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1265        cx: &mut TestAppContext,
1266    ) {
1267        init_test(cx);
1268
1269        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1270        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1271        let now = Utc::now();
1272
1273        // Create 7 threads for project A and 3 for project B
1274        let mut threads_to_save = Vec::new();
1275        for i in 0..7 {
1276            threads_to_save.push((
1277                format!("a-session-{i}"),
1278                format!("Thread A{i}"),
1279                project_a_paths.clone(),
1280                now + chrono::Duration::seconds(i as i64),
1281            ));
1282        }
1283        for i in 0..3 {
1284            threads_to_save.push((
1285                format!("b-session-{i}"),
1286                format!("Thread B{i}"),
1287                project_b_paths.clone(),
1288                now + chrono::Duration::seconds(i as i64),
1289            ));
1290        }
1291
1292        for (session_id, title, paths, updated_at) in &threads_to_save {
1293            let save_task = cx.update(|cx| {
1294                let thread_store = ThreadStore::global(cx);
1295                let session_id = session_id.to_string();
1296                let title = title.to_string();
1297                let paths = paths.clone();
1298                thread_store.update(cx, |store, cx| {
1299                    store.save_thread(
1300                        acp::SessionId::new(session_id),
1301                        make_db_thread(&title, *updated_at),
1302                        paths,
1303                        cx,
1304                    )
1305                })
1306            });
1307            save_task.await.unwrap();
1308            cx.run_until_parked();
1309        }
1310
1311        cx.update(|cx| migrate_thread_metadata(cx));
1312        cx.run_until_parked();
1313
1314        let list = cx.update(|cx| {
1315            let store = ThreadMetadataStore::global(cx);
1316            store.read(cx).entries().cloned().collect::<Vec<_>>()
1317        });
1318
1319        assert_eq!(list.len(), 10);
1320
1321        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1322        let mut project_a_entries: Vec<_> = list
1323            .iter()
1324            .filter(|m| m.folder_paths == project_a_paths)
1325            .collect();
1326        assert_eq!(project_a_entries.len(), 7);
1327        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1328
1329        for entry in &project_a_entries[..5] {
1330            assert!(
1331                !entry.archived,
1332                "Expected {} to be unarchived (top 5 most recent)",
1333                entry.session_id.0
1334            );
1335        }
1336        for entry in &project_a_entries[5..] {
1337            assert!(
1338                entry.archived,
1339                "Expected {} to be archived (older than top 5)",
1340                entry.session_id.0
1341            );
1342        }
1343
1344        // Project B: all 3 should be unarchived (under the limit)
1345        let project_b_entries: Vec<_> = list
1346            .iter()
1347            .filter(|m| m.folder_paths == project_b_paths)
1348            .collect();
1349        assert_eq!(project_b_entries.len(), 3);
1350        assert!(project_b_entries.iter().all(|m| !m.archived));
1351    }
1352
1353    #[gpui::test]
1354    async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1355        init_test(cx);
1356
1357        let fs = FakeFs::new(cx.executor());
1358        let project = Project::test(fs, None::<&Path>, cx).await;
1359        let connection = Rc::new(StubAgentConnection::new());
1360
1361        let thread = cx
1362            .update(|cx| {
1363                connection
1364                    .clone()
1365                    .new_session(project.clone(), PathList::default(), cx)
1366            })
1367            .await
1368            .unwrap();
1369        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1370
1371        cx.update(|cx| {
1372            thread.update(cx, |thread, cx| {
1373                thread.set_title("Draft Thread".into(), cx).detach();
1374            });
1375        });
1376        cx.run_until_parked();
1377
1378        let metadata_ids = cx.update(|cx| {
1379            ThreadMetadataStore::global(cx)
1380                .read(cx)
1381                .entry_ids()
1382                .collect::<Vec<_>>()
1383        });
1384        assert_eq!(metadata_ids, vec![session_id]);
1385
1386        drop(thread);
1387        cx.update(|_| {});
1388        cx.run_until_parked();
1389        cx.run_until_parked();
1390
1391        let metadata_ids = cx.update(|cx| {
1392            ThreadMetadataStore::global(cx)
1393                .read(cx)
1394                .entry_ids()
1395                .collect::<Vec<_>>()
1396        });
1397        assert!(
1398            metadata_ids.is_empty(),
1399            "expected empty draft thread metadata to be deleted on release"
1400        );
1401    }
1402
1403    #[gpui::test]
1404    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1405        init_test(cx);
1406
1407        let fs = FakeFs::new(cx.executor());
1408        let project = Project::test(fs, None::<&Path>, cx).await;
1409        let connection = Rc::new(StubAgentConnection::new());
1410
1411        let thread = cx
1412            .update(|cx| {
1413                connection
1414                    .clone()
1415                    .new_session(project.clone(), PathList::default(), cx)
1416            })
1417            .await
1418            .unwrap();
1419        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1420
1421        cx.update(|cx| {
1422            thread.update(cx, |thread, cx| {
1423                thread.push_user_content_block(None, "Hello".into(), cx);
1424            });
1425        });
1426        cx.run_until_parked();
1427
1428        let metadata_ids = cx.update(|cx| {
1429            ThreadMetadataStore::global(cx)
1430                .read(cx)
1431                .entry_ids()
1432                .collect::<Vec<_>>()
1433        });
1434        assert_eq!(metadata_ids, vec![session_id.clone()]);
1435
1436        drop(thread);
1437        cx.update(|_| {});
1438        cx.run_until_parked();
1439
1440        let metadata_ids = cx.update(|cx| {
1441            ThreadMetadataStore::global(cx)
1442                .read(cx)
1443                .entry_ids()
1444                .collect::<Vec<_>>()
1445        });
1446        assert_eq!(metadata_ids, vec![session_id]);
1447    }
1448
1449    #[gpui::test]
1450    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1451        init_test(cx);
1452
1453        let fs = FakeFs::new(cx.executor());
1454        let project = Project::test(fs, None::<&Path>, cx).await;
1455        let connection = Rc::new(StubAgentConnection::new());
1456
1457        // Create a regular (non-subagent) AcpThread.
1458        let regular_thread = cx
1459            .update(|cx| {
1460                connection
1461                    .clone()
1462                    .new_session(project.clone(), PathList::default(), cx)
1463            })
1464            .await
1465            .unwrap();
1466
1467        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1468
1469        // Set a title on the regular thread to trigger a save via handle_thread_update.
1470        cx.update(|cx| {
1471            regular_thread.update(cx, |thread, cx| {
1472                thread.set_title("Regular Thread".into(), cx).detach();
1473            });
1474        });
1475        cx.run_until_parked();
1476
1477        // Create a subagent AcpThread
1478        let subagent_session_id = acp::SessionId::new("subagent-session");
1479        let subagent_thread = cx.update(|cx| {
1480            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1481            cx.new(|cx| {
1482                acp_thread::AcpThread::new(
1483                    Some(regular_session_id.clone()),
1484                    Some("Subagent Thread".into()),
1485                    None,
1486                    connection.clone(),
1487                    project.clone(),
1488                    action_log,
1489                    subagent_session_id.clone(),
1490                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1491                    cx,
1492                )
1493            })
1494        });
1495
1496        // Set a title on the subagent thread to trigger handle_thread_update.
1497        cx.update(|cx| {
1498            subagent_thread.update(cx, |thread, cx| {
1499                thread
1500                    .set_title("Subagent Thread Title".into(), cx)
1501                    .detach();
1502            });
1503        });
1504        cx.run_until_parked();
1505
1506        // List all metadata from the store cache.
1507        let list = cx.update(|cx| {
1508            let store = ThreadMetadataStore::global(cx);
1509            store.read(cx).entries().cloned().collect::<Vec<_>>()
1510        });
1511
1512        // The subagent thread should NOT appear in the sidebar metadata.
1513        // Only the regular thread should be listed.
1514        assert_eq!(
1515            list.len(),
1516            1,
1517            "Expected only the regular thread in sidebar metadata, \
1518             but found {} entries (subagent threads are leaking into the sidebar)",
1519            list.len(),
1520        );
1521        assert_eq!(list[0].session_id, regular_session_id);
1522        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1523    }
1524
1525    #[test]
1526    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1527        let now = Utc::now();
1528
1529        let operations = vec![
1530            DbOperation::Upsert(make_metadata(
1531                "session-1",
1532                "First Thread",
1533                now,
1534                PathList::default(),
1535            )),
1536            DbOperation::Delete(acp::SessionId::new("session-1")),
1537        ];
1538
1539        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1540
1541        assert_eq!(deduped.len(), 1);
1542        assert_eq!(
1543            deduped[0],
1544            DbOperation::Delete(acp::SessionId::new("session-1"))
1545        );
1546    }
1547
1548    #[test]
1549    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1550        let now = Utc::now();
1551        let later = now + chrono::Duration::seconds(1);
1552
1553        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1554        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1555
1556        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1557            DbOperation::Upsert(old_metadata),
1558            DbOperation::Upsert(new_metadata.clone()),
1559        ]);
1560
1561        assert_eq!(deduped.len(), 1);
1562        assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1563    }
1564
1565    #[test]
1566    fn test_dedup_db_operations_preserves_distinct_sessions() {
1567        let now = Utc::now();
1568
1569        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1570        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1571        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1572            DbOperation::Upsert(metadata1.clone()),
1573            DbOperation::Upsert(metadata2.clone()),
1574        ]);
1575
1576        assert_eq!(deduped.len(), 2);
1577        assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1578        assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1579    }
1580
1581    #[gpui::test]
1582    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1583        init_test(cx);
1584
1585        let paths = PathList::new(&[Path::new("/project-a")]);
1586        let now = Utc::now();
1587        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1588
1589        cx.update(|cx| {
1590            let store = ThreadMetadataStore::global(cx);
1591            store.update(cx, |store, cx| {
1592                store.save(metadata, cx);
1593            });
1594        });
1595
1596        cx.run_until_parked();
1597
1598        cx.update(|cx| {
1599            let store = ThreadMetadataStore::global(cx);
1600            let store = store.read(cx);
1601
1602            let path_entries = store
1603                .entries_for_path(&paths)
1604                .map(|e| e.session_id.0.to_string())
1605                .collect::<Vec<_>>();
1606            assert_eq!(path_entries, vec!["session-1"]);
1607
1608            let archived = store
1609                .archived_entries()
1610                .map(|e| e.session_id.0.to_string())
1611                .collect::<Vec<_>>();
1612            assert!(archived.is_empty());
1613        });
1614
1615        cx.update(|cx| {
1616            let store = ThreadMetadataStore::global(cx);
1617            store.update(cx, |store, cx| {
1618                store.archive(&acp::SessionId::new("session-1"), cx);
1619            });
1620        });
1621
1622        cx.run_until_parked();
1623
1624        cx.update(|cx| {
1625            let store = ThreadMetadataStore::global(cx);
1626            let store = store.read(cx);
1627
1628            let path_entries = store
1629                .entries_for_path(&paths)
1630                .map(|e| e.session_id.0.to_string())
1631                .collect::<Vec<_>>();
1632            assert!(path_entries.is_empty());
1633
1634            let archived = store.archived_entries().collect::<Vec<_>>();
1635            assert_eq!(archived.len(), 1);
1636            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1637            assert!(archived[0].archived);
1638        });
1639
1640        cx.update(|cx| {
1641            let store = ThreadMetadataStore::global(cx);
1642            store.update(cx, |store, cx| {
1643                store.unarchive(&acp::SessionId::new("session-1"), cx);
1644            });
1645        });
1646
1647        cx.run_until_parked();
1648
1649        cx.update(|cx| {
1650            let store = ThreadMetadataStore::global(cx);
1651            let store = store.read(cx);
1652
1653            let path_entries = store
1654                .entries_for_path(&paths)
1655                .map(|e| e.session_id.0.to_string())
1656                .collect::<Vec<_>>();
1657            assert_eq!(path_entries, vec!["session-1"]);
1658
1659            let archived = store
1660                .archived_entries()
1661                .map(|e| e.session_id.0.to_string())
1662                .collect::<Vec<_>>();
1663            assert!(archived.is_empty());
1664        });
1665    }
1666
1667    #[gpui::test]
1668    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1669        init_test(cx);
1670
1671        let paths = PathList::new(&[Path::new("/project-a")]);
1672        let now = Utc::now();
1673
1674        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1675        let metadata2 = make_metadata(
1676            "session-2",
1677            "Archived Thread",
1678            now - chrono::Duration::seconds(1),
1679            paths.clone(),
1680        );
1681
1682        cx.update(|cx| {
1683            let store = ThreadMetadataStore::global(cx);
1684            store.update(cx, |store, cx| {
1685                store.save(metadata1, cx);
1686                store.save(metadata2, cx);
1687            });
1688        });
1689
1690        cx.run_until_parked();
1691
1692        cx.update(|cx| {
1693            let store = ThreadMetadataStore::global(cx);
1694            store.update(cx, |store, cx| {
1695                store.archive(&acp::SessionId::new("session-2"), cx);
1696            });
1697        });
1698
1699        cx.run_until_parked();
1700
1701        cx.update(|cx| {
1702            let store = ThreadMetadataStore::global(cx);
1703            let store = store.read(cx);
1704
1705            let path_entries = store
1706                .entries_for_path(&paths)
1707                .map(|e| e.session_id.0.to_string())
1708                .collect::<Vec<_>>();
1709            assert_eq!(path_entries, vec!["session-1"]);
1710
1711            let all_entries = store
1712                .entries()
1713                .map(|e| e.session_id.0.to_string())
1714                .collect::<Vec<_>>();
1715            assert_eq!(all_entries.len(), 2);
1716            assert!(all_entries.contains(&"session-1".to_string()));
1717            assert!(all_entries.contains(&"session-2".to_string()));
1718
1719            let archived = store
1720                .archived_entries()
1721                .map(|e| e.session_id.0.to_string())
1722                .collect::<Vec<_>>();
1723            assert_eq!(archived, vec!["session-2"]);
1724        });
1725    }
1726
1727    #[gpui::test]
1728    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1729        init_test(cx);
1730
1731        let paths = PathList::new(&[Path::new("/project-a")]);
1732        let now = Utc::now();
1733
1734        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1735        let m2 = make_metadata(
1736            "session-2",
1737            "Thread Two",
1738            now - chrono::Duration::seconds(1),
1739            paths.clone(),
1740        );
1741        let m3 = make_metadata(
1742            "session-3",
1743            "Thread Three",
1744            now - chrono::Duration::seconds(2),
1745            paths,
1746        );
1747
1748        cx.update(|cx| {
1749            let store = ThreadMetadataStore::global(cx);
1750            store.update(cx, |store, cx| {
1751                store.save_all(vec![m1, m2, m3], cx);
1752            });
1753        });
1754
1755        cx.run_until_parked();
1756
1757        cx.update(|cx| {
1758            let store = ThreadMetadataStore::global(cx);
1759            let store = store.read(cx);
1760
1761            let all_entries = store
1762                .entries()
1763                .map(|e| e.session_id.0.to_string())
1764                .collect::<Vec<_>>();
1765            assert_eq!(all_entries.len(), 3);
1766            assert!(all_entries.contains(&"session-1".to_string()));
1767            assert!(all_entries.contains(&"session-2".to_string()));
1768            assert!(all_entries.contains(&"session-3".to_string()));
1769
1770            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1771            assert_eq!(entry_ids.len(), 3);
1772        });
1773    }
1774
1775    #[gpui::test]
1776    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1777        init_test(cx);
1778
1779        let paths = PathList::new(&[Path::new("/project-a")]);
1780        let now = Utc::now();
1781        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1782
1783        cx.update(|cx| {
1784            let store = ThreadMetadataStore::global(cx);
1785            store.update(cx, |store, cx| {
1786                store.save(metadata, cx);
1787            });
1788        });
1789
1790        cx.run_until_parked();
1791
1792        cx.update(|cx| {
1793            let store = ThreadMetadataStore::global(cx);
1794            store.update(cx, |store, cx| {
1795                store.archive(&acp::SessionId::new("session-1"), cx);
1796            });
1797        });
1798
1799        cx.run_until_parked();
1800
1801        cx.update(|cx| {
1802            let store = ThreadMetadataStore::global(cx);
1803            store.update(cx, |store, cx| {
1804                let _ = store.reload(cx);
1805            });
1806        });
1807
1808        cx.run_until_parked();
1809
1810        cx.update(|cx| {
1811            let store = ThreadMetadataStore::global(cx);
1812            let store = store.read(cx);
1813
1814            let thread = store
1815                .entries()
1816                .find(|e| e.session_id.0.as_ref() == "session-1")
1817                .expect("thread should exist after reload");
1818            assert!(thread.archived);
1819
1820            let path_entries = store
1821                .entries_for_path(&paths)
1822                .map(|e| e.session_id.0.to_string())
1823                .collect::<Vec<_>>();
1824            assert!(path_entries.is_empty());
1825
1826            let archived = store
1827                .archived_entries()
1828                .map(|e| e.session_id.0.to_string())
1829                .collect::<Vec<_>>();
1830            assert_eq!(archived, vec!["session-1"]);
1831        });
1832    }
1833
1834    #[gpui::test]
1835    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1836        init_test(cx);
1837
1838        cx.run_until_parked();
1839
1840        cx.update(|cx| {
1841            let store = ThreadMetadataStore::global(cx);
1842            store.update(cx, |store, cx| {
1843                store.archive(&acp::SessionId::new("nonexistent"), cx);
1844            });
1845        });
1846
1847        cx.run_until_parked();
1848
1849        cx.update(|cx| {
1850            let store = ThreadMetadataStore::global(cx);
1851            let store = store.read(cx);
1852
1853            assert!(store.is_empty());
1854            assert_eq!(store.entries().count(), 0);
1855            assert_eq!(store.archived_entries().count(), 0);
1856        });
1857    }
1858
1859    #[gpui::test]
1860    async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1861        init_test(cx);
1862
1863        let paths = PathList::new(&[Path::new("/project-a")]);
1864        let now = Utc::now();
1865        let metadata = make_metadata("session-1", "Thread 1", now, paths);
1866        let session_id = metadata.session_id.clone();
1867
1868        cx.update(|cx| {
1869            let store = ThreadMetadataStore::global(cx);
1870            store.update(cx, |store, cx| {
1871                store.save(metadata.clone(), cx);
1872                store.archive(&session_id, cx);
1873            });
1874        });
1875
1876        cx.run_until_parked();
1877
1878        cx.update(|cx| {
1879            let store = ThreadMetadataStore::global(cx);
1880            let store = store.read(cx);
1881
1882            let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1883            pretty_assertions::assert_eq!(
1884                entries,
1885                vec![ThreadMetadata {
1886                    archived: true,
1887                    ..metadata
1888                }]
1889            );
1890        });
1891    }
1892}