thread_metadata_store.rs

   1use std::{path::Path, sync::Arc};
   2
   3use acp_thread::AgentSessionInfo;
   4use agent::{ThreadStore, ZED_AGENT_ID};
   5use agent_client_protocol as acp;
   6use anyhow::{Context as _, Result};
   7use chrono::{DateTime, Utc};
   8use collections::HashMap;
   9use db::{
  10    sqlez::{
  11        bindable::Column, domain::Domain, statement::Statement,
  12        thread_safe_connection::ThreadSafeConnection,
  13    },
  14    sqlez_macros::sql,
  15};
  16use feature_flags::{AgentV2FeatureFlag, FeatureFlagAppExt};
  17use futures::{FutureExt as _, future::Shared};
  18use gpui::{AppContext as _, Entity, Global, Subscription, Task};
  19use project::AgentId;
  20use ui::{App, Context, SharedString};
  21use util::ResultExt as _;
  22use workspace::PathList;
  23
  24use crate::DEFAULT_THREAD_TITLE;
  25
  26pub fn init(cx: &mut App) {
  27    SidebarThreadMetadataStore::init_global(cx);
  28
  29    if cx.has_flag::<AgentV2FeatureFlag>() {
  30        migrate_thread_metadata(cx);
  31    }
  32    cx.observe_flag::<AgentV2FeatureFlag, _>(|has_flag, cx| {
  33        if has_flag {
  34            migrate_thread_metadata(cx);
  35        }
  36    })
  37    .detach();
  38}
  39
  40/// Migrate existing thread metadata from native agent thread store to the new metadata storage.
  41/// We migrate the last 10 threads per project and skip threads that do not have a project.
  42///
  43/// TODO: Remove this after N weeks of shipping the sidebar
  44fn migrate_thread_metadata(cx: &mut App) {
  45    const MAX_MIGRATED_THREADS_PER_PROJECT: usize = 10;
  46
  47    let store = SidebarThreadMetadataStore::global(cx);
  48    let db = store.read(cx).db.clone();
  49
  50    cx.spawn(async move |cx| {
  51        if !db.is_empty()? {
  52            return Ok::<(), anyhow::Error>(());
  53        }
  54
  55        let metadata = store.read_with(cx, |_store, app| {
  56            let mut migrated_threads_per_project = HashMap::default();
  57
  58            ThreadStore::global(app)
  59                .read(app)
  60                .entries()
  61                .filter_map(|entry| {
  62                    if entry.folder_paths.is_empty() {
  63                        return None;
  64                    }
  65
  66                    let migrated_thread_count = migrated_threads_per_project
  67                        .entry(entry.folder_paths.clone())
  68                        .or_insert(0);
  69                    if *migrated_thread_count >= MAX_MIGRATED_THREADS_PER_PROJECT {
  70                        return None;
  71                    }
  72                    *migrated_thread_count += 1;
  73
  74                    Some(ThreadMetadata {
  75                        session_id: entry.id,
  76                        agent_id: None,
  77                        title: entry.title,
  78                        updated_at: entry.updated_at,
  79                        created_at: entry.created_at,
  80                        folder_paths: entry.folder_paths,
  81                    })
  82                })
  83                .collect::<Vec<_>>()
  84        });
  85
  86        // Manually save each entry to the database and call reload, otherwise
  87        // we'll end up triggering lots of reloads after each save
  88        for entry in metadata {
  89            db.save(entry).await?;
  90        }
  91
  92        let _ = store.update(cx, |store, cx| store.reload(cx));
  93        Ok(())
  94    })
  95    .detach_and_log_err(cx);
  96}
  97
  98struct GlobalThreadMetadataStore(Entity<SidebarThreadMetadataStore>);
  99impl Global for GlobalThreadMetadataStore {}
 100
 101/// Lightweight metadata for any thread (native or ACP), enough to populate
 102/// the sidebar list and route to the correct load path when clicked.
 103#[derive(Debug, Clone)]
 104pub struct ThreadMetadata {
 105    pub session_id: acp::SessionId,
 106    /// `None` for native Zed threads, `Some("claude-code")` etc. for ACP agents.
 107    pub agent_id: Option<AgentId>,
 108    pub title: SharedString,
 109    pub updated_at: DateTime<Utc>,
 110    pub created_at: Option<DateTime<Utc>>,
 111    pub folder_paths: PathList,
 112}
 113
 114impl ThreadMetadata {
 115    pub fn from_session_info(agent_id: AgentId, session: &AgentSessionInfo) -> Self {
 116        let session_id = session.session_id.clone();
 117        let title = session.title.clone().unwrap_or_default();
 118        let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 119        let created_at = session.created_at.unwrap_or(updated_at);
 120        let folder_paths = session.work_dirs.clone().unwrap_or_default();
 121        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 122            None
 123        } else {
 124            Some(agent_id)
 125        };
 126        Self {
 127            session_id,
 128            agent_id,
 129            title,
 130            updated_at,
 131            created_at: Some(created_at),
 132            folder_paths,
 133        }
 134    }
 135
 136    pub fn from_thread(thread: &Entity<acp_thread::AcpThread>, cx: &App) -> Self {
 137        let thread_ref = thread.read(cx);
 138        let session_id = thread_ref.session_id().clone();
 139        let title = thread_ref
 140            .title()
 141            .unwrap_or_else(|| DEFAULT_THREAD_TITLE.into());
 142        let updated_at = Utc::now();
 143
 144        let agent_id = thread_ref.connection().agent_id();
 145
 146        let agent_id = if agent_id.as_ref() == ZED_AGENT_ID.as_ref() {
 147            None
 148        } else {
 149            Some(agent_id)
 150        };
 151
 152        let folder_paths = {
 153            let project = thread_ref.project().read(cx);
 154            let paths: Vec<Arc<Path>> = project
 155                .visible_worktrees(cx)
 156                .map(|worktree| worktree.read(cx).abs_path())
 157                .collect();
 158            PathList::new(&paths)
 159        };
 160
 161        Self {
 162            session_id,
 163            agent_id,
 164            title,
 165            created_at: Some(updated_at), // handled by db `ON CONFLICT`
 166            updated_at,
 167            folder_paths,
 168        }
 169    }
 170}
 171
 172/// The store holds all metadata needed to show threads in the sidebar.
 173/// Effectively, all threads stored in here are "non-archived".
 174///
 175/// Automatically listens to AcpThread events and updates metadata if it has changed.
 176pub struct SidebarThreadMetadataStore {
 177    db: ThreadMetadataDb,
 178    threads: Vec<ThreadMetadata>,
 179    threads_by_paths: HashMap<PathList, Vec<ThreadMetadata>>,
 180    reload_task: Option<Shared<Task<()>>>,
 181    session_subscriptions: HashMap<acp::SessionId, Subscription>,
 182}
 183
 184impl SidebarThreadMetadataStore {
 185    #[cfg(not(any(test, feature = "test-support")))]
 186    pub fn init_global(cx: &mut App) {
 187        if cx.has_global::<Self>() {
 188            return;
 189        }
 190
 191        let db = ThreadMetadataDb::global(cx);
 192        let thread_store = cx.new(|cx| Self::new(db, cx));
 193        cx.set_global(GlobalThreadMetadataStore(thread_store));
 194    }
 195
 196    #[cfg(any(test, feature = "test-support"))]
 197    pub fn init_global(cx: &mut App) {
 198        let thread = std::thread::current();
 199        let test_name = thread.name().unwrap_or("unknown_test");
 200        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 201        let db = smol::block_on(db::open_test_db::<ThreadMetadataDb>(&db_name));
 202        let thread_store = cx.new(|cx| Self::new(ThreadMetadataDb(db), cx));
 203        cx.set_global(GlobalThreadMetadataStore(thread_store));
 204    }
 205
 206    pub fn try_global(cx: &App) -> Option<Entity<Self>> {
 207        cx.try_global::<GlobalThreadMetadataStore>()
 208            .map(|store| store.0.clone())
 209    }
 210
 211    pub fn global(cx: &App) -> Entity<Self> {
 212        cx.global::<GlobalThreadMetadataStore>().0.clone()
 213    }
 214
 215    pub fn is_empty(&self) -> bool {
 216        self.threads.is_empty()
 217    }
 218
 219    pub fn entries(&self) -> impl Iterator<Item = ThreadMetadata> + '_ {
 220        self.threads.iter().cloned()
 221    }
 222
 223    pub fn entry_ids(&self) -> impl Iterator<Item = acp::SessionId> + '_ {
 224        self.threads.iter().map(|thread| thread.session_id.clone())
 225    }
 226
 227    pub fn entries_for_path(
 228        &self,
 229        path_list: &PathList,
 230    ) -> impl Iterator<Item = ThreadMetadata> + '_ {
 231        self.threads_by_paths
 232            .get(path_list)
 233            .into_iter()
 234            .flatten()
 235            .cloned()
 236    }
 237
 238    fn reload(&mut self, cx: &mut Context<Self>) -> Shared<Task<()>> {
 239        let db = self.db.clone();
 240        self.reload_task.take();
 241
 242        let list_task = cx
 243            .background_spawn(async move { db.list().context("Failed to fetch sidebar metadata") });
 244
 245        let reload_task = cx
 246            .spawn(async move |this, cx| {
 247                let Some(rows) = list_task.await.log_err() else {
 248                    return;
 249                };
 250
 251                this.update(cx, |this, cx| {
 252                    this.threads.clear();
 253                    this.threads_by_paths.clear();
 254
 255                    for row in rows {
 256                        this.threads_by_paths
 257                            .entry(row.folder_paths.clone())
 258                            .or_default()
 259                            .push(row.clone());
 260                        this.threads.push(row);
 261                    }
 262
 263                    cx.notify();
 264                })
 265                .ok();
 266            })
 267            .shared();
 268        self.reload_task = Some(reload_task.clone());
 269        reload_task
 270    }
 271
 272    pub fn save(&mut self, metadata: ThreadMetadata, cx: &mut Context<Self>) -> Task<Result<()>> {
 273        if !cx.has_flag::<AgentV2FeatureFlag>() {
 274            return Task::ready(Ok(()));
 275        }
 276
 277        let db = self.db.clone();
 278        cx.spawn(async move |this, cx| {
 279            db.save(metadata).await?;
 280            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 281            reload_task.await;
 282            Ok(())
 283        })
 284    }
 285
 286    pub fn delete(
 287        &mut self,
 288        session_id: acp::SessionId,
 289        cx: &mut Context<Self>,
 290    ) -> Task<Result<()>> {
 291        if !cx.has_flag::<AgentV2FeatureFlag>() {
 292            return Task::ready(Ok(()));
 293        }
 294
 295        let db = self.db.clone();
 296        cx.spawn(async move |this, cx| {
 297            db.delete(session_id).await?;
 298            let reload_task = this.update(cx, |this, cx| this.reload(cx))?;
 299            reload_task.await;
 300            Ok(())
 301        })
 302    }
 303
 304    fn new(db: ThreadMetadataDb, cx: &mut Context<Self>) -> Self {
 305        let weak_store = cx.weak_entity();
 306
 307        cx.observe_new::<acp_thread::AcpThread>(move |thread, _window, cx| {
 308            // Don't track subagent threads in the sidebar.
 309            if thread.parent_session_id().is_some() {
 310                return;
 311            }
 312
 313            let thread_entity = cx.entity();
 314
 315            cx.on_release({
 316                let weak_store = weak_store.clone();
 317                move |thread, cx| {
 318                    weak_store
 319                        .update(cx, |store, _cx| {
 320                            store.session_subscriptions.remove(thread.session_id());
 321                        })
 322                        .ok();
 323                }
 324            })
 325            .detach();
 326
 327            weak_store
 328                .update(cx, |this, cx| {
 329                    let subscription = cx.subscribe(&thread_entity, Self::handle_thread_update);
 330                    this.session_subscriptions
 331                        .insert(thread.session_id().clone(), subscription);
 332                })
 333                .ok();
 334        })
 335        .detach();
 336
 337        let mut this = Self {
 338            db,
 339            threads: Vec::new(),
 340            threads_by_paths: HashMap::default(),
 341            reload_task: None,
 342            session_subscriptions: HashMap::default(),
 343        };
 344        let _ = this.reload(cx);
 345        this
 346    }
 347
 348    fn handle_thread_update(
 349        &mut self,
 350        thread: Entity<acp_thread::AcpThread>,
 351        event: &acp_thread::AcpThreadEvent,
 352        cx: &mut Context<Self>,
 353    ) {
 354        // Don't track subagent threads in the sidebar.
 355        if thread.read(cx).parent_session_id().is_some() {
 356            return;
 357        }
 358
 359        match event {
 360            acp_thread::AcpThreadEvent::NewEntry
 361            | acp_thread::AcpThreadEvent::TitleUpdated
 362            | acp_thread::AcpThreadEvent::EntryUpdated(_)
 363            | acp_thread::AcpThreadEvent::EntriesRemoved(_)
 364            | acp_thread::AcpThreadEvent::ToolAuthorizationRequested(_)
 365            | acp_thread::AcpThreadEvent::ToolAuthorizationReceived(_)
 366            | acp_thread::AcpThreadEvent::Retry(_)
 367            | acp_thread::AcpThreadEvent::Stopped(_)
 368            | acp_thread::AcpThreadEvent::Error
 369            | acp_thread::AcpThreadEvent::LoadError(_)
 370            | acp_thread::AcpThreadEvent::Refusal => {
 371                let metadata = ThreadMetadata::from_thread(&thread, cx);
 372                self.save(metadata, cx).detach_and_log_err(cx);
 373            }
 374            _ => {}
 375        }
 376    }
 377}
 378
 379impl Global for SidebarThreadMetadataStore {}
 380
 381struct ThreadMetadataDb(ThreadSafeConnection);
 382
 383impl Domain for ThreadMetadataDb {
 384    const NAME: &str = stringify!(ThreadMetadataDb);
 385
 386    const MIGRATIONS: &[&str] = &[sql!(
 387        CREATE TABLE IF NOT EXISTS sidebar_threads(
 388            session_id TEXT PRIMARY KEY,
 389            agent_id TEXT,
 390            title TEXT NOT NULL,
 391            updated_at TEXT NOT NULL,
 392            created_at TEXT,
 393            folder_paths TEXT,
 394            folder_paths_order TEXT
 395        ) STRICT;
 396    )];
 397}
 398
 399db::static_connection!(ThreadMetadataDb, []);
 400
 401impl ThreadMetadataDb {
 402    pub fn is_empty(&self) -> anyhow::Result<bool> {
 403        self.select::<i64>("SELECT COUNT(*) FROM sidebar_threads")?()
 404            .map(|counts| counts.into_iter().next().unwrap_or_default() == 0)
 405    }
 406
 407    /// List all sidebar thread metadata, ordered by updated_at descending.
 408    pub fn list(&self) -> anyhow::Result<Vec<ThreadMetadata>> {
 409        self.select::<ThreadMetadata>(
 410            "SELECT session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order \
 411             FROM sidebar_threads \
 412             ORDER BY updated_at DESC"
 413        )?()
 414    }
 415
 416    /// Upsert metadata for a thread.
 417    pub async fn save(&self, row: ThreadMetadata) -> anyhow::Result<()> {
 418        let id = row.session_id.0.clone();
 419        let agent_id = row.agent_id.as_ref().map(|id| id.0.to_string());
 420        let title = row.title.to_string();
 421        let updated_at = row.updated_at.to_rfc3339();
 422        let created_at = row.created_at.map(|dt| dt.to_rfc3339());
 423        let serialized = row.folder_paths.serialize();
 424        let (folder_paths, folder_paths_order) = if row.folder_paths.is_empty() {
 425            (None, None)
 426        } else {
 427            (Some(serialized.paths), Some(serialized.order))
 428        };
 429
 430        self.write(move |conn| {
 431            let sql = "INSERT INTO sidebar_threads(session_id, agent_id, title, updated_at, created_at, folder_paths, folder_paths_order) \
 432                       VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7) \
 433                       ON CONFLICT(session_id) DO UPDATE SET \
 434                           agent_id = excluded.agent_id, \
 435                           title = excluded.title, \
 436                           updated_at = excluded.updated_at, \
 437                           folder_paths = excluded.folder_paths, \
 438                           folder_paths_order = excluded.folder_paths_order";
 439            let mut stmt = Statement::prepare(conn, sql)?;
 440            let mut i = stmt.bind(&id, 1)?;
 441            i = stmt.bind(&agent_id, i)?;
 442            i = stmt.bind(&title, i)?;
 443            i = stmt.bind(&updated_at, i)?;
 444            i = stmt.bind(&created_at, i)?;
 445            i = stmt.bind(&folder_paths, i)?;
 446            stmt.bind(&folder_paths_order, i)?;
 447            stmt.exec()
 448        })
 449        .await
 450    }
 451
 452    /// Delete metadata for a single thread.
 453    pub async fn delete(&self, session_id: acp::SessionId) -> anyhow::Result<()> {
 454        let id = session_id.0.clone();
 455        self.write(move |conn| {
 456            let mut stmt =
 457                Statement::prepare(conn, "DELETE FROM sidebar_threads WHERE session_id = ?")?;
 458            stmt.bind(&id, 1)?;
 459            stmt.exec()
 460        })
 461        .await
 462    }
 463}
 464
 465impl Column for ThreadMetadata {
 466    fn column(statement: &mut Statement, start_index: i32) -> anyhow::Result<(Self, i32)> {
 467        let (id, next): (Arc<str>, i32) = Column::column(statement, start_index)?;
 468        let (agent_id, next): (Option<String>, i32) = Column::column(statement, next)?;
 469        let (title, next): (String, i32) = Column::column(statement, next)?;
 470        let (updated_at_str, next): (String, i32) = Column::column(statement, next)?;
 471        let (created_at_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 472        let (folder_paths_str, next): (Option<String>, i32) = Column::column(statement, next)?;
 473        let (folder_paths_order_str, next): (Option<String>, i32) =
 474            Column::column(statement, next)?;
 475
 476        let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)?.with_timezone(&Utc);
 477        let created_at = created_at_str
 478            .as_deref()
 479            .map(DateTime::parse_from_rfc3339)
 480            .transpose()?
 481            .map(|dt| dt.with_timezone(&Utc));
 482
 483        let folder_paths = folder_paths_str
 484            .map(|paths| {
 485                PathList::deserialize(&util::path_list::SerializedPathList {
 486                    paths,
 487                    order: folder_paths_order_str.unwrap_or_default(),
 488                })
 489            })
 490            .unwrap_or_default();
 491
 492        Ok((
 493            ThreadMetadata {
 494                session_id: acp::SessionId::new(id),
 495                agent_id: agent_id.map(|id| AgentId::new(id)),
 496                title: title.into(),
 497                updated_at,
 498                created_at,
 499                folder_paths,
 500            },
 501            next,
 502        ))
 503    }
 504}
 505
 506#[cfg(test)]
 507mod tests {
 508    use super::*;
 509    use acp_thread::{AgentConnection, StubAgentConnection};
 510    use action_log::ActionLog;
 511    use agent::DbThread;
 512    use agent_client_protocol as acp;
 513    use feature_flags::FeatureFlagAppExt;
 514    use gpui::TestAppContext;
 515    use project::FakeFs;
 516    use project::Project;
 517    use std::path::Path;
 518    use std::rc::Rc;
 519
 520    fn make_db_thread(title: &str, updated_at: DateTime<Utc>) -> DbThread {
 521        DbThread {
 522            title: title.to_string().into(),
 523            messages: Vec::new(),
 524            updated_at,
 525            detailed_summary: None,
 526            initial_project_snapshot: None,
 527            cumulative_token_usage: Default::default(),
 528            request_token_usage: Default::default(),
 529            model: None,
 530            profile: None,
 531            imported: false,
 532            subagent_context: None,
 533            speed: None,
 534            thinking_enabled: false,
 535            thinking_effort: None,
 536            draft_prompt: None,
 537            ui_scroll_position: None,
 538        }
 539    }
 540
 541    fn make_metadata(
 542        session_id: &str,
 543        title: &str,
 544        updated_at: DateTime<Utc>,
 545        folder_paths: PathList,
 546    ) -> ThreadMetadata {
 547        ThreadMetadata {
 548            session_id: acp::SessionId::new(session_id),
 549            agent_id: None,
 550            title: title.to_string().into(),
 551            updated_at,
 552            created_at: Some(updated_at),
 553            folder_paths,
 554        }
 555    }
 556
 557    #[gpui::test]
 558    async fn test_store_initializes_cache_from_database(cx: &mut TestAppContext) {
 559        let first_paths = PathList::new(&[Path::new("/project-a")]);
 560        let second_paths = PathList::new(&[Path::new("/project-b")]);
 561        let now = Utc::now();
 562        let older = now - chrono::Duration::seconds(1);
 563
 564        let thread = std::thread::current();
 565        let test_name = thread.name().unwrap_or("unknown_test");
 566        let db_name = format!("THREAD_METADATA_DB_{}", test_name);
 567        let db = ThreadMetadataDb(smol::block_on(db::open_test_db::<ThreadMetadataDb>(
 568            &db_name,
 569        )));
 570
 571        db.save(make_metadata(
 572            "session-1",
 573            "First Thread",
 574            now,
 575            first_paths.clone(),
 576        ))
 577        .await
 578        .unwrap();
 579        db.save(make_metadata(
 580            "session-2",
 581            "Second Thread",
 582            older,
 583            second_paths.clone(),
 584        ))
 585        .await
 586        .unwrap();
 587
 588        cx.update(|cx| {
 589            let settings_store = settings::SettingsStore::test(cx);
 590            cx.set_global(settings_store);
 591            cx.update_flags(true, vec!["agent-v2".to_string()]);
 592            SidebarThreadMetadataStore::init_global(cx);
 593        });
 594
 595        cx.run_until_parked();
 596
 597        cx.update(|cx| {
 598            let store = SidebarThreadMetadataStore::global(cx);
 599            let store = store.read(cx);
 600
 601            let entry_ids = store
 602                .entry_ids()
 603                .map(|session_id| session_id.0.to_string())
 604                .collect::<Vec<_>>();
 605            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 606
 607            let first_path_entries = store
 608                .entries_for_path(&first_paths)
 609                .map(|entry| entry.session_id.0.to_string())
 610                .collect::<Vec<_>>();
 611            assert_eq!(first_path_entries, vec!["session-1"]);
 612
 613            let second_path_entries = store
 614                .entries_for_path(&second_paths)
 615                .map(|entry| entry.session_id.0.to_string())
 616                .collect::<Vec<_>>();
 617            assert_eq!(second_path_entries, vec!["session-2"]);
 618        });
 619    }
 620
 621    #[gpui::test]
 622    async fn test_store_cache_updates_after_save_and_delete(cx: &mut TestAppContext) {
 623        cx.update(|cx| {
 624            let settings_store = settings::SettingsStore::test(cx);
 625            cx.set_global(settings_store);
 626            cx.update_flags(true, vec!["agent-v2".to_string()]);
 627            SidebarThreadMetadataStore::init_global(cx);
 628        });
 629
 630        let first_paths = PathList::new(&[Path::new("/project-a")]);
 631        let second_paths = PathList::new(&[Path::new("/project-b")]);
 632        let initial_time = Utc::now();
 633        let updated_time = initial_time + chrono::Duration::seconds(1);
 634
 635        let initial_metadata = make_metadata(
 636            "session-1",
 637            "First Thread",
 638            initial_time,
 639            first_paths.clone(),
 640        );
 641
 642        let second_metadata = make_metadata(
 643            "session-2",
 644            "Second Thread",
 645            initial_time,
 646            second_paths.clone(),
 647        );
 648
 649        cx.update(|cx| {
 650            let store = SidebarThreadMetadataStore::global(cx);
 651            store.update(cx, |store, cx| {
 652                store.save(initial_metadata, cx).detach();
 653                store.save(second_metadata, cx).detach();
 654            });
 655        });
 656
 657        cx.run_until_parked();
 658
 659        cx.update(|cx| {
 660            let store = SidebarThreadMetadataStore::global(cx);
 661            let store = store.read(cx);
 662
 663            let first_path_entries = store
 664                .entries_for_path(&first_paths)
 665                .map(|entry| entry.session_id.0.to_string())
 666                .collect::<Vec<_>>();
 667            assert_eq!(first_path_entries, vec!["session-1"]);
 668
 669            let second_path_entries = store
 670                .entries_for_path(&second_paths)
 671                .map(|entry| entry.session_id.0.to_string())
 672                .collect::<Vec<_>>();
 673            assert_eq!(second_path_entries, vec!["session-2"]);
 674        });
 675
 676        let moved_metadata = make_metadata(
 677            "session-1",
 678            "First Thread",
 679            updated_time,
 680            second_paths.clone(),
 681        );
 682
 683        cx.update(|cx| {
 684            let store = SidebarThreadMetadataStore::global(cx);
 685            store.update(cx, |store, cx| {
 686                store.save(moved_metadata, cx).detach();
 687            });
 688        });
 689
 690        cx.run_until_parked();
 691
 692        cx.update(|cx| {
 693            let store = SidebarThreadMetadataStore::global(cx);
 694            let store = store.read(cx);
 695
 696            let entry_ids = store
 697                .entry_ids()
 698                .map(|session_id| session_id.0.to_string())
 699                .collect::<Vec<_>>();
 700            assert_eq!(entry_ids, vec!["session-1", "session-2"]);
 701
 702            let first_path_entries = store
 703                .entries_for_path(&first_paths)
 704                .map(|entry| entry.session_id.0.to_string())
 705                .collect::<Vec<_>>();
 706            assert!(first_path_entries.is_empty());
 707
 708            let second_path_entries = store
 709                .entries_for_path(&second_paths)
 710                .map(|entry| entry.session_id.0.to_string())
 711                .collect::<Vec<_>>();
 712            assert_eq!(second_path_entries, vec!["session-1", "session-2"]);
 713        });
 714
 715        cx.update(|cx| {
 716            let store = SidebarThreadMetadataStore::global(cx);
 717            store.update(cx, |store, cx| {
 718                store.delete(acp::SessionId::new("session-2"), cx).detach();
 719            });
 720        });
 721
 722        cx.run_until_parked();
 723
 724        cx.update(|cx| {
 725            let store = SidebarThreadMetadataStore::global(cx);
 726            let store = store.read(cx);
 727
 728            let entry_ids = store
 729                .entry_ids()
 730                .map(|session_id| session_id.0.to_string())
 731                .collect::<Vec<_>>();
 732            assert_eq!(entry_ids, vec!["session-1"]);
 733
 734            let second_path_entries = store
 735                .entries_for_path(&second_paths)
 736                .map(|entry| entry.session_id.0.to_string())
 737                .collect::<Vec<_>>();
 738            assert_eq!(second_path_entries, vec!["session-1"]);
 739        });
 740    }
 741
 742    #[gpui::test]
 743    async fn test_migrate_thread_metadata(cx: &mut TestAppContext) {
 744        cx.update(|cx| {
 745            ThreadStore::init_global(cx);
 746            SidebarThreadMetadataStore::init_global(cx);
 747        });
 748
 749        // Verify the cache is empty before migration
 750        let list = cx.update(|cx| {
 751            let store = SidebarThreadMetadataStore::global(cx);
 752            store.read(cx).entries().collect::<Vec<_>>()
 753        });
 754        assert_eq!(list.len(), 0);
 755
 756        let project_a_paths = PathList::new(&[Path::new("/project-a")]);
 757        let project_b_paths = PathList::new(&[Path::new("/project-b")]);
 758        let now = Utc::now();
 759
 760        for index in 0..12 {
 761            let updated_at = now + chrono::Duration::seconds(index as i64);
 762            let session_id = format!("project-a-session-{index}");
 763            let title = format!("Project A Thread {index}");
 764
 765            let save_task = cx.update(|cx| {
 766                let thread_store = ThreadStore::global(cx);
 767                let session_id = session_id.clone();
 768                let title = title.clone();
 769                let project_a_paths = project_a_paths.clone();
 770                thread_store.update(cx, |store, cx| {
 771                    store.save_thread(
 772                        acp::SessionId::new(session_id),
 773                        make_db_thread(&title, updated_at),
 774                        project_a_paths,
 775                        cx,
 776                    )
 777                })
 778            });
 779            save_task.await.unwrap();
 780            cx.run_until_parked();
 781        }
 782
 783        for index in 0..3 {
 784            let updated_at = now + chrono::Duration::seconds(100 + index as i64);
 785            let session_id = format!("project-b-session-{index}");
 786            let title = format!("Project B Thread {index}");
 787
 788            let save_task = cx.update(|cx| {
 789                let thread_store = ThreadStore::global(cx);
 790                let session_id = session_id.clone();
 791                let title = title.clone();
 792                let project_b_paths = project_b_paths.clone();
 793                thread_store.update(cx, |store, cx| {
 794                    store.save_thread(
 795                        acp::SessionId::new(session_id),
 796                        make_db_thread(&title, updated_at),
 797                        project_b_paths,
 798                        cx,
 799                    )
 800                })
 801            });
 802            save_task.await.unwrap();
 803            cx.run_until_parked();
 804        }
 805
 806        let save_projectless = cx.update(|cx| {
 807            let thread_store = ThreadStore::global(cx);
 808            thread_store.update(cx, |store, cx| {
 809                store.save_thread(
 810                    acp::SessionId::new("projectless-session"),
 811                    make_db_thread("Projectless Thread", now + chrono::Duration::seconds(200)),
 812                    PathList::default(),
 813                    cx,
 814                )
 815            })
 816        });
 817        save_projectless.await.unwrap();
 818        cx.run_until_parked();
 819
 820        // Run migration
 821        cx.update(|cx| {
 822            migrate_thread_metadata(cx);
 823        });
 824
 825        cx.run_until_parked();
 826
 827        // Verify the metadata was migrated, limited to 10 per project, and
 828        // projectless threads were skipped.
 829        let list = cx.update(|cx| {
 830            let store = SidebarThreadMetadataStore::global(cx);
 831            store.read(cx).entries().collect::<Vec<_>>()
 832        });
 833        assert_eq!(list.len(), 13);
 834
 835        assert!(
 836            list.iter()
 837                .all(|metadata| !metadata.folder_paths.is_empty())
 838        );
 839        assert!(
 840            list.iter()
 841                .all(|metadata| metadata.session_id.0.as_ref() != "projectless-session")
 842        );
 843
 844        let project_a_entries = list
 845            .iter()
 846            .filter(|metadata| metadata.folder_paths == project_a_paths)
 847            .collect::<Vec<_>>();
 848        assert_eq!(project_a_entries.len(), 10);
 849        assert_eq!(
 850            project_a_entries
 851                .iter()
 852                .map(|metadata| metadata.session_id.0.as_ref())
 853                .collect::<Vec<_>>(),
 854            vec![
 855                "project-a-session-11",
 856                "project-a-session-10",
 857                "project-a-session-9",
 858                "project-a-session-8",
 859                "project-a-session-7",
 860                "project-a-session-6",
 861                "project-a-session-5",
 862                "project-a-session-4",
 863                "project-a-session-3",
 864                "project-a-session-2",
 865            ]
 866        );
 867        assert!(
 868            project_a_entries
 869                .iter()
 870                .all(|metadata| metadata.agent_id.is_none())
 871        );
 872
 873        let project_b_entries = list
 874            .iter()
 875            .filter(|metadata| metadata.folder_paths == project_b_paths)
 876            .collect::<Vec<_>>();
 877        assert_eq!(project_b_entries.len(), 3);
 878        assert_eq!(
 879            project_b_entries
 880                .iter()
 881                .map(|metadata| metadata.session_id.0.as_ref())
 882                .collect::<Vec<_>>(),
 883            vec![
 884                "project-b-session-2",
 885                "project-b-session-1",
 886                "project-b-session-0",
 887            ]
 888        );
 889        assert!(
 890            project_b_entries
 891                .iter()
 892                .all(|metadata| metadata.agent_id.is_none())
 893        );
 894    }
 895
 896    #[gpui::test]
 897    async fn test_migrate_thread_metadata_skips_when_data_exists(cx: &mut TestAppContext) {
 898        cx.update(|cx| {
 899            ThreadStore::init_global(cx);
 900            SidebarThreadMetadataStore::init_global(cx);
 901        });
 902
 903        // Pre-populate the metadata store with existing data
 904        let existing_metadata = ThreadMetadata {
 905            session_id: acp::SessionId::new("existing-session"),
 906            agent_id: None,
 907            title: "Existing Thread".into(),
 908            updated_at: Utc::now(),
 909            created_at: Some(Utc::now()),
 910            folder_paths: PathList::default(),
 911        };
 912
 913        cx.update(|cx| {
 914            let store = SidebarThreadMetadataStore::global(cx);
 915            store.update(cx, |store, cx| {
 916                store.save(existing_metadata, cx).detach();
 917            });
 918        });
 919
 920        cx.run_until_parked();
 921
 922        // Add an entry to native thread store that should NOT be migrated
 923        let save_task = cx.update(|cx| {
 924            let thread_store = ThreadStore::global(cx);
 925            thread_store.update(cx, |store, cx| {
 926                store.save_thread(
 927                    acp::SessionId::new("native-session"),
 928                    make_db_thread("Native Thread", Utc::now()),
 929                    PathList::default(),
 930                    cx,
 931                )
 932            })
 933        });
 934        save_task.await.unwrap();
 935        cx.run_until_parked();
 936
 937        // Run migration - should skip because metadata store is not empty
 938        cx.update(|cx| {
 939            migrate_thread_metadata(cx);
 940        });
 941
 942        cx.run_until_parked();
 943
 944        // Verify only the existing metadata is present (migration was skipped)
 945        let list = cx.update(|cx| {
 946            let store = SidebarThreadMetadataStore::global(cx);
 947            store.read(cx).entries().collect::<Vec<_>>()
 948        });
 949        assert_eq!(list.len(), 1);
 950        assert_eq!(list[0].session_id.0.as_ref(), "existing-session");
 951    }
 952
 953    #[gpui::test]
 954    async fn test_subagent_threads_excluded_from_sidebar_metadata(cx: &mut TestAppContext) {
 955        cx.update(|cx| {
 956            let settings_store = settings::SettingsStore::test(cx);
 957            cx.set_global(settings_store);
 958            cx.update_flags(true, vec!["agent-v2".to_string()]);
 959            ThreadStore::init_global(cx);
 960            SidebarThreadMetadataStore::init_global(cx);
 961        });
 962
 963        let fs = FakeFs::new(cx.executor());
 964        let project = Project::test(fs, None::<&Path>, cx).await;
 965        let connection = Rc::new(StubAgentConnection::new());
 966
 967        // Create a regular (non-subagent) AcpThread.
 968        let regular_thread = cx
 969            .update(|cx| {
 970                connection
 971                    .clone()
 972                    .new_session(project.clone(), PathList::default(), cx)
 973            })
 974            .await
 975            .unwrap();
 976
 977        let regular_session_id = cx.read(|cx| regular_thread.read(cx).session_id().clone());
 978
 979        // Set a title on the regular thread to trigger a save via handle_thread_update.
 980        cx.update(|cx| {
 981            regular_thread.update(cx, |thread, cx| {
 982                thread.set_title("Regular Thread".into(), cx).detach();
 983            });
 984        });
 985        cx.run_until_parked();
 986
 987        // Create a subagent AcpThread
 988        let subagent_session_id = acp::SessionId::new("subagent-session");
 989        let subagent_thread = cx.update(|cx| {
 990            let action_log = cx.new(|_| ActionLog::new(project.clone()));
 991            cx.new(|cx| {
 992                acp_thread::AcpThread::new(
 993                    Some(regular_session_id.clone()),
 994                    Some("Subagent Thread".into()),
 995                    None,
 996                    connection.clone(),
 997                    project.clone(),
 998                    action_log,
 999                    subagent_session_id.clone(),
1000                    watch::Receiver::constant(acp::PromptCapabilities::new()),
1001                    cx,
1002                )
1003            })
1004        });
1005
1006        // Set a title on the subagent thread to trigger handle_thread_update.
1007        cx.update(|cx| {
1008            subagent_thread.update(cx, |thread, cx| {
1009                thread
1010                    .set_title("Subagent Thread Title".into(), cx)
1011                    .detach();
1012            });
1013        });
1014        cx.run_until_parked();
1015
1016        // List all metadata from the store cache.
1017        let list = cx.update(|cx| {
1018            let store = SidebarThreadMetadataStore::global(cx);
1019            store.read(cx).entries().collect::<Vec<_>>()
1020        });
1021
1022        // The subagent thread should NOT appear in the sidebar metadata.
1023        // Only the regular thread should be listed.
1024        assert_eq!(
1025            list.len(),
1026            1,
1027            "Expected only the regular thread in sidebar metadata, \
1028             but found {} entries (subagent threads are leaking into the sidebar)",
1029            list.len(),
1030        );
1031        assert_eq!(list[0].session_id, regular_session_id);
1032        assert_eq!(list[0].title.as_ref(), "Regular Thread");
1033    }
1034}