thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use agent::{ThreadStore, ZED_AGENT_ID};
   4use agent_client_protocol as acp;
   5use anyhow::Context as _;
   6use chrono::{DateTime, Utc};
   7use collections::{HashMap, HashSet};
   8use db::{
   9    sqlez::{
  10        bindable::Column, domain::Domain, statement::Statement,
  11        thread_safe_connection::ThreadSafeConnection,
  12    },
  13    sqlez_macros::sql,
  14};
  15use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  16use futures::{FutureExt as _, future::Shared};
  17use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  18use project::AgentId;
  19use ui::{App, Context, SharedString};
  20use util::ResultExt as _;
  21use workspace::PathList;
  22
  23use crate::DEFAULT_THREAD_TITLE;
  24
  25pub fn init(cx: &mut App) {
  26    ThreadMetadataStore::init_global(cx);
  27
  28    if cx.has_flag::<AgentV2FeatureFlag>() {
  29        migrate_thread_metadata(cx);
  30    }
  31    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  32        if has_flag {
  33            migrate_thread_metadata(cx);
  34        }
  35    })
  36    .detach();
  37}
  38
  39/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  40/// We skip migrating threads that do not have a project.
  41///
  42/// TODO: Remove this after N weeks of shipping the sidebar
  43fn migrate_thread_metadata(cx: &mut App) {
  44    let store = ThreadMetadataStore::global(cx);
  45    let db = store.read(cx).db.clone();
  46
  47    cx.spawn(async move |cx| {
  48        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  49
  50        let is_first_migration = existing_entries.is_empty();
  51
  52        let mut to_migrate = store.read_with(cx, |_store, cx| {
  53            ThreadStore::global(cx)
  54                .read(cx)
  55                .entries()
  56                .filter_map(|entry| {
  57                    if existing_entries.contains(&entry.id.0) || entry.folder_paths.is_empty() {
  58                        return None;
  59                    }
  60
  61                    Some(ThreadMetadata {
  62                        session_id: entry.id,
  63                        agent_id: ZED_AGENT_ID.clone(),
  64                        title: entry.title,
  65                        updated_at: entry.updated_at,
  66                        created_at: entry.created_at,
  67                        folder_paths: entry.folder_paths,
  68                        archived: true,
  69                    })
  70                })
  71                .collect::<Vec<_>>()
  72        });
  73
  74        if to_migrate.is_empty() {
  75            return anyhow::Ok(());
  76        }
  77
  78        // On the first migration (no entries in DB yet), keep the 5 most
  79        // recent threads per project unarchived.
  80        if is_first_migration {
  81            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  82            for entry in &mut to_migrate {
  83                per_project
  84                    .entry(entry.folder_paths.clone())
  85                    .or_default()
  86                    .push(entry);
  87            }
  88            for entries in per_project.values_mut() {
  89                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  90                for entry in entries.iter_mut().take(5) {
  91                    entry.archived = false;
  92                }
  93            }
  94        }
  95
  96        log::info!("Migrating {} thread store entries", to_migrate.len());
  97
  98        // Manually save each entry to the database and call reload, otherwise
  99        // we'll end up triggering lots of reloads after each save
 100        for entry in to_migrate {
 101            db.save(entry).await?;
 102        }
 103
 104        log::info!("Finished migrating thread store entries");
 105
 106        let _ = store.update(cx, |store, cx| store.reload(cx));
 107        anyhow::Ok(())
 108    })
 109    .detach_and_log_err(cx);
 110}
 111
 112struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 113impl Global for GlobalThreadMetadataStore {}
 114
 115/// Lightweight metadata for any thread (native or ACP), enough to populate
 116/// the sidebar list and route to the correct load path when clicked.
 117#[derive(Debug, Clone, PartialEq)]
 118pub struct ThreadMetadata {
 119    pub session_id: acp::SessionId,
 120    pub agent_id: AgentId,
 121    pub title: SharedString,
 122    pub updated_at: DateTime<Utc>,
 123    pub created_at: Option<DateTime<Utc>>,
 124    pub folder_paths: PathList,
 125    pub archived: bool,
 126}
 127
 128impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 129    fn from(meta: &ThreadMetadata) -> Self {
 130        Self {
 131            session_id: meta.session_id.clone(),
 132            work_dirs: Some(meta.folder_paths.clone()),
 133            title: Some(meta.title.clone()),
 134            updated_at: Some(meta.updated_at),
 135            created_at: meta.created_at,
 136            meta: None,
 137        }
 138    }
 139}
 140
 141impl ThreadMetadata {
 142    pub fn from_thread(
 143        is_archived: bool,
 144        thread: &Entity<acp_thread::AcpThread>,
 145        cx: &App,
 146    ) -> Self {
 147        let thread_ref = thread.read(cx);
 148        let session_id = thread_ref.session_id().clone();
 149        let title = thread_ref
 150            .title()
 151            .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 152        let updated_at = Utc::now();
 153
 154        let agent_id = thread_ref.connection().agent_id();
 155
 156        let folder_paths = {
 157            let project = thread_ref.project().read(cx);
 158            let paths: Vec<Arc<Path>> = project
 159                .visible_worktrees(cx)
 160                .map(|worktree| worktree.read(cx).abs_path())
 161                .collect();
 162            PathList::new(&paths)
 163        };
 164
 165        Self {
 166            session_id,
 167            agent_id,
 168            title,
 169            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 170            updated_at,
 171            folder_paths,
 172            archived: is_archived,
 173        }
 174    }
 175}
 176
 177/// The store holds all metadata needed to show threads in the sidebar/the archive.
 178///
 179/// Automatically listens to AcpThread events and updates metadata if it has changed.
 180pub struct ThreadMetadataStore {
 181    db: ThreadMetadataDb,
 182    threads: HashMap<acp::SessionId, ThreadMetadata>,
 183    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 184    reload_task: Option<Shared<Task<()>>>,
 185    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 186    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 187    _db_operations_task: Task<()>,
 188}
 189
 190#[derive(Debug, PartialEq)]
 191enum DbOperation {
 192    Insert(ThreadMetadata),
 193    Delete(acp::SessionId),
 194}
 195
 196impl DbOperation {
 197    fn id(&self) -> &acp::SessionId {
 198        match self {
 199            DbOperation::Insert(thread) => &thread.session_id,
 200            DbOperation::Delete(session_id) => session_id,
 201        }
 202    }
 203}
 204
 205impl ThreadMetadataStore {
 206    #[cfg(not(any(test, feature = "test-support")))]
 207    pub fn init_global(cx: &mut App) {
 208        if cx.has_global::<Self>() {
 209            return;
 210        }
 211
 212        let db = ThreadMetadataDb::global(cx);
 213        let thread_store = cx.new(|cx| Self::new(db, cx));
 214        cx.set_global(GlobalThreadMetadataStore(thread_store));
 215    }
 216
 217    #[cfg(any(test, feature = "test-support"))]
 218    pub fn init_global(cx: &mut App) {
 219        let thread = std::thread::current();
 220        let test_name = thread.name().unwrap_or("unknown_test");
 221        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 222        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 223        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 224        cx.set_global(GlobalThreadMetadataStore(thread_store));
 225    }
 226
 227    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 228        cx.try_global::<GlobalThreadMetadataStore>()
 229            .map(|store| store.0.clone())
 230    }
 231
 232    pub fn global(cx: &App) -> Entity<Self> {
 233        cx.global::<GlobalThreadMetadataStore>().0.clone()
 234    }
 235
 236    pub fn is_empty(&self) -> bool {
 237        self.threads.is_empty()
 238    }
 239
 240    /// Returns all thread IDs.
 241    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 242        self.threads.keys().cloned()
 243    }
 244
 245    /// Returns the metadata for a specific thread, if it exists.
 246    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 247        self.threads.get(session_id)
 248    }
 249
 250    /// Returns all threads.
 251    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 252        self.threads.values().cloned()
 253    }
 254
 255    /// Returns all archived threads.
 256    pub fn archived_entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 257        self.entries().filter(|t| t.archived)
 258    }
 259
 260    /// Returns all threads for the given path list, excluding archived threads.
 261    pub fn entries_for_path(
 262        &self,
 263        path_list: &PathList,
 264    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 265        self.threads_by_paths
 266            .get(path_list)
 267            .into_iter()
 268            .flatten()
 269            .filter(|s| !s.archived)
 270            .cloned()
 271    }
 272
 273    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 274        let db = self.db.clone();
 275        self.reload_task.take();
 276
 277        let list_task = cx
 278            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 279
 280        let reload_task = cx
 281            .spawn(async move |this, cx| {
 282                let Some(rows) = list_task.await.log_err() else {
 283                    return;
 284                };
 285
 286                this.update(cx, |this, cx| {
 287                    this.threads.clear();
 288                    this.threads_by_paths.clear();
 289
 290                    for row in rows {
 291                        this.threads_by_paths
 292                            .entry(row.folder_paths.clone())
 293                            .or_default()
 294                            .push(row.clone());
 295                        this.threads.insert(row.session_id.clone(), row);
 296                    }
 297
 298                    cx.notify();
 299                })
 300                .ok();
 301            })
 302            .shared();
 303        self.reload_task = Some(reload_task.clone());
 304        reload_task
 305    }
 306
 307    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 308        if !cx.has_flag::<AgentV2FeatureFlag>() {
 309            return;
 310        }
 311
 312        for metadata in metadata {
 313            self.pending_thread_ops_tx
 314                .try_send(DbOperation::Insert(metadata))
 315                .log_err();
 316        }
 317    }
 318
 319    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 320        if !cx.has_flag::<AgentV2FeatureFlag>() {
 321            return;
 322        }
 323
 324        self.pending_thread_ops_tx
 325            .try_send(DbOperation::Insert(metadata))
 326            .log_err();
 327    }
 328
 329    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 330        self.update_archived(session_id, true, cx);
 331    }
 332
 333    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 334        self.update_archived(session_id, false, cx);
 335    }
 336
 337    fn update_archived(
 338        &mut self,
 339        session_id: &acp::SessionId,
 340        archived: bool,
 341        cx: &mut Context<Self>,
 342    ) {
 343        if !cx.has_flag::<AgentV2FeatureFlag>() {
 344            return;
 345        }
 346
 347        if let Some(thread) = self.threads.get(session_id) {
 348            self.save(
 349                ThreadMetadata {
 350                    archived,
 351                    ..thread.clone()
 352                },
 353                cx,
 354            );
 355            cx.notify();
 356        }
 357    }
 358
 359    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 360        if !cx.has_flag::<AgentV2FeatureFlag>() {
 361            return;
 362        }
 363
 364        self.pending_thread_ops_tx
 365            .try_send(DbOperation::Delete(session_id))
 366            .log_err();
 367    }
 368
 369    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 370        let weak_store = cx.weak_entity();
 371
 372        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 373            // Don't track subagent threads in the sidebar.
 374            if thread.parent_session_id().is_some() {
 375                return;
 376            }
 377
 378            let thread_entity = cx.entity();
 379
 380            cx.on_release({
 381                let weak_store = weak_store.clone();
 382                move |thread, cx| {
 383                    weak_store
 384                        .update(cx, |store, cx| {
 385                            let session_id = thread.session_id().clone();
 386                            store.session_subscriptions.remove(&session_id);
 387                            if thread.entries().is_empty() {
 388                                // Empty threads can be unloaded without ever being
 389                                // durably persisted by the underlying agent.
 390                                store.delete(session_id, cx);
 391                            }
 392                        })
 393                        .ok();
 394                }
 395            })
 396            .detach();
 397
 398            weak_store
 399                .update(cx, |this, cx| {
 400                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 401                    this.session_subscriptions
 402                        .insert(thread.session_id().clone(), subscription);
 403                })
 404                .ok();
 405        })
 406        .detach();
 407
 408        let (tx, rx) = smol::channel::unbounded();
 409        let _db_operations_task = cx.spawn({
 410            let db = db.clone();
 411            async move |this, cx| {
 412                while let Ok(first_update) = rx.recv().await {
 413                    let mut updates = vec![first_update];
 414                    while let Ok(update) = rx.try_recv() {
 415                        updates.push(update);
 416                    }
 417                    let updates = Self::dedup_db_operations(updates);
 418                    for operation in updates {
 419                        match operation {
 420                            DbOperation::Insert(metadata) => {
 421                                db.save(metadata).await.log_err();
 422                            }
 423                            DbOperation::Delete(session_id) => {
 424                                db.delete(session_id).await.log_err();
 425                            }
 426                        }
 427                    }
 428
 429                    this.update(cx, |this, cx| this.reload(cx)).ok();
 430                }
 431            }
 432        });
 433
 434        let mut this = Self {
 435            db,
 436            threads: HashMap::default(),
 437            threads_by_paths: HashMap::default(),
 438            reload_task: None,
 439            session_subscriptions: HashMap::default(),
 440            pending_thread_ops_tx: tx,
 441            _db_operations_task,
 442        };
 443        let _ = this.reload(cx);
 444        this
 445    }
 446
 447    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 448        let mut ops = HashMap::default();
 449        for operation in operations.into_iter().rev() {
 450            if ops.contains_key(operation.id()) {
 451                continue;
 452            }
 453            ops.insert(operation.id().clone(), operation);
 454        }
 455        ops.into_values().collect()
 456    }
 457
 458    fn handle_thread_update(
 459        &mut self,
 460        thread: Entity<acp_thread::AcpThread>,
 461        event: &acp_thread::AcpThreadEvent,
 462        cx: &mut Context<Self>,
 463    ) {
 464        // Don't track subagent threads in the sidebar.
 465        if thread.read(cx).parent_session_id().is_some() {
 466            return;
 467        }
 468
 469        match event {
 470            acp_thread::AcpThreadEvent::NewEntry
 471            | acp_thread::AcpThreadEvent::TitleUpdated
 472            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 473            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 474            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 475            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 476            | acp_thread::AcpThreadEvent::Retry(_)
 477            | acp_thread::AcpThreadEvent::Stopped(_)
 478            | acp_thread::AcpThreadEvent::Error
 479            | acp_thread::AcpThreadEvent::LoadError(_)
 480            | acp_thread::AcpThreadEvent::Refusal => {
 481                let is_archived = self
 482                    .threads
 483                    .get(thread.read(cx).session_id())
 484                    .map(|t| t.archived)
 485                    .unwrap_or(false);
 486                let metadata = ThreadMetadata::from_thread(is_archived, &thread, cx);
 487                self.save(metadata, cx);
 488            }
 489            _ => {}
 490        }
 491    }
 492}
 493
 494impl Global for ThreadMetadataStore {}
 495
 496struct ThreadMetadataDb(ThreadSafeConnection);
 497
 498impl Domain for ThreadMetadataDb {
 499    const NAME: &str = stringify!(ThreadMetadataDb);
 500
 501    const MIGRATIONS: &[&str] = &[
 502        sql!(
 503            CREATE TABLE IF NOT EXISTS sidebar_threads(
 504                session_id TEXT PRIMARY KEY,
 505                agent_id TEXT,
 506                title TEXT NOT NULL,
 507                updated_at TEXT NOT NULL,
 508                created_at TEXT,
 509                folder_paths TEXT,
 510                folder_paths_order TEXT
 511            ) STRICT;
 512        ),
 513        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 514    ];
 515}
 516
 517db::static_connection!(ThreadMetadataDb, []);
 518
 519impl ThreadMetadataDb {
 520    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 521        self.select::<Arc<str>>(
 522            "SELECT session_id FROM sidebar_threads \
 523             ORDER BY updated_at DESC",
 524        )?()
 525    }
 526
 527    /// List all sidebar thread metadata, ordered by updated_at descending.
 528    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 529        self.select::<ThreadMetadata>(
 530            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
 531             FROM sidebar_threads \
 532             ORDER BY updated_at DESC"
 533        )?()
 534    }
 535
 536    /// Upsert metadata for a thread.
 537    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 538        let id = row.session_id.0.clone();
 539        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 540            None
 541        } else {
 542            Some(row.agent_id.to_string())
 543        };
 544        let title = row.title.to_string();
 545        let updated_at = row.updated_at.to_rfc3339();
 546        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 547        let serialized = row.folder_paths.serialize();
 548        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 549            (None, None)
 550        } else {
 551            (Some(serialized.paths), Some(serialized.order))
 552        };
 553        let archived = row.archived;
 554
 555        self.write(move |conn| {
 556            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
 557                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
 558                       ON CONFLICT(session_id) DO UPDATE SET \
 559                           agent_id = excluded.agent_id, \
 560                           title = excluded.title, \
 561                           updated_at = excluded.updated_at, \
 562                           folder_paths = excluded.folder_paths, \
 563                           folder_paths_order = excluded.folder_paths_order, \
 564                           archived = excluded.archived";
 565            let mut stmt = Statement::prepare(conn, sql)?;
 566            let mut i = stmt.bind(&id, 1)?;
 567            i = stmt.bind(&agent_id, i)?;
 568            i = stmt.bind(&title, i)?;
 569            i = stmt.bind(&updated_at, i)?;
 570            i = stmt.bind(&created_at, i)?;
 571            i = stmt.bind(&folder_paths, i)?;
 572            i = stmt.bind(&folder_paths_order, i)?;
 573            stmt.bind(&archived, i)?;
 574            stmt.exec()
 575        })
 576        .await
 577    }
 578
 579    /// Delete metadata for a single thread.
 580    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 581        let id = session_id.0.clone();
 582        self.write(move |conn| {
 583            let mut stmt =
 584                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 585            stmt.bind(&id, 1)?;
 586            stmt.exec()
 587        })
 588        .await
 589    }
 590}
 591
 592impl Column for ThreadMetadata {
 593    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 594        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 595        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 596        let (title, next): (String, i32) = Column::column(statement, next)?;
 597        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 598        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 599        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 600        let (folder_paths_order_str, next): (Option<String>, i32) =
 601            Column::column(statement, next)?;
 602        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 603
 604        let agent_id = agent_id
 605            .map(|id| AgentId::new(id))
 606            .unwrap_or(ZED_AGENT_ID.clone());
 607
 608        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 609        let created_at = created_at_str
 610            .as_deref()
 611            .map(DateTime::parse_from_rfc3339)
 612            .transpose()?
 613            .map(|dt| dt.with_timezone(&Utc));
 614
 615        let folder_paths = folder_paths_str
 616            .map(|paths| {
 617                PathList::deserialize(&util::path_list::SerializedPathList {
 618                    paths,
 619                    order: folder_paths_order_str.unwrap_or_default(),
 620                })
 621            })
 622            .unwrap_or_default();
 623
 624        Ok((
 625            ThreadMetadata {
 626                session_id: acp::SessionId::new(id),
 627                agent_id,
 628                title: title.into(),
 629                updated_at,
 630                created_at,
 631                folder_paths,
 632                archived,
 633            },
 634            next,
 635        ))
 636    }
 637}
 638
 639#[cfg(test)]
 640mod tests {
 641    use super::*;
 642    use acp_thread::{AgentConnection, StubAgentConnection};
 643    use action_log::ActionLog;
 644    use agent::DbThread;
 645    use agent_client_protocol as acp;
 646    use feature_flags::FeatureFlagAppExt;
 647    use gpui::TestAppContext;
 648    use project::FakeFs;
 649    use project::Project;
 650    use std::path::Path;
 651    use std::rc::Rc;
 652
 653    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 654        DbThread {
 655            title: title.to_string().into(),
 656            messages: Vec::new(),
 657            updated_at,
 658            detailed_summary: None,
 659            initial_project_snapshot: None,
 660            cumulative_token_usage: Default::default(),
 661            request_token_usage: Default::default(),
 662            model: None,
 663            profile: None,
 664            imported: false,
 665            subagent_context: None,
 666            speed: None,
 667            thinking_enabled: false,
 668            thinking_effort: None,
 669            draft_prompt: None,
 670            ui_scroll_position: None,
 671        }
 672    }
 673
 674    fn make_metadata(
 675        session_id: &str,
 676        title: &str,
 677        updated_at: DateTime<Utc>,
 678        folder_paths: PathList,
 679    ) -> ThreadMetadata {
 680        ThreadMetadata {
 681            archived: false,
 682            session_id: acp::SessionId::new(session_id),
 683            agent_id: agent::ZED_AGENT_ID.clone(),
 684            title: title.to_string().into(),
 685            updated_at,
 686            created_at: Some(updated_at),
 687            folder_paths,
 688        }
 689    }
 690
 691    #[gpui::test]
 692    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 693        let first_paths = PathList::new(&[Path::new("/project-a")]);
 694        let second_paths = PathList::new(&[Path::new("/project-b")]);
 695        let now = Utc::now();
 696        let older = now - chrono::Duration::seconds(1);
 697
 698        let thread = std::thread::current();
 699        let test_name = thread.name().unwrap_or("unknown_test");
 700        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 701        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 702            &db_name,
 703        )));
 704
 705        db.save(make_metadata(
 706            "session-1",
 707            "First Thread",
 708            now,
 709            first_paths.clone(),
 710        ))
 711        .await
 712        .unwrap();
 713        db.save(make_metadata(
 714            "session-2",
 715            "Second Thread",
 716            older,
 717            second_paths.clone(),
 718        ))
 719        .await
 720        .unwrap();
 721
 722        cx.update(|cx| {
 723            let settings_store = settings::SettingsStore::test(cx);
 724            cx.set_global(settings_store);
 725            cx.update_flags(true, vec!["agent-v2".to_string()]);
 726            ThreadMetadataStore::init_global(cx);
 727        });
 728
 729        cx.run_until_parked();
 730
 731        cx.update(|cx| {
 732            let store = ThreadMetadataStore::global(cx);
 733            let store = store.read(cx);
 734
 735            let entry_ids = store
 736                .entry_ids()
 737                .map(|session_id| session_id.0.to_string())
 738                .collect::<Vec<_>>();
 739            assert_eq!(entry_ids.len(), 2);
 740            assert!(entry_ids.contains(&"session-1".to_string()));
 741            assert!(entry_ids.contains(&"session-2".to_string()));
 742
 743            let first_path_entries = store
 744                .entries_for_path(&first_paths)
 745                .map(|entry| entry.session_id.0.to_string())
 746                .collect::<Vec<_>>();
 747            assert_eq!(first_path_entries, vec!["session-1"]);
 748
 749            let second_path_entries = store
 750                .entries_for_path(&second_paths)
 751                .map(|entry| entry.session_id.0.to_string())
 752                .collect::<Vec<_>>();
 753            assert_eq!(second_path_entries, vec!["session-2"]);
 754        });
 755    }
 756
 757    #[gpui::test]
 758    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 759        cx.update(|cx| {
 760            let settings_store = settings::SettingsStore::test(cx);
 761            cx.set_global(settings_store);
 762            cx.update_flags(true, vec!["agent-v2".to_string()]);
 763            ThreadMetadataStore::init_global(cx);
 764        });
 765
 766        let first_paths = PathList::new(&[Path::new("/project-a")]);
 767        let second_paths = PathList::new(&[Path::new("/project-b")]);
 768        let initial_time = Utc::now();
 769        let updated_time = initial_time + chrono::Duration::seconds(1);
 770
 771        let initial_metadata = make_metadata(
 772            "session-1",
 773            "First Thread",
 774            initial_time,
 775            first_paths.clone(),
 776        );
 777
 778        let second_metadata = make_metadata(
 779            "session-2",
 780            "Second Thread",
 781            initial_time,
 782            second_paths.clone(),
 783        );
 784
 785        cx.update(|cx| {
 786            let store = ThreadMetadataStore::global(cx);
 787            store.update(cx, |store, cx| {
 788                store.save(initial_metadata, cx);
 789                store.save(second_metadata, cx);
 790            });
 791        });
 792
 793        cx.run_until_parked();
 794
 795        cx.update(|cx| {
 796            let store = ThreadMetadataStore::global(cx);
 797            let store = store.read(cx);
 798
 799            let first_path_entries = store
 800                .entries_for_path(&first_paths)
 801                .map(|entry| entry.session_id.0.to_string())
 802                .collect::<Vec<_>>();
 803            assert_eq!(first_path_entries, vec!["session-1"]);
 804
 805            let second_path_entries = store
 806                .entries_for_path(&second_paths)
 807                .map(|entry| entry.session_id.0.to_string())
 808                .collect::<Vec<_>>();
 809            assert_eq!(second_path_entries, vec!["session-2"]);
 810        });
 811
 812        let moved_metadata = make_metadata(
 813            "session-1",
 814            "First Thread",
 815            updated_time,
 816            second_paths.clone(),
 817        );
 818
 819        cx.update(|cx| {
 820            let store = ThreadMetadataStore::global(cx);
 821            store.update(cx, |store, cx| {
 822                store.save(moved_metadata, cx);
 823            });
 824        });
 825
 826        cx.run_until_parked();
 827
 828        cx.update(|cx| {
 829            let store = ThreadMetadataStore::global(cx);
 830            let store = store.read(cx);
 831
 832            let entry_ids = store
 833                .entry_ids()
 834                .map(|session_id| session_id.0.to_string())
 835                .collect::<Vec<_>>();
 836            assert_eq!(entry_ids.len(), 2);
 837            assert!(entry_ids.contains(&"session-1".to_string()));
 838            assert!(entry_ids.contains(&"session-2".to_string()));
 839
 840            let first_path_entries = store
 841                .entries_for_path(&first_paths)
 842                .map(|entry| entry.session_id.0.to_string())
 843                .collect::<Vec<_>>();
 844            assert!(first_path_entries.is_empty());
 845
 846            let second_path_entries = store
 847                .entries_for_path(&second_paths)
 848                .map(|entry| entry.session_id.0.to_string())
 849                .collect::<Vec<_>>();
 850            assert_eq!(second_path_entries.len(), 2);
 851            assert!(second_path_entries.contains(&"session-1".to_string()));
 852            assert!(second_path_entries.contains(&"session-2".to_string()));
 853        });
 854
 855        cx.update(|cx| {
 856            let store = ThreadMetadataStore::global(cx);
 857            store.update(cx, |store, cx| {
 858                store.delete(acp::SessionId::new("session-2"), cx);
 859            });
 860        });
 861
 862        cx.run_until_parked();
 863
 864        cx.update(|cx| {
 865            let store = ThreadMetadataStore::global(cx);
 866            let store = store.read(cx);
 867
 868            let entry_ids = store
 869                .entry_ids()
 870                .map(|session_id| session_id.0.to_string())
 871                .collect::<Vec<_>>();
 872            assert_eq!(entry_ids, vec!["session-1"]);
 873
 874            let second_path_entries = store
 875                .entries_for_path(&second_paths)
 876                .map(|entry| entry.session_id.0.to_string())
 877                .collect::<Vec<_>>();
 878            assert_eq!(second_path_entries, vec!["session-1"]);
 879        });
 880    }
 881
 882    #[gpui::test]
 883    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
 884        cx.update(|cx| {
 885            ThreadStore::init_global(cx);
 886            ThreadMetadataStore::init_global(cx);
 887        });
 888
 889        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 890        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 891        let now = Utc::now();
 892
 893        let existing_metadata = ThreadMetadata {
 894            session_id: acp::SessionId::new("a-session-0"),
 895            agent_id: agent::ZED_AGENT_ID.clone(),
 896            title: "Existing Metadata".into(),
 897            updated_at: now - chrono::Duration::seconds(10),
 898            created_at: Some(now - chrono::Duration::seconds(10)),
 899            folder_paths: project_a_paths.clone(),
 900            archived: false,
 901        };
 902
 903        cx.update(|cx| {
 904            let store = ThreadMetadataStore::global(cx);
 905            store.update(cx, |store, cx| {
 906                store.save(existing_metadata, cx);
 907            });
 908        });
 909        cx.run_until_parked();
 910
 911        let threads_to_save = vec![
 912            (
 913                "a-session-0",
 914                "Thread A0 From Native Store",
 915                project_a_paths.clone(),
 916                now,
 917            ),
 918            (
 919                "a-session-1",
 920                "Thread A1",
 921                project_a_paths.clone(),
 922                now + chrono::Duration::seconds(1),
 923            ),
 924            (
 925                "b-session-0",
 926                "Thread B0",
 927                project_b_paths.clone(),
 928                now + chrono::Duration::seconds(2),
 929            ),
 930            (
 931                "projectless",
 932                "Projectless",
 933                PathList::default(),
 934                now + chrono::Duration::seconds(3),
 935            ),
 936        ];
 937
 938        for (session_id, title, paths, updated_at) in &threads_to_save {
 939            let save_task = cx.update(|cx| {
 940                let thread_store = ThreadStore::global(cx);
 941                let session_id = session_id.to_string();
 942                let title = title.to_string();
 943                let paths = paths.clone();
 944                thread_store.update(cx, |store, cx| {
 945                    store.save_thread(
 946                        acp::SessionId::new(session_id),
 947                        make_db_thread(&title, *updated_at),
 948                        paths,
 949                        cx,
 950                    )
 951                })
 952            });
 953            save_task.await.unwrap();
 954            cx.run_until_parked();
 955        }
 956
 957        cx.update(|cx| migrate_thread_metadata(cx));
 958        cx.run_until_parked();
 959
 960        let list = cx.update(|cx| {
 961            let store = ThreadMetadataStore::global(cx);
 962            store.read(cx).entries().collect::<Vec<_>>()
 963        });
 964
 965        assert_eq!(list.len(), 3);
 966        assert!(
 967            list.iter()
 968                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
 969        );
 970
 971        let existing_metadata = list
 972            .iter()
 973            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
 974            .unwrap();
 975        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
 976        assert!(!existing_metadata.archived);
 977
 978        let migrated_session_ids = list
 979            .iter()
 980            .map(|metadata| metadata.session_id.0.as_ref())
 981            .collect::<Vec<_>>();
 982        assert!(migrated_session_ids.contains(&"a-session-1"));
 983        assert!(migrated_session_ids.contains(&"b-session-0"));
 984        assert!(!migrated_session_ids.contains(&"projectless"));
 985
 986        let migrated_entries = list
 987            .iter()
 988            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
 989            .collect::<Vec<_>>();
 990        assert!(
 991            migrated_entries
 992                .iter()
 993                .all(|metadata| !metadata.folder_paths.is_empty())
 994        );
 995        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
 996    }
 997
 998    #[gpui::test]
 999    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1000        cx: &mut TestAppContext,
