thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AcpThreadEvent;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::Context as _;
   7use chrono::{DateTime, Utc};
   8use collections::{HashMap, HashSet};
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    ThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We skip migrating threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    let store = ThreadMetadataStore::global(cx);
  46    let db = store.read(cx).db.clone();
  47
  48    cx.spawn(async move |cx| {
  49        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  50
  51        let is_first_migration = existing_entries.is_empty();
  52
  53        let mut to_migrate = store.read_with(cx, |_store, cx| {
  54            ThreadStore::global(cx)
  55                .read(cx)
  56                .entries()
  57                .filter_map(|entry| {
  58                    if existing_entries.contains(&entry.id.0) || entry.folder_paths.is_empty() {
  59                        return None;
  60                    }
  61
  62                    Some(ThreadMetadata {
  63                        session_id: entry.id,
  64                        agent_id: ZED_AGENT_ID.clone(),
  65                        title: entry.title,
  66                        updated_at: entry.updated_at,
  67                        created_at: entry.created_at,
  68                        folder_paths: entry.folder_paths,
  69                        archived: true,
  70                    })
  71                })
  72                .collect::<Vec<_>>()
  73        });
  74
  75        if to_migrate.is_empty() {
  76            return anyhow::Ok(());
  77        }
  78
  79        // On the first migration (no entries in DB yet), keep the 5 most
  80        // recent threads per project unarchived.
  81        if is_first_migration {
  82            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  83            for entry in &mut to_migrate {
  84                per_project
  85                    .entry(entry.folder_paths.clone())
  86                    .or_default()
  87                    .push(entry);
  88            }
  89            for entries in per_project.values_mut() {
  90                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  91                for entry in entries.iter_mut().take(5) {
  92                    entry.archived = false;
  93                }
  94            }
  95        }
  96
  97        log::info!("Migrating {} thread store entries", to_migrate.len());
  98
  99        // Manually save each entry to the database and call reload, otherwise
 100        // we'll end up triggering lots of reloads after each save
 101        for entry in to_migrate {
 102            db.save(entry).await?;
 103        }
 104
 105        log::info!("Finished migrating thread store entries");
 106
 107        let _ = store.update(cx, |store, cx| store.reload(cx));
 108        anyhow::Ok(())
 109    })
 110    .detach_and_log_err(cx);
 111}
 112
 113struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 114impl Global for GlobalThreadMetadataStore {}
 115
 116/// Lightweight metadata for any thread (native or ACP), enough to populate
 117/// the sidebar list and route to the correct load path when clicked.
 118#[derive(Debug, Clone, PartialEq)]
 119pub struct ThreadMetadata {
 120    pub session_id: acp::SessionId,
 121    pub agent_id: AgentId,
 122    pub title: SharedString,
 123    pub updated_at: DateTime<Utc>,
 124    pub created_at: Option<DateTime<Utc>>,
 125    pub folder_paths: PathList,
 126    pub archived: bool,
 127}
 128
 129impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 130    fn from(meta: &ThreadMetadata) -> Self {
 131        Self {
 132            session_id: meta.session_id.clone(),
 133            work_dirs: Some(meta.folder_paths.clone()),
 134            title: Some(meta.title.clone()),
 135            updated_at: Some(meta.updated_at),
 136            created_at: meta.created_at,
 137            meta: None,
 138        }
 139    }
 140}
 141
 142/// The store holds all metadata needed to show threads in the sidebar/the archive.
 143///
 144/// Automatically listens to AcpThread events and updates metadata if it has changed.
 145pub struct ThreadMetadataStore {
 146    db: ThreadMetadataDb,
 147    threads: HashMap<acp::SessionId, ThreadMetadata>,
 148    threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 149    reload_task: Option<Shared<Task<()>>>,
 150    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 151    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 152    _db_operations_task: Task<()>,
 153}
 154
 155#[derive(Debug, PartialEq)]
 156enum DbOperation {
 157    Upsert(ThreadMetadata),
 158    Delete(acp::SessionId),
 159}
 160
 161impl DbOperation {
 162    fn id(&self) -> &acp::SessionId {
 163        match self {
 164            DbOperation::Upsert(thread) => &thread.session_id,
 165            DbOperation::Delete(session_id) => session_id,
 166        }
 167    }
 168}
 169
 170impl ThreadMetadataStore {
 171    #[cfg(not(any(test, feature = "test-support")))]
 172    pub fn init_global(cx: &mut App) {
 173        if cx.has_global::<Self>() {
 174            return;
 175        }
 176
 177        let db = ThreadMetadataDb::global(cx);
 178        let thread_store = cx.new(|cx| Self::new(db, cx));
 179        cx.set_global(GlobalThreadMetadataStore(thread_store));
 180    }
 181
 182    #[cfg(any(test, feature = "test-support"))]
 183    pub fn init_global(cx: &mut App) {
 184        let thread = std::thread::current();
 185        let test_name = thread.name().unwrap_or("unknown_test");
 186        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 187        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 188        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 189        cx.set_global(GlobalThreadMetadataStore(thread_store));
 190    }
 191
 192    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 193        cx.try_global::<GlobalThreadMetadataStore>()
 194            .map(|store| store.0.clone())
 195    }
 196
 197    pub fn global(cx: &App) -> Entity<Self> {
 198        cx.global::<GlobalThreadMetadataStore>().0.clone()
 199    }
 200
 201    pub fn is_empty(&self) -> bool {
 202        self.threads.is_empty()
 203    }
 204
 205    /// Returns all thread IDs.
 206    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 207        self.threads.keys().cloned()
 208    }
 209
 210    /// Returns the metadata for a specific thread, if it exists.
 211    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 212        self.threads.get(session_id)
 213    }
 214
 215    /// Returns all threads.
 216    pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 217        self.threads.values()
 218    }
 219
 220    /// Returns all archived threads.
 221    pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 222        self.entries().filter(|t| t.archived)
 223    }
 224
 225    /// Returns all threads for the given path list, excluding archived threads.
 226    pub fn entries_for_path(
 227        &self,
 228        path_list: &PathList,
 229    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 230        self.threads_by_paths
 231            .get(path_list)
 232            .into_iter()
 233            .flatten()
 234            .filter_map(|s| self.threads.get(s))
 235            .filter(|s| !s.archived)
 236    }
 237
 238    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 239        let db = self.db.clone();
 240        self.reload_task.take();
 241
 242        let list_task = cx
 243            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 244
 245        let reload_task = cx
 246            .spawn(async move |this, cx| {
 247                let Some(rows) = list_task.await.log_err() else {
 248                    return;
 249                };
 250
 251                this.update(cx, |this, cx| {
 252                    this.threads.clear();
 253                    this.threads_by_paths.clear();
 254
 255                    for row in rows {
 256                        this.threads_by_paths
 257                            .entry(row.folder_paths.clone())
 258                            .or_default()
 259                            .insert(row.session_id.clone());
 260                        this.threads.insert(row.session_id.clone(), row);
 261                    }
 262
 263                    cx.notify();
 264                })
 265                .ok();
 266            })
 267            .shared();
 268        self.reload_task = Some(reload_task.clone());
 269        reload_task
 270    }
 271
 272    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 273        if !cx.has_flag::<AgentV2FeatureFlag>() {
 274            return;
 275        }
 276
 277        for metadata in metadata {
 278            self.save_internal(metadata);
 279        }
 280        cx.notify();
 281    }
 282
 283    #[cfg(any(test, feature = "test-support"))]
 284    pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 285        self.save(metadata, cx)
 286    }
 287
 288    fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 289        if !cx.has_flag::<AgentV2FeatureFlag>() {
 290            return;
 291        }
 292
 293        self.save_internal(metadata);
 294        cx.notify();
 295    }
 296
 297    fn save_internal(&mut self, metadata: ThreadMetadata) {
 298        // If the folder paths have changed, we need to clear the old entry
 299        if let Some(thread) = self.threads.get(&metadata.session_id)
 300            && thread.folder_paths != metadata.folder_paths
 301            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 302        {
 303            session_ids.remove(&metadata.session_id);
 304        }
 305
 306        self.threads
 307            .insert(metadata.session_id.clone(), metadata.clone());
 308
 309        self.threads_by_paths
 310            .entry(metadata.folder_paths.clone())
 311            .or_default()
 312            .insert(metadata.session_id.clone());
 313
 314        self.pending_thread_ops_tx
 315            .try_send(DbOperation::Upsert(metadata))
 316            .log_err();
 317    }
 318
 319    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 320        self.update_archived(session_id, true, cx);
 321    }
 322
 323    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 324        self.update_archived(session_id, false, cx);
 325    }
 326
 327    fn update_archived(
 328        &mut self,
 329        session_id: &acp::SessionId,
 330        archived: bool,
 331        cx: &mut Context<Self>,
 332    ) {
 333        if !cx.has_flag::<AgentV2FeatureFlag>() {
 334            return;
 335        }
 336
 337        if let Some(thread) = self.threads.get(session_id) {
 338            self.save_internal(ThreadMetadata {
 339                archived,
 340                ..thread.clone()
 341            });
 342            cx.notify();
 343        }
 344    }
 345
 346    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 347        if !cx.has_flag::<AgentV2FeatureFlag>() {
 348            return;
 349        }
 350
 351        if let Some(thread) = self.threads.get(&session_id)
 352            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 353        {
 354            session_ids.remove(&session_id);
 355        }
 356        self.threads.remove(&session_id);
 357        self.pending_thread_ops_tx
 358            .try_send(DbOperation::Delete(session_id))
 359            .log_err();
 360        cx.notify();
 361    }
 362
 363    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 364        let weak_store = cx.weak_entity();
 365
 366        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 367            // Don't track subagent threads in the sidebar.
 368            if thread.parent_session_id().is_some() {
 369                return;
 370            }
 371
 372            let thread_entity = cx.entity();
 373
 374            cx.on_release({
 375                let weak_store = weak_store.clone();
 376                move |thread, cx| {
 377                    weak_store
 378                        .update(cx, |store, cx| {
 379                            let session_id = thread.session_id().clone();
 380                            store.session_subscriptions.remove(&session_id);
 381                            if thread.entries().is_empty() {
 382                                // Empty threads can be unloaded without ever being
 383                                // durably persisted by the underlying agent.
 384                                store.delete(session_id, cx);
 385                            }
 386                        })
 387                        .ok();
 388                }
 389            })
 390            .detach();
 391
 392            weak_store
 393                .update(cx, |this, cx| {
 394                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
 395                    this.session_subscriptions
 396                        .insert(thread.session_id().clone(), subscription);
 397                })
 398                .ok();
 399        })
 400        .detach();
 401
 402        let (tx, rx) = smol::channel::unbounded();
 403        let _db_operations_task = cx.background_spawn({
 404            let db = db.clone();
 405            async move {
 406                while let Ok(first_update) = rx.recv().await {
 407                    let mut updates = vec![first_update];
 408                    while let Ok(update) = rx.try_recv() {
 409                        updates.push(update);
 410                    }
 411                    let updates = Self::dedup_db_operations(updates);
 412                    for operation in updates {
 413                        match operation {
 414                            DbOperation::Upsert(metadata) => {
 415                                db.save(metadata).await.log_err();
 416                            }
 417                            DbOperation::Delete(session_id) => {
 418                                db.delete(session_id).await.log_err();
 419                            }
 420                        }
 421                    }
 422                }
 423            }
 424        });
 425
 426        let mut this = Self {
 427            db,
 428            threads: HashMap::default(),
 429            threads_by_paths: HashMap::default(),
 430            reload_task: None,
 431            session_subscriptions: HashMap::default(),
 432            pending_thread_ops_tx: tx,
 433            _db_operations_task,
 434        };
 435        let _ = this.reload(cx);
 436        this
 437    }
 438
 439    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 440        let mut ops = HashMap::default();
 441        for operation in operations.into_iter().rev() {
 442            if ops.contains_key(operation.id()) {
 443                continue;
 444            }
 445            ops.insert(operation.id().clone(), operation);
 446        }
 447        ops.into_values().collect()
 448    }
 449
 450    fn handle_thread_event(
 451        &mut self,
 452        thread: Entity<acp_thread::AcpThread>,
 453        event: &AcpThreadEvent,
 454        cx: &mut Context<Self>,
 455    ) {
 456        // Don't track subagent threads in the sidebar.
 457        if thread.read(cx).parent_session_id().is_some() {
 458            return;
 459        }
 460
 461        match event {
 462            AcpThreadEvent::NewEntry
 463            | AcpThreadEvent::TitleUpdated
 464            | AcpThreadEvent::EntryUpdated(_)
 465            | AcpThreadEvent::EntriesRemoved(_)
 466            | AcpThreadEvent::ToolAuthorizationRequested(_)
 467            | AcpThreadEvent::ToolAuthorizationReceived(_)
 468            | AcpThreadEvent::Retry(_)
 469            | AcpThreadEvent::Stopped(_)
 470            | AcpThreadEvent::Error
 471            | AcpThreadEvent::LoadError(_)
 472            | AcpThreadEvent::Refusal
 473            | AcpThreadEvent::WorkingDirectoriesUpdated => {
 474                let thread_ref = thread.read(cx);
 475                let existing_thread = self.threads.get(thread_ref.session_id());
 476                let session_id = thread_ref.session_id().clone();
 477                let title = thread_ref
 478                    .title()
 479                    .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 480
 481                let updated_at = Utc::now();
 482
 483                let created_at = existing_thread
 484                    .and_then(|t| t.created_at)
 485                    .unwrap_or_else(|| updated_at);
 486
 487                let agent_id = thread_ref.connection().agent_id();
 488
 489                let folder_paths = {
 490                    let project = thread_ref.project().read(cx);
 491                    let paths: Vec<Arc<Path>> = project
 492                        .visible_worktrees(cx)
 493                        .map(|worktree| worktree.read(cx).abs_path())
 494                        .collect();
 495                    PathList::new(&paths)
 496                };
 497
 498                let archived = existing_thread.map(|t| t.archived).unwrap_or(false);
 499
 500                let metadata = ThreadMetadata {
 501                    session_id,
 502                    agent_id,
 503                    title,
 504                    created_at: Some(created_at),
 505                    updated_at,
 506                    folder_paths,
 507                    archived,
 508                };
 509
 510                self.save(metadata, cx);
 511            }
 512            AcpThreadEvent::TokenUsageUpdated
 513            | AcpThreadEvent::SubagentSpawned(_)
 514            | AcpThreadEvent::PromptCapabilitiesUpdated
 515            | AcpThreadEvent::AvailableCommandsUpdated(_)
 516            | AcpThreadEvent::ModeUpdated(_)
 517            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
 518        }
 519    }
 520}
 521
 522impl Global for ThreadMetadataStore {}
 523
 524struct ThreadMetadataDb(ThreadSafeConnection);
 525
 526impl Domain for ThreadMetadataDb {
 527    const NAME: &str = stringify!(ThreadMetadataDb);
 528
 529    const MIGRATIONS: &[&str] = &[
 530        sql!(
 531            CREATE TABLE IF NOT EXISTS sidebar_threads(
 532                session_id TEXT PRIMARY KEY,
 533                agent_id TEXT,
 534                title TEXT NOT NULL,
 535                updated_at TEXT NOT NULL,
 536                created_at TEXT,
 537                folder_paths TEXT,
 538                folder_paths_order TEXT
 539            ) STRICT;
 540        ),
 541        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 542    ];
 543}
 544
 545db::static_connection!(ThreadMetadataDb, []);
 546
 547impl ThreadMetadataDb {
 548    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 549        self.select::<Arc<str>>(
 550            "SELECT session_id FROM sidebar_threads \
 551             ORDER BY updated_at DESC",
 552        )?()
 553    }
 554
 555    /// List all sidebar thread metadata, ordered by updated_at descending.
 556    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 557        self.select::<ThreadMetadata>(
 558            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
 559             FROM sidebar_threads \
 560             ORDER BY updated_at DESC"
 561        )?()
 562    }
 563
 564    /// Upsert metadata for a thread.
 565    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 566        let id = row.session_id.0.clone();
 567        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 568            None
 569        } else {
 570            Some(row.agent_id.to_string())
 571        };
 572        let title = row.title.to_string();
 573        let updated_at = row.updated_at.to_rfc3339();
 574        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 575        let serialized = row.folder_paths.serialize();
 576        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 577            (None, None)
 578        } else {
 579            (Some(serialized.paths), Some(serialized.order))
 580        };
 581        let archived = row.archived;
 582
 583        self.write(move |conn| {
 584            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
 585                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
 586                       ON CONFLICT(session_id) DO UPDATE SET \
 587                           agent_id = excluded.agent_id, \
 588                           title = excluded.title, \
 589                           updated_at = excluded.updated_at, \
 590                           created_at = excluded.created_at, \
 591                           folder_paths = excluded.folder_paths, \
 592                           folder_paths_order = excluded.folder_paths_order, \
 593                           archived = excluded.archived";
 594            let mut stmt = Statement::prepare(conn, sql)?;
 595            let mut i = stmt.bind(&id, 1)?;
 596            i = stmt.bind(&agent_id, i)?;
 597            i = stmt.bind(&title, i)?;
 598            i = stmt.bind(&updated_at, i)?;
 599            i = stmt.bind(&created_at, i)?;
 600            i = stmt.bind(&folder_paths, i)?;
 601            i = stmt.bind(&folder_paths_order, i)?;
 602            stmt.bind(&archived, i)?;
 603            stmt.exec()
 604        })
 605        .await
 606    }
 607
 608    /// Delete metadata for a single thread.
 609    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 610        let id = session_id.0.clone();
 611        self.write(move |conn| {
 612            let mut stmt =
 613                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 614            stmt.bind(&id, 1)?;
 615            stmt.exec()
 616        })
 617        .await
 618    }
 619}
 620
 621impl Column for ThreadMetadata {
 622    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 623        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 624        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 625        let (title, next): (String, i32) = Column::column(statement, next)?;
 626        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 627        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 628        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 629        let (folder_paths_order_str, next): (Option<String>, i32) =
 630            Column::column(statement, next)?;
 631        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 632
 633        let agent_id = agent_id
 634            .map(|id| AgentId::new(id))
 635            .unwrap_or(ZED_AGENT_ID.clone());
 636
 637        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 638        let created_at = created_at_str
 639            .as_deref()
 640            .map(DateTime::parse_from_rfc3339)
 641            .transpose()?
 642            .map(|dt| dt.with_timezone(&Utc));
 643
 644        let folder_paths = folder_paths_str
 645            .map(|paths| {
 646                PathList::deserialize(&util::path_list::SerializedPathList {
 647                    paths,
 648                    order: folder_paths_order_str.unwrap_or_default(),
 649                })
 650            })
 651            .unwrap_or_default();
 652
 653        Ok((
 654            ThreadMetadata {
 655                session_id: acp::SessionId::new(id),
 656                agent_id,
 657                title: title.into(),
 658                updated_at,
 659                created_at,
 660                folder_paths,
 661                archived,
 662            },
 663            next,
 664        ))
 665    }
 666}
 667
 668#[cfg(test)]
 669mod tests {
 670    use super::*;
 671    use acp_thread::{AgentConnection, StubAgentConnection};
 672    use action_log::ActionLog;
 673    use agent::DbThread;
 674    use agent_client_protocol as acp;
 675    use feature_flags::FeatureFlagAppExt;
 676    use gpui::TestAppContext;
 677    use project::FakeFs;
 678    use project::Project;
 679    use std::path::Path;
 680    use std::rc::Rc;
 681
 682    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 683        DbThread {
 684            title: title.to_string().into(),
 685            messages: Vec::new(),
 686            updated_at,
 687            detailed_summary: None,
 688            initial_project_snapshot: None,
 689            cumulative_token_usage: Default::default(),
 690            request_token_usage: Default::default(),
 691            model: None,
 692            profile: None,
 693            imported: false,
 694            subagent_context: None,
 695            speed: None,
 696            thinking_enabled: false,
 697            thinking_effort: None,
 698            draft_prompt: None,
 699            ui_scroll_position: None,
 700        }
 701    }
 702
 703    fn make_metadata(
 704        session_id: &str,
 705        title: &str,
 706        updated_at: DateTime<Utc>,
 707        folder_paths: PathList,
 708    ) -> ThreadMetadata {
 709        ThreadMetadata {
 710            archived: false,
 711            session_id: acp::SessionId::new(session_id),
 712            agent_id: agent::ZED_AGENT_ID.clone(),
 713            title: title.to_string().into(),
 714            updated_at,
 715            created_at: Some(updated_at),
 716            folder_paths,
 717        }
 718    }
 719
 720    fn init_test(cx: &mut TestAppContext) {
 721        cx.update(|cx| {
 722            let settings_store = settings::SettingsStore::test(cx);
 723            cx.set_global(settings_store);
 724            cx.update_flags(true, vec!["agent-v2".to_string()]);
 725            ThreadMetadataStore::init_global(cx);
 726            ThreadStore::init_global(cx);
 727        });
 728        cx.run_until_parked();
 729    }
 730
 731    #[gpui::test]
 732    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 733        let first_paths = PathList::new(&[Path::new("/project-a")]);
 734        let second_paths = PathList::new(&[Path::new("/project-b")]);
 735        let now = Utc::now();
 736        let older = now - chrono::Duration::seconds(1);
 737
 738        let thread = std::thread::current();
 739        let test_name = thread.name().unwrap_or("unknown_test");
 740        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 741        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 742            &db_name,
 743        )));
 744
 745        db.save(make_metadata(
 746            "session-1",
 747            "First Thread",
 748            now,
 749            first_paths.clone(),
 750        ))
 751        .await
 752        .unwrap();
 753        db.save(make_metadata(
 754            "session-2",
 755            "Second Thread",
 756            older,
 757            second_paths.clone(),
 758        ))
 759        .await
 760        .unwrap();
 761
 762        cx.update(|cx| {
 763            let settings_store = settings::SettingsStore::test(cx);
 764            cx.set_global(settings_store);
 765            cx.update_flags(true, vec!["agent-v2".to_string()]);
 766            ThreadMetadataStore::init_global(cx);
 767        });
 768
 769        cx.run_until_parked();
 770
 771        cx.update(|cx| {
 772            let store = ThreadMetadataStore::global(cx);
 773            let store = store.read(cx);
 774
 775            let entry_ids = store
 776                .entry_ids()
 777                .map(|session_id| session_id.0.to_string())
 778                .collect::<Vec<_>>();
 779            assert_eq!(entry_ids.len(), 2);
 780            assert!(entry_ids.contains(&"session-1".to_string()));
 781            assert!(entry_ids.contains(&"session-2".to_string()));
 782
 783            let first_path_entries = store
 784                .entries_for_path(&first_paths)
 785                .map(|entry| entry.session_id.0.to_string())
 786                .collect::<Vec<_>>();
 787            assert_eq!(first_path_entries, vec!["session-1"]);
 788
 789            let second_path_entries = store
 790                .entries_for_path(&second_paths)
 791                .map(|entry| entry.session_id.0.to_string())
 792                .collect::<Vec<_>>();
 793            assert_eq!(second_path_entries, vec!["session-2"]);
 794        });
 795    }
 796
 797    #[gpui::test]
 798    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 799        init_test(cx);
 800
 801        let first_paths = PathList::new(&[Path::new("/project-a")]);
 802        let second_paths = PathList::new(&[Path::new("/project-b")]);
 803        let initial_time = Utc::now();
 804        let updated_time = initial_time + chrono::Duration::seconds(1);
 805
 806        let initial_metadata = make_metadata(
 807            "session-1",
 808            "First Thread",
 809            initial_time,
 810            first_paths.clone(),
 811        );
 812
 813        let second_metadata = make_metadata(
 814            "session-2",
 815            "Second Thread",
 816            initial_time,
 817            second_paths.clone(),
 818        );
 819
 820        cx.update(|cx| {
 821            let store = ThreadMetadataStore::global(cx);
 822            store.update(cx, |store, cx| {
 823                store.save(initial_metadata, cx);
 824                store.save(second_metadata, cx);
 825            });
 826        });
 827
 828        cx.run_until_parked();
 829
 830        cx.update(|cx| {
 831            let store = ThreadMetadataStore::global(cx);
 832            let store = store.read(cx);
 833
 834            let first_path_entries = store
 835                .entries_for_path(&first_paths)
 836                .map(|entry| entry.session_id.0.to_string())
 837                .collect::<Vec<_>>();
 838            assert_eq!(first_path_entries, vec!["session-1"]);
 839
 840            let second_path_entries = store
 841                .entries_for_path(&second_paths)
 842                .map(|entry| entry.session_id.0.to_string())
 843                .collect::<Vec<_>>();
 844            assert_eq!(second_path_entries, vec!["session-2"]);
 845        });
 846
 847        let moved_metadata = make_metadata(
 848            "session-1",
 849            "First Thread",
 850            updated_time,
 851            second_paths.clone(),
 852        );
 853
 854        cx.update(|cx| {
 855            let store = ThreadMetadataStore::global(cx);
 856            store.update(cx, |store, cx| {
 857                store.save(moved_metadata, cx);
 858            });
 859        });
 860
 861        cx.run_until_parked();
 862
 863        cx.update(|cx| {
 864            let store = ThreadMetadataStore::global(cx);
 865            let store = store.read(cx);
 866
 867            let entry_ids = store
 868                .entry_ids()
 869                .map(|session_id| session_id.0.to_string())
 870                .collect::<Vec<_>>();
 871            assert_eq!(entry_ids.len(), 2);
 872            assert!(entry_ids.contains(&"session-1".to_string()));
 873            assert!(entry_ids.contains(&"session-2".to_string()));
 874
 875            let first_path_entries = store
 876                .entries_for_path(&first_paths)
 877                .map(|entry| entry.session_id.0.to_string())
 878                .collect::<Vec<_>>();
 879            assert!(first_path_entries.is_empty());
 880
 881            let second_path_entries = store
 882                .entries_for_path(&second_paths)
 883                .map(|entry| entry.session_id.0.to_string())
 884                .collect::<Vec<_>>();
 885            assert_eq!(second_path_entries.len(), 2);
 886            assert!(second_path_entries.contains(&"session-1".to_string()));
 887            assert!(second_path_entries.contains(&"session-2".to_string()));
 888        });
 889
 890        cx.update(|cx| {
 891            let store = ThreadMetadataStore::global(cx);
 892            store.update(cx, |store, cx| {
 893                store.delete(acp::SessionId::new("session-2"), cx);
 894            });
 895        });
 896
 897        cx.run_until_parked();
 898
 899        cx.update(|cx| {
 900            let store = ThreadMetadataStore::global(cx);
 901            let store = store.read(cx);
 902
 903            let entry_ids = store
 904                .entry_ids()
 905                .map(|session_id| session_id.0.to_string())
 906                .collect::<Vec<_>>();
 907            assert_eq!(entry_ids, vec!["session-1"]);
 908
 909            let second_path_entries = store
 910                .entries_for_path(&second_paths)
 911                .map(|entry| entry.session_id.0.to_string())
 912                .collect::<Vec<_>>();
 913            assert_eq!(second_path_entries, vec!["session-1"]);
 914        });
 915    }
 916
 917    #[gpui::test]
 918    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
 919        init_test(cx);
 920
 921        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 922        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 923        let now = Utc::now();
 924
 925        let existing_metadata = ThreadMetadata {
 926            session_id: acp::SessionId::new("a-session-0"),
 927            agent_id: agent::ZED_AGENT_ID.clone(),
 928            title: "Existing Metadata".into(),
 929            updated_at: now - chrono::Duration::seconds(10),
 930            created_at: Some(now - chrono::Duration::seconds(10)),
 931            folder_paths: project_a_paths.clone(),
 932            archived: false,
 933        };
 934
 935        cx.update(|cx| {
 936            let store = ThreadMetadataStore::global(cx);
 937            store.update(cx, |store, cx| {
 938                store.save(existing_metadata, cx);
 939            });
 940        });
 941        cx.run_until_parked();
 942
 943        let threads_to_save = vec![
 944            (
 945                "a-session-0",
 946                "Thread A0 From Native Store",
 947                project_a_paths.clone(),
 948                now,
 949            ),
 950            (
 951                "a-session-1",
 952                "Thread A1",
 953                project_a_paths.clone(),
 954                now + chrono::Duration::seconds(1),
 955            ),
 956            (
 957                "b-session-0",
 958                "Thread B0",
 959                project_b_paths.clone(),
 960                now + chrono::Duration::seconds(2),
 961            ),
 962            (
 963                "projectless",
 964                "Projectless",
 965                PathList::default(),
 966                now + chrono::Duration::seconds(3),
 967            ),
 968        ];
 969
 970        for (session_id, title, paths, updated_at) in &threads_to_save {
 971            let save_task = cx.update(|cx| {
 972                let thread_store = ThreadStore::global(cx);
 973                let session_id = session_id.to_string();
 974                let title = title.to_string();
 975                let paths = paths.clone();
 976                thread_store.update(cx, |store, cx| {
 977                    store.save_thread(
 978                        acp::SessionId::new(session_id),
 979                        make_db_thread(&title, *updated_at),
 980                        paths,
 981                        cx,
 982                    )
 983                })
 984            });
 985            save_task.await.unwrap();
 986            cx.run_until_parked();
 987        }
 988
 989        cx.update(|cx| migrate_thread_metadata(cx));
 990        cx.run_until_parked();
 991
 992        let list = cx.update(|cx| {
 993            let store = ThreadMetadataStore::global(cx);
 994            store.read(cx).entries().cloned().collect::<Vec<_>>()
 995        });
 996
 997        assert_eq!(list.len(), 3);
 998        assert!(
 999            list.iter()
1000                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1001        );
1002
1003        let existing_metadata = list
1004            .iter()
1005            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1006            .unwrap();
1007        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1008        assert!(!existing_metadata.archived);
1009
1010        let migrated_session_ids = list
1011            .iter()
1012            .map(|metadata| metadata.session_id.0.as_ref())
1013            .collect::<Vec<_>>();
1014        assert!(migrated_session_ids.contains(&"a-session-1"));
1015        assert!(migrated_session_ids.contains(&"b-session-0"));
1016        assert!(!migrated_session_ids.contains(&"projectless"));
1017
1018        let migrated_entries = list
1019            .iter()
1020            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1021            .collect::<Vec<_>>();
1022        assert!(
1023            migrated_entries
1024                .iter()
1025                .all(|metadata| !metadata.folder_paths.is_empty())
1026        );
1027        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1028    }
1029
1030    #[gpui::test]
1031    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1032        cx: &mut TestAppContext,
1033    ) {
1034        init_test(cx);
1035
1036        let project_paths = PathList::new(&[Path::new("/project-a")]);
1037        let existing_updated_at = Utc::now();
1038
1039        let existing_metadata = ThreadMetadata {
1040            session_id: acp::SessionId::new("existing-session"),
1041            agent_id: agent::ZED_AGENT_ID.clone(),
1042            title: "Existing Metadata".into(),
1043            updated_at: existing_updated_at,
1044            created_at: Some(existing_updated_at),
1045            folder_paths: project_paths.clone(),
1046            archived: false,
1047        };
1048
1049        cx.update(|cx| {
1050            let store = ThreadMetadataStore::global(cx);
1051            store.update(cx, |store, cx| {
1052                store.save(existing_metadata, cx);
1053            });
1054        });
1055        cx.run_until_parked();
1056
1057        let save_task = cx.update(|cx| {
1058            let thread_store = ThreadStore::global(cx);
1059            thread_store.update(cx, |store, cx| {
1060                store.save_thread(
1061                    acp::SessionId::new("existing-session"),
1062                    make_db_thread(
1063                        "Updated Native Thread Title",
1064                        existing_updated_at + chrono::Duration::seconds(1),
1065                    ),
1066                    project_paths.clone(),
1067                    cx,
1068                )
1069            })
1070        });
1071        save_task.await.unwrap();
1072        cx.run_until_parked();
1073
1074        cx.update(|cx| migrate_thread_metadata(cx));
1075        cx.run_until_parked();
1076
1077        let list = cx.update(|cx| {
1078            let store = ThreadMetadataStore::global(cx);
1079            store.read(cx).entries().cloned().collect::<Vec<_>>()
1080        });
1081
1082        assert_eq!(list.len(), 1);
1083        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1084    }
1085
1086    #[gpui::test]
1087    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1088        cx: &mut TestAppContext,
1089    ) {
1090        init_test(cx);
1091
1092        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1093        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1094        let now = Utc::now();
1095
1096        // Create 7 threads for project A and 3 for project B
1097        let mut threads_to_save = Vec::new();
1098        for i in 0..7 {
1099            threads_to_save.push((
1100                format!("a-session-{i}"),
1101                format!("Thread A{i}"),
1102                project_a_paths.clone(),
1103                now + chrono::Duration::seconds(i as i64),
1104            ));
1105        }
1106        for i in 0..3 {
1107            threads_to_save.push((
1108                format!("b-session-{i}"),
1109                format!("Thread B{i}"),
1110                project_b_paths.clone(),
1111                now + chrono::Duration::seconds(i as i64),
1112            ));
1113        }
1114
1115        for (session_id, title, paths, updated_at) in &threads_to_save {
1116            let save_task = cx.update(|cx| {
1117                let thread_store = ThreadStore::global(cx);
1118                let session_id = session_id.to_string();
1119                let title = title.to_string();
1120                let paths = paths.clone();
1121                thread_store.update(cx, |store, cx| {
1122                    store.save_thread(
1123                        acp::SessionId::new(session_id),
1124                        make_db_thread(&title, *updated_at),
1125                        paths,
1126                        cx,
1127                    )
1128                })
1129            });
1130            save_task.await.unwrap();
1131            cx.run_until_parked();
1132        }
1133
1134        cx.update(|cx| migrate_thread_metadata(cx));
1135        cx.run_until_parked();
1136
1137        let list = cx.update(|cx| {
1138            let store = ThreadMetadataStore::global(cx);
1139            store.read(cx).entries().cloned().collect::<Vec<_>>()
1140        });
1141
1142        assert_eq!(list.len(), 10);
1143
1144        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1145        let mut project_a_entries: Vec<_> = list
1146            .iter()
1147            .filter(|m| m.folder_paths == project_a_paths)
1148            .collect();
1149        assert_eq!(project_a_entries.len(), 7);
1150        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1151
1152        for entry in &project_a_entries[..5] {
1153            assert!(
1154                !entry.archived,
1155                "Expected {} to be unarchived (top 5 most recent)",
1156                entry.session_id.0
1157            );
1158        }
1159        for entry in &project_a_entries[5..] {
1160            assert!(
1161                entry.archived,
1162                "Expected {} to be archived (older than top 5)",
1163                entry.session_id.0
1164            );
1165        }
1166
1167        // Project B: all 3 should be unarchived (under the limit)
1168        let project_b_entries: Vec<_> = list
1169            .iter()
1170            .filter(|m| m.folder_paths == project_b_paths)
1171            .collect();
1172        assert_eq!(project_b_entries.len(), 3);
1173        assert!(project_b_entries.iter().all(|m| !m.archived));
1174    }
1175
1176    #[gpui::test]
1177    async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1178        init_test(cx);
1179
1180        let fs = FakeFs::new(cx.executor());
1181        let project = Project::test(fs, None::<&Path>, cx).await;
1182        let connection = Rc::new(StubAgentConnection::new());
1183
1184        let thread = cx
1185            .update(|cx| {
1186                connection
1187                    .clone()
1188                    .new_session(project.clone(), PathList::default(), cx)
1189            })
1190            .await
1191            .unwrap();
1192        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1193
1194        cx.update(|cx| {
1195            thread.update(cx, |thread, cx| {
1196                thread.set_title("Draft Thread".into(), cx).detach();
1197            });
1198        });
1199        cx.run_until_parked();
1200
1201        let metadata_ids = cx.update(|cx| {
1202            ThreadMetadataStore::global(cx)
1203                .read(cx)
1204                .entry_ids()
1205                .collect::<Vec<_>>()
1206        });
1207        assert_eq!(metadata_ids, vec![session_id]);
1208
1209        drop(thread);
1210        cx.update(|_| {});
1211        cx.run_until_parked();
1212        cx.run_until_parked();
1213
1214        let metadata_ids = cx.update(|cx| {
1215            ThreadMetadataStore::global(cx)
1216                .read(cx)
1217                .entry_ids()
1218                .collect::<Vec<_>>()
1219        });
1220        assert!(
1221            metadata_ids.is_empty(),
1222            "expected empty draft thread metadata to be deleted on release"
1223        );
1224    }
1225
1226    #[gpui::test]
1227    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1228        init_test(cx);
1229
1230        let fs = FakeFs::new(cx.executor());
1231        let project = Project::test(fs, None::<&Path>, cx).await;
1232        let connection = Rc::new(StubAgentConnection::new());
1233
1234        let thread = cx
1235            .update(|cx| {
1236                connection
1237                    .clone()
1238                    .new_session(project.clone(), PathList::default(), cx)
1239            })
1240            .await
1241            .unwrap();
1242        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1243
1244        cx.update(|cx| {
1245            thread.update(cx, |thread, cx| {
1246                thread.push_user_content_block(None, "Hello".into(), cx);
1247            });
1248        });
1249        cx.run_until_parked();
1250
1251        let metadata_ids = cx.update(|cx| {
1252            ThreadMetadataStore::global(cx)
1253                .read(cx)
1254                .entry_ids()
1255                .collect::<Vec<_>>()
1256        });
1257        assert_eq!(metadata_ids, vec![session_id.clone()]);
1258
1259        drop(thread);
1260        cx.update(|_| {});
1261        cx.run_until_parked();
1262
1263        let metadata_ids = cx.update(|cx| {
1264            ThreadMetadataStore::global(cx)
1265                .read(cx)
1266                .entry_ids()
1267                .collect::<Vec<_>>()
1268        });
1269        assert_eq!(metadata_ids, vec![session_id]);
1270    }
1271
1272    #[gpui::test]
1273    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1274        init_test(cx);
1275
1276        let fs = FakeFs::new(cx.executor());
1277        let project = Project::test(fs, None::<&Path>, cx).await;
1278        let connection = Rc::new(StubAgentConnection::new());
1279
1280        // Create a regular (non-subagent) AcpThread.
1281        let regular_thread = cx
1282            .update(|cx| {
1283                connection
1284                    .clone()
1285                    .new_session(project.clone(), PathList::default(), cx)
1286            })
1287            .await
1288            .unwrap();
1289
1290        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1291
1292        // Set a title on the regular thread to trigger a save via handle_thread_update.
1293        cx.update(|cx| {
1294            regular_thread.update(cx, |thread, cx| {
1295                thread.set_title("Regular Thread".into(), cx).detach();
1296            });
1297        });
1298        cx.run_until_parked();
1299
1300        // Create a subagent AcpThread
1301        let subagent_session_id = acp::SessionId::new("subagent-session");
1302        let subagent_thread = cx.update(|cx| {
1303            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1304            cx.new(|cx| {
1305                acp_thread::AcpThread::new(
1306                    Some(regular_session_id.clone()),
1307                    Some("Subagent Thread".into()),
1308                    None,
1309                    connection.clone(),
1310                    project.clone(),
1311                    action_log,
1312                    subagent_session_id.clone(),
1313                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1314                    cx,
1315                )
1316            })
1317        });
1318
1319        // Set a title on the subagent thread to trigger handle_thread_update.
1320        cx.update(|cx| {
1321            subagent_thread.update(cx, |thread, cx| {
1322                thread
1323                    .set_title("Subagent Thread Title".into(), cx)
1324                    .detach();
1325            });
1326        });
1327        cx.run_until_parked();
1328
1329        // List all metadata from the store cache.
1330        let list = cx.update(|cx| {
1331            let store = ThreadMetadataStore::global(cx);
1332            store.read(cx).entries().cloned().collect::<Vec<_>>()
1333        });
1334
1335        // The subagent thread should NOT appear in the sidebar metadata.
1336        // Only the regular thread should be listed.
1337        assert_eq!(
1338            list.len(),
1339            1,
1340            "Expected only the regular thread in sidebar metadata, \
1341             but found {} entries (subagent threads are leaking into the sidebar)",
1342            list.len(),
1343        );
1344        assert_eq!(list[0].session_id, regular_session_id);
1345        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1346    }
1347
1348    #[test]
1349    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1350        let now = Utc::now();
1351
1352        let operations = vec![
1353            DbOperation::Upsert(make_metadata(
1354                "session-1",
1355                "First Thread",
1356                now,
1357                PathList::default(),
1358            )),
1359            DbOperation::Delete(acp::SessionId::new("session-1")),
1360        ];
1361
1362        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1363
1364        assert_eq!(deduped.len(), 1);
1365        assert_eq!(
1366            deduped[0],
1367            DbOperation::Delete(acp::SessionId::new("session-1"))
1368        );
1369    }
1370
1371    #[test]
1372    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1373        let now = Utc::now();
1374        let later = now + chrono::Duration::seconds(1);
1375
1376        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1377        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1378
1379        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1380            DbOperation::Upsert(old_metadata),
1381            DbOperation::Upsert(new_metadata.clone()),
1382        ]);
1383
1384        assert_eq!(deduped.len(), 1);
1385        assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1386    }
1387
1388    #[test]
1389    fn test_dedup_db_operations_preserves_distinct_sessions() {
1390        let now = Utc::now();
1391
1392        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1393        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1394        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1395            DbOperation::Upsert(metadata1.clone()),
1396            DbOperation::Upsert(metadata2.clone()),
1397        ]);
1398
1399        assert_eq!(deduped.len(), 2);
1400        assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1401        assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1402    }
1403
1404    #[gpui::test]
1405    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1406        init_test(cx);
1407
1408        let paths = PathList::new(&[Path::new("/project-a")]);
1409        let now = Utc::now();
1410        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1411
1412        cx.update(|cx| {
1413            let store = ThreadMetadataStore::global(cx);
1414            store.update(cx, |store, cx| {
1415                store.save(metadata, cx);
1416            });
1417        });
1418
1419        cx.run_until_parked();
1420
1421        cx.update(|cx| {
1422            let store = ThreadMetadataStore::global(cx);
1423            let store = store.read(cx);
1424
1425            let path_entries = store
1426                .entries_for_path(&paths)
1427                .map(|e| e.session_id.0.to_string())
1428                .collect::<Vec<_>>();
1429            assert_eq!(path_entries, vec!["session-1"]);
1430
1431            let archived = store
1432                .archived_entries()
1433                .map(|e| e.session_id.0.to_string())
1434                .collect::<Vec<_>>();
1435            assert!(archived.is_empty());
1436        });
1437
1438        cx.update(|cx| {
1439            let store = ThreadMetadataStore::global(cx);
1440            store.update(cx, |store, cx| {
1441                store.archive(&acp::SessionId::new("session-1"), cx);
1442            });
1443        });
1444
1445        cx.run_until_parked();
1446
1447        cx.update(|cx| {
1448            let store = ThreadMetadataStore::global(cx);
1449            let store = store.read(cx);
1450
1451            let path_entries = store
1452                .entries_for_path(&paths)
1453                .map(|e| e.session_id.0.to_string())
1454                .collect::<Vec<_>>();
1455            assert!(path_entries.is_empty());
1456
1457            let archived = store.archived_entries().collect::<Vec<_>>();
1458            assert_eq!(archived.len(), 1);
1459            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1460            assert!(archived[0].archived);
1461        });
1462
1463        cx.update(|cx| {
1464            let store = ThreadMetadataStore::global(cx);
1465            store.update(cx, |store, cx| {
1466                store.unarchive(&acp::SessionId::new("session-1"), cx);
1467            });
1468        });
1469
1470        cx.run_until_parked();
1471
1472        cx.update(|cx| {
1473            let store = ThreadMetadataStore::global(cx);
1474            let store = store.read(cx);
1475
1476            let path_entries = store
1477                .entries_for_path(&paths)
1478                .map(|e| e.session_id.0.to_string())
1479                .collect::<Vec<_>>();
1480            assert_eq!(path_entries, vec!["session-1"]);
1481
1482            let archived = store
1483                .archived_entries()
1484                .map(|e| e.session_id.0.to_string())
1485                .collect::<Vec<_>>();
1486            assert!(archived.is_empty());
1487        });
1488    }
1489
1490    #[gpui::test]
1491    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1492        init_test(cx);
1493
1494        let paths = PathList::new(&[Path::new("/project-a")]);
1495        let now = Utc::now();
1496
1497        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1498        let metadata2 = make_metadata(
1499            "session-2",
1500            "Archived Thread",
1501            now - chrono::Duration::seconds(1),
1502            paths.clone(),
1503        );
1504
1505        cx.update(|cx| {
1506            let store = ThreadMetadataStore::global(cx);
1507            store.update(cx, |store, cx| {
1508                store.save(metadata1, cx);
1509                store.save(metadata2, cx);
1510            });
1511        });
1512
1513        cx.run_until_parked();
1514
1515        cx.update(|cx| {
1516            let store = ThreadMetadataStore::global(cx);
1517            store.update(cx, |store, cx| {
1518                store.archive(&acp::SessionId::new("session-2"), cx);
1519            });
1520        });
1521
1522        cx.run_until_parked();
1523
1524        cx.update(|cx| {
1525            let store = ThreadMetadataStore::global(cx);
1526            let store = store.read(cx);
1527
1528            let path_entries = store
1529                .entries_for_path(&paths)
1530                .map(|e| e.session_id.0.to_string())
1531                .collect::<Vec<_>>();
1532            assert_eq!(path_entries, vec!["session-1"]);
1533
1534            let all_entries = store
1535                .entries()
1536                .map(|e| e.session_id.0.to_string())
1537                .collect::<Vec<_>>();
1538            assert_eq!(all_entries.len(), 2);
1539            assert!(all_entries.contains(&"session-1".to_string()));
1540            assert!(all_entries.contains(&"session-2".to_string()));
1541
1542            let archived = store
1543                .archived_entries()
1544                .map(|e| e.session_id.0.to_string())
1545                .collect::<Vec<_>>();
1546            assert_eq!(archived, vec!["session-2"]);
1547        });
1548    }
1549
1550    #[gpui::test]
1551    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1552        init_test(cx);
1553
1554        let paths = PathList::new(&[Path::new("/project-a")]);
1555        let now = Utc::now();
1556
1557        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1558        let m2 = make_metadata(
1559            "session-2",
1560            "Thread Two",
1561            now - chrono::Duration::seconds(1),
1562            paths.clone(),
1563        );
1564        let m3 = make_metadata(
1565            "session-3",
1566            "Thread Three",
1567            now - chrono::Duration::seconds(2),
1568            paths,
1569        );
1570
1571        cx.update(|cx| {
1572            let store = ThreadMetadataStore::global(cx);
1573            store.update(cx, |store, cx| {
1574                store.save_all(vec![m1, m2, m3], cx);
1575            });
1576        });
1577
1578        cx.run_until_parked();
1579
1580        cx.update(|cx| {
1581            let store = ThreadMetadataStore::global(cx);
1582            let store = store.read(cx);
1583
1584            let all_entries = store
1585                .entries()
1586                .map(|e| e.session_id.0.to_string())
1587                .collect::<Vec<_>>();
1588            assert_eq!(all_entries.len(), 3);
1589            assert!(all_entries.contains(&"session-1".to_string()));
1590            assert!(all_entries.contains(&"session-2".to_string()));
1591            assert!(all_entries.contains(&"session-3".to_string()));
1592
1593            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1594            assert_eq!(entry_ids.len(), 3);
1595        });
1596    }
1597
1598    #[gpui::test]
1599    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1600        init_test(cx);
1601
1602        let paths = PathList::new(&[Path::new("/project-a")]);
1603        let now = Utc::now();
1604        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1605
1606        cx.update(|cx| {
1607            let store = ThreadMetadataStore::global(cx);
1608            store.update(cx, |store, cx| {
1609                store.save(metadata, cx);
1610            });
1611        });
1612
1613        cx.run_until_parked();
1614
1615        cx.update(|cx| {
1616            let store = ThreadMetadataStore::global(cx);
1617            store.update(cx, |store, cx| {
1618                store.archive(&acp::SessionId::new("session-1"), cx);
1619            });
1620        });
1621
1622        cx.run_until_parked();
1623
1624        cx.update(|cx| {
1625            let store = ThreadMetadataStore::global(cx);
1626            store.update(cx, |store, cx| {
1627                let _ = store.reload(cx);
1628            });
1629        });
1630
1631        cx.run_until_parked();
1632
1633        cx.update(|cx| {
1634            let store = ThreadMetadataStore::global(cx);
1635            let store = store.read(cx);
1636
1637            let thread = store
1638                .entries()
1639                .find(|e| e.session_id.0.as_ref() == "session-1")
1640                .expect("thread should exist after reload");
1641            assert!(thread.archived);
1642
1643            let path_entries = store
1644                .entries_for_path(&paths)
1645                .map(|e| e.session_id.0.to_string())
1646                .collect::<Vec<_>>();
1647            assert!(path_entries.is_empty());
1648
1649            let archived = store
1650                .archived_entries()
1651                .map(|e| e.session_id.0.to_string())
1652                .collect::<Vec<_>>();
1653            assert_eq!(archived, vec!["session-1"]);
1654        });
1655    }
1656
1657    #[gpui::test]
1658    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1659        init_test(cx);
1660
1661        cx.run_until_parked();
1662
1663        cx.update(|cx| {
1664            let store = ThreadMetadataStore::global(cx);
1665            store.update(cx, |store, cx| {
1666                store.archive(&acp::SessionId::new("nonexistent"), cx);
1667            });
1668        });
1669
1670        cx.run_until_parked();
1671
1672        cx.update(|cx| {
1673            let store = ThreadMetadataStore::global(cx);
1674            let store = store.read(cx);
1675
1676            assert!(store.is_empty());
1677            assert_eq!(store.entries().count(), 0);
1678            assert_eq!(store.archived_entries().count(), 0);
1679        });
1680    }
1681
1682    #[gpui::test]
1683    async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1684        init_test(cx);
1685
1686        let paths = PathList::new(&[Path::new("/project-a")]);
1687        let now = Utc::now();
1688        let metadata = make_metadata("session-1", "Thread 1", now, paths);
1689        let session_id = metadata.session_id.clone();
1690
1691        cx.update(|cx| {
1692            let store = ThreadMetadataStore::global(cx);
1693            store.update(cx, |store, cx| {
1694                store.save(metadata.clone(), cx);
1695                store.archive(&session_id, cx);
1696            });
1697        });
1698
1699        cx.run_until_parked();
1700
1701        cx.update(|cx| {
1702            let store = ThreadMetadataStore::global(cx);
1703            let store = store.read(cx);
1704
1705            let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1706            pretty_assertions::assert_eq!(
1707                entries,
1708                vec![ThreadMetadata {
1709                    archived: true,
1710                    ..metadata
1711                }]
1712            );
1713        });
1714    }
1715}