thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AcpThreadEvent;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::Context as _;
   7use chrono::{DateTime, Utc};
   8use collections::{HashMap, HashSet};
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    ThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We skip migrating threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    let store = ThreadMetadataStore::global(cx);
  46    let db = store.read(cx).db.clone();
  47
  48    cx.spawn(async move |cx| {
  49        let existing_entries = db.list_ids()?.into_iter().collect::<HashSet<_>>();
  50
  51        let is_first_migration = existing_entries.is_empty();
  52
  53        let mut to_migrate = store.read_with(cx, |_store, cx| {
  54            ThreadStore::global(cx)
  55                .read(cx)
  56                .entries()
  57                .filter_map(|entry| {
  58                    if existing_entries.contains(&entry.id.0) {
  59                        return None;
  60                    }
  61
  62                    Some(ThreadMetadata {
  63                        session_id: entry.id,
  64                        agent_id: ZED_AGENT_ID.clone(),
  65                        title: entry.title,
  66                        updated_at: entry.updated_at,
  67                        created_at: entry.created_at,
  68                        folder_paths: entry.folder_paths,
  69                        archived: true,
  70                    })
  71                })
  72                .collect::<Vec<_>>()
  73        });
  74
  75        if to_migrate.is_empty() {
  76            return anyhow::Ok(());
  77        }
  78
  79        // On the first migration (no entries in DB yet), keep the 5 most
  80        // recent threads per project unarchived.
  81        if is_first_migration {
  82            let mut per_project: HashMap<PathList, Vec<&mut ThreadMetadata>> = HashMap::default();
  83            for entry in &mut to_migrate {
  84                if entry.folder_paths.is_empty() {
  85                    continue;
  86                }
  87                per_project
  88                    .entry(entry.folder_paths.clone())
  89                    .or_default()
  90                    .push(entry);
  91            }
  92            for entries in per_project.values_mut() {
  93                entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
  94                for entry in entries.iter_mut().take(5) {
  95                    entry.archived = false;
  96                }
  97            }
  98        }
  99
 100        log::info!("Migrating {} thread store entries", to_migrate.len());
 101
 102        // Manually save each entry to the database and call reload, otherwise
 103        // we'll end up triggering lots of reloads after each save
 104        for entry in to_migrate {
 105            db.save(entry).await?;
 106        }
 107
 108        log::info!("Finished migrating thread store entries");
 109
 110        let _ = store.update(cx, |store, cx| store.reload(cx));
 111        anyhow::Ok(())
 112    })
 113    .detach_and_log_err(cx);
 114}
 115
 116struct GlobalThreadMetadataStore(Entity<ThreadMetadataStore>);
 117impl Global for GlobalThreadMetadataStore {}
 118
 119/// Lightweight metadata for any thread (native or ACP), enough to populate
 120/// the sidebar list and route to the correct load path when clicked.
 121#[derive(Debug, Clone, PartialEq)]
 122pub struct ThreadMetadata {
 123    pub session_id: acp::SessionId,
 124    pub agent_id: AgentId,
 125    pub title: SharedString,
 126    pub updated_at: DateTime<Utc>,
 127    pub created_at: Option<DateTime<Utc>>,
 128    pub folder_paths: PathList,
 129    pub archived: bool,
 130}
 131
 132impl From<&ThreadMetadata> for acp_thread::AgentSessionInfo {
 133    fn from(meta: &ThreadMetadata) -> Self {
 134        Self {
 135            session_id: meta.session_id.clone(),
 136            work_dirs: Some(meta.folder_paths.clone()),
 137            title: Some(meta.title.clone()),
 138            updated_at: Some(meta.updated_at),
 139            created_at: meta.created_at,
 140            meta: None,
 141        }
 142    }
 143}
 144
 145/// The store holds all metadata needed to show threads in the sidebar/the archive.
 146///
 147/// Automatically listens to AcpThread events and updates metadata if it has changed.
 148pub struct ThreadMetadataStore {
 149    db: ThreadMetadataDb,
 150    threads: HashMap<acp::SessionId, ThreadMetadata>,
 151    threads_by_paths: HashMap<PathList, HashSet<acp::SessionId>>,
 152    reload_task: Option<Shared<Task<()>>>,
 153    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 154    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 155    _db_operations_task: Task<()>,
 156}
 157
 158#[derive(Debug, PartialEq)]
 159enum DbOperation {
 160    Upsert(ThreadMetadata),
 161    Delete(acp::SessionId),
 162}
 163
 164impl DbOperation {
 165    fn id(&self) -> &acp::SessionId {
 166        match self {
 167            DbOperation::Upsert(thread) => &thread.session_id,
 168            DbOperation::Delete(session_id) => session_id,
 169        }
 170    }
 171}
 172
 173impl ThreadMetadataStore {
 174    #[cfg(not(any(test, feature = "test-support")))]
 175    pub fn init_global(cx: &mut App) {
 176        if cx.has_global::<Self>() {
 177            return;
 178        }
 179
 180        let db = ThreadMetadataDb::global(cx);
 181        let thread_store = cx.new(|cx| Self::new(db, cx));
 182        cx.set_global(GlobalThreadMetadataStore(thread_store));
 183    }
 184
 185    #[cfg(any(test, feature = "test-support"))]
 186    pub fn init_global(cx: &mut App) {
 187        let thread = std::thread::current();
 188        let test_name = thread.name().unwrap_or("unknown_test");
 189        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 190        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 191        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 192        cx.set_global(GlobalThreadMetadataStore(thread_store));
 193    }
 194
 195    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 196        cx.try_global::<GlobalThreadMetadataStore>()
 197            .map(|store| store.0.clone())
 198    }
 199
 200    pub fn global(cx: &App) -> Entity<Self> {
 201        cx.global::<GlobalThreadMetadataStore>().0.clone()
 202    }
 203
 204    pub fn is_empty(&self) -> bool {
 205        self.threads.is_empty()
 206    }
 207
 208    /// Returns all thread IDs.
 209    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 210        self.threads.keys().cloned()
 211    }
 212
 213    /// Returns the metadata for a specific thread, if it exists.
 214    pub fn entry(&self, session_id: &acp::SessionId) -> Option<&ThreadMetadata> {
 215        self.threads.get(session_id)
 216    }
 217
 218    /// Returns all threads.
 219    pub fn entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 220        self.threads.values()
 221    }
 222
 223    /// Returns all archived threads.
 224    pub fn archived_entries(&self) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 225        self.entries().filter(|t| t.archived)
 226    }
 227
 228    /// Returns all threads for the given path list, excluding archived threads.
 229    pub fn entries_for_path(
 230        &self,
 231        path_list: &PathList,
 232    ) -> impl Iterator<Item = &ThreadMetadata> + '_ {
 233        self.threads_by_paths
 234            .get(path_list)
 235            .into_iter()
 236            .flatten()
 237            .filter_map(|s| self.threads.get(s))
 238            .filter(|s| !s.archived)
 239    }
 240
 241    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 242        let db = self.db.clone();
 243        self.reload_task.take();
 244
 245        let list_task = cx
 246            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 247
 248        let reload_task = cx
 249            .spawn(async move |this, cx| {
 250                let Some(rows) = list_task.await.log_err() else {
 251                    return;
 252                };
 253
 254                this.update(cx, |this, cx| {
 255                    this.threads.clear();
 256                    this.threads_by_paths.clear();
 257
 258                    for row in rows {
 259                        this.threads_by_paths
 260                            .entry(row.folder_paths.clone())
 261                            .or_default()
 262                            .insert(row.session_id.clone());
 263                        this.threads.insert(row.session_id.clone(), row);
 264                    }
 265
 266                    cx.notify();
 267                })
 268                .ok();
 269            })
 270            .shared();
 271        self.reload_task = Some(reload_task.clone());
 272        reload_task
 273    }
 274
 275    pub fn save_all(&mut self, metadata: Vec<ThreadMetadata>, cx: &mut Context<Self>) {
 276        if !cx.has_flag::<AgentV2FeatureFlag>() {
 277            return;
 278        }
 279
 280        for metadata in metadata {
 281            self.save_internal(metadata);
 282        }
 283        cx.notify();
 284    }
 285
 286    #[cfg(any(test, feature = "test-support"))]
 287    pub fn save_manually(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 288        self.save(metadata, cx)
 289    }
 290
 291    fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 292        if !cx.has_flag::<AgentV2FeatureFlag>() {
 293            return;
 294        }
 295
 296        self.save_internal(metadata);
 297        cx.notify();
 298    }
 299
 300    fn save_internal(&mut self, metadata: ThreadMetadata) {
 301        // If the folder paths have changed, we need to clear the old entry
 302        if let Some(thread) = self.threads.get(&metadata.session_id)
 303            && thread.folder_paths != metadata.folder_paths
 304            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 305        {
 306            session_ids.remove(&metadata.session_id);
 307        }
 308
 309        self.threads
 310            .insert(metadata.session_id.clone(), metadata.clone());
 311
 312        self.threads_by_paths
 313            .entry(metadata.folder_paths.clone())
 314            .or_default()
 315            .insert(metadata.session_id.clone());
 316
 317        self.pending_thread_ops_tx
 318            .try_send(DbOperation::Upsert(metadata))
 319            .log_err();
 320    }
 321
 322    pub fn update_working_directories(
 323        &mut self,
 324        session_id: &acp::SessionId,
 325        work_dirs: PathList,
 326        cx: &mut Context<Self>,
 327    ) {
 328        if !cx.has_flag::<AgentV2FeatureFlag>() {
 329            return;
 330        }
 331
 332        if let Some(thread) = self.threads.get(session_id) {
 333            self.save_internal(ThreadMetadata {
 334                folder_paths: work_dirs,
 335                ..thread.clone()
 336            });
 337            cx.notify();
 338        }
 339    }
 340
 341    pub fn archive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 342        self.update_archived(session_id, true, cx);
 343    }
 344
 345    pub fn unarchive(&mut self, session_id: &acp::SessionId, cx: &mut Context<Self>) {
 346        self.update_archived(session_id, false, cx);
 347    }
 348
 349    fn update_archived(
 350        &mut self,
 351        session_id: &acp::SessionId,
 352        archived: bool,
 353        cx: &mut Context<Self>,
 354    ) {
 355        if !cx.has_flag::<AgentV2FeatureFlag>() {
 356            return;
 357        }
 358
 359        if let Some(thread) = self.threads.get(session_id) {
 360            self.save_internal(ThreadMetadata {
 361                archived,
 362                ..thread.clone()
 363            });
 364            cx.notify();
 365        }
 366    }
 367
 368    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 369        if !cx.has_flag::<AgentV2FeatureFlag>() {
 370            return;
 371        }
 372
 373        if let Some(thread) = self.threads.get(&session_id)
 374            && let Some(session_ids) = self.threads_by_paths.get_mut(&thread.folder_paths)
 375        {
 376            session_ids.remove(&session_id);
 377        }
 378        self.threads.remove(&session_id);
 379        self.pending_thread_ops_tx
 380            .try_send(DbOperation::Delete(session_id))
 381            .log_err();
 382        cx.notify();
 383    }
 384
 385    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 386        let weak_store = cx.weak_entity();
 387
 388        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 389            // Don't track subagent threads in the sidebar.
 390            if thread.parent_session_id().is_some() {
 391                return;
 392            }
 393
 394            let thread_entity = cx.entity();
 395
 396            cx.on_release({
 397                let weak_store = weak_store.clone();
 398                move |thread, cx| {
 399                    weak_store
 400                        .update(cx, |store, cx| {
 401                            let session_id = thread.session_id().clone();
 402                            store.session_subscriptions.remove(&session_id);
 403                            if thread.entries().is_empty() {
 404                                // Empty threads can be unloaded without ever being
 405                                // durably persisted by the underlying agent.
 406                                store.delete(session_id, cx);
 407                            }
 408                        })
 409                        .ok();
 410                }
 411            })
 412            .detach();
 413
 414            weak_store
 415                .update(cx, |this, cx| {
 416                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_event);
 417                    this.session_subscriptions
 418                        .insert(thread.session_id().clone(), subscription);
 419                })
 420                .ok();
 421        })
 422        .detach();
 423
 424        let (tx, rx) = smol::channel::unbounded();
 425        let _db_operations_task = cx.background_spawn({
 426            let db = db.clone();
 427            async move {
 428                while let Ok(first_update) = rx.recv().await {
 429                    let mut updates = vec![first_update];
 430                    while let Ok(update) = rx.try_recv() {
 431                        updates.push(update);
 432                    }
 433                    let updates = Self::dedup_db_operations(updates);
 434                    for operation in updates {
 435                        match operation {
 436                            DbOperation::Upsert(metadata) => {
 437                                db.save(metadata).await.log_err();
 438                            }
 439                            DbOperation::Delete(session_id) => {
 440                                db.delete(session_id).await.log_err();
 441                            }
 442                        }
 443                    }
 444                }
 445            }
 446        });
 447
 448        let mut this = Self {
 449            db,
 450            threads: HashMap::default(),
 451            threads_by_paths: HashMap::default(),
 452            reload_task: None,
 453            session_subscriptions: HashMap::default(),
 454            pending_thread_ops_tx: tx,
 455            _db_operations_task,
 456        };
 457        let _ = this.reload(cx);
 458        this
 459    }
 460
 461    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 462        let mut ops = HashMap::default();
 463        for operation in operations.into_iter().rev() {
 464            if ops.contains_key(operation.id()) {
 465                continue;
 466            }
 467            ops.insert(operation.id().clone(), operation);
 468        }
 469        ops.into_values().collect()
 470    }
 471
 472    fn handle_thread_event(
 473        &mut self,
 474        thread: Entity<acp_thread::AcpThread>,
 475        event: &AcpThreadEvent,
 476        cx: &mut Context<Self>,
 477    ) {
 478        // Don't track subagent threads in the sidebar.
 479        if thread.read(cx).parent_session_id().is_some() {
 480            return;
 481        }
 482
 483        match event {
 484            AcpThreadEvent::NewEntry
 485            | AcpThreadEvent::TitleUpdated
 486            | AcpThreadEvent::EntryUpdated(_)
 487            | AcpThreadEvent::EntriesRemoved(_)
 488            | AcpThreadEvent::ToolAuthorizationRequested(_)
 489            | AcpThreadEvent::ToolAuthorizationReceived(_)
 490            | AcpThreadEvent::Retry(_)
 491            | AcpThreadEvent::Stopped(_)
 492            | AcpThreadEvent::Error
 493            | AcpThreadEvent::LoadError(_)
 494            | AcpThreadEvent::Refusal
 495            | AcpThreadEvent::WorkingDirectoriesUpdated => {
 496                let thread_ref = thread.read(cx);
 497                let existing_thread = self.threads.get(thread_ref.session_id());
 498                let session_id = thread_ref.session_id().clone();
 499                let title = thread_ref
 500                    .title()
 501                    .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 502
 503                let updated_at = Utc::now();
 504
 505                let created_at = existing_thread
 506                    .and_then(|t| t.created_at)
 507                    .unwrap_or_else(|| updated_at);
 508
 509                let agent_id = thread_ref.connection().agent_id();
 510
 511                let folder_paths = {
 512                    let project = thread_ref.project().read(cx);
 513                    let paths: Vec<Arc<Path>> = project
 514                        .visible_worktrees(cx)
 515                        .map(|worktree| worktree.read(cx).abs_path())
 516                        .collect();
 517                    PathList::new(&paths)
 518                };
 519
 520                // Threads without a folder path (e.g. started in an empty
 521                // window) are archived by default so they don't get lost,
 522                // because they won't show up in the sidebar. Users can reload
 523                // them from the archive.
 524                let archived = existing_thread
 525                    .map(|t| t.archived)
 526                    .unwrap_or(folder_paths.is_empty());
 527
 528                let metadata = ThreadMetadata {
 529                    session_id,
 530                    agent_id,
 531                    title,
 532                    created_at: Some(created_at),
 533                    updated_at,
 534                    folder_paths,
 535                    archived,
 536                };
 537
 538                self.save(metadata, cx);
 539            }
 540            AcpThreadEvent::TokenUsageUpdated
 541            | AcpThreadEvent::SubagentSpawned(_)
 542            | AcpThreadEvent::PromptCapabilitiesUpdated
 543            | AcpThreadEvent::AvailableCommandsUpdated(_)
 544            | AcpThreadEvent::ModeUpdated(_)
 545            | AcpThreadEvent::ConfigOptionsUpdated(_) => {}
 546        }
 547    }
 548}
 549
 550impl Global for ThreadMetadataStore {}
 551
 552struct ThreadMetadataDb(ThreadSafeConnection);
 553
 554impl Domain for ThreadMetadataDb {
 555    const NAME: &str = stringify!(ThreadMetadataDb);
 556
 557    const MIGRATIONS: &[&str] = &[
 558        sql!(
 559            CREATE TABLE IF NOT EXISTS sidebar_threads(
 560                session_id TEXT PRIMARY KEY,
 561                agent_id TEXT,
 562                title TEXT NOT NULL,
 563                updated_at TEXT NOT NULL,
 564                created_at TEXT,
 565                folder_paths TEXT,
 566                folder_paths_order TEXT
 567            ) STRICT;
 568        ),
 569        sql!(ALTER TABLE sidebar_threads ADD COLUMN archived INTEGER DEFAULT 0),
 570    ];
 571}
 572
 573db::static_connection!(ThreadMetadataDb, []);
 574
 575impl ThreadMetadataDb {
 576    pub fn list_ids(&self) -> anyhow::Result<Vec<Arc<str>>> {
 577        self.select::<Arc<str>>(
 578            "SELECT session_id FROM sidebar_threads \
 579             ORDER BY updated_at DESC",
 580        )?()
 581    }
 582
 583    /// List all sidebar thread metadata, ordered by updated_at descending.
 584    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 585        self.select::<ThreadMetadata>(
 586            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived \
 587             FROM sidebar_threads \
 588             ORDER BY updated_at DESC"
 589        )?()
 590    }
 591
 592    /// Upsert metadata for a thread.
 593    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 594        let id = row.session_id.0.clone();
 595        let agent_id = if row.agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 596            None
 597        } else {
 598            Some(row.agent_id.to_string())
 599        };
 600        let title = row.title.to_string();
 601        let updated_at = row.updated_at.to_rfc3339();
 602        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 603        let serialized = row.folder_paths.serialize();
 604        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 605            (None, None)
 606        } else {
 607            (Some(serialized.paths), Some(serialized.order))
 608        };
 609        let archived = row.archived;
 610
 611        self.write(move |conn| {
 612            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order, archived) \
 613                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8) \
 614                       ON CONFLICT(session_id) DO UPDATE SET \
 615                           agent_id = excluded.agent_id, \
 616                           title = excluded.title, \
 617                           updated_at = excluded.updated_at, \
 618                           created_at = excluded.created_at, \
 619                           folder_paths = excluded.folder_paths, \
 620                           folder_paths_order = excluded.folder_paths_order, \
 621                           archived = excluded.archived";
 622            let mut stmt = Statement::prepare(conn, sql)?;
 623            let mut i = stmt.bind(&id, 1)?;
 624            i = stmt.bind(&agent_id, i)?;
 625            i = stmt.bind(&title, i)?;
 626            i = stmt.bind(&updated_at, i)?;
 627            i = stmt.bind(&created_at, i)?;
 628            i = stmt.bind(&folder_paths, i)?;
 629            i = stmt.bind(&folder_paths_order, i)?;
 630            stmt.bind(&archived, i)?;
 631            stmt.exec()
 632        })
 633        .await
 634    }
 635
 636    /// Delete metadata for a single thread.
 637    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 638        let id = session_id.0.clone();
 639        self.write(move |conn| {
 640            let mut stmt =
 641                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 642            stmt.bind(&id, 1)?;
 643            stmt.exec()
 644        })
 645        .await
 646    }
 647}
 648
 649impl Column for ThreadMetadata {
 650    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 651        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 652        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 653        let (title, next): (String, i32) = Column::column(statement, next)?;
 654        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 655        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 656        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 657        let (folder_paths_order_str, next): (Option<String>, i32) =
 658            Column::column(statement, next)?;
 659        let (archived, next): (bool, i32) = Column::column(statement, next)?;
 660
 661        let agent_id = agent_id
 662            .map(|id| AgentId::new(id))
 663            .unwrap_or(ZED_AGENT_ID.clone());
 664
 665        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 666        let created_at = created_at_str
 667            .as_deref()
 668            .map(DateTime::parse_from_rfc3339)
 669            .transpose()?
 670            .map(|dt| dt.with_timezone(&Utc));
 671
 672        let folder_paths = folder_paths_str
 673            .map(|paths| {
 674                PathList::deserialize(&util::path_list::SerializedPathList {
 675                    paths,
 676                    order: folder_paths_order_str.unwrap_or_default(),
 677                })
 678            })
 679            .unwrap_or_default();
 680
 681        Ok((
 682            ThreadMetadata {
 683                session_id: acp::SessionId::new(id),
 684                agent_id,
 685                title: title.into(),
 686                updated_at,
 687                created_at,
 688                folder_paths,
 689                archived,
 690            },
 691            next,
 692        ))
 693    }
 694}
 695
 696#[cfg(test)]
 697mod tests {
 698    use super::*;
 699    use acp_thread::{AgentConnection, StubAgentConnection};
 700    use action_log::ActionLog;
 701    use agent::DbThread;
 702    use agent_client_protocol as acp;
 703    use feature_flags::FeatureFlagAppExt;
 704    use gpui::TestAppContext;
 705    use project::FakeFs;
 706    use project::Project;
 707    use std::path::Path;
 708    use std::rc::Rc;
 709
 710    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 711        DbThread {
 712            title: title.to_string().into(),
 713            messages: Vec::new(),
 714            updated_at,
 715            detailed_summary: None,
 716            initial_project_snapshot: None,
 717            cumulative_token_usage: Default::default(),
 718            request_token_usage: Default::default(),
 719            model: None,
 720            profile: None,
 721            imported: false,
 722            subagent_context: None,
 723            speed: None,
 724            thinking_enabled: false,
 725            thinking_effort: None,
 726            draft_prompt: None,
 727            ui_scroll_position: None,
 728        }
 729    }
 730
 731    fn make_metadata(
 732        session_id: &str,
 733        title: &str,
 734        updated_at: DateTime<Utc>,
 735        folder_paths: PathList,
 736    ) -> ThreadMetadata {
 737        ThreadMetadata {
 738            archived: false,
 739            session_id: acp::SessionId::new(session_id),
 740            agent_id: agent::ZED_AGENT_ID.clone(),
 741            title: title.to_string().into(),
 742            updated_at,
 743            created_at: Some(updated_at),
 744            folder_paths,
 745        }
 746    }
 747
 748    fn init_test(cx: &mut TestAppContext) {
 749        cx.update(|cx| {
 750            let settings_store = settings::SettingsStore::test(cx);
 751            cx.set_global(settings_store);
 752            cx.update_flags(true, vec!["agent-v2".to_string()]);
 753            ThreadMetadataStore::init_global(cx);
 754            ThreadStore::init_global(cx);
 755        });
 756        cx.run_until_parked();
 757    }
 758
 759    #[gpui::test]
 760    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 761        let first_paths = PathList::new(&[Path::new("/project-a")]);
 762        let second_paths = PathList::new(&[Path::new("/project-b")]);
 763        let now = Utc::now();
 764        let older = now - chrono::Duration::seconds(1);
 765
 766        let thread = std::thread::current();
 767        let test_name = thread.name().unwrap_or("unknown_test");
 768        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 769        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 770            &db_name,
 771        )));
 772
 773        db.save(make_metadata(
 774            "session-1",
 775            "First Thread",
 776            now,
 777            first_paths.clone(),
 778        ))
 779        .await
 780        .unwrap();
 781        db.save(make_metadata(
 782            "session-2",
 783            "Second Thread",
 784            older,
 785            second_paths.clone(),
 786        ))
 787        .await
 788        .unwrap();
 789
 790        cx.update(|cx| {
 791            let settings_store = settings::SettingsStore::test(cx);
 792            cx.set_global(settings_store);
 793            cx.update_flags(true, vec!["agent-v2".to_string()]);
 794            ThreadMetadataStore::init_global(cx);
 795        });
 796
 797        cx.run_until_parked();
 798
 799        cx.update(|cx| {
 800            let store = ThreadMetadataStore::global(cx);
 801            let store = store.read(cx);
 802
 803            let entry_ids = store
 804                .entry_ids()
 805                .map(|session_id| session_id.0.to_string())
 806                .collect::<Vec<_>>();
 807            assert_eq!(entry_ids.len(), 2);
 808            assert!(entry_ids.contains(&"session-1".to_string()));
 809            assert!(entry_ids.contains(&"session-2".to_string()));
 810
 811            let first_path_entries = store
 812                .entries_for_path(&first_paths)
 813                .map(|entry| entry.session_id.0.to_string())
 814                .collect::<Vec<_>>();
 815            assert_eq!(first_path_entries, vec!["session-1"]);
 816
 817            let second_path_entries = store
 818                .entries_for_path(&second_paths)
 819                .map(|entry| entry.session_id.0.to_string())
 820                .collect::<Vec<_>>();
 821            assert_eq!(second_path_entries, vec!["session-2"]);
 822        });
 823    }
 824
 825    #[gpui::test]
 826    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 827        init_test(cx);
 828
 829        let first_paths = PathList::new(&[Path::new("/project-a")]);
 830        let second_paths = PathList::new(&[Path::new("/project-b")]);
 831        let initial_time = Utc::now();
 832        let updated_time = initial_time + chrono::Duration::seconds(1);
 833
 834        let initial_metadata = make_metadata(
 835            "session-1",
 836            "First Thread",
 837            initial_time,
 838            first_paths.clone(),
 839        );
 840
 841        let second_metadata = make_metadata(
 842            "session-2",
 843            "Second Thread",
 844            initial_time,
 845            second_paths.clone(),
 846        );
 847
 848        cx.update(|cx| {
 849            let store = ThreadMetadataStore::global(cx);
 850            store.update(cx, |store, cx| {
 851                store.save(initial_metadata, cx);
 852                store.save(second_metadata, cx);
 853            });
 854        });
 855
 856        cx.run_until_parked();
 857
 858        cx.update(|cx| {
 859            let store = ThreadMetadataStore::global(cx);
 860            let store = store.read(cx);
 861
 862            let first_path_entries = store
 863                .entries_for_path(&first_paths)
 864                .map(|entry| entry.session_id.0.to_string())
 865                .collect::<Vec<_>>();
 866            assert_eq!(first_path_entries, vec!["session-1"]);
 867
 868            let second_path_entries = store
 869                .entries_for_path(&second_paths)
 870                .map(|entry| entry.session_id.0.to_string())
 871                .collect::<Vec<_>>();
 872            assert_eq!(second_path_entries, vec!["session-2"]);
 873        });
 874
 875        let moved_metadata = make_metadata(
 876            "session-1",
 877            "First Thread",
 878            updated_time,
 879            second_paths.clone(),
 880        );
 881
 882        cx.update(|cx| {
 883            let store = ThreadMetadataStore::global(cx);
 884            store.update(cx, |store, cx| {
 885                store.save(moved_metadata, cx);
 886            });
 887        });
 888
 889        cx.run_until_parked();
 890
 891        cx.update(|cx| {
 892            let store = ThreadMetadataStore::global(cx);
 893            let store = store.read(cx);
 894
 895            let entry_ids = store
 896                .entry_ids()
 897                .map(|session_id| session_id.0.to_string())
 898                .collect::<Vec<_>>();
 899            assert_eq!(entry_ids.len(), 2);
 900            assert!(entry_ids.contains(&"session-1".to_string()));
 901            assert!(entry_ids.contains(&"session-2".to_string()));
 902
 903            let first_path_entries = store
 904                .entries_for_path(&first_paths)
 905                .map(|entry| entry.session_id.0.to_string())
 906                .collect::<Vec<_>>();
 907            assert!(first_path_entries.is_empty());
 908
 909            let second_path_entries = store
 910                .entries_for_path(&second_paths)
 911                .map(|entry| entry.session_id.0.to_string())
 912                .collect::<Vec<_>>();
 913            assert_eq!(second_path_entries.len(), 2);
 914            assert!(second_path_entries.contains(&"session-1".to_string()));
 915            assert!(second_path_entries.contains(&"session-2".to_string()));
 916        });
 917
 918        cx.update(|cx| {
 919            let store = ThreadMetadataStore::global(cx);
 920            store.update(cx, |store, cx| {
 921                store.delete(acp::SessionId::new("session-2"), cx);
 922            });
 923        });
 924
 925        cx.run_until_parked();
 926
 927        cx.update(|cx| {
 928            let store = ThreadMetadataStore::global(cx);
 929            let store = store.read(cx);
 930
 931            let entry_ids = store
 932                .entry_ids()
 933                .map(|session_id| session_id.0.to_string())
 934                .collect::<Vec<_>>();
 935            assert_eq!(entry_ids, vec!["session-1"]);
 936
 937            let second_path_entries = store
 938                .entries_for_path(&second_paths)
 939                .map(|entry| entry.session_id.0.to_string())
 940                .collect::<Vec<_>>();
 941            assert_eq!(second_path_entries, vec!["session-1"]);
 942        });
 943    }
 944
 945    #[gpui::test]
 946    async fn test_migrate_thread_metadata_migrates_only_missing_threads(cx: &mut TestAppContext) {
 947        init_test(cx);
 948
 949        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 950        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 951        let now = Utc::now();
 952
 953        let existing_metadata = ThreadMetadata {
 954            session_id: acp::SessionId::new("a-session-0"),
 955            agent_id: agent::ZED_AGENT_ID.clone(),
 956            title: "Existing Metadata".into(),
 957            updated_at: now - chrono::Duration::seconds(10),
 958            created_at: Some(now - chrono::Duration::seconds(10)),
 959            folder_paths: project_a_paths.clone(),
 960            archived: false,
 961        };
 962
 963        cx.update(|cx| {
 964            let store = ThreadMetadataStore::global(cx);
 965            store.update(cx, |store, cx| {
 966                store.save(existing_metadata, cx);
 967            });
 968        });
 969        cx.run_until_parked();
 970
 971        let threads_to_save = vec![
 972            (
 973                "a-session-0",
 974                "Thread A0 From Native Store",
 975                project_a_paths.clone(),
 976                now,
 977            ),
 978            (
 979                "a-session-1",
 980                "Thread A1",
 981                project_a_paths.clone(),
 982                now + chrono::Duration::seconds(1),
 983            ),
 984            (
 985                "b-session-0",
 986                "Thread B0",
 987                project_b_paths.clone(),
 988                now + chrono::Duration::seconds(2),
 989            ),
 990            (
 991                "projectless",
 992                "Projectless",
 993                PathList::default(),
 994                now + chrono::Duration::seconds(3),
 995            ),
 996        ];
 997
 998        for (session_id, title, paths, updated_at) in &threads_to_save {
 999            let save_task = cx.update(|cx| {
1000                let thread_store = ThreadStore::global(cx);
1001                let session_id = session_id.to_string();
1002                let title = title.to_string();
1003                let paths = paths.clone();
1004                thread_store.update(cx, |store, cx| {
1005                    store.save_thread(
1006                        acp::SessionId::new(session_id),
1007                        make_db_thread(&title, *updated_at),
1008                        paths,
1009                        cx,
1010                    )
1011                })
1012            });
1013            save_task.await.unwrap();
1014            cx.run_until_parked();
1015        }
1016
1017        cx.update(|cx| migrate_thread_metadata(cx));
1018        cx.run_until_parked();
1019
1020        let list = cx.update(|cx| {
1021            let store = ThreadMetadataStore::global(cx);
1022            store.read(cx).entries().cloned().collect::<Vec<_>>()
1023        });
1024
1025        assert_eq!(list.len(), 4);
1026        assert!(
1027            list.iter()
1028                .all(|metadata| metadata.agent_id.as_ref() == agent::ZED_AGENT_ID.as_ref())
1029        );
1030
1031        let existing_metadata = list
1032            .iter()
1033            .find(|metadata| metadata.session_id.0.as_ref() == "a-session-0")
1034            .unwrap();
1035        assert_eq!(existing_metadata.title.as_ref(), "Existing Metadata");
1036        assert!(!existing_metadata.archived);
1037
1038        let migrated_session_ids = list
1039            .iter()
1040            .map(|metadata| metadata.session_id.0.as_ref())
1041            .collect::<Vec<_>>();
1042        assert!(migrated_session_ids.contains(&"a-session-1"));
1043        assert!(migrated_session_ids.contains(&"b-session-0"));
1044        assert!(migrated_session_ids.contains(&"projectless"));
1045
1046        let migrated_entries = list
1047            .iter()
1048            .filter(|metadata| metadata.session_id.0.as_ref() != "a-session-0")
1049            .collect::<Vec<_>>();
1050        assert!(migrated_entries.iter().all(|metadata| metadata.archived));
1051    }
1052
1053    #[gpui::test]
1054    async fn test_migrate_thread_metadata_noops_when_all_threads_already_exist(
1055        cx: &mut TestAppContext,
1056    ) {
1057        init_test(cx);
1058
1059        let project_paths = PathList::new(&[Path::new("/project-a")]);
1060        let existing_updated_at = Utc::now();
1061
1062        let existing_metadata = ThreadMetadata {
1063            session_id: acp::SessionId::new("existing-session"),
1064            agent_id: agent::ZED_AGENT_ID.clone(),
1065            title: "Existing Metadata".into(),
1066            updated_at: existing_updated_at,
1067            created_at: Some(existing_updated_at),
1068            folder_paths: project_paths.clone(),
1069            archived: false,
1070        };
1071
1072        cx.update(|cx| {
1073            let store = ThreadMetadataStore::global(cx);
1074            store.update(cx, |store, cx| {
1075                store.save(existing_metadata, cx);
1076            });
1077        });
1078        cx.run_until_parked();
1079
1080        let save_task = cx.update(|cx| {
1081            let thread_store = ThreadStore::global(cx);
1082            thread_store.update(cx, |store, cx| {
1083                store.save_thread(
1084                    acp::SessionId::new("existing-session"),
1085                    make_db_thread(
1086                        "Updated Native Thread Title",
1087                        existing_updated_at + chrono::Duration::seconds(1),
1088                    ),
1089                    project_paths.clone(),
1090                    cx,
1091                )
1092            })
1093        });
1094        save_task.await.unwrap();
1095        cx.run_until_parked();
1096
1097        cx.update(|cx| migrate_thread_metadata(cx));
1098        cx.run_until_parked();
1099
1100        let list = cx.update(|cx| {
1101            let store = ThreadMetadataStore::global(cx);
1102            store.read(cx).entries().cloned().collect::<Vec<_>>()
1103        });
1104
1105        assert_eq!(list.len(), 1);
1106        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1107    }
1108
1109    #[gpui::test]
1110    async fn test_migrate_thread_metadata_archives_beyond_five_most_recent_per_project(
1111        cx: &mut TestAppContext,
1112    ) {
1113        init_test(cx);
1114
1115        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
1116        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
1117        let now = Utc::now();
1118
1119        // Create 7 threads for project A and 3 for project B
1120        let mut threads_to_save = Vec::new();
1121        for i in 0..7 {
1122            threads_to_save.push((
1123                format!("a-session-{i}"),
1124                format!("Thread A{i}"),
1125                project_a_paths.clone(),
1126                now + chrono::Duration::seconds(i as i64),
1127            ));
1128        }
1129        for i in 0..3 {
1130            threads_to_save.push((
1131                format!("b-session-{i}"),
1132                format!("Thread B{i}"),
1133                project_b_paths.clone(),
1134                now + chrono::Duration::seconds(i as i64),
1135            ));
1136        }
1137
1138        for (session_id, title, paths, updated_at) in &threads_to_save {
1139            let save_task = cx.update(|cx| {
1140                let thread_store = ThreadStore::global(cx);
1141                let session_id = session_id.to_string();
1142                let title = title.to_string();
1143                let paths = paths.clone();
1144                thread_store.update(cx, |store, cx| {
1145                    store.save_thread(
1146                        acp::SessionId::new(session_id),
1147                        make_db_thread(&title, *updated_at),
1148                        paths,
1149                        cx,
1150                    )
1151                })
1152            });
1153            save_task.await.unwrap();
1154            cx.run_until_parked();
1155        }
1156
1157        cx.update(|cx| migrate_thread_metadata(cx));
1158        cx.run_until_parked();
1159
1160        let list = cx.update(|cx| {
1161            let store = ThreadMetadataStore::global(cx);
1162            store.read(cx).entries().cloned().collect::<Vec<_>>()
1163        });
1164
1165        assert_eq!(list.len(), 10);
1166
1167        // Project A: 5 most recent should be unarchived, 2 oldest should be archived
1168        let mut project_a_entries: Vec<_> = list
1169            .iter()
1170            .filter(|m| m.folder_paths == project_a_paths)
1171            .collect();
1172        assert_eq!(project_a_entries.len(), 7);
1173        project_a_entries.sort_by(|a, b| b.updated_at.cmp(&a.updated_at));
1174
1175        for entry in &project_a_entries[..5] {
1176            assert!(
1177                !entry.archived,
1178                "Expected {} to be unarchived (top 5 most recent)",
1179                entry.session_id.0
1180            );
1181        }
1182        for entry in &project_a_entries[5..] {
1183            assert!(
1184                entry.archived,
1185                "Expected {} to be archived (older than top 5)",
1186                entry.session_id.0
1187            );
1188        }
1189
1190        // Project B: all 3 should be unarchived (under the limit)
1191        let project_b_entries: Vec<_> = list
1192            .iter()
1193            .filter(|m| m.folder_paths == project_b_paths)
1194            .collect();
1195        assert_eq!(project_b_entries.len(), 3);
1196        assert!(project_b_entries.iter().all(|m| !m.archived));
1197    }
1198
1199    #[gpui::test]
1200    async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1201        init_test(cx);
1202
1203        let fs = FakeFs::new(cx.executor());
1204        let project = Project::test(fs, None::<&Path>, cx).await;
1205        let connection = Rc::new(StubAgentConnection::new());
1206
1207        let thread = cx
1208            .update(|cx| {
1209                connection
1210                    .clone()
1211                    .new_session(project.clone(), PathList::default(), cx)
1212            })
1213            .await
1214            .unwrap();
1215        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1216
1217        cx.update(|cx| {
1218            thread.update(cx, |thread, cx| {
1219                thread.set_title("Draft Thread".into(), cx).detach();
1220            });
1221        });
1222        cx.run_until_parked();
1223
1224        let metadata_ids = cx.update(|cx| {
1225            ThreadMetadataStore::global(cx)
1226                .read(cx)
1227                .entry_ids()
1228                .collect::<Vec<_>>()
1229        });
1230        assert_eq!(metadata_ids, vec![session_id]);
1231
1232        drop(thread);
1233        cx.update(|_| {});
1234        cx.run_until_parked();
1235        cx.run_until_parked();
1236
1237        let metadata_ids = cx.update(|cx| {
1238            ThreadMetadataStore::global(cx)
1239                .read(cx)
1240                .entry_ids()
1241                .collect::<Vec<_>>()
1242        });
1243        assert!(
1244            metadata_ids.is_empty(),
1245            "expected empty draft thread metadata to be deleted on release"
1246        );
1247    }
1248
1249    #[gpui::test]
1250    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1251        init_test(cx);
1252
1253        let fs = FakeFs::new(cx.executor());
1254        let project = Project::test(fs, None::<&Path>, cx).await;
1255        let connection = Rc::new(StubAgentConnection::new());
1256
1257        let thread = cx
1258            .update(|cx| {
1259                connection
1260                    .clone()
1261                    .new_session(project.clone(), PathList::default(), cx)
1262            })
1263            .await
1264            .unwrap();
1265        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1266
1267        cx.update(|cx| {
1268            thread.update(cx, |thread, cx| {
1269                thread.push_user_content_block(None, "Hello".into(), cx);
1270            });
1271        });
1272        cx.run_until_parked();
1273
1274        let metadata_ids = cx.update(|cx| {
1275            ThreadMetadataStore::global(cx)
1276                .read(cx)
1277                .entry_ids()
1278                .collect::<Vec<_>>()
1279        });
1280        assert_eq!(metadata_ids, vec![session_id.clone()]);
1281
1282        drop(thread);
1283        cx.update(|_| {});
1284        cx.run_until_parked();
1285
1286        let metadata_ids = cx.update(|cx| {
1287            ThreadMetadataStore::global(cx)
1288                .read(cx)
1289                .entry_ids()
1290                .collect::<Vec<_>>()
1291        });
1292        assert_eq!(metadata_ids, vec![session_id]);
1293    }
1294
1295    #[gpui::test]
1296    async fn test_threads_without_project_association_are_archived_by_default(
1297        cx: &mut TestAppContext,
1298    ) {
1299        init_test(cx);
1300
1301        let fs = FakeFs::new(cx.executor());
1302        let project_without_worktree = Project::test(fs.clone(), None::<&Path>, cx).await;
1303        let project_with_worktree = Project::test(fs, [Path::new("/project-a")], cx).await;
1304        let connection = Rc::new(StubAgentConnection::new());
1305
1306        let thread_without_worktree = cx
1307            .update(|cx| {
1308                connection.clone().new_session(
1309                    project_without_worktree.clone(),
1310                    PathList::default(),
1311                    cx,
1312                )
1313            })
1314            .await
1315            .unwrap();
1316        let session_without_worktree =
1317            cx.read(|cx| thread_without_worktree.read(cx).session_id().clone());
1318
1319        cx.update(|cx| {
1320            thread_without_worktree.update(cx, |thread, cx| {
1321                thread.set_title("No Project Thread".into(), cx).detach();
1322            });
1323        });
1324        cx.run_until_parked();
1325
1326        let thread_with_worktree = cx
1327            .update(|cx| {
1328                connection.clone().new_session(
1329                    project_with_worktree.clone(),
1330                    PathList::default(),
1331                    cx,
1332                )
1333            })
1334            .await
1335            .unwrap();
1336        let session_with_worktree =
1337            cx.read(|cx| thread_with_worktree.read(cx).session_id().clone());
1338
1339        cx.update(|cx| {
1340            thread_with_worktree.update(cx, |thread, cx| {
1341                thread.set_title("Project Thread".into(), cx).detach();
1342            });
1343        });
1344        cx.run_until_parked();
1345
1346        cx.update(|cx| {
1347            let store = ThreadMetadataStore::global(cx);
1348            let store = store.read(cx);
1349
1350            let without_worktree = store
1351                .entry(&session_without_worktree)
1352                .expect("missing metadata for thread without project association");
1353            assert!(without_worktree.folder_paths.is_empty());
1354            assert!(
1355                without_worktree.archived,
1356                "expected thread without project association to be archived"
1357            );
1358
1359            let with_worktree = store
1360                .entry(&session_with_worktree)
1361                .expect("missing metadata for thread with project association");
1362            assert_eq!(
1363                with_worktree.folder_paths,
1364                PathList::new(&[Path::new("/project-a")])
1365            );
1366            assert!(
1367                !with_worktree.archived,
1368                "expected thread with project association to remain unarchived"
1369            );
1370        });
1371    }
1372
1373    #[gpui::test]
1374    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1375        init_test(cx);
1376
1377        let fs = FakeFs::new(cx.executor());
1378        let project = Project::test(fs, None::<&Path>, cx).await;
1379        let connection = Rc::new(StubAgentConnection::new());
1380
1381        // Create a regular (non-subagent) AcpThread.
1382        let regular_thread = cx
1383            .update(|cx| {
1384                connection
1385                    .clone()
1386                    .new_session(project.clone(), PathList::default(), cx)
1387            })
1388            .await
1389            .unwrap();
1390
1391        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1392
1393        // Set a title on the regular thread to trigger a save via handle_thread_update.
1394        cx.update(|cx| {
1395            regular_thread.update(cx, |thread, cx| {
1396                thread.set_title("Regular Thread".into(), cx).detach();
1397            });
1398        });
1399        cx.run_until_parked();
1400
1401        // Create a subagent AcpThread
1402        let subagent_session_id = acp::SessionId::new("subagent-session");
1403        let subagent_thread = cx.update(|cx| {
1404            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1405            cx.new(|cx| {
1406                acp_thread::AcpThread::new(
1407                    Some(regular_session_id.clone()),
1408                    Some("Subagent Thread".into()),
1409                    None,
1410                    connection.clone(),
1411                    project.clone(),
1412                    action_log,
1413                    subagent_session_id.clone(),
1414                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1415                    cx,
1416                )
1417            })
1418        });
1419
1420        // Set a title on the subagent thread to trigger handle_thread_update.
1421        cx.update(|cx| {
1422            subagent_thread.update(cx, |thread, cx| {
1423                thread
1424                    .set_title("Subagent Thread Title".into(), cx)
1425                    .detach();
1426            });
1427        });
1428        cx.run_until_parked();
1429
1430        // List all metadata from the store cache.
1431        let list = cx.update(|cx| {
1432            let store = ThreadMetadataStore::global(cx);
1433            store.read(cx).entries().cloned().collect::<Vec<_>>()
1434        });
1435
1436        // The subagent thread should NOT appear in the sidebar metadata.
1437        // Only the regular thread should be listed.
1438        assert_eq!(
1439            list.len(),
1440            1,
1441            "Expected only the regular thread in sidebar metadata, \
1442             but found {} entries (subagent threads are leaking into the sidebar)",
1443            list.len(),
1444        );
1445        assert_eq!(list[0].session_id, regular_session_id);
1446        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1447    }
1448
1449    #[test]
1450    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1451        let now = Utc::now();
1452
1453        let operations = vec![
1454            DbOperation::Upsert(make_metadata(
1455                "session-1",
1456                "First Thread",
1457                now,
1458                PathList::default(),
1459            )),
1460            DbOperation::Delete(acp::SessionId::new("session-1")),
1461        ];
1462
1463        let deduped = ThreadMetadataStore::dedup_db_operations(operations);
1464
1465        assert_eq!(deduped.len(), 1);
1466        assert_eq!(
1467            deduped[0],
1468            DbOperation::Delete(acp::SessionId::new("session-1"))
1469        );
1470    }
1471
1472    #[test]
1473    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1474        let now = Utc::now();
1475        let later = now + chrono::Duration::seconds(1);
1476
1477        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1478        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1479
1480        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1481            DbOperation::Upsert(old_metadata),
1482            DbOperation::Upsert(new_metadata.clone()),
1483        ]);
1484
1485        assert_eq!(deduped.len(), 1);
1486        assert_eq!(deduped[0], DbOperation::Upsert(new_metadata));
1487    }
1488
1489    #[test]
1490    fn test_dedup_db_operations_preserves_distinct_sessions() {
1491        let now = Utc::now();
1492
1493        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1494        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1495        let deduped = ThreadMetadataStore::dedup_db_operations(vec![
1496            DbOperation::Upsert(metadata1.clone()),
1497            DbOperation::Upsert(metadata2.clone()),
1498        ]);
1499
1500        assert_eq!(deduped.len(), 2);
1501        assert!(deduped.contains(&DbOperation::Upsert(metadata1)));
1502        assert!(deduped.contains(&DbOperation::Upsert(metadata2)));
1503    }
1504
1505    #[gpui::test]
1506    async fn test_archive_and_unarchive_thread(cx: &mut TestAppContext) {
1507        init_test(cx);
1508
1509        let paths = PathList::new(&[Path::new("/project-a")]);
1510        let now = Utc::now();
1511        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1512
1513        cx.update(|cx| {
1514            let store = ThreadMetadataStore::global(cx);
1515            store.update(cx, |store, cx| {
1516                store.save(metadata, cx);
1517            });
1518        });
1519
1520        cx.run_until_parked();
1521
1522        cx.update(|cx| {
1523            let store = ThreadMetadataStore::global(cx);
1524            let store = store.read(cx);
1525
1526            let path_entries = store
1527                .entries_for_path(&paths)
1528                .map(|e| e.session_id.0.to_string())
1529                .collect::<Vec<_>>();
1530            assert_eq!(path_entries, vec!["session-1"]);
1531
1532            let archived = store
1533                .archived_entries()
1534                .map(|e| e.session_id.0.to_string())
1535                .collect::<Vec<_>>();
1536            assert!(archived.is_empty());
1537        });
1538
1539        cx.update(|cx| {
1540            let store = ThreadMetadataStore::global(cx);
1541            store.update(cx, |store, cx| {
1542                store.archive(&acp::SessionId::new("session-1"), cx);
1543            });
1544        });
1545
1546        cx.run_until_parked();
1547
1548        cx.update(|cx| {
1549            let store = ThreadMetadataStore::global(cx);
1550            let store = store.read(cx);
1551
1552            let path_entries = store
1553                .entries_for_path(&paths)
1554                .map(|e| e.session_id.0.to_string())
1555                .collect::<Vec<_>>();
1556            assert!(path_entries.is_empty());
1557
1558            let archived = store.archived_entries().collect::<Vec<_>>();
1559            assert_eq!(archived.len(), 1);
1560            assert_eq!(archived[0].session_id.0.as_ref(), "session-1");
1561            assert!(archived[0].archived);
1562        });
1563
1564        cx.update(|cx| {
1565            let store = ThreadMetadataStore::global(cx);
1566            store.update(cx, |store, cx| {
1567                store.unarchive(&acp::SessionId::new("session-1"), cx);
1568            });
1569        });
1570
1571        cx.run_until_parked();
1572
1573        cx.update(|cx| {
1574            let store = ThreadMetadataStore::global(cx);
1575            let store = store.read(cx);
1576
1577            let path_entries = store
1578                .entries_for_path(&paths)
1579                .map(|e| e.session_id.0.to_string())
1580                .collect::<Vec<_>>();
1581            assert_eq!(path_entries, vec!["session-1"]);
1582
1583            let archived = store
1584                .archived_entries()
1585                .map(|e| e.session_id.0.to_string())
1586                .collect::<Vec<_>>();
1587            assert!(archived.is_empty());
1588        });
1589    }
1590
1591    #[gpui::test]
1592    async fn test_entries_for_path_excludes_archived(cx: &mut TestAppContext) {
1593        init_test(cx);
1594
1595        let paths = PathList::new(&[Path::new("/project-a")]);
1596        let now = Utc::now();
1597
1598        let metadata1 = make_metadata("session-1", "Active Thread", now, paths.clone());
1599        let metadata2 = make_metadata(
1600            "session-2",
1601            "Archived Thread",
1602            now - chrono::Duration::seconds(1),
1603            paths.clone(),
1604        );
1605
1606        cx.update(|cx| {
1607            let store = ThreadMetadataStore::global(cx);
1608            store.update(cx, |store, cx| {
1609                store.save(metadata1, cx);
1610                store.save(metadata2, cx);
1611            });
1612        });
1613
1614        cx.run_until_parked();
1615
1616        cx.update(|cx| {
1617            let store = ThreadMetadataStore::global(cx);
1618            store.update(cx, |store, cx| {
1619                store.archive(&acp::SessionId::new("session-2"), cx);
1620            });
1621        });
1622
1623        cx.run_until_parked();
1624
1625        cx.update(|cx| {
1626            let store = ThreadMetadataStore::global(cx);
1627            let store = store.read(cx);
1628
1629            let path_entries = store
1630                .entries_for_path(&paths)
1631                .map(|e| e.session_id.0.to_string())
1632                .collect::<Vec<_>>();
1633            assert_eq!(path_entries, vec!["session-1"]);
1634
1635            let all_entries = store
1636                .entries()
1637                .map(|e| e.session_id.0.to_string())
1638                .collect::<Vec<_>>();
1639            assert_eq!(all_entries.len(), 2);
1640            assert!(all_entries.contains(&"session-1".to_string()));
1641            assert!(all_entries.contains(&"session-2".to_string()));
1642
1643            let archived = store
1644                .archived_entries()
1645                .map(|e| e.session_id.0.to_string())
1646                .collect::<Vec<_>>();
1647            assert_eq!(archived, vec!["session-2"]);
1648        });
1649    }
1650
1651    #[gpui::test]
1652    async fn test_save_all_persists_multiple_threads(cx: &mut TestAppContext) {
1653        init_test(cx);
1654
1655        let paths = PathList::new(&[Path::new("/project-a")]);
1656        let now = Utc::now();
1657
1658        let m1 = make_metadata("session-1", "Thread One", now, paths.clone());
1659        let m2 = make_metadata(
1660            "session-2",
1661            "Thread Two",
1662            now - chrono::Duration::seconds(1),
1663            paths.clone(),
1664        );
1665        let m3 = make_metadata(
1666            "session-3",
1667            "Thread Three",
1668            now - chrono::Duration::seconds(2),
1669            paths,
1670        );
1671
1672        cx.update(|cx| {
1673            let store = ThreadMetadataStore::global(cx);
1674            store.update(cx, |store, cx| {
1675                store.save_all(vec![m1, m2, m3], cx);
1676            });
1677        });
1678
1679        cx.run_until_parked();
1680
1681        cx.update(|cx| {
1682            let store = ThreadMetadataStore::global(cx);
1683            let store = store.read(cx);
1684
1685            let all_entries = store
1686                .entries()
1687                .map(|e| e.session_id.0.to_string())
1688                .collect::<Vec<_>>();
1689            assert_eq!(all_entries.len(), 3);
1690            assert!(all_entries.contains(&"session-1".to_string()));
1691            assert!(all_entries.contains(&"session-2".to_string()));
1692            assert!(all_entries.contains(&"session-3".to_string()));
1693
1694            let entry_ids = store.entry_ids().collect::<Vec<_>>();
1695            assert_eq!(entry_ids.len(), 3);
1696        });
1697    }
1698
1699    #[gpui::test]
1700    async fn test_archived_flag_persists_across_reload(cx: &mut TestAppContext) {
1701        init_test(cx);
1702
1703        let paths = PathList::new(&[Path::new("/project-a")]);
1704        let now = Utc::now();
1705        let metadata = make_metadata("session-1", "Thread 1", now, paths.clone());
1706
1707        cx.update(|cx| {
1708            let store = ThreadMetadataStore::global(cx);
1709            store.update(cx, |store, cx| {
1710                store.save(metadata, cx);
1711            });
1712        });
1713
1714        cx.run_until_parked();
1715
1716        cx.update(|cx| {
1717            let store = ThreadMetadataStore::global(cx);
1718            store.update(cx, |store, cx| {
1719                store.archive(&acp::SessionId::new("session-1"), cx);
1720            });
1721        });
1722
1723        cx.run_until_parked();
1724
1725        cx.update(|cx| {
1726            let store = ThreadMetadataStore::global(cx);
1727            store.update(cx, |store, cx| {
1728                let _ = store.reload(cx);
1729            });
1730        });
1731
1732        cx.run_until_parked();
1733
1734        cx.update(|cx| {
1735            let store = ThreadMetadataStore::global(cx);
1736            let store = store.read(cx);
1737
1738            let thread = store
1739                .entries()
1740                .find(|e| e.session_id.0.as_ref() == "session-1")
1741                .expect("thread should exist after reload");
1742            assert!(thread.archived);
1743
1744            let path_entries = store
1745                .entries_for_path(&paths)
1746                .map(|e| e.session_id.0.to_string())
1747                .collect::<Vec<_>>();
1748            assert!(path_entries.is_empty());
1749
1750            let archived = store
1751                .archived_entries()
1752                .map(|e| e.session_id.0.to_string())
1753                .collect::<Vec<_>>();
1754            assert_eq!(archived, vec!["session-1"]);
1755        });
1756    }
1757
1758    #[gpui::test]
1759    async fn test_archive_nonexistent_thread_is_noop(cx: &mut TestAppContext) {
1760        init_test(cx);
1761
1762        cx.run_until_parked();
1763
1764        cx.update(|cx| {
1765            let store = ThreadMetadataStore::global(cx);
1766            store.update(cx, |store, cx| {
1767                store.archive(&acp::SessionId::new("nonexistent"), cx);
1768            });
1769        });
1770
1771        cx.run_until_parked();
1772
1773        cx.update(|cx| {
1774            let store = ThreadMetadataStore::global(cx);
1775            let store = store.read(cx);
1776
1777            assert!(store.is_empty());
1778            assert_eq!(store.entries().count(), 0);
1779            assert_eq!(store.archived_entries().count(), 0);
1780        });
1781    }
1782
1783    #[gpui::test]
1784    async fn test_save_followed_by_archiving_without_parking(cx: &mut TestAppContext) {
1785        init_test(cx);
1786
1787        let paths = PathList::new(&[Path::new("/project-a")]);
1788        let now = Utc::now();
1789        let metadata = make_metadata("session-1", "Thread 1", now, paths);
1790        let session_id = metadata.session_id.clone();
1791
1792        cx.update(|cx| {
1793            let store = ThreadMetadataStore::global(cx);
1794            store.update(cx, |store, cx| {
1795                store.save(metadata.clone(), cx);
1796                store.archive(&session_id, cx);
1797            });
1798        });
1799
1800        cx.run_until_parked();
1801
1802        cx.update(|cx| {
1803            let store = ThreadMetadataStore::global(cx);
1804            let store = store.read(cx);
1805
1806            let entries: Vec<ThreadMetadata> = store.entries().cloned().collect();
1807            pretty_assertions::assert_eq!(
1808                entries,
1809                vec![ThreadMetadata {
1810                    archived: true,
1811                    ..metadata
1812                }]
1813            );
1814        });
1815    }
1816}