1001    ) {
1002        cx.update(|cx| {
1003            ThreadStore::init_global(cx);
1004            ThreadMetadataStore::init_global(cx);
1005        });
1006
1007        let project_paths = PathList::new(&[Path::new("/project-a")]);
1008        let existing_updated_at = Utc::now();
1009
1010        let existing_metadata = ThreadMetadata {
1011            session_id: acp::SessionId::new("existing-session"),
1012            agent_id: agent::ZED_AGENT_ID.clone(),
1013            title: "Existing Metadata".into(),
1014            updated_at: existing_updated_at,
1015            created_at: Some(existing_updated_at),
1016            folder_paths: project_paths.clone(),
1017            archived: false,
1018        };
1019
1020        cx.update(|cx| {
1021            let store = ThreadMetadataStore::global(cx);
1022            store.update(cx, |store, cx| {
1023                store.save(existing_metadata, cx);
1024            });
1025        });
1026        cx.run_until_parked();
1027
1028        let save_task = cx.update(|cx| {
1029            let thread_store = ThreadStore::global(cx);
1030            thread_store.update(cx, |store, cx| {
1031                store.save_thread(
1032                    acp::SessionId::new("existing-session"),
1033                    make_db_thread(
1034                        "Updated Native Thread Title",
1035                        existing_updated_at + chrono::Duration::seconds(1),
1036                    ),
1037                    project_paths.clone(),
1038                    cx,
1039                )
1040            })
1041        });
1042        save_task.await.unwrap();
1043        cx.run_until_parked();
1044
1045        cx.update(|cx| migrate_thread_metadata(cx));
1046        cx.run_until_parked();
1047
1048        let list = cx.update(|cx| {
1049            let store = ThreadMetadataStore::global(cx);
1050            store.read(cx).entries().collect::<Vec<_>>()
1051        });
1052
1053        assert_eq!(list.len(), 1);
1054        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1055    }
1056
1057    #[gpui::test]
1058    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1059        cx: &mut TestAppContext,
1060    ) {
1061        cx.update(|cx| {
1062            ThreadStore::init_global(cx);
1063            ThreadMetadataStore::init_global(cx);
1064        });
1065
1066        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1067        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1068        let now = Utc::now();
1069
1070        // Create 7 threads for project A and 3 for project B
1071        let mut threads_to_save = Vec::new();
1072        for i in 0..7 {
1073            threads_to_save.push((
1074                format!("a-session-{i}"),
1075                format!("Thread A{i}"),
1076                project_a_paths.clone(),
1077                now + chrono::Duration::seconds(i as i64),
1078            ));
1079        }
1080        for i in 0..3 {
1081            threads_to_save.push((
1082                format!("b-session-{i}"),
1083                format!("Thread B{i}"),
1084                project_b_paths.clone(),
1085                now + chrono::Duration::seconds(i as i64),
1086            ));
1087        }
1088
1089        for (session_id, title, paths, updated_at) in &threads_to_save {
1090            let save_task = cx.update(|cx| {
1091                let thread_store = ThreadStore::global(cx);
1092                let session_id = session_id.to_string();
1093                let title = title.to_string();
1094                let paths = paths.clone();
1095                thread_store.update(cx, |store, cx| {
1096                    store.save_thread(
1097                        acp::SessionId::new(session_id),
1098                        make_db_thread(&title, *updated_at),
1099                        paths,
1100                        cx,
1101                    )
1102                })
1103            });
1104            save_task.await.unwrap();
1105            cx.run_until_parked();
1106        }
1107
1108        cx.update(|cx| migrate_thread_metadata(cx));
1109        cx.run_until_parked();
1110
1111        let list = cx.update(|cx| {
1112            let store = ThreadMetadataStore::global(cx);
1113            store.read(cx).entries().collect::<Vec<_>>()
1114        });
1115
1116        assert_eq!(list.len(), 10);
1117
1118        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1119        let mut project_a_entries: Vec<_> = list
1120            .iter()
1121            .filter(|m| m.folder_paths == project_a_paths)
1122            .collect();
1123        assert_eq!(project_a_entries.len(), 7);
1124        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1125
1126        for entry in &project_a_entries[..5] {
1127            assert!(
1128                !entry.archived,
1129                "Expected {} to be unarchived (top 5 most recent)",
1130                entry.session_id.0
1131            );
1132        }
1133        for entry in &project_a_entries[5..] {
1134            assert!(
1135                entry.archived,
1136                "Expected {} to be archived (older than top 5)",
1137                entry.session_id.0
1138            );
1139        }
1140
1141        // Project B: all 3 should be unarchived (under the limit)
1142        let project_b_entries: Vec<_> = list
1143            .iter()
1144            .filter(|m| m.folder_paths == project_b_paths)
1145            .collect();
1146        assert_eq!(project_b_entries.len(), 3);
1147        assert!(project_b_entries.iter().all(|m| !m.archived));
1148    }
1149
1150    #[gpui::test]
1151    async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1152        cx.update(|cx| {
1153            let settings_store = settings::SettingsStore::test(cx);
1154            cx.set_global(settings_store);
1155            cx.update_flags(true, vec!["agent-v2".to_string()]);
1156            ThreadStore::init_global(cx);
1157            ThreadMetadataStore::init_global(cx);
1158        });
1159
1160        let fs = FakeFs::new(cx.executor());
1161        let project = Project::test(fs, None::<&Path>, cx).await;
1162        let connection = Rc::new(StubAgentConnection::new());
1163
1164        let thread = cx
1165            .update(|cx| {
1166                connection
1167                    .clone()
1168                    .new_session(project.clone(), PathList::default(), cx)
1169            })
1170            .await
1171            .unwrap();
1172        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1173
1174        cx.update(|cx| {
1175            thread.update(cx, |thread, cx| {
1176                thread.set_title("Draft Thread".into(), cx).detach();
1177            });
1178        });
1179        cx.run_until_parked();
1180
1181        let metadata_ids = cx.update(|cx| {
1182            ThreadMetadataStore::global(cx)
1183                .read(cx)
1184                .entry_ids()
1185                .collect::<Vec<_>>()
1186        });
1187        assert_eq!(metadata_ids, vec![session_id]);
1188
1189        drop(thread);
1190        cx.update(|_| {});
1191        cx.run_until_parked();
1192        cx.run_until_parked();
1193
1194        let metadata_ids = cx.update(|cx| {
1195            ThreadMetadataStore::global(cx)
1196                .read(cx)
1197                .entry_ids()
1198                .collect::<Vec<_>>()
1199        });
1200        assert!(
1201            metadata_ids.is_empty(),
1202            "expected empty draft thread metadata to be deleted on release"
1203        );
1204    }
1205
1206    #[gpui::test]
1207    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1208        cx.update(|cx| {
1209            let settings_store = settings::SettingsStore::test(cx);
1210            cx.set_global(settings_store);
1211            cx.update_flags(true, vec!["agent-v2".to_string()]);
1212            ThreadStore::init_global(cx);
1213            ThreadMetadataStore::init_global(cx);
1214        });
1215
1216        let fs = FakeFs::new(cx.executor());
1217        let project = Project::test(fs, None::<&Path>, cx).await;
1218        let connection = Rc::new(StubAgentConnection::new());
1219
1220        let thread = cx
1221            .update(|cx| {
1222                connection
1223                    .clone()
1224                    .new_session(project.clone(), PathList::default(), cx)
1225            })
1226            .await
1227            .unwrap();
1228        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1229
1230        cx.update(|cx| {
1231            thread.update(cx, |thread, cx| {
1232                thread.push_user_content_block(None, "Hello".into(), cx);
1233            });
1234        });
1235        cx.run_until_parked();
1236
1237        let metadata_ids = cx.update(|cx| {
1238            ThreadMetadataStore::global(cx)
1239                .read(cx)
1240                .entry_ids()
1241                .collect::<Vec<_>>()
1242        });
1243        assert_eq!(metadata_ids, vec![session_id.clone()]);
1244
1245        drop(thread);
1246        cx.update(|_| {});
1247        cx.run_until_parked();
1248
1249        let metadata_ids = cx.update(|cx| {
1250            ThreadMetadataStore::global(cx)
1251                .read(cx)
1252                .entry_ids()
1253                .collect::<Vec<_>>()
1254        });
1255        assert_eq!(metadata_ids, vec![session_id]);
1256    }
1257
1258    #[gpui::test]
1259    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1260        cx.update(|cx| {
1261            let settings_store = settings::SettingsStore::test(cx);
1262            cx.set_global(settings_store);
1263            cx.update_flags(true, vec!["agent-v2".to_string()]);
1264            ThreadStore::init_global(cx);
1265            ThreadMetadataStore::init_global(cx);
1266        });
1267
1268        let fs = FakeFs::new(cx.executor());
1269        let project = Project::test(fs, None::<&Path>, cx).await;
1270        let connection = Rc::new(StubAgentConnection::new());
1271
1272        // Create a regular (non-subagent) AcpThread.
1273        let regular_thread = cx
1274            .update(|cx| {
1275                connection
1276                    .clone()
1277                    .new_session(project.clone(), PathList::default(), cx)
1278            })
1279            .await
1280            .unwrap();
1281
1282        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1283
1284        // Set a title on the regular thread to trigger a save via handle_thread_update.
1285        cx.update(|cx| {
1286            regular_thread.update(cx, |thread, cx| {
1287                thread.set_title("Regular Thread".into(), cx).detach();
1288            });
1289        });
1290        cx.run_until_parked();
1291
1292        // Create a subagent AcpThread
1293        let subagent_session_id = acp::SessionId::new("subagent-session");
1294        let subagent_thread = cx.update(|cx| {
1295            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1296            cx.new(|cx| {
1297                acp_thread::AcpThread::new(
1298                    Some(regular_session_id.clone()),
1299                    Some("Subagent Thread".into()),
1300                    None,
1301                    connection.clone(),
1302                    project.clone(),
1303                    action_log,
1304                    subagent_session_id.clone(),
1305                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1306                    cx,
1307                )
1308            })
1309        });
1310
1311        // Set a title on the subagent thread to trigger handle_thread_update.
1312        cx.update(|cx| {
1313            subagent_thread.update(cx, |thread, cx| {
1314                thread
1315                    .set_title("Subagent Thread Title".into(), cx)
1316                    .detach();
1317            });
1318        });
1319        cx.run_until_parked();
1320
1321        // List all metadata from the store cache.
1322        let list = cx.update(|cx| {
1323            let store = ThreadMetadataStore::global(cx);
1324            store.read(cx).entries().collect::<Vec<_>>()
1325        });
1326
1327        // The subagent thread should NOT appear in the sidebar metadata.
1328        // Only the regular thread should be listed.
1329        assert_eq!(
1330            list.len(),
1331            1,
1332            "Expected only the regular thread in sidebar metadata, \
1333             but found {} entries (subagent threads are leaking into the sidebar)",
1334            list.len(),
1335        );
1336        assert_eq!(list[0].session_id, regular_session_id);
1337        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1338    }
1339
1340    #[test]
1341    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1342        let now = Utc::now();
1343
1344        let operations = vec![
1345            DbOperation::Insert(make_metadata(
1346                "session-1",
1347                "First Thread",
1348                now,
1349                PathList::default(),
1350            )),
1351            DbOperation::Delete(acp::SessionId::new("session-1")),
1352        ];
1353
1354        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1355
1356        assert_eq!(deduped.len(), 1);
1357        assert_eq!(
1358            deduped[0],
1359            DbOperation::Delete(acp::SessionId::new("session-1"))
1360        );
1361    }
1362
1363    #[test]
1364    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1365        let now = Utc::now();
1366        let later = now + chrono::Duration::seconds(1);
1367
1368        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1369        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1370
1371        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1372            DbOperation::Insert(old_metadata),
1373            DbOperation::Insert(new_metadata.clone()),
1374        ]);
1375
1376        assert_eq!(deduped.len(), 1);
1377        assert_eq!(deduped[0], DbOperation::Insert(new_metadata));
1378    }
1379
1380    #[test]
1381    fn test_dedup_db_operations_preserves_distinct_sessions() {
1382        let now = Utc::now();
1383
1384        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1385        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1386        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1387            DbOperation::Insert(metadata1.clone()),
1388            DbOperation::Insert(metadata2.clone()),
1389        ]);
1390
1391        assert_eq!(deduped.len(), 2);
1392        assert!(deduped.contains(&DbOperation::Insert(metadata1)));
1393        assert!(deduped.contains(&DbOperation::Insert(metadata2)));
1394    }
1395
1396    #[gpui::test]
1397    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1398        cx.update(|cx| {
1399            let settings_store = settings::SettingsStore::test(cx);
1400            cx.set_global(settings_store);
1401            cx.update_flags(true, vec!["agent-v2".to_string()]);
1402            ThreadMetadataStore::init_global(cx);
1403        });
1404
1405        let paths = PathList::new(&[Path::new("/project-a")]);
1406        let now = Utc::now();
1407        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1408
1409        cx.update(|cx| {
1410            let store = ThreadMetadataStore::global(cx);
1411            store.update(cx, |store, cx| {
1412                store.save(metadata, cx);
1413            });
1414        });
1415
1416        cx.run_until_parked();
1417
1418        cx.update(|cx| {
1419            let store = ThreadMetadataStore::global(cx);
1420            let store = store.read(cx);
1421
1422            let path_entries = store
1423                .entries_for_path(&paths)
1424                .map(|e| e.session_id.0.to_string())
1425                .collect::<Vec<_>>();
1426            assert_eq!(path_entries, vec!["session-1"]);
1427
1428            let archived = store
1429                .archived_entries()
1430                .map(|e| e.session_id.0.to_string())
1431                .collect::<Vec<_>>();
1432            assert!(archived.is_empty());
1433        });
1434
1435        cx.update(|cx| {
1436            let store = ThreadMetadataStore::global(cx);
1437            store.update(cx, |store, cx| {
1438                store.archive(&acp::SessionId::new("session-1"), cx);
1439            });
1440        });
1441
1442        cx.run_until_parked();
1443
1444        cx.update(|cx| {
1445            let store = ThreadMetadataStore::global(cx);
1446            let store = store.read(cx);
1447
1448            let path_entries = store
1449                .entries_for_path(&paths)
1450                .map(|e| e.session_id.0.to_string())
1451                .collect::<Vec<_>>();
1452            assert!(path_entries.is_empty());
1453
1454            let archived = store.archived_entries().collect::<Vec<_>>();
1455            assert_eq!(archived.len(), 1);
1456            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1457            assert!(archived[0].archived);
1458        });
1459
1460        cx.update(|cx| {
1461            let store = ThreadMetadataStore::global(cx);
1462            store.update(cx, |store, cx| {
1463                store.unarchive(&acp::SessionId::new("session-1"), cx);
1464            });
1465        });
1466
1467        cx.run_until_parked();
1468
1469        cx.update(|cx| {
1470            let store = ThreadMetadataStore::global(cx);
1471            let store = store.read(cx);
1472
1473            let path_entries = store
1474                .entries_for_path(&paths)
1475                .map(|e| e.session_id.0.to_string())
1476                .collect::<Vec<_>>();
1477            assert_eq!(path_entries, vec!["session-1"]);
1478
1479            let archived = store
1480                .archived_entries()
1481                .map(|e| e.session_id.0.to_string())
1482                .collect::<Vec<_>>();
1483            assert!(archived.is_empty());
1484        });
1485    }
1486
1487    #[gpui::test]
1488    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1489        cx.update(|cx| {
1490            let settings_store = settings::SettingsStore::test(cx);
1491            cx.set_global(settings_store);
1492            cx.update_flags(true, vec!["agent-v2".to_string()]);
1493            ThreadMetadataStore::init_global(cx);
1494        });
1495
1496        let paths = PathList::new(&[Path::new("/project-a")]);
1497        let now = Utc::now();
1498
1499        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1500        let metadata2 = make_metadata(
1501            "session-2",
1502            "Archived Thread",
1503            now - chrono::Duration::seconds(1),
1504            paths.clone(),
1505        );
1506
1507        cx.update(|cx| {
1508            let store = ThreadMetadataStore::global(cx);
1509            store.update(cx, |store, cx| {
1510                store.save(metadata1, cx);
1511                store.save(metadata2, cx);
1512            });
1513        });
1514
1515        cx.run_until_parked();
1516
1517        cx.update(|cx| {
1518            let store = ThreadMetadataStore::global(cx);
1519            store.update(cx, |store, cx| {
1520                store.archive(&acp::SessionId::new("session-2"), cx);
1521            });
1522        });
1523
1524        cx.run_until_parked();
1525
1526        cx.update(|cx| {
1527            let store = ThreadMetadataStore::global(cx);
1528            let store = store.read(cx);
1529
1530            let path_entries = store
1531                .entries_for_path(&paths)
1532                .map(|e| e.session_id.0.to_string())
1533                .collect::<Vec<_>>();
1534            assert_eq!(path_entries, vec!["session-1"]);
1535
1536            let all_entries = store
1537                .entries()
1538                .map(|e| e.session_id.0.to_string())
1539                .collect::<Vec<_>>();
1540            assert_eq!(all_entries.len(), 2);
1541            assert!(all_entries.contains(&"session-1".to_string()));
1542            assert!(all_entries.contains(&"session-2".to_string()));
1543
1544            let archived = store
1545                .archived_entries()
1546                .map(|e| e.session_id.0.to_string())
1547                .collect::<Vec<_>>();
1548            assert_eq!(archived, vec!["session-2"]);
1549        });
1550    }
1551
1552    #[gpui::test]
1553    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1554        cx.update(|cx| {
1555            let settings_store = settings::SettingsStore::test(cx);
1556            cx.set_global(settings_store);
1557            cx.update_flags(true, vec!["agent-v2".to_string()]);
1558            ThreadMetadataStore::init_global(cx);
1559        });
1560
1561        let paths = PathList::new(&[Path::new("/project-a")]);
1562        let now = Utc::now();
1563
1564        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1565        let m2 = make_metadata(
1566            "session-2",
1567            "Thread Two",
1568            now - chrono::Duration::seconds(1),
1569            paths.clone(),
1570        );
1571        let m3 = make_metadata(
1572            "session-3",
1573            "Thread Three",
1574            now - chrono::Duration::seconds(2),
1575            paths,
1576        );
1577
1578        cx.update(|cx| {
1579            let store = ThreadMetadataStore::global(cx);
1580            store.update(cx, |store, cx| {
1581                store.save_all(vec![m1, m2, m3], cx);
1582            });
1583        });
1584
1585        cx.run_until_parked();
1586
1587        cx.update(|cx| {
1588            let store = ThreadMetadataStore::global(cx);
1589            let store = store.read(cx);
1590
1591            let all_entries = store
1592                .entries()
1593                .map(|e| e.session_id.0.to_string())
1594                .collect::<Vec<_>>();
1595            assert_eq!(all_entries.len(), 3);
1596            assert!(all_entries.contains(&"session-1".to_string()));
1597            assert!(all_entries.contains(&"session-2".to_string()));
1598            assert!(all_entries.contains(&"session-3".to_string()));
1599
1600            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1601            assert_eq!(entry_ids.len(), 3);
1602        });
1603    }
1604
1605    #[gpui::test]
1606    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1607        cx.update(|cx| {
1608            let settings_store = settings::SettingsStore::test(cx);
1609            cx.set_global(settings_store);
1610            cx.update_flags(true, vec!["agent-v2".to_string()]);
1611            ThreadMetadataStore::init_global(cx);
1612        });
1613
1614        let paths = PathList::new(&[Path::new("/project-a")]);
1615        let now = Utc::now();
1616        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1617
1618        cx.update(|cx| {
1619            let store = ThreadMetadataStore::global(cx);
1620            store.update(cx, |store, cx| {
1621                store.save(metadata, cx);
1622            });
1623        });
1624
1625        cx.run_until_parked();
1626
1627        cx.update(|cx| {
1628            let store = ThreadMetadataStore::global(cx);
1629            store.update(cx, |store, cx| {
1630                store.archive(&acp::SessionId::new("session-1"), cx);
1631            });
1632        });
1633
1634        cx.run_until_parked();
1635
1636        cx.update(|cx| {
1637            let store = ThreadMetadataStore::global(cx);
1638            store.update(cx, |store, cx| {
1639                let _ = store.reload(cx);
1640            });
1641        });
1642
1643        cx.run_until_parked();
1644
1645        cx.update(|cx| {
1646            let store = ThreadMetadataStore::global(cx);
1647            let store = store.read(cx);
1648
1649            let thread = store
1650                .entries()
1651                .find(|e| e.session_id.0.as_ref() == "session-1")
1652                .expect("thread should exist after reload");
1653            assert!(thread.archived);
1654
1655            let path_entries = store
1656                .entries_for_path(&paths)
1657                .map(|e| e.session_id.0.to_string())
1658                .collect::<Vec<_>>();
1659            assert!(path_entries.is_empty());
1660
1661            let archived = store
1662                .archived_entries()
1663                .map(|e| e.session_id.0.to_string())
1664                .collect::<Vec<_>>();
1665            assert_eq!(archived, vec!["session-1"]);
1666        });
1667    }
1668
1669    #[gpui::test]
1670    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1671        cx.update(|cx| {
1672            let settings_store = settings::SettingsStore::test(cx);
1673            cx.set_global(settings_store);
1674            cx.update_flags(true, vec!["agent-v2".to_string()]);
1675            ThreadMetadataStore::init_global(cx);
1676        });
1677
1678        cx.run_until_parked();
1679
1680        cx.update(|cx| {
1681            let store = ThreadMetadataStore::global(cx);
1682            store.update(cx, |store, cx| {
1683                store.archive(&acp::SessionId::new("nonexistent"), cx);
1684            });
1685        });
1686
1687        cx.run_until_parked();
1688
1689        cx.update(|cx| {
1690            let store = ThreadMetadataStore::global(cx);
1691            let store = store.read(cx);
1692
1693            assert!(store.is_empty());
1694            assert_eq!(store.entries().count(), 0);
1695            assert_eq!(store.archived_entries().count(), 0);
1696        });
1697    }
1698}