thread_import.rs

   1use acp_thread::AgentSessionListRequest;
   2use agent::ThreadStore;
   3use agent_client_protocol as acp;
   4use chrono::Utc;
   5use collections::HashSet;
   6use db::kvp::Dismissable;
   7use db::sqlez;
   8use fs::Fs;
   9use futures::FutureExt as _;
  10use gpui::{
  11    App, Context, DismissEvent, Entity, EventEmitter, FocusHandle, Focusable, MouseDownEvent,
  12    Render, SharedString, Task, WeakEntity, Window,
  13};
  14use notifications::status_toast::StatusToast;
  15use project::{AgentId, AgentRegistryStore, AgentServerStore};
  16use release_channel::ReleaseChannel;
  17use remote::RemoteConnectionOptions;
  18use ui::{
  19    Checkbox, KeyBinding, ListItem, ListItemSpacing, Modal, ModalFooter, ModalHeader, Section,
  20    prelude::*,
  21};
  22use util::ResultExt;
  23use workspace::{ModalView, MultiWorkspace, Workspace};
  24
  25use crate::{
  26    Agent, AgentPanel,
  27    agent_connection_store::AgentConnectionStore,
  28    thread_metadata_store::{ThreadId, ThreadMetadata, ThreadMetadataStore, WorktreePaths},
  29};
  30
  31pub struct AcpThreadImportOnboarding;
  32pub struct CrossChannelImportOnboarding;
  33
  34impl AcpThreadImportOnboarding {
  35    pub fn dismissed(cx: &App) -> bool {
  36        <Self as Dismissable>::dismissed(cx)
  37    }
  38
  39    pub fn dismiss(cx: &mut App) {
  40        <Self as Dismissable>::set_dismissed(true, cx);
  41    }
  42}
  43
  44impl Dismissable for AcpThreadImportOnboarding {
  45    const KEY: &'static str = "dismissed-acp-thread-import";
  46}
  47
  48impl CrossChannelImportOnboarding {
  49    pub fn dismissed(cx: &App) -> bool {
  50        <Self as Dismissable>::dismissed(cx)
  51    }
  52
  53    pub fn dismiss(cx: &mut App) {
  54        <Self as Dismissable>::set_dismissed(true, cx);
  55    }
  56}
  57
  58impl Dismissable for CrossChannelImportOnboarding {
  59    const KEY: &'static str = "dismissed-cross-channel-thread-import";
  60}
  61
  62/// Returns the list of non-Dev, non-current release channels that have
  63/// at least one thread in their database.  The result is suitable for
  64/// building a user-facing message ("from Zed Preview and Nightly").
  65pub fn channels_with_threads(cx: &App) -> Vec<ReleaseChannel> {
  66    let Some(current_channel) = ReleaseChannel::try_global(cx) else {
  67        return Vec::new();
  68    };
  69    let database_dir = paths::database_dir();
  70
  71    ReleaseChannel::ALL
  72        .iter()
  73        .copied()
  74        .filter(|channel| {
  75            *channel != current_channel
  76                && *channel != ReleaseChannel::Dev
  77                && channel_has_threads(database_dir, *channel)
  78        })
  79        .collect()
  80}
  81
  82#[derive(Clone)]
  83struct AgentEntry {
  84    agent_id: AgentId,
  85    display_name: SharedString,
  86    icon_path: Option<SharedString>,
  87}
  88
  89pub struct ThreadImportModal {
  90    focus_handle: FocusHandle,
  91    workspace: WeakEntity<Workspace>,
  92    multi_workspace: WeakEntity<MultiWorkspace>,
  93    agent_entries: Vec<AgentEntry>,
  94    unchecked_agents: HashSet<AgentId>,
  95    selected_index: Option<usize>,
  96    is_importing: bool,
  97    last_error: Option<SharedString>,
  98}
  99
 100impl ThreadImportModal {
 101    pub fn new(
 102        agent_server_store: Entity<AgentServerStore>,
 103        agent_registry_store: Entity<AgentRegistryStore>,
 104        workspace: WeakEntity<Workspace>,
 105        multi_workspace: WeakEntity<MultiWorkspace>,
 106        _window: &mut Window,
 107        cx: &mut Context<Self>,
 108    ) -> Self {
 109        AcpThreadImportOnboarding::dismiss(cx);
 110
 111        let agent_entries = agent_server_store
 112            .read(cx)
 113            .external_agents()
 114            .map(|agent_id| {
 115                let display_name = agent_server_store
 116                    .read(cx)
 117                    .agent_display_name(agent_id)
 118                    .or_else(|| {
 119                        agent_registry_store
 120                            .read(cx)
 121                            .agent(agent_id)
 122                            .map(|agent| agent.name().clone())
 123                    })
 124                    .unwrap_or_else(|| agent_id.0.clone());
 125                let icon_path = agent_server_store
 126                    .read(cx)
 127                    .agent_icon(agent_id)
 128                    .or_else(|| {
 129                        agent_registry_store
 130                            .read(cx)
 131                            .agent(agent_id)
 132                            .and_then(|agent| agent.icon_path().cloned())
 133                    });
 134
 135                AgentEntry {
 136                    agent_id: agent_id.clone(),
 137                    display_name,
 138                    icon_path,
 139                }
 140            })
 141            .collect::<Vec<_>>();
 142
 143        Self {
 144            focus_handle: cx.focus_handle(),
 145            workspace,
 146            multi_workspace,
 147            agent_entries,
 148            unchecked_agents: HashSet::default(),
 149            selected_index: None,
 150            is_importing: false,
 151            last_error: None,
 152        }
 153    }
 154
 155    fn agent_ids(&self) -> Vec<AgentId> {
 156        self.agent_entries
 157            .iter()
 158            .map(|entry| entry.agent_id.clone())
 159            .collect()
 160    }
 161
 162    fn toggle_agent_checked(&mut self, agent_id: AgentId, cx: &mut Context<Self>) {
 163        if self.unchecked_agents.contains(&agent_id) {
 164            self.unchecked_agents.remove(&agent_id);
 165        } else {
 166            self.unchecked_agents.insert(agent_id);
 167        }
 168        cx.notify();
 169    }
 170
 171    fn select_next(&mut self, _: &menu::SelectNext, _window: &mut Window, cx: &mut Context<Self>) {
 172        if self.agent_entries.is_empty() {
 173            return;
 174        }
 175        self.selected_index = Some(match self.selected_index {
 176            Some(ix) if ix + 1 >= self.agent_entries.len() => 0,
 177            Some(ix) => ix + 1,
 178            None => 0,
 179        });
 180        cx.notify();
 181    }
 182
 183    fn select_previous(
 184        &mut self,
 185        _: &menu::SelectPrevious,
 186        _window: &mut Window,
 187        cx: &mut Context<Self>,
 188    ) {
 189        if self.agent_entries.is_empty() {
 190            return;
 191        }
 192        self.selected_index = Some(match self.selected_index {
 193            Some(0) => self.agent_entries.len() - 1,
 194            Some(ix) => ix - 1,
 195            None => self.agent_entries.len() - 1,
 196        });
 197        cx.notify();
 198    }
 199
 200    fn confirm(&mut self, _: &menu::Confirm, _window: &mut Window, cx: &mut Context<Self>) {
 201        if let Some(ix) = self.selected_index {
 202            if let Some(entry) = self.agent_entries.get(ix) {
 203                self.toggle_agent_checked(entry.agent_id.clone(), cx);
 204            }
 205        }
 206    }
 207
 208    fn cancel(&mut self, _: &menu::Cancel, _: &mut Window, cx: &mut Context<Self>) {
 209        cx.emit(DismissEvent);
 210    }
 211
 212    fn import_threads(
 213        &mut self,
 214        _: &menu::SecondaryConfirm,
 215        _: &mut Window,
 216        cx: &mut Context<Self>,
 217    ) {
 218        if self.is_importing {
 219            return;
 220        }
 221
 222        let Some(multi_workspace) = self.multi_workspace.upgrade() else {
 223            self.is_importing = false;
 224            cx.notify();
 225            return;
 226        };
 227
 228        let stores = resolve_agent_connection_stores(&multi_workspace, cx);
 229        if stores.is_empty() {
 230            log::error!("Did not find any workspaces to import from");
 231            self.is_importing = false;
 232            cx.notify();
 233            return;
 234        }
 235
 236        self.is_importing = true;
 237        self.last_error = None;
 238        cx.notify();
 239
 240        let agent_ids = self
 241            .agent_ids()
 242            .into_iter()
 243            .filter(|agent_id| !self.unchecked_agents.contains(agent_id))
 244            .collect::<Vec<_>>();
 245
 246        let existing_sessions: HashSet<acp::SessionId> = ThreadMetadataStore::global(cx)
 247            .read(cx)
 248            .entries()
 249            .filter_map(|m| m.session_id.clone())
 250            .collect();
 251
 252        let task = find_threads_to_import(agent_ids, existing_sessions, stores, cx);
 253        cx.spawn(async move |this, cx| {
 254            let result = task.await;
 255            this.update(cx, |this, cx| match result {
 256                Ok(threads) => {
 257                    let imported_count = threads.len();
 258                    ThreadMetadataStore::global(cx)
 259                        .update(cx, |store, cx| store.save_all(threads, cx));
 260                    this.is_importing = false;
 261                    this.last_error = None;
 262                    this.show_imported_threads_toast(imported_count, cx);
 263                    cx.emit(DismissEvent);
 264                }
 265                Err(error) => {
 266                    this.is_importing = false;
 267                    this.last_error = Some(error.to_string().into());
 268                    cx.notify();
 269                }
 270            })
 271        })
 272        .detach_and_log_err(cx);
 273    }
 274
 275    fn show_imported_threads_toast(&self, imported_count: usize, cx: &mut App) {
 276        let status_toast = if imported_count == 0 {
 277            StatusToast::new("No threads found to import.", cx, |this, _cx| {
 278                this.icon(
 279                    Icon::new(IconName::Info)
 280                        .size(IconSize::Small)
 281                        .color(Color::Muted),
 282                )
 283                .dismiss_button(true)
 284            })
 285        } else {
 286            let message = if imported_count == 1 {
 287                "Imported 1 thread.".to_string()
 288            } else {
 289                format!("Imported {imported_count} threads.")
 290            };
 291            StatusToast::new(message, cx, |this, _cx| {
 292                this.icon(
 293                    Icon::new(IconName::Check)
 294                        .size(IconSize::Small)
 295                        .color(Color::Success),
 296                )
 297                .dismiss_button(true)
 298            })
 299        };
 300
 301        self.workspace
 302            .update(cx, |workspace, cx| {
 303                workspace.toggle_status_toast(status_toast, cx);
 304            })
 305            .log_err();
 306    }
 307}
 308
 309impl EventEmitter<DismissEvent> for ThreadImportModal {}
 310
 311impl Focusable for ThreadImportModal {
 312    fn focus_handle(&self, _cx: &App) -> FocusHandle {
 313        self.focus_handle.clone()
 314    }
 315}
 316
 317impl ModalView for ThreadImportModal {}
 318
 319impl Render for ThreadImportModal {
 320    fn render(&mut self, _window: &mut Window, cx: &mut Context<Self>) -> impl IntoElement {
 321        let has_agents = !self.agent_entries.is_empty();
 322        let disabled_import_thread = self.is_importing
 323            || !has_agents
 324            || self.unchecked_agents.len() == self.agent_entries.len();
 325
 326        let agent_rows = self
 327            .agent_entries
 328            .iter()
 329            .enumerate()
 330            .map(|(ix, entry)| {
 331                let is_checked = !self.unchecked_agents.contains(&entry.agent_id);
 332                let is_focused = self.selected_index == Some(ix);
 333
 334                ListItem::new(("thread-import-agent", ix))
 335                    .rounded()
 336                    .spacing(ListItemSpacing::Sparse)
 337                    .focused(is_focused)
 338                    .disabled(self.is_importing)
 339                    .child(
 340                        h_flex()
 341                            .w_full()
 342                            .gap_2()
 343                            .when(!is_checked, |this| this.opacity(0.6))
 344                            .child(if let Some(icon_path) = entry.icon_path.clone() {
 345                                Icon::from_external_svg(icon_path)
 346                                    .color(Color::Muted)
 347                                    .size(IconSize::Small)
 348                            } else {
 349                                Icon::new(IconName::Sparkle)
 350                                    .color(Color::Muted)
 351                                    .size(IconSize::Small)
 352                            })
 353                            .child(Label::new(entry.display_name.clone())),
 354                    )
 355                    .end_slot(Checkbox::new(
 356                        ("thread-import-agent-checkbox", ix),
 357                        if is_checked {
 358                            ToggleState::Selected
 359                        } else {
 360                            ToggleState::Unselected
 361                        },
 362                    ))
 363                    .on_click({
 364                        let agent_id = entry.agent_id.clone();
 365                        cx.listener(move |this, _event, _window, cx| {
 366                            this.toggle_agent_checked(agent_id.clone(), cx);
 367                        })
 368                    })
 369            })
 370            .collect::<Vec<_>>();
 371
 372        v_flex()
 373            .id("thread-import-modal")
 374            .key_context("ThreadImportModal")
 375            .w(rems(34.))
 376            .elevation_3(cx)
 377            .overflow_hidden()
 378            .track_focus(&self.focus_handle)
 379            .on_action(cx.listener(Self::cancel))
 380            .on_action(cx.listener(Self::confirm))
 381            .on_action(cx.listener(Self::select_next))
 382            .on_action(cx.listener(Self::select_previous))
 383            .on_action(cx.listener(Self::import_threads))
 384            .on_any_mouse_down(cx.listener(|this, _: &MouseDownEvent, window, cx| {
 385                this.focus_handle.focus(window, cx);
 386            }))
 387            .child(
 388                Modal::new("import-threads", None)
 389                    .header(
 390                        ModalHeader::new()
 391                            .headline("Import External Agent Threads")
 392                            .description(
 393                                "Import threads from agents like Claude Agent, Codex, and more, whether started in Zed or another client. \
 394                                Choose which agents to include, and their threads will appear in your list."
 395                            )
 396                            .show_dismiss_button(true),
 397
 398                    )
 399                    .section(
 400                        Section::new().child(
 401                            v_flex()
 402                                .id("thread-import-agent-list")
 403                                .max_h(rems_from_px(320.))
 404                                .pb_1()
 405                                .overflow_y_scroll()
 406                                .when(has_agents, |this| this.children(agent_rows))
 407                                .when(!has_agents, |this| {
 408                                    this.child(
 409                                        Label::new("No ACP agents available.")
 410                                            .color(Color::Muted)
 411                                            .size(LabelSize::Small),
 412                                    )
 413                                }),
 414                        ),
 415                    )
 416                    .footer(
 417                        ModalFooter::new()
 418                            .when_some(self.last_error.clone(), |this, error| {
 419                                this.start_slot(
 420                                    Label::new(error)
 421                                        .size(LabelSize::Small)
 422                                        .color(Color::Error)
 423                                        .truncate(),
 424                                )
 425                            })
 426                            .end_slot(
 427                                Button::new("import-threads", "Import Threads")
 428                                    .loading(self.is_importing)
 429                                    .disabled(disabled_import_thread)
 430                                    .key_binding(
 431                                        KeyBinding::for_action(&menu::SecondaryConfirm, cx)
 432                                            .map(|kb| kb.size(rems_from_px(12.))),
 433                                    )
 434                                    .on_click(cx.listener(|this, _, window, cx| {
 435                                        this.import_threads(&menu::SecondaryConfirm, window, cx);
 436                                    })),
 437                            ),
 438                    ),
 439            )
 440    }
 441}
 442
 443fn resolve_agent_connection_stores(
 444    multi_workspace: &Entity<MultiWorkspace>,
 445    cx: &App,
 446) -> Vec<Entity<AgentConnectionStore>> {
 447    let mut stores = Vec::new();
 448    let mut included_local_store = false;
 449
 450    for workspace in multi_workspace.read(cx).workspaces() {
 451        let workspace = workspace.read(cx);
 452        let project = workspace.project().read(cx);
 453
 454        // We only want to include scores from one local workspace, since we
 455        // know that they live on the same machine
 456        let include_store = if project.is_remote() {
 457            true
 458        } else if project.is_local() && !included_local_store {
 459            included_local_store = true;
 460            true
 461        } else {
 462            false
 463        };
 464
 465        if !include_store {
 466            continue;
 467        }
 468
 469        if let Some(panel) = workspace.panel::<AgentPanel>(cx) {
 470            stores.push(panel.read(cx).connection_store().clone());
 471        }
 472    }
 473
 474    stores
 475}
 476
 477fn find_threads_to_import(
 478    agent_ids: Vec<AgentId>,
 479    existing_sessions: HashSet<acp::SessionId>,
 480    stores: Vec<Entity<AgentConnectionStore>>,
 481    cx: &mut App,
 482) -> Task<anyhow::Result<Vec<ThreadMetadata>>> {
 483    let mut wait_for_connection_tasks = Vec::new();
 484
 485    for store in stores {
 486        let remote_connection = store
 487            .read(cx)
 488            .project()
 489            .read(cx)
 490            .remote_connection_options(cx);
 491
 492        for agent_id in agent_ids.clone() {
 493            let agent = Agent::from(agent_id.clone());
 494            let server = agent.server(<dyn Fs>::global(cx), ThreadStore::global(cx));
 495            let entry = store.update(cx, |store, cx| store.request_connection(agent, server, cx));
 496
 497            wait_for_connection_tasks.push(entry.read(cx).wait_for_connection().map({
 498                let remote_connection = remote_connection.clone();
 499                move |state| (agent_id, remote_connection, state)
 500            }));
 501        }
 502    }
 503
 504    let mut session_list_tasks = Vec::new();
 505    cx.spawn(async move |cx| {
 506        let results = futures::future::join_all(wait_for_connection_tasks).await;
 507        for (agent_id, remote_connection, result) in results {
 508            let Some(state) = result.log_err() else {
 509                continue;
 510            };
 511            let Some(list) = cx.update(|cx| state.connection.session_list(cx)) else {
 512                continue;
 513            };
 514            let task = cx.update(|cx| {
 515                list.list_sessions(AgentSessionListRequest::default(), cx)
 516                    .map({
 517                        let remote_connection = remote_connection.clone();
 518                        move |response| (agent_id, remote_connection, response)
 519                    })
 520            });
 521            session_list_tasks.push(task);
 522        }
 523
 524        let mut sessions_by_agent = Vec::new();
 525        let results = futures::future::join_all(session_list_tasks).await;
 526        for (agent_id, remote_connection, result) in results {
 527            let Some(response) = result.log_err() else {
 528                continue;
 529            };
 530            sessions_by_agent.push(SessionByAgent {
 531                agent_id,
 532                remote_connection,
 533                sessions: response.sessions,
 534            });
 535        }
 536
 537        Ok(collect_importable_threads(
 538            sessions_by_agent,
 539            existing_sessions,
 540        ))
 541    })
 542}
 543
 544struct SessionByAgent {
 545    agent_id: AgentId,
 546    remote_connection: Option<RemoteConnectionOptions>,
 547    sessions: Vec<acp_thread::AgentSessionInfo>,
 548}
 549
 550fn collect_importable_threads(
 551    sessions_by_agent: Vec<SessionByAgent>,
 552    mut existing_sessions: HashSet<acp::SessionId>,
 553) -> Vec<ThreadMetadata> {
 554    let mut to_insert = Vec::new();
 555    for SessionByAgent {
 556        agent_id,
 557        remote_connection,
 558        sessions,
 559    } in sessions_by_agent
 560    {
 561        for session in sessions {
 562            if !existing_sessions.insert(session.session_id.clone()) {
 563                continue;
 564            }
 565            let Some(folder_paths) = session.work_dirs else {
 566                continue;
 567            };
 568            let updated_at = session.updated_at.unwrap_or_else(|| Utc::now());
 569            to_insert.push(ThreadMetadata {
 570                thread_id: ThreadId::new(),
 571                session_id: Some(session.session_id),
 572                agent_id: agent_id.clone(),
 573                title: session.title,
 574                updated_at,
 575                created_at: session.created_at,
 576                worktree_paths: WorktreePaths::from_folder_paths(&folder_paths),
 577                remote_connection: remote_connection.clone(),
 578                last_user_interaction: updated_at,
 579                archived: true,
 580            });
 581        }
 582    }
 583    to_insert
 584}
 585
 586pub fn import_threads_from_other_channels(_workspace: &mut Workspace, cx: &mut Context<Workspace>) {
 587    let database_dir = paths::database_dir().clone();
 588    import_threads_from_other_channels_in(database_dir, cx);
 589}
 590
 591fn import_threads_from_other_channels_in(
 592    database_dir: std::path::PathBuf,
 593    cx: &mut Context<Workspace>,
 594) {
 595    let current_channel = ReleaseChannel::global(cx);
 596
 597    let existing_thread_ids: HashSet<ThreadId> = ThreadMetadataStore::global(cx)
 598        .read(cx)
 599        .entries()
 600        .map(|metadata| metadata.thread_id)
 601        .collect();
 602
 603    let workspace_handle = cx.weak_entity();
 604    cx.spawn(async move |_this, cx| {
 605        let mut imported_threads = Vec::new();
 606
 607        for channel in &ReleaseChannel::ALL {
 608            if *channel == current_channel || *channel == ReleaseChannel::Dev {
 609                continue;
 610            }
 611
 612            match read_threads_from_channel(&database_dir, *channel) {
 613                Ok(threads) => {
 614                    let new_threads = threads
 615                        .into_iter()
 616                        .filter(|thread| !existing_thread_ids.contains(&thread.thread_id));
 617                    imported_threads.extend(new_threads);
 618                }
 619                Err(error) => {
 620                    log::warn!(
 621                        "Failed to read threads from {} channel database: {}",
 622                        channel.dev_name(),
 623                        error
 624                    );
 625                }
 626            }
 627        }
 628
 629        let imported_count = imported_threads.len();
 630
 631        cx.update(|cx| {
 632            ThreadMetadataStore::global(cx)
 633                .update(cx, |store, cx| store.save_all(imported_threads, cx));
 634
 635            show_cross_channel_import_toast(&workspace_handle, imported_count, cx);
 636        })
 637    })
 638    .detach();
 639}
 640
 641fn channel_has_threads(database_dir: &std::path::Path, channel: ReleaseChannel) -> bool {
 642    let db_path = db::db_path(database_dir, channel);
 643    if !db_path.exists() {
 644        return false;
 645    }
 646    let connection = sqlez::connection::Connection::open_file(&db_path.to_string_lossy());
 647    connection
 648        .select_row::<bool>("SELECT 1 FROM sidebar_threads LIMIT 1")
 649        .ok()
 650        .and_then(|mut query| query().ok().flatten())
 651        .unwrap_or(false)
 652}
 653
 654fn read_threads_from_channel(
 655    database_dir: &std::path::Path,
 656    channel: ReleaseChannel,
 657) -> anyhow::Result<Vec<ThreadMetadata>> {
 658    let db_path = db::db_path(database_dir, channel);
 659    if !db_path.exists() {
 660        return Ok(Vec::new());
 661    }
 662    let connection = sqlez::connection::Connection::open_file(&db_path.to_string_lossy());
 663    crate::thread_metadata_store::list_thread_metadata_from_connection(&connection)
 664}
 665
 666fn show_cross_channel_import_toast(
 667    workspace: &WeakEntity<Workspace>,
 668    imported_count: usize,
 669    cx: &mut App,
 670) {
 671    let status_toast = if imported_count == 0 {
 672        StatusToast::new("No new threads found to import.", cx, |this, _cx| {
 673            this.icon(Icon::new(IconName::Info).color(Color::Muted))
 674                .dismiss_button(true)
 675        })
 676    } else {
 677        let message = if imported_count == 1 {
 678            "Imported 1 thread from other channels.".to_string()
 679        } else {
 680            format!("Imported {imported_count} threads from other channels.")
 681        };
 682        StatusToast::new(message, cx, |this, _cx| {
 683            this.icon(Icon::new(IconName::Check).color(Color::Success))
 684                .dismiss_button(true)
 685        })
 686    };
 687
 688    workspace
 689        .update(cx, |workspace, cx| {
 690            workspace.toggle_status_toast(status_toast, cx);
 691        })
 692        .log_err();
 693}
 694
 695#[cfg(test)]
 696mod tests {
 697    use super::*;
 698    use acp_thread::AgentSessionInfo;
 699    use chrono::Utc;
 700    use gpui::TestAppContext;
 701    use std::path::Path;
 702    use workspace::PathList;
 703
 704    fn make_session(
 705        session_id: &str,
 706        title: Option<&str>,
 707        work_dirs: Option<PathList>,
 708        updated_at: Option<chrono::DateTime<Utc>>,
 709        created_at: Option<chrono::DateTime<Utc>>,
 710    ) -> AgentSessionInfo {
 711        AgentSessionInfo {
 712            session_id: acp::SessionId::new(session_id),
 713            title: title.map(|t| SharedString::from(t.to_string())),
 714            work_dirs,
 715            updated_at,
 716            created_at,
 717            meta: None,
 718        }
 719    }
 720
 721    #[test]
 722    fn test_collect_skips_sessions_already_in_existing_set() {
 723        let existing = HashSet::from_iter(vec![acp::SessionId::new("existing-1")]);
 724        let paths = PathList::new(&[Path::new("/project")]);
 725
 726        let sessions_by_agent = vec![SessionByAgent {
 727            agent_id: AgentId::new("agent-a"),
 728            remote_connection: None,
 729            sessions: vec![
 730                make_session(
 731                    "existing-1",
 732                    Some("Already There"),
 733                    Some(paths.clone()),
 734                    None,
 735                    None,
 736                ),
 737                make_session("new-1", Some("Brand New"), Some(paths), None, None),
 738            ],
 739        }];
 740
 741        let result = collect_importable_threads(sessions_by_agent, existing);
 742
 743        assert_eq!(result.len(), 1);
 744        assert_eq!(result[0].session_id.as_ref().unwrap().0.as_ref(), "new-1");
 745        assert_eq!(result[0].display_title(), "Brand New");
 746    }
 747
 748    #[test]
 749    fn test_collect_skips_sessions_without_work_dirs() {
 750        let existing = HashSet::default();
 751        let paths = PathList::new(&[Path::new("/project")]);
 752
 753        let sessions_by_agent = vec![SessionByAgent {
 754            agent_id: AgentId::new("agent-a"),
 755            remote_connection: None,
 756            sessions: vec![
 757                make_session("has-dirs", Some("With Dirs"), Some(paths), None, None),
 758                make_session("no-dirs", Some("No Dirs"), None, None, None),
 759            ],
 760        }];
 761
 762        let result = collect_importable_threads(sessions_by_agent, existing);
 763
 764        assert_eq!(result.len(), 1);
 765        assert_eq!(
 766            result[0].session_id.as_ref().unwrap().0.as_ref(),
 767            "has-dirs"
 768        );
 769    }
 770
 771    #[test]
 772    fn test_collect_marks_all_imported_threads_as_archived() {
 773        let existing = HashSet::default();
 774        let paths = PathList::new(&[Path::new("/project")]);
 775
 776        let sessions_by_agent = vec![SessionByAgent {
 777            agent_id: AgentId::new("agent-a"),
 778            remote_connection: None,
 779            sessions: vec![
 780                make_session("s1", Some("Thread 1"), Some(paths.clone()), None, None),
 781                make_session("s2", Some("Thread 2"), Some(paths), None, None),
 782            ],
 783        }];
 784
 785        let result = collect_importable_threads(sessions_by_agent, existing);
 786
 787        assert_eq!(result.len(), 2);
 788        assert!(result.iter().all(|t| t.archived));
 789    }
 790
 791    #[test]
 792    fn test_collect_assigns_correct_agent_id_per_session() {
 793        let existing = HashSet::default();
 794        let paths = PathList::new(&[Path::new("/project")]);
 795
 796        let sessions_by_agent = vec![
 797            SessionByAgent {
 798                agent_id: AgentId::new("agent-a"),
 799                remote_connection: None,
 800                sessions: vec![make_session(
 801                    "s1",
 802                    Some("From A"),
 803                    Some(paths.clone()),
 804                    None,
 805                    None,
 806                )],
 807            },
 808            SessionByAgent {
 809                agent_id: AgentId::new("agent-b"),
 810                remote_connection: None,
 811                sessions: vec![make_session("s2", Some("From B"), Some(paths), None, None)],
 812            },
 813        ];
 814
 815        let result = collect_importable_threads(sessions_by_agent, existing);
 816
 817        assert_eq!(result.len(), 2);
 818        let s1 = result
 819            .iter()
 820            .find(|t| t.session_id.as_ref().map(|s| s.0.as_ref()) == Some("s1"))
 821            .unwrap();
 822        let s2 = result
 823            .iter()
 824            .find(|t| t.session_id.as_ref().map(|s| s.0.as_ref()) == Some("s2"))
 825            .unwrap();
 826        assert_eq!(s1.agent_id.as_ref(), "agent-a");
 827        assert_eq!(s2.agent_id.as_ref(), "agent-b");
 828    }
 829
 830    #[test]
 831    fn test_collect_deduplicates_across_agents() {
 832        let existing = HashSet::default();
 833        let paths = PathList::new(&[Path::new("/project")]);
 834
 835        let sessions_by_agent = vec![
 836            SessionByAgent {
 837                agent_id: AgentId::new("agent-a"),
 838                remote_connection: None,
 839                sessions: vec![make_session(
 840                    "shared-session",
 841                    Some("From A"),
 842                    Some(paths.clone()),
 843                    None,
 844                    None,
 845                )],
 846            },
 847            SessionByAgent {
 848                agent_id: AgentId::new("agent-b"),
 849                remote_connection: None,
 850                sessions: vec![make_session(
 851                    "shared-session",
 852                    Some("From B"),
 853                    Some(paths),
 854                    None,
 855                    None,
 856                )],
 857            },
 858        ];
 859
 860        let result = collect_importable_threads(sessions_by_agent, existing);
 861
 862        assert_eq!(result.len(), 1);
 863        assert_eq!(
 864            result[0].session_id.as_ref().unwrap().0.as_ref(),
 865            "shared-session"
 866        );
 867        assert_eq!(
 868            result[0].agent_id.as_ref(),
 869            "agent-a",
 870            "first agent encountered should win"
 871        );
 872    }
 873
 874    #[test]
 875    fn test_collect_all_existing_returns_empty() {
 876        let paths = PathList::new(&[Path::new("/project")]);
 877        let existing =
 878            HashSet::from_iter(vec![acp::SessionId::new("s1"), acp::SessionId::new("s2")]);
 879
 880        let sessions_by_agent = vec![SessionByAgent {
 881            agent_id: AgentId::new("agent-a"),
 882            remote_connection: None,
 883            sessions: vec![
 884                make_session("s1", Some("T1"), Some(paths.clone()), None, None),
 885                make_session("s2", Some("T2"), Some(paths), None, None),
 886            ],
 887        }];
 888
 889        let result = collect_importable_threads(sessions_by_agent, existing);
 890        assert!(result.is_empty());
 891    }
 892
 893    fn create_channel_db(
 894        db_dir: &std::path::Path,
 895        channel: ReleaseChannel,
 896    ) -> db::sqlez::connection::Connection {
 897        let db_path = db::db_path(db_dir, channel);
 898        std::fs::create_dir_all(db_path.parent().unwrap()).unwrap();
 899        let connection = db::sqlez::connection::Connection::open_file(&db_path.to_string_lossy());
 900        crate::thread_metadata_store::run_thread_metadata_migrations(&connection);
 901        connection
 902    }
 903
 904    fn insert_thread(
 905        connection: &db::sqlez::connection::Connection,
 906        title: &str,
 907        updated_at: &str,
 908        archived: bool,
 909    ) {
 910        let thread_id = uuid::Uuid::new_v4();
 911        let session_id = uuid::Uuid::new_v4().to_string();
 912        connection
 913            .exec_bound::<(uuid::Uuid, &str, &str, &str, bool, &str)>(
 914                "INSERT INTO sidebar_threads \
 915                 (thread_id, session_id, title, updated_at, archived, last_user_interaction) \
 916                 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
 917            )
 918            .unwrap()((
 919            thread_id,
 920            session_id.as_str(),
 921            title,
 922            updated_at,
 923            archived,
 924            updated_at,
 925        ))
 926        .unwrap();
 927    }
 928
 929    #[test]
 930    fn test_returns_empty_when_channel_db_missing() {
 931        let dir = tempfile::tempdir().unwrap();
 932        let threads = read_threads_from_channel(dir.path(), ReleaseChannel::Nightly).unwrap();
 933        assert!(threads.is_empty());
 934    }
 935
 936    #[test]
 937    fn test_preserves_archived_state() {
 938        let dir = tempfile::tempdir().unwrap();
 939        let connection = create_channel_db(dir.path(), ReleaseChannel::Nightly);
 940
 941        insert_thread(&connection, "Active Thread", "2025-01-15T10:00:00Z", false);
 942        insert_thread(&connection, "Archived Thread", "2025-01-15T09:00:00Z", true);
 943        drop(connection);
 944
 945        let threads = read_threads_from_channel(dir.path(), ReleaseChannel::Nightly).unwrap();
 946        assert_eq!(threads.len(), 2);
 947
 948        let active = threads
 949            .iter()
 950            .find(|t| t.display_title().as_ref() == "Active Thread")
 951            .unwrap();
 952        assert!(!active.archived);
 953
 954        let archived = threads
 955            .iter()
 956            .find(|t| t.display_title().as_ref() == "Archived Thread")
 957            .unwrap();
 958        assert!(archived.archived);
 959    }
 960
 961    fn init_test(cx: &mut TestAppContext) {
 962        let fs = fs::FakeFs::new(cx.executor());
 963        cx.update(|cx| {
 964            let settings_store = settings::SettingsStore::test(cx);
 965            cx.set_global(settings_store);
 966            theme_settings::init(theme::LoadThemes::JustBase, cx);
 967            release_channel::init("0.0.0".parse().unwrap(), cx);
 968            <dyn fs::Fs>::set_global(fs, cx);
 969            ThreadMetadataStore::init_global(cx);
 970        });
 971        cx.run_until_parked();
 972    }
 973
 974    /// Returns two release channels that are not the current one and not Dev.
 975    /// This ensures tests work regardless of which release channel branch
 976    /// they run on.
 977    fn foreign_channels(cx: &TestAppContext) -> (ReleaseChannel, ReleaseChannel) {
 978        let current = cx.update(|cx| ReleaseChannel::global(cx));
 979        let mut channels = ReleaseChannel::ALL
 980            .iter()
 981            .copied()
 982            .filter(|ch| *ch != current && *ch != ReleaseChannel::Dev);
 983        (channels.next().unwrap(), channels.next().unwrap())
 984    }
 985
 986    #[gpui::test]
 987    async fn test_import_threads_from_other_channels(cx: &mut TestAppContext) {
 988        init_test(cx);
 989
 990        let dir = tempfile::tempdir().unwrap();
 991        let database_dir = dir.path().to_path_buf();
 992
 993        let (channel_a, channel_b) = foreign_channels(cx);
 994
 995        // Set up databases for two foreign channels.
 996        let db_a = create_channel_db(dir.path(), channel_a);
 997        insert_thread(&db_a, "Thread A1", "2025-01-15T10:00:00Z", false);
 998        insert_thread(&db_a, "Thread A2", "2025-01-15T11:00:00Z", true);
 999        drop(db_a);
1000
1001        let db_b = create_channel_db(dir.path(), channel_b);
1002        insert_thread(&db_b, "Thread B1", "2025-01-15T12:00:00Z", false);
1003        drop(db_b);
1004
1005        // Create a workspace and run the import.
1006        let fs = fs::FakeFs::new(cx.executor());
1007        let project = project::Project::test(fs, [], cx).await;
1008        let multi_workspace =
1009            cx.add_window(|window, cx| MultiWorkspace::test_new(project, window, cx));
1010        let workspace_entity = multi_workspace
1011            .read_with(cx, |mw, _cx| mw.workspace().clone())
1012            .unwrap();
1013        let mut vcx = gpui::VisualTestContext::from_window(multi_workspace.into(), cx);
1014
1015        workspace_entity.update_in(&mut vcx, |_workspace, _window, cx| {
1016            import_threads_from_other_channels_in(database_dir, cx);
1017        });
1018        cx.run_until_parked();
1019
1020        // Verify all three threads were imported into the store.
1021        cx.update(|cx| {
1022            let store = ThreadMetadataStore::global(cx);
1023            let store = store.read(cx);
1024            let titles: collections::HashSet<String> = store
1025                .entries()
1026                .map(|m| m.display_title().to_string())
1027                .collect();
1028
1029            assert_eq!(titles.len(), 3);
1030            assert!(titles.contains("Thread A1"));
1031            assert!(titles.contains("Thread A2"));
1032            assert!(titles.contains("Thread B1"));
1033
1034            // Verify archived state is preserved.
1035            let thread_a2 = store
1036                .entries()
1037                .find(|m| m.display_title().as_ref() == "Thread A2")
1038                .unwrap();
1039            assert!(thread_a2.archived);
1040
1041            let thread_b1 = store
1042                .entries()
1043                .find(|m| m.display_title().as_ref() == "Thread B1")
1044                .unwrap();
1045            assert!(!thread_b1.archived);
1046        });
1047    }
1048
1049    #[gpui::test]
1050    async fn test_import_skips_already_existing_threads(cx: &mut TestAppContext) {
1051        init_test(cx);
1052
1053        let dir = tempfile::tempdir().unwrap();
1054        let database_dir = dir.path().to_path_buf();
1055
1056        let (channel_a, _) = foreign_channels(cx);
1057
1058        // Set up a database for a foreign channel.
1059        let db_a = create_channel_db(dir.path(), channel_a);
1060        insert_thread(&db_a, "Thread A", "2025-01-15T10:00:00Z", false);
1061        insert_thread(&db_a, "Thread B", "2025-01-15T11:00:00Z", false);
1062        drop(db_a);
1063
1064        // Read the threads so we can pre-populate one into the store.
1065        let foreign_threads = read_threads_from_channel(dir.path(), channel_a).unwrap();
1066        let thread_a = foreign_threads
1067            .iter()
1068            .find(|t| t.display_title().as_ref() == "Thread A")
1069            .unwrap()
1070            .clone();
1071
1072        // Pre-populate Thread A into the store.
1073        cx.update(|cx| {
1074            ThreadMetadataStore::global(cx).update(cx, |store, cx| store.save(thread_a, cx));
1075        });
1076        cx.run_until_parked();
1077
1078        // Run the import.
1079        let fs = fs::FakeFs::new(cx.executor());
1080        let project = project::Project::test(fs, [], cx).await;
1081        let multi_workspace =
1082            cx.add_window(|window, cx| MultiWorkspace::test_new(project, window, cx));
1083        let workspace_entity = multi_workspace
1084            .read_with(cx, |mw, _cx| mw.workspace().clone())
1085            .unwrap();
1086        let mut vcx = gpui::VisualTestContext::from_window(multi_workspace.into(), cx);
1087
1088        workspace_entity.update_in(&mut vcx, |_workspace, _window, cx| {
1089            import_threads_from_other_channels_in(database_dir, cx);
1090        });
1091        cx.run_until_parked();
1092
1093        // Verify only Thread B was added (Thread A already existed).
1094        cx.update(|cx| {
1095            let store = ThreadMetadataStore::global(cx);
1096            let store = store.read(cx);
1097            assert_eq!(store.entries().count(), 2);
1098
1099            let titles: collections::HashSet<String> = store
1100                .entries()
1101                .map(|m| m.display_title().to_string())
1102                .collect();
1103            assert!(titles.contains("Thread A"));
1104            assert!(titles.contains("Thread B"));
1105        });
1106    }
1107}