thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AgentSessionInfo;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::Context as _;
   7use chrono::{DateTime, Utc};
   8use collections::HashMap;
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    SidebarThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We migrate the last 10 threads per project and skip threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
  46
  47    let store = SidebarThreadMetadataStore::global(cx);
  48    let db = store.read(cx).db.clone();
  49
  50    cx.spawn(async move |cx| {
  51        if !db.is_empty()? {
  52            return Ok::<(), anyhow::Error>(());
  53        }
  54
  55        let metadata = store.read_with(cx, |_store, app| {
  56            let mut migrated_threads_per_project = HashMap::default();
  57
  58            ThreadStore::global(app)
  59                .read(app)
  60                .entries()
  61                .filter_map(|entry| {
  62                    if entry.folder_paths.is_empty() {
  63                        return None;
  64                    }
  65
  66                    let migrated_thread_count = migrated_threads_per_project
  67                        .entry(entry.folder_paths.clone())
  68                        .or_insert(0);
  69                    if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
  70                        return None;
  71                    }
  72                    *migrated_thread_count += 1;
  73
  74                    Some(ThreadMetadata {
  75                        session_id: entry.id,
  76                        agent_id: None,
  77                        title: entry.title,
  78                        updated_at: entry.updated_at,
  79                        created_at: entry.created_at,
  80                        folder_paths: entry.folder_paths,
  81                    })
  82                })
  83                .collect::<Vec<_>>()
  84        });
  85
  86        log::info!("Migrating {} thread store entries", metadata.len());
  87
  88        // Manually save each entry to the database and call reload, otherwise
  89        // we'll end up triggering lots of reloads after each save
  90        for entry in metadata {
  91            db.save(entry).await?;
  92        }
  93
  94        log::info!("Finished migrating thread store entries");
  95
  96        let _ = store.update(cx, |store, cx| store.reload(cx));
  97        Ok(())
  98    })
  99    .detach_and_log_err(cx);
 100}
 101
 102struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
 103impl Global for GlobalThreadMetadataStore {}
 104
 105/// Lightweight metadata for any thread (native or ACP), enough to populate
 106/// the sidebar list and route to the correct load path when clicked.
 107#[derive(Debug, Clone, PartialEq)]
 108pub struct ThreadMetadata {
 109    pub session_id: acp::SessionId,
 110    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 111    pub agent_id: Option<AgentId>,
 112    pub title: SharedString,
 113    pub updated_at: DateTime<Utc>,
 114    pub created_at: Option<DateTime<Utc>>,
 115    pub folder_paths: PathList,
 116}
 117
 118impl ThreadMetadata {
 119    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 120        let session_id = session.session_id.clone();
 121        let title = session.title.clone().unwrap_or_default();
 122        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 123        let created_at = session.created_at.unwrap_or(updated_at);
 124        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 125        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 126            None
 127        } else {
 128            Some(agent_id)
 129        };
 130        Self {
 131            session_id,
 132            agent_id,
 133            title,
 134            updated_at,
 135            created_at: Some(created_at),
 136            folder_paths,
 137        }
 138    }
 139
 140    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
 141        let thread_ref = thread.read(cx);
 142        let session_id = thread_ref.session_id().clone();
 143        let title = thread_ref
 144            .title()
 145            .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 146        let updated_at = Utc::now();
 147
 148        let agent_id = thread_ref.connection().agent_id();
 149
 150        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 151            None
 152        } else {
 153            Some(agent_id)
 154        };
 155
 156        let folder_paths = {
 157            let project = thread_ref.project().read(cx);
 158            let paths: Vec<Arc<Path>> = project
 159                .visible_worktrees(cx)
 160                .map(|worktree| worktree.read(cx).abs_path())
 161                .collect();
 162            PathList::new(&paths)
 163        };
 164
 165        Self {
 166            session_id,
 167            agent_id,
 168            title,
 169            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 170            updated_at,
 171            folder_paths,
 172        }
 173    }
 174}
 175
 176/// The store holds all metadata needed to show threads in the sidebar.
 177/// Effectively, all threads stored in here are "non-archived".
 178///
 179/// Automatically listens to AcpThread events and updates metadata if it has changed.
 180pub struct SidebarThreadMetadataStore {
 181    db: ThreadMetadataDb,
 182    threads: Vec<ThreadMetadata>,
 183    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 184    reload_task: Option<Shared<Task<()>>>,
 185    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 186    pending_thread_ops_tx: smol::channel::Sender<DbOperation>,
 187    _db_operations_task: Task<()>,
 188}
 189
 190#[derive(Debug, PartialEq)]
 191enum DbOperation {
 192    Insert(ThreadMetadata),
 193    Delete(acp::SessionId),
 194}
 195
 196impl DbOperation {
 197    fn id(&self) -> &acp::SessionId {
 198        match self {
 199            DbOperation::Insert(thread) => &thread.session_id,
 200            DbOperation::Delete(session_id) => session_id,
 201        }
 202    }
 203}
 204
 205impl SidebarThreadMetadataStore {
 206    #[cfg(not(any(test, feature = "test-support")))]
 207    pub fn init_global(cx: &mut App) {
 208        if cx.has_global::<Self>() {
 209            return;
 210        }
 211
 212        let db = ThreadMetadataDb::global(cx);
 213        let thread_store = cx.new(|cx| Self::new(db, cx));
 214        cx.set_global(GlobalThreadMetadataStore(thread_store));
 215    }
 216
 217    #[cfg(any(test, feature = "test-support"))]
 218    pub fn init_global(cx: &mut App) {
 219        let thread = std::thread::current();
 220        let test_name = thread.name().unwrap_or("unknown_test");
 221        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 222        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 223        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 224        cx.set_global(GlobalThreadMetadataStore(thread_store));
 225    }
 226
 227    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 228        cx.try_global::<GlobalThreadMetadataStore>()
 229            .map(|store| store.0.clone())
 230    }
 231
 232    pub fn global(cx: &App) -> Entity<Self> {
 233        cx.global::<GlobalThreadMetadataStore>().0.clone()
 234    }
 235
 236    pub fn is_empty(&self) -> bool {
 237        self.threads.is_empty()
 238    }
 239
 240    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 241        self.threads.iter().cloned()
 242    }
 243
 244    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 245        self.threads.iter().map(|thread| thread.session_id.clone())
 246    }
 247
 248    pub fn entries_for_path(
 249        &self,
 250        path_list: &PathList,
 251    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 252        self.threads_by_paths
 253            .get(path_list)
 254            .into_iter()
 255            .flatten()
 256            .cloned()
 257    }
 258
 259    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 260        let db = self.db.clone();
 261        self.reload_task.take();
 262
 263        let list_task = cx
 264            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 265
 266        let reload_task = cx
 267            .spawn(async move |this, cx| {
 268                let Some(rows) = list_task.await.log_err() else {
 269                    return;
 270                };
 271
 272                this.update(cx, |this, cx| {
 273                    this.threads.clear();
 274                    this.threads_by_paths.clear();
 275
 276                    for row in rows {
 277                        this.threads_by_paths
 278                            .entry(row.folder_paths.clone())
 279                            .or_default()
 280                            .push(row.clone());
 281                        this.threads.push(row);
 282                    }
 283
 284                    cx.notify();
 285                })
 286                .ok();
 287            })
 288            .shared();
 289        self.reload_task = Some(reload_task.clone());
 290        reload_task
 291    }
 292
 293    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) {
 294        if !cx.has_flag::<AgentV2FeatureFlag>() {
 295            return;
 296        }
 297
 298        self.pending_thread_ops_tx
 299            .try_send(DbOperation::Insert(metadata))
 300            .log_err();
 301    }
 302
 303    pub fn delete(&mut self, session_id: acp::SessionId, cx: &mut Context<Self>) {
 304        if !cx.has_flag::<AgentV2FeatureFlag>() {
 305            return;
 306        }
 307
 308        self.pending_thread_ops_tx
 309            .try_send(DbOperation::Delete(session_id))
 310            .log_err();
 311    }
 312
 313    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 314        let weak_store = cx.weak_entity();
 315
 316        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 317            // Don't track subagent threads in the sidebar.
 318            if thread.parent_session_id().is_some() {
 319                return;
 320            }
 321
 322            let thread_entity = cx.entity();
 323
 324            cx.on_release({
 325                let weak_store = weak_store.clone();
 326                move |thread, cx| {
 327                    weak_store
 328                        .update(cx, |store, cx| {
 329                            let session_id = thread.session_id().clone();
 330                            store.session_subscriptions.remove(&session_id);
 331                            if thread.entries().is_empty() {
 332                                // Empty threads can be unloaded without ever being
 333                                // durably persisted by the underlying agent.
 334                                store.delete(session_id, cx);
 335                            }
 336                        })
 337                        .ok();
 338                }
 339            })
 340            .detach();
 341
 342            weak_store
 343                .update(cx, |this, cx| {
 344                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 345                    this.session_subscriptions
 346                        .insert(thread.session_id().clone(), subscription);
 347                })
 348                .ok();
 349        })
 350        .detach();
 351
 352        let (tx, rx) = smol::channel::unbounded();
 353        let _db_operations_task = cx.spawn({
 354            let db = db.clone();
 355            async move |this, cx| {
 356                while let Ok(first_update) = rx.recv().await {
 357                    let mut updates = vec![first_update];
 358                    while let Ok(update) = rx.try_recv() {
 359                        updates.push(update);
 360                    }
 361                    let updates = Self::dedup_db_operations(updates);
 362                    for operation in updates {
 363                        match operation {
 364                            DbOperation::Insert(metadata) => {
 365                                db.save(metadata).await.log_err();
 366                            }
 367                            DbOperation::Delete(session_id) => {
 368                                db.delete(session_id).await.log_err();
 369                            }
 370                        }
 371                    }
 372
 373                    this.update(cx, |this, cx| this.reload(cx)).ok();
 374                }
 375            }
 376        });
 377
 378        let mut this = Self {
 379            db,
 380            threads: Vec::new(),
 381            threads_by_paths: HashMap::default(),
 382            reload_task: None,
 383            session_subscriptions: HashMap::default(),
 384            pending_thread_ops_tx: tx,
 385            _db_operations_task,
 386        };
 387        let _ = this.reload(cx);
 388        this
 389    }
 390
 391    fn dedup_db_operations(operations: Vec<DbOperation>) -> Vec<DbOperation> {
 392        let mut ops = HashMap::default();
 393        for operation in operations.into_iter().rev() {
 394            if ops.contains_key(operation.id()) {
 395                continue;
 396            }
 397            ops.insert(operation.id().clone(), operation);
 398        }
 399        ops.into_values().collect()
 400    }
 401
 402    fn handle_thread_update(
 403        &mut self,
 404        thread: Entity<acp_thread::AcpThread>,
 405        event: &acp_thread::AcpThreadEvent,
 406        cx: &mut Context<Self>,
 407    ) {
 408        // Don't track subagent threads in the sidebar.
 409        if thread.read(cx).parent_session_id().is_some() {
 410            return;
 411        }
 412
 413        match event {
 414            acp_thread::AcpThreadEvent::NewEntry
 415            | acp_thread::AcpThreadEvent::TitleUpdated
 416            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 417            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 418            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 419            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 420            | acp_thread::AcpThreadEvent::Retry(_)
 421            | acp_thread::AcpThreadEvent::Stopped(_)
 422            | acp_thread::AcpThreadEvent::Error
 423            | acp_thread::AcpThreadEvent::LoadError(_)
 424            | acp_thread::AcpThreadEvent::Refusal => {
 425                let metadata = ThreadMetadata::from_thread(&thread, cx);
 426                self.save(metadata, cx);
 427            }
 428            _ => {}
 429        }
 430    }
 431}
 432
 433impl Global for SidebarThreadMetadataStore {}
 434
 435struct ThreadMetadataDb(ThreadSafeConnection);
 436
 437impl Domain for ThreadMetadataDb {
 438    const NAME: &str = stringify!(ThreadMetadataDb);
 439
 440    const MIGRATIONS: &[&str] = &[sql!(
 441        CREATE TABLE IF NOT EXISTS sidebar_threads(
 442            session_id TEXT PRIMARY KEY,
 443            agent_id TEXT,
 444            title TEXT NOT NULL,
 445            updated_at TEXT NOT NULL,
 446            created_at TEXT,
 447            folder_paths TEXT,
 448            folder_paths_order TEXT
 449        ) STRICT;
 450    )];
 451}
 452
 453db::static_connection!(ThreadMetadataDb, []);
 454
 455impl ThreadMetadataDb {
 456    pub fn is_empty(&self) -> anyhow::Result<bool> {
 457        self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
 458            .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
 459    }
 460
 461    /// List all sidebar thread metadata, ordered by updated_at descending.
 462    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 463        self.select::<ThreadMetadata>(
 464            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
 465             FROM sidebar_threads \
 466             ORDER BY updated_at DESC"
 467        )?()
 468    }
 469
 470    /// Upsert metadata for a thread.
 471    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 472        let id = row.session_id.0.clone();
 473        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
 474        let title = row.title.to_string();
 475        let updated_at = row.updated_at.to_rfc3339();
 476        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 477        let serialized = row.folder_paths.serialize();
 478        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 479            (None, None)
 480        } else {
 481            (Some(serialized.paths), Some(serialized.order))
 482        };
 483
 484        self.write(move |conn| {
 485            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
 486                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
 487                       ON CONFLICT(session_id) DO UPDATE SET \
 488                           agent_id = excluded.agent_id, \
 489                           title = excluded.title, \
 490                           updated_at = excluded.updated_at, \
 491                           folder_paths = excluded.folder_paths, \
 492                           folder_paths_order = excluded.folder_paths_order";
 493            let mut stmt = Statement::prepare(conn, sql)?;
 494            let mut i = stmt.bind(&id, 1)?;
 495            i = stmt.bind(&agent_id, i)?;
 496            i = stmt.bind(&title, i)?;
 497            i = stmt.bind(&updated_at, i)?;
 498            i = stmt.bind(&created_at, i)?;
 499            i = stmt.bind(&folder_paths, i)?;
 500            stmt.bind(&folder_paths_order, i)?;
 501            stmt.exec()
 502        })
 503        .await
 504    }
 505
 506    /// Delete metadata for a single thread.
 507    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 508        let id = session_id.0.clone();
 509        self.write(move |conn| {
 510            let mut stmt =
 511                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 512            stmt.bind(&id, 1)?;
 513            stmt.exec()
 514        })
 515        .await
 516    }
 517}
 518
 519impl Column for ThreadMetadata {
 520    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 521        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 522        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 523        let (title, next): (String, i32) = Column::column(statement, next)?;
 524        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 525        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 526        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 527        let (folder_paths_order_str, next): (Option<String>, i32) =
 528            Column::column(statement, next)?;
 529
 530        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 531        let created_at = created_at_str
 532            .as_deref()
 533            .map(DateTime::parse_from_rfc3339)
 534            .transpose()?
 535            .map(|dt| dt.with_timezone(&Utc));
 536
 537        let folder_paths = folder_paths_str
 538            .map(|paths| {
 539                PathList::deserialize(&util::path_list::SerializedPathList {
 540                    paths,
 541                    order: folder_paths_order_str.unwrap_or_default(),
 542                })
 543            })
 544            .unwrap_or_default();
 545
 546        Ok((
 547            ThreadMetadata {
 548                session_id: acp::SessionId::new(id),
 549                agent_id: agent_id.map(|id| AgentId::new(id)),
 550                title: title.into(),
 551                updated_at,
 552                created_at,
 553                folder_paths,
 554            },
 555            next,
 556        ))
 557    }
 558}
 559
 560#[cfg(test)]
 561mod tests {
 562    use super::*;
 563    use acp_thread::{AgentConnection, StubAgentConnection};
 564    use action_log::ActionLog;
 565    use agent::DbThread;
 566    use agent_client_protocol as acp;
 567    use feature_flags::FeatureFlagAppExt;
 568    use gpui::TestAppContext;
 569    use project::FakeFs;
 570    use project::Project;
 571    use std::path::Path;
 572    use std::rc::Rc;
 573
 574    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 575        DbThread {
 576            title: title.to_string().into(),
 577            messages: Vec::new(),
 578            updated_at,
 579            detailed_summary: None,
 580            initial_project_snapshot: None,
 581            cumulative_token_usage: Default::default(),
 582            request_token_usage: Default::default(),
 583            model: None,
 584            profile: None,
 585            imported: false,
 586            subagent_context: None,
 587            speed: None,
 588            thinking_enabled: false,
 589            thinking_effort: None,
 590            draft_prompt: None,
 591            ui_scroll_position: None,
 592        }
 593    }
 594
 595    fn make_metadata(
 596        session_id: &str,
 597        title: &str,
 598        updated_at: DateTime<Utc>,
 599        folder_paths: PathList,
 600    ) -> ThreadMetadata {
 601        ThreadMetadata {
 602            session_id: acp::SessionId::new(session_id),
 603            agent_id: None,
 604            title: title.to_string().into(),
 605            updated_at,
 606            created_at: Some(updated_at),
 607            folder_paths,
 608        }
 609    }
 610
 611    #[gpui::test]
 612    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 613        let first_paths = PathList::new(&[Path::new("/project-a")]);
 614        let second_paths = PathList::new(&[Path::new("/project-b")]);
 615        let now = Utc::now();
 616        let older = now - chrono::Duration::seconds(1);
 617
 618        let thread = std::thread::current();
 619        let test_name = thread.name().unwrap_or("unknown_test");
 620        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 621        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 622            &db_name,
 623        )));
 624
 625        db.save(make_metadata(
 626            "session-1",
 627            "First Thread",
 628            now,
 629            first_paths.clone(),
 630        ))
 631        .await
 632        .unwrap();
 633        db.save(make_metadata(
 634            "session-2",
 635            "Second Thread",
 636            older,
 637            second_paths.clone(),
 638        ))
 639        .await
 640        .unwrap();
 641
 642        cx.update(|cx| {
 643            let settings_store = settings::SettingsStore::test(cx);
 644            cx.set_global(settings_store);
 645            cx.update_flags(true, vec!["agent-v2".to_string()]);
 646            SidebarThreadMetadataStore::init_global(cx);
 647        });
 648
 649        cx.run_until_parked();
 650
 651        cx.update(|cx| {
 652            let store = SidebarThreadMetadataStore::global(cx);
 653            let store = store.read(cx);
 654
 655            let entry_ids = store
 656                .entry_ids()
 657                .map(|session_id| session_id.0.to_string())
 658                .collect::<Vec<_>>();
 659            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 660
 661            let first_path_entries = store
 662                .entries_for_path(&first_paths)
 663                .map(|entry| entry.session_id.0.to_string())
 664                .collect::<Vec<_>>();
 665            assert_eq!(first_path_entries, vec!["session-1"]);
 666
 667            let second_path_entries = store
 668                .entries_for_path(&second_paths)
 669                .map(|entry| entry.session_id.0.to_string())
 670                .collect::<Vec<_>>();
 671            assert_eq!(second_path_entries, vec!["session-2"]);
 672        });
 673    }
 674
 675    #[gpui::test]
 676    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 677        cx.update(|cx| {
 678            let settings_store = settings::SettingsStore::test(cx);
 679            cx.set_global(settings_store);
 680            cx.update_flags(true, vec!["agent-v2".to_string()]);
 681            SidebarThreadMetadataStore::init_global(cx);
 682        });
 683
 684        let first_paths = PathList::new(&[Path::new("/project-a")]);
 685        let second_paths = PathList::new(&[Path::new("/project-b")]);
 686        let initial_time = Utc::now();
 687        let updated_time = initial_time + chrono::Duration::seconds(1);
 688
 689        let initial_metadata = make_metadata(
 690            "session-1",
 691            "First Thread",
 692            initial_time,
 693            first_paths.clone(),
 694        );
 695
 696        let second_metadata = make_metadata(
 697            "session-2",
 698            "Second Thread",
 699            initial_time,
 700            second_paths.clone(),
 701        );
 702
 703        cx.update(|cx| {
 704            let store = SidebarThreadMetadataStore::global(cx);
 705            store.update(cx, |store, cx| {
 706                store.save(initial_metadata, cx);
 707                store.save(second_metadata, cx);
 708            });
 709        });
 710
 711        cx.run_until_parked();
 712
 713        cx.update(|cx| {
 714            let store = SidebarThreadMetadataStore::global(cx);
 715            let store = store.read(cx);
 716
 717            let first_path_entries = store
 718                .entries_for_path(&first_paths)
 719                .map(|entry| entry.session_id.0.to_string())
 720                .collect::<Vec<_>>();
 721            assert_eq!(first_path_entries, vec!["session-1"]);
 722
 723            let second_path_entries = store
 724                .entries_for_path(&second_paths)
 725                .map(|entry| entry.session_id.0.to_string())
 726                .collect::<Vec<_>>();
 727            assert_eq!(second_path_entries, vec!["session-2"]);
 728        });
 729
 730        let moved_metadata = make_metadata(
 731            "session-1",
 732            "First Thread",
 733            updated_time,
 734            second_paths.clone(),
 735        );
 736
 737        cx.update(|cx| {
 738            let store = SidebarThreadMetadataStore::global(cx);
 739            store.update(cx, |store, cx| {
 740                store.save(moved_metadata, cx);
 741            });
 742        });
 743
 744        cx.run_until_parked();
 745
 746        cx.update(|cx| {
 747            let store = SidebarThreadMetadataStore::global(cx);
 748            let store = store.read(cx);
 749
 750            let entry_ids = store
 751                .entry_ids()
 752                .map(|session_id| session_id.0.to_string())
 753                .collect::<Vec<_>>();
 754            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 755
 756            let first_path_entries = store
 757                .entries_for_path(&first_paths)
 758                .map(|entry| entry.session_id.0.to_string())
 759                .collect::<Vec<_>>();
 760            assert!(first_path_entries.is_empty());
 761
 762            let second_path_entries = store
 763                .entries_for_path(&second_paths)
 764                .map(|entry| entry.session_id.0.to_string())
 765                .collect::<Vec<_>>();
 766            assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
 767        });
 768
 769        cx.update(|cx| {
 770            let store = SidebarThreadMetadataStore::global(cx);
 771            store.update(cx, |store, cx| {
 772                store.delete(acp::SessionId::new("session-2"), cx);
 773            });
 774        });
 775
 776        cx.run_until_parked();
 777
 778        cx.update(|cx| {
 779            let store = SidebarThreadMetadataStore::global(cx);
 780            let store = store.read(cx);
 781
 782            let entry_ids = store
 783                .entry_ids()
 784                .map(|session_id| session_id.0.to_string())
 785                .collect::<Vec<_>>();
 786            assert_eq!(entry_ids, vec!["session-1"]);
 787
 788            let second_path_entries = store
 789                .entries_for_path(&second_paths)
 790                .map(|entry| entry.session_id.0.to_string())
 791                .collect::<Vec<_>>();
 792            assert_eq!(second_path_entries, vec!["session-1"]);
 793        });
 794    }
 795
 796    #[gpui::test]
 797    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
 798        cx.update(|cx| {
 799            ThreadStore::init_global(cx);
 800            SidebarThreadMetadataStore::init_global(cx);
 801        });
 802
 803        // Verify the cache is empty before migration
 804        let list = cx.update(|cx| {
 805            let store = SidebarThreadMetadataStore::global(cx);
 806            store.read(cx).entries().collect::<Vec<_>>()
 807        });
 808        assert_eq!(list.len(), 0);
 809
 810        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 811        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 812        let now = Utc::now();
 813
 814        for index in 0..12 {
 815            let updated_at = now + chrono::Duration::seconds(index as i64);
 816            let session_id = format!("project-a-session-{index}");
 817            let title = format!("Project A Thread {index}");
 818
 819            let save_task = cx.update(|cx| {
 820                let thread_store = ThreadStore::global(cx);
 821                let session_id = session_id.clone();
 822                let title = title.clone();
 823                let project_a_paths = project_a_paths.clone();
 824                thread_store.update(cx, |store, cx| {
 825                    store.save_thread(
 826                        acp::SessionId::new(session_id),
 827                        make_db_thread(&title, updated_at),
 828                        project_a_paths,
 829                        cx,
 830                    )
 831                })
 832            });
 833            save_task.await.unwrap();
 834            cx.run_until_parked();
 835        }
 836
 837        for index in 0..3 {
 838            let updated_at = now + chrono::Duration::seconds(100 + index as i64);
 839            let session_id = format!("project-b-session-{index}");
 840            let title = format!("Project B Thread {index}");
 841
 842            let save_task = cx.update(|cx| {
 843                let thread_store = ThreadStore::global(cx);
 844                let session_id = session_id.clone();
 845                let title = title.clone();
 846                let project_b_paths = project_b_paths.clone();
 847                thread_store.update(cx, |store, cx| {
 848                    store.save_thread(
 849                        acp::SessionId::new(session_id),
 850                        make_db_thread(&title, updated_at),
 851                        project_b_paths,
 852                        cx,
 853                    )
 854                })
 855            });
 856            save_task.await.unwrap();
 857            cx.run_until_parked();
 858        }
 859
 860        let save_projectless = cx.update(|cx| {
 861            let thread_store = ThreadStore::global(cx);
 862            thread_store.update(cx, |store, cx| {
 863                store.save_thread(
 864                    acp::SessionId::new("projectless-session"),
 865                    make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
 866                    PathList::default(),
 867                    cx,
 868                )
 869            })
 870        });
 871        save_projectless.await.unwrap();
 872        cx.run_until_parked();
 873
 874        // Run migration
 875        cx.update(|cx| {
 876            migrate_thread_metadata(cx);
 877        });
 878
 879        cx.run_until_parked();
 880
 881        // Verify the metadata was migrated, limited to 10 per project, and
 882        // projectless threads were skipped.
 883        let list = cx.update(|cx| {
 884            let store = SidebarThreadMetadataStore::global(cx);
 885            store.read(cx).entries().collect::<Vec<_>>()
 886        });
 887        assert_eq!(list.len(), 13);
 888
 889        assert!(
 890            list.iter()
 891                .all(|metadata| !metadata.folder_paths.is_empty())
 892        );
 893        assert!(
 894            list.iter()
 895                .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
 896        );
 897
 898        let project_a_entries = list
 899            .iter()
 900            .filter(|metadata| metadata.folder_paths == project_a_paths)
 901            .collect::<Vec<_>>();
 902        assert_eq!(project_a_entries.len(), 10);
 903        assert_eq!(
 904            project_a_entries
 905                .iter()
 906                .map(|metadata| metadata.session_id.0.as_ref())
 907                .collect::<Vec<_>>(),
 908            vec![
 909                "project-a-session-11",
 910                "project-a-session-10",
 911                "project-a-session-9",
 912                "project-a-session-8",
 913                "project-a-session-7",
 914                "project-a-session-6",
 915                "project-a-session-5",
 916                "project-a-session-4",
 917                "project-a-session-3",
 918                "project-a-session-2",
 919            ]
 920        );
 921        assert!(
 922            project_a_entries
 923                .iter()
 924                .all(|metadata| metadata.agent_id.is_none())
 925        );
 926
 927        let project_b_entries = list
 928            .iter()
 929            .filter(|metadata| metadata.folder_paths == project_b_paths)
 930            .collect::<Vec<_>>();
 931        assert_eq!(project_b_entries.len(), 3);
 932        assert_eq!(
 933            project_b_entries
 934                .iter()
 935                .map(|metadata| metadata.session_id.0.as_ref())
 936                .collect::<Vec<_>>(),
 937            vec![
 938                "project-b-session-2",
 939                "project-b-session-1",
 940                "project-b-session-0",
 941            ]
 942        );
 943        assert!(
 944            project_b_entries
 945                .iter()
 946                .all(|metadata| metadata.agent_id.is_none())
 947        );
 948    }
 949
 950    #[gpui::test]
 951    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
 952        cx.update(|cx| {
 953            ThreadStore::init_global(cx);
 954            SidebarThreadMetadataStore::init_global(cx);
 955        });
 956
 957        // Pre-populate the metadata store with existing data
 958        let existing_metadata = ThreadMetadata {
 959            session_id: acp::SessionId::new("existing-session"),
 960            agent_id: None,
 961            title: "Existing Thread".into(),
 962            updated_at: Utc::now(),
 963            created_at: Some(Utc::now()),
 964            folder_paths: PathList::default(),
 965        };
 966
 967        cx.update(|cx| {
 968            let store = SidebarThreadMetadataStore::global(cx);
 969            store.update(cx, |store, cx| {
 970                store.save(existing_metadata, cx);
 971            });
 972        });
 973
 974        cx.run_until_parked();
 975
 976        // Add an entry to native thread store that should NOT be migrated
 977        let save_task = cx.update(|cx| {
 978            let thread_store = ThreadStore::global(cx);
 979            thread_store.update(cx, |store, cx| {
 980                store.save_thread(
 981                    acp::SessionId::new("native-session"),
 982                    make_db_thread("Native Thread", Utc::now()),
 983                    PathList::default(),
 984                    cx,
 985                )
 986            })
 987        });
 988        save_task.await.unwrap();
 989        cx.run_until_parked();
 990
 991        // Run migration - should skip because metadata store is not empty
 992        cx.update(|cx| {
 993            migrate_thread_metadata(cx);
 994        });
 995
 996        cx.run_until_parked();
 997
 998        // Verify only the existing metadata is present (migration was skipped)
 999        let list = cx.update(|cx| {
1000            let store = SidebarThreadMetadataStore::global(cx);
1001            store.read(cx).entries().collect::<Vec<_>>()
1002        });
1003        assert_eq!(list.len(), 1);
1004        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
1005    }
1006
1007    #[gpui::test]
1008    async fn test_empty_thread_metadata_deleted_when_thread_released(cx: &mut TestAppContext) {
1009        cx.update(|cx| {
1010            let settings_store = settings::SettingsStore::test(cx);
1011            cx.set_global(settings_store);
1012            cx.update_flags(true, vec!["agent-v2".to_string()]);
1013            ThreadStore::init_global(cx);
1014            SidebarThreadMetadataStore::init_global(cx);
1015        });
1016
1017        let fs = FakeFs::new(cx.executor());
1018        let project = Project::test(fs, None::<&Path>, cx).await;
1019        let connection = Rc::new(StubAgentConnection::new());
1020
1021        let thread = cx
1022            .update(|cx| {
1023                connection
1024                    .clone()
1025                    .new_session(project.clone(), PathList::default(), cx)
1026            })
1027            .await
1028            .unwrap();
1029        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1030
1031        cx.update(|cx| {
1032            thread.update(cx, |thread, cx| {
1033                thread.set_title("Draft Thread".into(), cx).detach();
1034            });
1035        });
1036        cx.run_until_parked();
1037
1038        let metadata_ids = cx.update(|cx| {
1039            SidebarThreadMetadataStore::global(cx)
1040                .read(cx)
1041                .entry_ids()
1042                .collect::<Vec<_>>()
1043        });
1044        assert_eq!(metadata_ids, vec![session_id]);
1045
1046        drop(thread);
1047        cx.update(|_| {});
1048        cx.run_until_parked();
1049        cx.run_until_parked();
1050
1051        let metadata_ids = cx.update(|cx| {
1052            SidebarThreadMetadataStore::global(cx)
1053                .read(cx)
1054                .entry_ids()
1055                .collect::<Vec<_>>()
1056        });
1057        assert!(
1058            metadata_ids.is_empty(),
1059            "expected empty draft thread metadata to be deleted on release"
1060        );
1061    }
1062
1063    #[gpui::test]
1064    async fn test_nonempty_thread_metadata_preserved_when_thread_released(cx: &mut TestAppContext) {
1065        cx.update(|cx| {
1066            let settings_store = settings::SettingsStore::test(cx);
1067            cx.set_global(settings_store);
1068            cx.update_flags(true, vec!["agent-v2".to_string()]);
1069            ThreadStore::init_global(cx);
1070            SidebarThreadMetadataStore::init_global(cx);
1071        });
1072
1073        let fs = FakeFs::new(cx.executor());
1074        let project = Project::test(fs, None::<&Path>, cx).await;
1075        let connection = Rc::new(StubAgentConnection::new());
1076
1077        let thread = cx
1078            .update(|cx| {
1079                connection
1080                    .clone()
1081                    .new_session(project.clone(), PathList::default(), cx)
1082            })
1083            .await
1084            .unwrap();
1085        let session_id = cx.read(|cx| thread.read(cx).session_id().clone());
1086
1087        cx.update(|cx| {
1088            thread.update(cx, |thread, cx| {
1089                thread.push_user_content_block(None, "Hello".into(), cx);
1090            });
1091        });
1092        cx.run_until_parked();
1093
1094        let metadata_ids = cx.update(|cx| {
1095            SidebarThreadMetadataStore::global(cx)
1096                .read(cx)
1097                .entry_ids()
1098                .collect::<Vec<_>>()
1099        });
1100        assert_eq!(metadata_ids, vec![session_id.clone()]);
1101
1102        drop(thread);
1103        cx.update(|_| {});
1104        cx.run_until_parked();
1105
1106        let metadata_ids = cx.update(|cx| {
1107            SidebarThreadMetadataStore::global(cx)
1108                .read(cx)
1109                .entry_ids()
1110                .collect::<Vec<_>>()
1111        });
1112        assert_eq!(metadata_ids, vec![session_id]);
1113    }
1114
1115    #[gpui::test]
1116    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
1117        cx.update(|cx| {
1118            let settings_store = settings::SettingsStore::test(cx);
1119            cx.set_global(settings_store);
1120            cx.update_flags(true, vec!["agent-v2".to_string()]);
1121            ThreadStore::init_global(cx);
1122            SidebarThreadMetadataStore::init_global(cx);
1123        });
1124
1125        let fs = FakeFs::new(cx.executor());
1126        let project = Project::test(fs, None::<&Path>, cx).await;
1127        let connection = Rc::new(StubAgentConnection::new());
1128
1129        // Create a regular (non-subagent) AcpThread.
1130        let regular_thread = cx
1131            .update(|cx| {
1132                connection
1133                    .clone()
1134                    .new_session(project.clone(), PathList::default(), cx)
1135            })
1136            .await
1137            .unwrap();
1138
1139        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
1140
1141        // Set a title on the regular thread to trigger a save via handle_thread_update.
1142        cx.update(|cx| {
1143            regular_thread.update(cx, |thread, cx| {
1144                thread.set_title("Regular Thread".into(), cx).detach();
1145            });
1146        });
1147        cx.run_until_parked();
1148
1149        // Create a subagent AcpThread
1150        let subagent_session_id = acp::SessionId::new("subagent-session");
1151        let subagent_thread = cx.update(|cx| {
1152            let action_log = cx.new(|_| ActionLog::new(project.clone()));
1153            cx.new(|cx| {
1154                acp_thread::AcpThread::new(
1155                    Some(regular_session_id.clone()),
1156                    Some("Subagent Thread".into()),
1157                    None,
1158                    connection.clone(),
1159                    project.clone(),
1160                    action_log,
1161                    subagent_session_id.clone(),
1162                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1163                    cx,
1164                )
1165            })
1166        });
1167
1168        // Set a title on the subagent thread to trigger handle_thread_update.
1169        cx.update(|cx| {
1170            subagent_thread.update(cx, |thread, cx| {
1171                thread
1172                    .set_title("Subagent Thread Title".into(), cx)
1173                    .detach();
1174            });
1175        });
1176        cx.run_until_parked();
1177
1178        // List all metadata from the store cache.
1179        let list = cx.update(|cx| {
1180            let store = SidebarThreadMetadataStore::global(cx);
1181            store.read(cx).entries().collect::<Vec<_>>()
1182        });
1183
1184        // The subagent thread should NOT appear in the sidebar metadata.
1185        // Only the regular thread should be listed.
1186        assert_eq!(
1187            list.len(),
1188            1,
1189            "Expected only the regular thread in sidebar metadata, \
1190             but found {} entries (subagent threads are leaking into the sidebar)",
1191            list.len(),
1192        );
1193        assert_eq!(list[0].session_id, regular_session_id);
1194        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1195    }
1196
1197    #[test]
1198    fn test_dedup_db_operations_keeps_latest_operation_for_session() {
1199        let now = Utc::now();
1200
1201        let operations = vec![
1202            DbOperation::Insert(make_metadata(
1203                "session-1",
1204                "First Thread",
1205                now,
1206                PathList::default(),
1207            )),
1208            DbOperation::Delete(acp::SessionId::new("session-1")),
1209        ];
1210
1211        let deduped = SidebarThreadMetadataStore::dedup_db_operations(operations);
1212
1213        assert_eq!(deduped.len(), 1);
1214        assert_eq!(
1215            deduped[0],
1216            DbOperation::Delete(acp::SessionId::new("session-1"))
1217        );
1218    }
1219
1220    #[test]
1221    fn test_dedup_db_operations_keeps_latest_insert_for_same_session() {
1222        let now = Utc::now();
1223        let later = now + chrono::Duration::seconds(1);
1224
1225        let old_metadata = make_metadata("session-1", "Old Title", now, PathList::default());
1226        let new_metadata = make_metadata("session-1", "New Title", later, PathList::default());
1227
1228        let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1229            DbOperation::Insert(old_metadata),
1230            DbOperation::Insert(new_metadata.clone()),
1231        ]);
1232
1233        assert_eq!(deduped.len(), 1);
1234        assert_eq!(deduped[0], DbOperation::Insert(new_metadata));
1235    }
1236
1237    #[test]
1238    fn test_dedup_db_operations_preserves_distinct_sessions() {
1239        let now = Utc::now();
1240
1241        let metadata1 = make_metadata("session-1", "First Thread", now, PathList::default());
1242        let metadata2 = make_metadata("session-2", "Second Thread", now, PathList::default());
1243        let deduped = SidebarThreadMetadataStore::dedup_db_operations(vec![
1244            DbOperation::Insert(metadata1.clone()),
1245            DbOperation::Insert(metadata2.clone()),
1246        ]);
1247
1248        assert_eq!(deduped.len(), 2);
1249        assert!(deduped.contains(&DbOperation::Insert(metadata1)));
1250        assert!(deduped.contains(&DbOperation::Insert(metadata2)));
1251    }
1252}