session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
   9    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  10    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use crate::debugger::breakpoint_store::BreakpointSessionState;
  14use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
  15use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
  16use anyhow::{Context as _, Result, anyhow, bail};
  17use base64::Engine;
  18use collections::{HashMap, HashSet, IndexMap};
  19use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  20use dap::messages::Response;
  21use dap::requests::{Request, RunInTerminal, StartDebugging};
  22use dap::transport::TcpTransport;
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::channel::mpsc::UnboundedSender;
  35use futures::channel::{mpsc, oneshot};
  36use futures::io::BufReader;
  37use futures::{AsyncBufReadExt as _, SinkExt, StreamExt, TryStreamExt};
  38use futures::{FutureExt, future::Shared};
  39use gpui::{
  40    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  41    Task, WeakEntity,
  42};
  43use http_client::HttpClient;
  44use node_runtime::NodeRuntime;
  45use remote::RemoteClient;
  46use serde::{Deserialize, Serialize};
  47use serde_json::Value;
  48use smol::net::{TcpListener, TcpStream};
  49use std::any::TypeId;
  50use std::collections::{BTreeMap, VecDeque};
  51use std::net::Ipv4Addr;
  52use std::ops::RangeInclusive;
  53use std::path::PathBuf;
  54use std::time::Duration;
  55use std::u64;
  56use std::{
  57    any::Any,
  58    collections::hash_map::Entry,
  59    hash::{Hash, Hasher},
  60    path::Path,
  61    sync::Arc,
  62};
  63use task::SharedTaskContext;
  64use text::{PointUtf16, ToPointUtf16};
  65use url::Url;
  66use util::command::Stdio;
  67use util::command::new_command;
  68use util::{ResultExt, debug_panic, maybe};
  69use worktree::Worktree;
  70
  71const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
  72const DEBUG_HISTORY_LIMIT: usize = 10;
  73
  74#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  75#[repr(transparent)]
  76pub struct ThreadId(pub i64);
  77
  78impl From<i64> for ThreadId {
  79    fn from(id: i64) -> Self {
  80        Self(id)
  81    }
  82}
  83
  84#[derive(Clone, Debug)]
  85pub struct StackFrame {
  86    pub dap: dap::StackFrame,
  87    pub scopes: Vec<dap::Scope>,
  88}
  89
  90impl From<dap::StackFrame> for StackFrame {
  91    fn from(stack_frame: dap::StackFrame) -> Self {
  92        Self {
  93            scopes: vec![],
  94            dap: stack_frame,
  95        }
  96    }
  97}
  98
  99#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
 100pub enum ThreadStatus {
 101    #[default]
 102    Running,
 103    Stopped,
 104    Stepping,
 105    Exited,
 106    Ended,
 107}
 108
 109impl ThreadStatus {
 110    pub fn label(&self) -> &'static str {
 111        match self {
 112            ThreadStatus::Running => "Running",
 113            ThreadStatus::Stopped => "Stopped",
 114            ThreadStatus::Stepping => "Stepping",
 115            ThreadStatus::Exited => "Exited",
 116            ThreadStatus::Ended => "Ended",
 117        }
 118    }
 119}
 120
 121#[derive(Debug, Clone)]
 122pub struct Thread {
 123    dap: dap::Thread,
 124    stack_frames: Vec<StackFrame>,
 125    stack_frames_error: Option<SharedString>,
 126    _has_stopped: bool,
 127}
 128
 129impl From<dap::Thread> for Thread {
 130    fn from(dap: dap::Thread) -> Self {
 131        Self {
 132            dap,
 133            stack_frames: Default::default(),
 134            stack_frames_error: None,
 135            _has_stopped: false,
 136        }
 137    }
 138}
 139
 140#[derive(Debug, Clone, PartialEq)]
 141pub struct Watcher {
 142    pub expression: SharedString,
 143    pub value: SharedString,
 144    pub variables_reference: u64,
 145    pub presentation_hint: Option<VariablePresentationHint>,
 146}
 147
 148#[derive(Debug, Clone, PartialEq)]
 149pub struct DataBreakpointState {
 150    pub dap: dap::DataBreakpoint,
 151    pub is_enabled: bool,
 152    pub context: Arc<DataBreakpointContext>,
 153}
 154
 155pub enum SessionState {
 156    /// Represents a session that is building/initializing
 157    /// even if a session doesn't have a pre build task this state
 158    /// is used to run all the async tasks that are required to start the session
 159    Booting(Option<Task<Result<()>>>),
 160    Running(RunningMode),
 161}
 162
 163#[derive(Clone)]
 164pub struct RunningMode {
 165    client: Arc<DebugAdapterClient>,
 166    binary: DebugAdapterBinary,
 167    tmp_breakpoint: Option<SourceBreakpoint>,
 168    worktree: WeakEntity<Worktree>,
 169    executor: BackgroundExecutor,
 170    is_started: bool,
 171    has_ever_stopped: bool,
 172    messages_tx: UnboundedSender<Message>,
 173}
 174
 175#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 176pub struct SessionQuirks {
 177    pub compact: bool,
 178    pub prefer_thread_name: bool,
 179}
 180
 181fn client_source(abs_path: &Path) -> dap::Source {
 182    dap::Source {
 183        name: abs_path
 184            .file_name()
 185            .map(|filename| filename.to_string_lossy().into_owned()),
 186        path: Some(abs_path.to_string_lossy().into_owned()),
 187        source_reference: None,
 188        presentation_hint: None,
 189        origin: None,
 190        sources: None,
 191        adapter_data: None,
 192        checksums: None,
 193    }
 194}
 195
 196impl RunningMode {
 197    async fn new(
 198        session_id: SessionId,
 199        parent_session: Option<Entity<Session>>,
 200        worktree: WeakEntity<Worktree>,
 201        binary: DebugAdapterBinary,
 202        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 203        cx: &mut AsyncApp,
 204    ) -> Result<Self> {
 205        let message_handler = Box::new({
 206            let messages_tx = messages_tx.clone();
 207            move |message| {
 208                messages_tx.unbounded_send(message).ok();
 209            }
 210        });
 211
 212        let client = if let Some(client) =
 213            parent_session.and_then(|session| cx.update(|cx| session.read(cx).adapter_client()))
 214        {
 215            client
 216                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 217                .await?
 218        } else {
 219            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 220        };
 221
 222        Ok(Self {
 223            client: Arc::new(client),
 224            worktree,
 225            tmp_breakpoint: None,
 226            binary,
 227            executor: cx.background_executor().clone(),
 228            is_started: false,
 229            has_ever_stopped: false,
 230            messages_tx,
 231        })
 232    }
 233
 234    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 235        &self.worktree
 236    }
 237
 238    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 239        let tasks: Vec<_> = paths
 240            .iter()
 241            .map(|path| {
 242                self.request(dap_command::SetBreakpoints {
 243                    source: client_source(path),
 244                    source_modified: None,
 245                    breakpoints: vec![],
 246                })
 247            })
 248            .collect();
 249
 250        cx.background_spawn(async move {
 251            futures::future::join_all(tasks)
 252                .await
 253                .iter()
 254                .for_each(|res| match res {
 255                    Ok(_) => {}
 256                    Err(err) => {
 257                        log::warn!("Set breakpoints request failed: {}", err);
 258                    }
 259                });
 260        })
 261    }
 262
 263    fn send_breakpoints_from_path(
 264        &self,
 265        abs_path: Arc<Path>,
 266        reason: BreakpointUpdatedReason,
 267        breakpoint_store: &Entity<BreakpointStore>,
 268        cx: &mut App,
 269    ) -> Task<()> {
 270        let breakpoints =
 271            breakpoint_store
 272                .read(cx)
 273                .source_breakpoints_from_path(&abs_path, cx)
 274                .into_iter()
 275                .filter(|bp| bp.state.is_enabled())
 276                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 277                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 278                }))
 279                .map(Into::into)
 280                .collect();
 281
 282        let raw_breakpoints = breakpoint_store
 283            .read(cx)
 284            .breakpoints_from_path(&abs_path)
 285            .into_iter()
 286            .filter(|bp| bp.bp.state.is_enabled())
 287            .collect::<Vec<_>>();
 288
 289        let task = self.request(dap_command::SetBreakpoints {
 290            source: client_source(&abs_path),
 291            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 292            breakpoints,
 293        });
 294        let session_id = self.client.id();
 295        let breakpoint_store = breakpoint_store.downgrade();
 296        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 297            Ok(breakpoints) => {
 298                let breakpoints =
 299                    breakpoints
 300                        .into_iter()
 301                        .zip(raw_breakpoints)
 302                        .filter_map(|(dap_bp, zed_bp)| {
 303                            Some((
 304                                zed_bp,
 305                                BreakpointSessionState {
 306                                    id: dap_bp.id?,
 307                                    verified: dap_bp.verified,
 308                                },
 309                            ))
 310                        });
 311                breakpoint_store
 312                    .update(cx, |this, _| {
 313                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 314                    })
 315                    .ok();
 316            }
 317            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 318        })
 319    }
 320
 321    fn send_exception_breakpoints(
 322        &self,
 323        filters: Vec<ExceptionBreakpointsFilter>,
 324        supports_filter_options: bool,
 325    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 326        let arg = if supports_filter_options {
 327            SetExceptionBreakpoints::WithOptions {
 328                filters: filters
 329                    .into_iter()
 330                    .map(|filter| ExceptionFilterOptions {
 331                        filter_id: filter.filter,
 332                        condition: None,
 333                        mode: None,
 334                    })
 335                    .collect(),
 336            }
 337        } else {
 338            SetExceptionBreakpoints::Plain {
 339                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 340            }
 341        };
 342        self.request(arg)
 343    }
 344
 345    fn send_source_breakpoints(
 346        &self,
 347        ignore_breakpoints: bool,
 348        breakpoint_store: &Entity<BreakpointStore>,
 349        cx: &App,
 350    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 351        let mut breakpoint_tasks = Vec::new();
 352        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 353        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 354        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 355        let session_id = self.client.id();
 356        for (path, breakpoints) in breakpoints {
 357            let breakpoints = if ignore_breakpoints {
 358                vec![]
 359            } else {
 360                breakpoints
 361                    .into_iter()
 362                    .filter(|bp| bp.state.is_enabled())
 363                    .map(Into::into)
 364                    .collect()
 365            };
 366
 367            let raw_breakpoints = raw_breakpoints
 368                .remove(&path)
 369                .unwrap_or_default()
 370                .into_iter()
 371                .filter(|bp| bp.bp.state.is_enabled());
 372            let error_path = path.clone();
 373            let send_request = self
 374                .request(dap_command::SetBreakpoints {
 375                    source: client_source(&path),
 376                    source_modified: Some(false),
 377                    breakpoints,
 378                })
 379                .map(|result| result.map_err(move |e| (error_path, e)));
 380
 381            let task = cx.spawn({
 382                let breakpoint_store = breakpoint_store.downgrade();
 383                async move |cx| {
 384                    let breakpoints = cx.background_spawn(send_request).await?;
 385
 386                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 387                        |(dap_bp, zed_bp)| {
 388                            Some((
 389                                zed_bp,
 390                                BreakpointSessionState {
 391                                    id: dap_bp.id?,
 392                                    verified: dap_bp.verified,
 393                                },
 394                            ))
 395                        },
 396                    );
 397                    breakpoint_store
 398                        .update(cx, |this, _| {
 399                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 400                        })
 401                        .ok();
 402
 403                    Ok(())
 404                }
 405            });
 406            breakpoint_tasks.push(task);
 407        }
 408
 409        cx.background_spawn(async move {
 410            futures::future::join_all(breakpoint_tasks)
 411                .await
 412                .into_iter()
 413                .filter_map(Result::err)
 414                .collect::<HashMap<_, _>>()
 415        })
 416    }
 417
 418    fn initialize_sequence(
 419        &self,
 420        capabilities: &Capabilities,
 421        initialized_rx: oneshot::Receiver<()>,
 422        dap_store: WeakEntity<DapStore>,
 423        cx: &mut Context<Session>,
 424    ) -> Task<Result<()>> {
 425        let raw = self.binary.request_args.clone();
 426
 427        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 428        let launch = match raw.request {
 429            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 430                raw: raw.configuration,
 431            }),
 432            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 433                raw: raw.configuration,
 434            }),
 435        };
 436
 437        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 438        // From spec (on initialization sequence):
 439        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 440        //
 441        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 442        let should_send_exception_breakpoints = capabilities
 443            .exception_breakpoint_filters
 444            .as_ref()
 445            .is_some_and(|filters| !filters.is_empty())
 446            || !configuration_done_supported;
 447        let supports_exception_filters = capabilities
 448            .supports_exception_filter_options
 449            .unwrap_or_default();
 450        let this = self.clone();
 451        let worktree = self.worktree().clone();
 452        let mut filters = capabilities
 453            .exception_breakpoint_filters
 454            .clone()
 455            .unwrap_or_default();
 456        let configuration_sequence = cx.spawn({
 457            async move |session, cx| {
 458                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 459                let (breakpoint_store, adapter_defaults) =
 460                    dap_store.read_with(cx, |dap_store, _| {
 461                        (
 462                            dap_store.breakpoint_store().clone(),
 463                            dap_store.adapter_options(&adapter_name),
 464                        )
 465                    })?;
 466                initialized_rx.await?;
 467                let errors_by_path = cx
 468                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))
 469                    .await;
 470
 471                dap_store.update(cx, |_, cx| {
 472                    let Some(worktree) = worktree.upgrade() else {
 473                        return;
 474                    };
 475
 476                    for (path, error) in &errors_by_path {
 477                        log::error!("failed to set breakpoints for {path:?}: {error}");
 478                    }
 479
 480                    if let Some(failed_path) = errors_by_path.keys().next() {
 481                        let failed_path = failed_path
 482                            .strip_prefix(worktree.read(cx).abs_path())
 483                            .unwrap_or(failed_path)
 484                            .display();
 485                        let message = format!(
 486                            "Failed to set breakpoints for {failed_path}{}",
 487                            match errors_by_path.len() {
 488                                0 => unreachable!(),
 489                                1 => "".into(),
 490                                2 => " and 1 other path".into(),
 491                                n => format!(" and {} other paths", n - 1),
 492                            }
 493                        );
 494                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 495                    }
 496                })?;
 497
 498                if should_send_exception_breakpoints {
 499                    _ = session.update(cx, |this, _| {
 500                        filters.retain(|filter| {
 501                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 502                                defaults
 503                                    .exception_breakpoints
 504                                    .get(&filter.filter)
 505                                    .map(|options| options.enabled)
 506                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 507                            } else {
 508                                filter.default.unwrap_or_default()
 509                            };
 510                            this.exception_breakpoints
 511                                .entry(filter.filter.clone())
 512                                .or_insert_with(|| (filter.clone(), is_enabled));
 513                            is_enabled
 514                        });
 515                    });
 516
 517                    this.send_exception_breakpoints(filters, supports_exception_filters)
 518                        .await
 519                        .ok();
 520                }
 521
 522                if configuration_done_supported {
 523                    this.request(ConfigurationDone {})
 524                } else {
 525                    Task::ready(Ok(()))
 526                }
 527                .await
 528            }
 529        });
 530
 531        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 532
 533        cx.spawn(async move |this, cx| {
 534            let result = task.await;
 535
 536            this.update(cx, |this, cx| {
 537                if let Some(this) = this.as_running_mut() {
 538                    this.is_started = true;
 539                    cx.notify();
 540                }
 541            })
 542            .ok();
 543
 544            result?;
 545            anyhow::Ok(())
 546        })
 547    }
 548
 549    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 550        let client = self.client.clone();
 551        let messages_tx = self.messages_tx.clone();
 552        let message_handler = Box::new(move |message| {
 553            messages_tx.unbounded_send(message).ok();
 554        });
 555        if client.should_reconnect_for_ssh() {
 556            Some(cx.spawn(async move |cx| {
 557                client.connect(message_handler, cx).await?;
 558                anyhow::Ok(())
 559            }))
 560        } else {
 561            None
 562        }
 563    }
 564
 565    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 566    where
 567        <R::DapRequest as dap::requests::Request>::Response: 'static,
 568        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 569    {
 570        let request = Arc::new(request);
 571
 572        let request_clone = request.clone();
 573        let connection = self.client.clone();
 574        self.executor.spawn(async move {
 575            let args = request_clone.to_dap();
 576            let response = connection.request::<R::DapRequest>(args).await?;
 577            request.response_from_dap(response)
 578        })
 579    }
 580}
 581
 582impl SessionState {
 583    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 584    where
 585        <R::DapRequest as dap::requests::Request>::Response: 'static,
 586        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 587    {
 588        match self {
 589            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 590            SessionState::Booting(_) => Task::ready(Err(anyhow!(
 591                "no adapter running to send request: {request:?}"
 592            ))),
 593        }
 594    }
 595
 596    /// Did this debug session stop at least once?
 597    pub(crate) fn has_ever_stopped(&self) -> bool {
 598        match self {
 599            SessionState::Booting(_) => false,
 600            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 601        }
 602    }
 603
 604    fn stopped(&mut self) {
 605        if let SessionState::Running(running) = self {
 606            running.has_ever_stopped = true;
 607        }
 608    }
 609}
 610
 611#[derive(Default)]
 612struct ThreadStates {
 613    global_state: Option<ThreadStatus>,
 614    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 615}
 616
 617impl ThreadStates {
 618    fn stop_all_threads(&mut self) {
 619        self.global_state = Some(ThreadStatus::Stopped);
 620        self.known_thread_states.clear();
 621    }
 622
 623    fn exit_all_threads(&mut self) {
 624        self.global_state = Some(ThreadStatus::Exited);
 625        self.known_thread_states.clear();
 626    }
 627
 628    fn continue_all_threads(&mut self) {
 629        self.global_state = Some(ThreadStatus::Running);
 630        self.known_thread_states.clear();
 631    }
 632
 633    fn stop_thread(&mut self, thread_id: ThreadId) {
 634        self.known_thread_states
 635            .insert(thread_id, ThreadStatus::Stopped);
 636    }
 637
 638    fn continue_thread(&mut self, thread_id: ThreadId) {
 639        self.known_thread_states
 640            .insert(thread_id, ThreadStatus::Running);
 641    }
 642
 643    fn process_step(&mut self, thread_id: ThreadId) {
 644        self.known_thread_states
 645            .insert(thread_id, ThreadStatus::Stepping);
 646    }
 647
 648    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 649        self.thread_state(thread_id)
 650            .unwrap_or(ThreadStatus::Running)
 651    }
 652
 653    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 654        self.known_thread_states
 655            .get(&thread_id)
 656            .copied()
 657            .or(self.global_state)
 658    }
 659
 660    fn exit_thread(&mut self, thread_id: ThreadId) {
 661        self.known_thread_states
 662            .insert(thread_id, ThreadStatus::Exited);
 663    }
 664
 665    fn any_stopped_thread(&self) -> bool {
 666        self.global_state
 667            .is_some_and(|state| state == ThreadStatus::Stopped)
 668            || self
 669                .known_thread_states
 670                .values()
 671                .any(|status| *status == ThreadStatus::Stopped)
 672    }
 673}
 674
 675// TODO(debugger): Wrap dap types with reference counting so the UI doesn't have to clone them on refresh
 676#[derive(Default)]
 677pub struct SessionSnapshot {
 678    threads: IndexMap<ThreadId, Thread>,
 679    thread_states: ThreadStates,
 680    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 681    stack_frames: IndexMap<StackFrameId, StackFrame>,
 682    locations: HashMap<u64, dap::LocationsResponse>,
 683    modules: Vec<dap::Module>,
 684    loaded_sources: Vec<dap::Source>,
 685}
 686
 687type IsEnabled = bool;
 688
 689#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 690pub struct OutputToken(pub usize);
 691/// Represents a current state of a single debug adapter and provides ways to mutate it.
 692pub struct Session {
 693    pub state: SessionState,
 694    active_snapshot: SessionSnapshot,
 695    snapshots: VecDeque<SessionSnapshot>,
 696    selected_snapshot_index: Option<usize>,
 697    id: SessionId,
 698    label: Option<SharedString>,
 699    adapter: DebugAdapterName,
 700    pub(super) capabilities: Capabilities,
 701    child_session_ids: HashSet<SessionId>,
 702    parent_session: Option<Entity<Session>>,
 703    output_token: OutputToken,
 704    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 705    watchers: HashMap<SharedString, Watcher>,
 706    is_session_terminated: bool,
 707    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 708    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 709    ignore_breakpoints: bool,
 710    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 711    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 712    background_tasks: Vec<Task<()>>,
 713    restart_task: Option<Task<()>>,
 714    task_context: SharedTaskContext,
 715    memory: memory::Memory,
 716    quirks: SessionQuirks,
 717    remote_client: Option<Entity<RemoteClient>>,
 718    node_runtime: Option<NodeRuntime>,
 719    http_client: Option<Arc<dyn HttpClient>>,
 720    companion_port: Option<u16>,
 721}
 722
 723trait CacheableCommand: Any + Send + Sync {
 724    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 725    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 726    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 727}
 728
 729impl<T> CacheableCommand for T
 730where
 731    T: LocalDapCommand + PartialEq + Eq + Hash,
 732{
 733    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 734        (rhs as &dyn Any).downcast_ref::<Self>() == Some(self)
 735    }
 736
 737    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 738        T::hash(self, &mut hasher);
 739    }
 740
 741    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 742        self
 743    }
 744}
 745
 746pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 747
 748impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 749    fn from(request: T) -> Self {
 750        Self(Arc::new(request))
 751    }
 752}
 753
 754impl PartialEq for RequestSlot {
 755    fn eq(&self, other: &Self) -> bool {
 756        self.0.dyn_eq(other.0.as_ref())
 757    }
 758}
 759
 760impl Eq for RequestSlot {}
 761
 762impl Hash for RequestSlot {
 763    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 764        self.0.dyn_hash(state);
 765        (&*self.0 as &dyn Any).type_id().hash(state)
 766    }
 767}
 768
 769#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 770pub struct CompletionsQuery {
 771    pub query: String,
 772    pub column: u64,
 773    pub line: Option<u64>,
 774    pub frame_id: Option<u64>,
 775}
 776
 777impl CompletionsQuery {
 778    pub fn new(
 779        buffer: &language::Buffer,
 780        cursor_position: language::Anchor,
 781        frame_id: Option<u64>,
 782    ) -> Self {
 783        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 784        Self {
 785            query: buffer.text(),
 786            column: column as u64,
 787            frame_id,
 788            line: Some(row as u64),
 789        }
 790    }
 791}
 792
 793#[derive(Debug)]
 794pub enum SessionEvent {
 795    Modules,
 796    LoadedSources,
 797    Stopped(Option<ThreadId>),
 798    StackTrace,
 799    Variables,
 800    Watchers,
 801    Threads,
 802    InvalidateInlineValue,
 803    CapabilitiesLoaded,
 804    RunInTerminal {
 805        request: RunInTerminalRequestArguments,
 806        sender: mpsc::Sender<Result<u32>>,
 807    },
 808    DataBreakpointInfo,
 809    ConsoleOutput,
 810    HistoricSnapshotSelected,
 811}
 812
 813#[derive(Clone, Debug, PartialEq, Eq)]
 814pub enum SessionStateEvent {
 815    Running,
 816    Shutdown,
 817    Restart,
 818    SpawnChildSession {
 819        request: StartDebuggingRequestArguments,
 820    },
 821}
 822
 823impl EventEmitter<SessionEvent> for Session {}
 824impl EventEmitter<SessionStateEvent> for Session {}
 825
 826// local session will send breakpoint updates to DAP for all new breakpoints
 827// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 828// BreakpointStore notifies session on breakpoint changes
 829impl Session {
 830    pub(crate) fn new(
 831        breakpoint_store: Entity<BreakpointStore>,
 832        session_id: SessionId,
 833        parent_session: Option<Entity<Session>>,
 834        label: Option<SharedString>,
 835        adapter: DebugAdapterName,
 836        task_context: SharedTaskContext,
 837        quirks: SessionQuirks,
 838        remote_client: Option<Entity<RemoteClient>>,
 839        node_runtime: Option<NodeRuntime>,
 840        http_client: Option<Arc<dyn HttpClient>>,
 841        cx: &mut App,
 842    ) -> Entity<Self> {
 843        cx.new::<Self>(|cx| {
 844            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 845                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 846                    if let Some(local) = (!this.ignore_breakpoints)
 847                        .then(|| this.as_running_mut())
 848                        .flatten()
 849                    {
 850                        local
 851                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 852                            .detach();
 853                    };
 854                }
 855                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 856                    if let Some(local) = (!this.ignore_breakpoints)
 857                        .then(|| this.as_running_mut())
 858                        .flatten()
 859                    {
 860                        local.unset_breakpoints_from_paths(paths, cx).detach();
 861                    }
 862                }
 863                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 864            })
 865            .detach();
 866
 867            Self {
 868                state: SessionState::Booting(None),
 869                snapshots: VecDeque::with_capacity(DEBUG_HISTORY_LIMIT),
 870                selected_snapshot_index: None,
 871                active_snapshot: Default::default(),
 872                id: session_id,
 873                child_session_ids: HashSet::default(),
 874                parent_session,
 875                capabilities: Capabilities::default(),
 876                watchers: HashMap::default(),
 877                output_token: OutputToken(0),
 878                output: circular_buffer::CircularBuffer::boxed(),
 879                requests: HashMap::default(),
 880                background_tasks: Vec::default(),
 881                restart_task: None,
 882                is_session_terminated: false,
 883                ignore_breakpoints: false,
 884                breakpoint_store,
 885                data_breakpoints: Default::default(),
 886                exception_breakpoints: Default::default(),
 887                label,
 888                adapter,
 889                task_context,
 890                memory: memory::Memory::new(),
 891                quirks,
 892                remote_client,
 893                node_runtime,
 894                http_client,
 895                companion_port: None,
 896            }
 897        })
 898    }
 899
 900    pub fn task_context(&self) -> &SharedTaskContext {
 901        &self.task_context
 902    }
 903
 904    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 905        match &self.state {
 906            SessionState::Booting(_) => None,
 907            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 908        }
 909    }
 910
 911    pub fn boot(
 912        &mut self,
 913        binary: DebugAdapterBinary,
 914        worktree: Entity<Worktree>,
 915        dap_store: WeakEntity<DapStore>,
 916        cx: &mut Context<Self>,
 917    ) -> Task<Result<()>> {
 918        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 919        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 920
 921        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 922            let mut initialized_tx = Some(initialized_tx);
 923            while let Some(message) = message_rx.next().await {
 924                if let Message::Event(event) = message {
 925                    if let Events::Initialized(_) = *event {
 926                        if let Some(tx) = initialized_tx.take() {
 927                            tx.send(()).ok();
 928                        }
 929                    } else {
 930                        let Ok(_) = this.update(cx, |session, cx| {
 931                            session.handle_dap_event(event, cx);
 932                        }) else {
 933                            break;
 934                        };
 935                    }
 936                } else if let Message::Request(request) = message {
 937                    let Ok(_) = this.update(cx, |this, cx| {
 938                        if request.command == StartDebugging::COMMAND {
 939                            this.handle_start_debugging_request(request, cx)
 940                                .detach_and_log_err(cx);
 941                        } else if request.command == RunInTerminal::COMMAND {
 942                            this.handle_run_in_terminal_request(request, cx)
 943                                .detach_and_log_err(cx);
 944                        }
 945                    }) else {
 946                        break;
 947                    };
 948                }
 949            }
 950        })];
 951        self.background_tasks = background_tasks;
 952        let id = self.id;
 953        let parent_session = self.parent_session.clone();
 954
 955        cx.spawn(async move |this, cx| {
 956            let mode = RunningMode::new(
 957                id,
 958                parent_session,
 959                worktree.downgrade(),
 960                binary.clone(),
 961                message_tx,
 962                cx,
 963            )
 964            .await?;
 965            this.update(cx, |this, cx| {
 966                match &mut this.state {
 967                    SessionState::Booting(task) if task.is_some() => {
 968                        task.take().unwrap().detach_and_log_err(cx);
 969                    }
 970                    SessionState::Booting(_) => {}
 971                    SessionState::Running(_) => {
 972                        debug_panic!("Attempting to boot a session that is already running");
 973                    }
 974                };
 975                this.state = SessionState::Running(mode);
 976                cx.emit(SessionStateEvent::Running);
 977            })?;
 978
 979            this.update(cx, |session, cx| session.request_initialize(cx))?
 980                .await?;
 981
 982            let result = this
 983                .update(cx, |session, cx| {
 984                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 985                })?
 986                .await;
 987
 988            if result.is_err() {
 989                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 990
 991                console
 992                    .send(format!(
 993                        "Tried to launch debugger with: {}",
 994                        serde_json::to_string_pretty(&binary.request_args.configuration)
 995                            .unwrap_or_default(),
 996                    ))
 997                    .await
 998                    .ok();
 999            }
1000
1001            result
1002        })
1003    }
1004
1005    pub fn session_id(&self) -> SessionId {
1006        self.id
1007    }
1008
1009    pub fn child_session_ids(&self) -> HashSet<SessionId> {
1010        self.child_session_ids.clone()
1011    }
1012
1013    pub fn add_child_session_id(&mut self, session_id: SessionId) {
1014        self.child_session_ids.insert(session_id);
1015    }
1016
1017    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1018        self.child_session_ids.remove(&session_id);
1019    }
1020
1021    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1022        self.parent_session
1023            .as_ref()
1024            .map(|session| session.read(cx).id)
1025    }
1026
1027    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1028        self.parent_session.as_ref()
1029    }
1030
1031    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1032        let Some(client) = self.adapter_client() else {
1033            return Task::ready(());
1034        };
1035
1036        let supports_terminate = self
1037            .capabilities
1038            .support_terminate_debuggee
1039            .unwrap_or(false);
1040
1041        cx.background_spawn(async move {
1042            if supports_terminate {
1043                client
1044                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1045                        restart: Some(false),
1046                    })
1047                    .await
1048                    .ok();
1049            } else {
1050                client
1051                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1052                        restart: Some(false),
1053                        terminate_debuggee: Some(true),
1054                        suspend_debuggee: Some(false),
1055                    })
1056                    .await
1057                    .ok();
1058            }
1059        })
1060    }
1061
1062    pub fn capabilities(&self) -> &Capabilities {
1063        &self.capabilities
1064    }
1065
1066    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1067        match &self.state {
1068            SessionState::Booting(_) => None,
1069            SessionState::Running(running_mode) => Some(&running_mode.binary),
1070        }
1071    }
1072
1073    pub fn adapter(&self) -> DebugAdapterName {
1074        self.adapter.clone()
1075    }
1076
1077    pub fn label(&self) -> Option<SharedString> {
1078        self.label.clone()
1079    }
1080
1081    pub fn is_terminated(&self) -> bool {
1082        self.is_session_terminated
1083    }
1084
1085    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1086        let (tx, mut rx) = mpsc::unbounded();
1087
1088        cx.spawn(async move |this, cx| {
1089            while let Some(output) = rx.next().await {
1090                this.update(cx, |this, _| {
1091                    let event = dap::OutputEvent {
1092                        category: None,
1093                        output,
1094                        group: None,
1095                        variables_reference: None,
1096                        source: None,
1097                        line: None,
1098                        column: None,
1099                        data: None,
1100                        location_reference: None,
1101                    };
1102                    this.push_output(event);
1103                })?;
1104            }
1105            anyhow::Ok(())
1106        })
1107        .detach();
1108
1109        tx
1110    }
1111
1112    pub fn is_started(&self) -> bool {
1113        match &self.state {
1114            SessionState::Booting(_) => false,
1115            SessionState::Running(running) => running.is_started,
1116        }
1117    }
1118
1119    pub fn is_building(&self) -> bool {
1120        matches!(self.state, SessionState::Booting(_))
1121    }
1122
1123    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1124        match &mut self.state {
1125            SessionState::Running(local_mode) => Some(local_mode),
1126            SessionState::Booting(_) => None,
1127        }
1128    }
1129
1130    pub fn as_running(&self) -> Option<&RunningMode> {
1131        match &self.state {
1132            SessionState::Running(local_mode) => Some(local_mode),
1133            SessionState::Booting(_) => None,
1134        }
1135    }
1136
1137    fn handle_start_debugging_request(
1138        &mut self,
1139        request: dap::messages::Request,
1140        cx: &mut Context<Self>,
1141    ) -> Task<Result<()>> {
1142        let request_seq = request.seq;
1143
1144        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1145            .arguments
1146            .as_ref()
1147            .map(|value| serde_json::from_value(value.clone()));
1148
1149        let mut success = true;
1150        if let Some(Ok(request)) = launch_request {
1151            cx.emit(SessionStateEvent::SpawnChildSession { request });
1152        } else {
1153            log::error!(
1154                "Failed to parse launch request arguments: {:?}",
1155                request.arguments
1156            );
1157            success = false;
1158        }
1159
1160        cx.spawn(async move |this, cx| {
1161            this.update(cx, |this, cx| {
1162                this.respond_to_client(
1163                    request_seq,
1164                    success,
1165                    StartDebugging::COMMAND.to_string(),
1166                    None,
1167                    cx,
1168                )
1169            })?
1170            .await
1171        })
1172    }
1173
1174    fn handle_run_in_terminal_request(
1175        &mut self,
1176        request: dap::messages::Request,
1177        cx: &mut Context<Self>,
1178    ) -> Task<Result<()>> {
1179        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1180            request.arguments.unwrap_or_default(),
1181        ) {
1182            Ok(args) => args,
1183            Err(error) => {
1184                return cx.spawn(async move |session, cx| {
1185                    let error = serde_json::to_value(dap::ErrorResponse {
1186                        error: Some(dap::Message {
1187                            id: request.seq,
1188                            format: error.to_string(),
1189                            variables: None,
1190                            send_telemetry: None,
1191                            show_user: None,
1192                            url: None,
1193                            url_label: None,
1194                        }),
1195                    })
1196                    .ok();
1197
1198                    session
1199                        .update(cx, |this, cx| {
1200                            this.respond_to_client(
1201                                request.seq,
1202                                false,
1203                                StartDebugging::COMMAND.to_string(),
1204                                error,
1205                                cx,
1206                            )
1207                        })?
1208                        .await?;
1209
1210                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1211                });
1212            }
1213        };
1214
1215        let seq = request.seq;
1216
1217        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1218        cx.emit(SessionEvent::RunInTerminal {
1219            request: request_args,
1220            sender: tx,
1221        });
1222        cx.notify();
1223
1224        cx.spawn(async move |session, cx| {
1225            let result = util::maybe!(async move {
1226                rx.next().await.ok_or_else(|| {
1227                    anyhow!("failed to receive response from spawn terminal".to_string())
1228                })?
1229            })
1230            .await;
1231            let (success, body) = match result {
1232                Ok(pid) => (
1233                    true,
1234                    serde_json::to_value(dap::RunInTerminalResponse {
1235                        process_id: None,
1236                        shell_process_id: Some(pid as u64),
1237                    })
1238                    .ok(),
1239                ),
1240                Err(error) => (
1241                    false,
1242                    serde_json::to_value(dap::ErrorResponse {
1243                        error: Some(dap::Message {
1244                            id: seq,
1245                            format: error.to_string(),
1246                            variables: None,
1247                            send_telemetry: None,
1248                            show_user: None,
1249                            url: None,
1250                            url_label: None,
1251                        }),
1252                    })
1253                    .ok(),
1254                ),
1255            };
1256
1257            session
1258                .update(cx, |session, cx| {
1259                    session.respond_to_client(
1260                        seq,
1261                        success,
1262                        RunInTerminal::COMMAND.to_string(),
1263                        body,
1264                        cx,
1265                    )
1266                })?
1267                .await
1268        })
1269    }
1270
1271    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1272        let adapter_id = self.adapter().to_string();
1273        let request = Initialize { adapter_id };
1274
1275        let SessionState::Running(running) = &self.state else {
1276            return Task::ready(Err(anyhow!(
1277                "Cannot send initialize request, task still building"
1278            )));
1279        };
1280        let mut response = running.request(request.clone());
1281
1282        cx.spawn(async move |this, cx| {
1283            loop {
1284                let capabilities = response.await;
1285                match capabilities {
1286                    Err(e) => {
1287                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1288                            this.as_running()
1289                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1290                        }) else {
1291                            return Err(e);
1292                        };
1293                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1294                        reconnect.await?;
1295
1296                        let Ok(Some(r)) = this.update(cx, |this, _| {
1297                            this.as_running()
1298                                .map(|running| running.request(request.clone()))
1299                        }) else {
1300                            return Err(e);
1301                        };
1302                        response = r
1303                    }
1304                    Ok(capabilities) => {
1305                        this.update(cx, |session, cx| {
1306                            session.capabilities = capabilities;
1307
1308                            cx.emit(SessionEvent::CapabilitiesLoaded);
1309                        })?;
1310                        return Ok(());
1311                    }
1312                }
1313            }
1314        })
1315    }
1316
1317    pub(super) fn initialize_sequence(
1318        &mut self,
1319        initialize_rx: oneshot::Receiver<()>,
1320        dap_store: WeakEntity<DapStore>,
1321        cx: &mut Context<Self>,
1322    ) -> Task<Result<()>> {
1323        match &self.state {
1324            SessionState::Running(local_mode) => {
1325                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1326            }
1327            SessionState::Booting(_) => {
1328                Task::ready(Err(anyhow!("cannot initialize, still building")))
1329            }
1330        }
1331    }
1332
1333    pub fn run_to_position(
1334        &mut self,
1335        breakpoint: SourceBreakpoint,
1336        active_thread_id: ThreadId,
1337        cx: &mut Context<Self>,
1338    ) {
1339        match &mut self.state {
1340            SessionState::Running(local_mode) => {
1341                if !matches!(
1342                    self.active_snapshot
1343                        .thread_states
1344                        .thread_state(active_thread_id),
1345                    Some(ThreadStatus::Stopped)
1346                ) {
1347                    return;
1348                };
1349                let path = breakpoint.path.clone();
1350                local_mode.tmp_breakpoint = Some(breakpoint);
1351                let task = local_mode.send_breakpoints_from_path(
1352                    path,
1353                    BreakpointUpdatedReason::Toggled,
1354                    &self.breakpoint_store,
1355                    cx,
1356                );
1357
1358                cx.spawn(async move |this, cx| {
1359                    task.await;
1360                    this.update(cx, |this, cx| {
1361                        this.continue_thread(active_thread_id, cx);
1362                    })
1363                })
1364                .detach();
1365            }
1366            SessionState::Booting(_) => {}
1367        }
1368    }
1369
1370    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1371        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1372    }
1373
1374    pub fn output(
1375        &self,
1376        since: OutputToken,
1377    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1378        if self.output_token.0 == 0 {
1379            return (self.output.range(0..0), OutputToken(0));
1380        };
1381
1382        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1383
1384        let clamped_events_since = events_since.clamp(0, self.output.len());
1385        (
1386            self.output
1387                .range(self.output.len() - clamped_events_since..),
1388            self.output_token,
1389        )
1390    }
1391
1392    pub fn respond_to_client(
1393        &self,
1394        request_seq: u64,
1395        success: bool,
1396        command: String,
1397        body: Option<serde_json::Value>,
1398        cx: &mut Context<Self>,
1399    ) -> Task<Result<()>> {
1400        let Some(local_session) = self.as_running() else {
1401            unreachable!("Cannot respond to remote client");
1402        };
1403        let client = local_session.client.clone();
1404
1405        cx.background_spawn(async move {
1406            client
1407                .send_message(Message::Response(Response {
1408                    body,
1409                    success,
1410                    command,
1411                    seq: request_seq + 1,
1412                    request_seq,
1413                    message: None,
1414                }))
1415                .await
1416        })
1417    }
1418
1419    fn session_state(&self) -> &SessionSnapshot {
1420        self.selected_snapshot_index
1421            .and_then(|ix| self.snapshots.get(ix))
1422            .unwrap_or_else(|| &self.active_snapshot)
1423    }
1424
1425    fn push_to_history(&mut self) {
1426        if !self.has_ever_stopped() {
1427            return;
1428        }
1429
1430        while self.snapshots.len() >= DEBUG_HISTORY_LIMIT {
1431            self.snapshots.pop_front();
1432        }
1433
1434        self.snapshots
1435            .push_back(std::mem::take(&mut self.active_snapshot));
1436    }
1437
1438    pub fn historic_snapshots(&self) -> &VecDeque<SessionSnapshot> {
1439        &self.snapshots
1440    }
1441
1442    pub fn select_historic_snapshot(&mut self, ix: Option<usize>, cx: &mut Context<Session>) {
1443        if self.selected_snapshot_index == ix {
1444            return;
1445        }
1446
1447        if self
1448            .selected_snapshot_index
1449            .is_some_and(|ix| self.snapshots.len() <= ix)
1450        {
1451            debug_panic!("Attempted to select a debug session with an out of bounds index");
1452            return;
1453        }
1454
1455        self.selected_snapshot_index = ix;
1456        cx.emit(SessionEvent::HistoricSnapshotSelected);
1457        cx.notify();
1458    }
1459
1460    pub fn active_snapshot_index(&self) -> Option<usize> {
1461        self.selected_snapshot_index
1462    }
1463
1464    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1465        self.push_to_history();
1466
1467        self.state.stopped();
1468        // todo(debugger): Find a clean way to get around the clone
1469        let breakpoint_store = self.breakpoint_store.clone();
1470        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1471            let breakpoint = local.tmp_breakpoint.take()?;
1472            let path = breakpoint.path;
1473            Some((local, path))
1474        }) {
1475            local
1476                .send_breakpoints_from_path(
1477                    path,
1478                    BreakpointUpdatedReason::Toggled,
1479                    &breakpoint_store,
1480                    cx,
1481                )
1482                .detach();
1483        };
1484
1485        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1486            self.active_snapshot.thread_states.stop_all_threads();
1487            self.invalidate_command_type::<StackTraceCommand>();
1488        }
1489
1490        // Event if we stopped all threads we still need to insert the thread_id
1491        // to our own data
1492        if let Some(thread_id) = event.thread_id {
1493            self.active_snapshot
1494                .thread_states
1495                .stop_thread(ThreadId(thread_id));
1496
1497            self.invalidate_state(
1498                &StackTraceCommand {
1499                    thread_id,
1500                    start_frame: None,
1501                    levels: None,
1502                }
1503                .into(),
1504            );
1505        }
1506
1507        self.invalidate_generic();
1508        self.active_snapshot.threads.clear();
1509        self.active_snapshot.variables.clear();
1510        cx.emit(SessionEvent::Stopped(
1511            event
1512                .thread_id
1513                .map(Into::into)
1514                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1515        ));
1516        cx.emit(SessionEvent::InvalidateInlineValue);
1517        cx.notify();
1518    }
1519
1520    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1521        match *event {
1522            Events::Initialized(_) => {
1523                debug_assert!(
1524                    false,
1525                    "Initialized event should have been handled in LocalMode"
1526                );
1527            }
1528            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1529            Events::Continued(event) => {
1530                if event.all_threads_continued.unwrap_or_default() {
1531                    self.active_snapshot.thread_states.continue_all_threads();
1532                    self.breakpoint_store.update(cx, |store, cx| {
1533                        store.remove_active_position(Some(self.session_id()), cx)
1534                    });
1535                } else {
1536                    self.active_snapshot
1537                        .thread_states
1538                        .continue_thread(ThreadId(event.thread_id));
1539                }
1540                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1541                self.invalidate_generic();
1542            }
1543            Events::Exited(_event) => {
1544                self.clear_active_debug_line(cx);
1545            }
1546            Events::Terminated(_) => {
1547                self.shutdown(cx).detach();
1548            }
1549            Events::Thread(event) => {
1550                let thread_id = ThreadId(event.thread_id);
1551
1552                match event.reason {
1553                    dap::ThreadEventReason::Started => {
1554                        self.active_snapshot
1555                            .thread_states
1556                            .continue_thread(thread_id);
1557                    }
1558                    dap::ThreadEventReason::Exited => {
1559                        self.active_snapshot.thread_states.exit_thread(thread_id);
1560                    }
1561                    reason => {
1562                        log::error!("Unhandled thread event reason {:?}", reason);
1563                    }
1564                }
1565                self.invalidate_state(&ThreadsCommand.into());
1566                cx.notify();
1567            }
1568            Events::Output(event) => {
1569                if event
1570                    .category
1571                    .as_ref()
1572                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1573                {
1574                    return;
1575                }
1576
1577                self.push_output(event);
1578                cx.notify();
1579            }
1580            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1581                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1582            }),
1583            Events::Module(event) => {
1584                match event.reason {
1585                    dap::ModuleEventReason::New => {
1586                        self.active_snapshot.modules.push(event.module);
1587                    }
1588                    dap::ModuleEventReason::Changed => {
1589                        if let Some(module) = self
1590                            .active_snapshot
1591                            .modules
1592                            .iter_mut()
1593                            .find(|other| event.module.id == other.id)
1594                        {
1595                            *module = event.module;
1596                        }
1597                    }
1598                    dap::ModuleEventReason::Removed => {
1599                        self.active_snapshot
1600                            .modules
1601                            .retain(|other| event.module.id != other.id);
1602                    }
1603                }
1604
1605                // todo(debugger): We should only send the invalidate command to downstream clients.
1606                // self.invalidate_state(&ModulesCommand.into());
1607            }
1608            Events::LoadedSource(_) => {
1609                self.invalidate_state(&LoadedSourcesCommand.into());
1610            }
1611            Events::Capabilities(event) => {
1612                self.capabilities = self.capabilities.merge(event.capabilities);
1613
1614                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1615                let recent_filters = self
1616                    .capabilities
1617                    .exception_breakpoint_filters
1618                    .iter()
1619                    .flatten()
1620                    .map(|filter| (filter.filter.clone(), filter.clone()))
1621                    .collect::<BTreeMap<_, _>>();
1622                for filter in recent_filters.values() {
1623                    let default = filter.default.unwrap_or_default();
1624                    self.exception_breakpoints
1625                        .entry(filter.filter.clone())
1626                        .or_insert_with(|| (filter.clone(), default));
1627                }
1628                self.exception_breakpoints
1629                    .retain(|k, _| recent_filters.contains_key(k));
1630                if self.is_started() {
1631                    self.send_exception_breakpoints(cx);
1632                }
1633
1634                // Remove the ones that no longer exist.
1635                cx.notify();
1636            }
1637            Events::Memory(_) => {}
1638            Events::Process(_) => {}
1639            Events::ProgressEnd(_) => {}
1640            Events::ProgressStart(_) => {}
1641            Events::ProgressUpdate(_) => {}
1642            Events::Invalidated(_) => {}
1643            Events::Other(event) => {
1644                if event.event == "launchBrowserInCompanion" {
1645                    let Some(request) = serde_json::from_value(event.body).ok() else {
1646                        log::error!("failed to deserialize launchBrowserInCompanion event");
1647                        return;
1648                    };
1649                    self.launch_browser_for_remote_server(request, cx);
1650                } else if event.event == "killCompanionBrowser" {
1651                    let Some(request) = serde_json::from_value(event.body).ok() else {
1652                        log::error!("failed to deserialize killCompanionBrowser event");
1653                        return;
1654                    };
1655                    self.kill_browser(request, cx);
1656                }
1657            }
1658        }
1659    }
1660
1661    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1662    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1663        &mut self,
1664        request: T,
1665        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1666        cx: &mut Context<Self>,
1667    ) {
1668        const {
1669            assert!(
1670                T::CACHEABLE,
1671                "Only requests marked as cacheable should invoke `fetch`"
1672            );
1673        }
1674
1675        if (!self.active_snapshot.thread_states.any_stopped_thread()
1676            && request.type_id() != TypeId::of::<ThreadsCommand>())
1677            || self.selected_snapshot_index.is_some()
1678            || self.is_session_terminated
1679        {
1680            return;
1681        }
1682
1683        let request_map = self
1684            .requests
1685            .entry(std::any::TypeId::of::<T>())
1686            .or_default();
1687
1688        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1689            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1690
1691            let task = Self::request_inner::<Arc<T>>(
1692                &self.capabilities,
1693                &self.state,
1694                command,
1695                |this, result, cx| {
1696                    process_result(this, result, cx);
1697                    None
1698                },
1699                cx,
1700            );
1701            let task = cx
1702                .background_executor()
1703                .spawn(async move {
1704                    let _ = task.await?;
1705                    Some(())
1706                })
1707                .shared();
1708
1709            vacant.insert(task);
1710            cx.notify();
1711        }
1712    }
1713
1714    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1715        capabilities: &Capabilities,
1716        mode: &SessionState,
1717        request: T,
1718        process_result: impl FnOnce(
1719            &mut Self,
1720            Result<T::Response>,
1721            &mut Context<Self>,
1722        ) -> Option<T::Response>
1723        + 'static,
1724        cx: &mut Context<Self>,
1725    ) -> Task<Option<T::Response>> {
1726        if !T::is_supported(capabilities) {
1727            log::warn!(
1728                "Attempted to send a DAP request that isn't supported: {:?}",
1729                request
1730            );
1731            let error = Err(anyhow::Error::msg(
1732                "Couldn't complete request because it's not supported",
1733            ));
1734            return cx.spawn(async move |this, cx| {
1735                this.update(cx, |this, cx| process_result(this, error, cx))
1736                    .ok()
1737                    .flatten()
1738            });
1739        }
1740
1741        let request = mode.request_dap(request);
1742        cx.spawn(async move |this, cx| {
1743            let result = request.await;
1744            this.update(cx, |this, cx| process_result(this, result, cx))
1745                .ok()
1746                .flatten()
1747        })
1748    }
1749
1750    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1751        &self,
1752        request: T,
1753        process_result: impl FnOnce(
1754            &mut Self,
1755            Result<T::Response>,
1756            &mut Context<Self>,
1757        ) -> Option<T::Response>
1758        + 'static,
1759        cx: &mut Context<Self>,
1760    ) -> Task<Option<T::Response>> {
1761        Self::request_inner(&self.capabilities, &self.state, request, process_result, cx)
1762    }
1763
1764    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1765        self.requests.remove(&std::any::TypeId::of::<Command>());
1766    }
1767
1768    fn invalidate_generic(&mut self) {
1769        self.invalidate_command_type::<ModulesCommand>();
1770        self.invalidate_command_type::<LoadedSourcesCommand>();
1771        self.invalidate_command_type::<ThreadsCommand>();
1772        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1773        self.invalidate_command_type::<ReadMemory>();
1774        let executor = self.as_running().map(|running| running.executor.clone());
1775        if let Some(executor) = executor {
1776            self.memory.clear(&executor);
1777        }
1778    }
1779
1780    fn invalidate_state(&mut self, key: &RequestSlot) {
1781        self.requests
1782            .entry((&*key.0 as &dyn Any).type_id())
1783            .and_modify(|request_map| {
1784                request_map.remove(key);
1785            });
1786    }
1787
1788    fn push_output(&mut self, event: OutputEvent) {
1789        self.output.push_back(event);
1790        self.output_token.0 += 1;
1791    }
1792
1793    pub fn any_stopped_thread(&self) -> bool {
1794        self.active_snapshot.thread_states.any_stopped_thread()
1795    }
1796
1797    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1798        self.active_snapshot.thread_states.thread_status(thread_id)
1799    }
1800
1801    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1802        self.fetch(
1803            dap_command::ThreadsCommand,
1804            |this, result, cx| {
1805                let Some(result) = result.log_err() else {
1806                    return;
1807                };
1808
1809                this.active_snapshot.threads = result
1810                    .into_iter()
1811                    .map(|thread| (ThreadId(thread.id), Thread::from(thread)))
1812                    .collect();
1813
1814                this.invalidate_command_type::<StackTraceCommand>();
1815                cx.emit(SessionEvent::Threads);
1816                cx.notify();
1817            },
1818            cx,
1819        );
1820
1821        let state = self.session_state();
1822        state
1823            .threads
1824            .values()
1825            .map(|thread| {
1826                (
1827                    thread.dap.clone(),
1828                    state.thread_states.thread_status(ThreadId(thread.dap.id)),
1829                )
1830            })
1831            .collect()
1832    }
1833
1834    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1835        self.fetch(
1836            dap_command::ModulesCommand,
1837            |this, result, cx| {
1838                let Some(result) = result.log_err() else {
1839                    return;
1840                };
1841
1842                this.active_snapshot.modules = result;
1843                cx.emit(SessionEvent::Modules);
1844                cx.notify();
1845            },
1846            cx,
1847        );
1848
1849        &self.session_state().modules
1850    }
1851
1852    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1853    pub fn data_access_size(
1854        &mut self,
1855        frame_id: Option<u64>,
1856        evaluate_name: &str,
1857        cx: &mut Context<Self>,
1858    ) -> Task<Option<u64>> {
1859        let request = self.request(
1860            EvaluateCommand {
1861                expression: format!("?${{sizeof({evaluate_name})}}"),
1862                frame_id,
1863
1864                context: Some(EvaluateArgumentsContext::Repl),
1865                source: None,
1866            },
1867            |_, response, _| response.ok(),
1868            cx,
1869        );
1870        cx.background_spawn(async move {
1871            let result = request.await?;
1872            result.result.parse().ok()
1873        })
1874    }
1875
1876    pub fn memory_reference_of_expr(
1877        &mut self,
1878        frame_id: Option<u64>,
1879        expression: String,
1880        cx: &mut Context<Self>,
1881    ) -> Task<Option<(String, Option<String>)>> {
1882        let request = self.request(
1883            EvaluateCommand {
1884                expression,
1885                frame_id,
1886
1887                context: Some(EvaluateArgumentsContext::Repl),
1888                source: None,
1889            },
1890            |_, response, _| response.ok(),
1891            cx,
1892        );
1893        cx.background_spawn(async move {
1894            let result = request.await?;
1895            result
1896                .memory_reference
1897                .map(|reference| (reference, result.type_))
1898        })
1899    }
1900
1901    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1902        let data = base64::engine::general_purpose::STANDARD.encode(data);
1903        self.request(
1904            WriteMemoryArguments {
1905                memory_reference: address.to_string(),
1906                data,
1907                allow_partial: None,
1908                offset: None,
1909            },
1910            |this, response, cx| {
1911                this.memory.clear(cx.background_executor());
1912                this.invalidate_command_type::<ReadMemory>();
1913                this.invalidate_command_type::<VariablesCommand>();
1914                cx.emit(SessionEvent::Variables);
1915                response.ok()
1916            },
1917            cx,
1918        )
1919        .detach();
1920    }
1921    pub fn read_memory(
1922        &mut self,
1923        range: RangeInclusive<u64>,
1924        cx: &mut Context<Self>,
1925    ) -> MemoryIterator {
1926        // This function is a bit more involved when it comes to fetching data.
1927        // Since we attempt to read memory in pages, we need to account for some parts
1928        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1929        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1930        let page_range = Memory::memory_range_to_page_range(range.clone());
1931        for page_address in PageAddress::iter_range(page_range) {
1932            self.read_single_page_memory(page_address, cx);
1933        }
1934        self.memory.memory_range(range)
1935    }
1936
1937    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1938        _ = maybe!({
1939            let builder = self.memory.build_page(page_start)?;
1940
1941            self.memory_read_fetch_page_recursive(builder, cx);
1942            Some(())
1943        });
1944    }
1945    fn memory_read_fetch_page_recursive(
1946        &mut self,
1947        mut builder: MemoryPageBuilder,
1948        cx: &mut Context<Self>,
1949    ) {
1950        let Some(next_request) = builder.next_request() else {
1951            // We're done fetching. Let's grab the page and insert it into our memory store.
1952            let (address, contents) = builder.build();
1953            self.memory.insert_page(address, contents);
1954
1955            return;
1956        };
1957        let size = next_request.size;
1958        self.fetch(
1959            ReadMemory {
1960                memory_reference: format!("0x{:X}", next_request.address),
1961                offset: Some(0),
1962                count: next_request.size,
1963            },
1964            move |this, memory, cx| {
1965                if let Ok(memory) = memory {
1966                    builder.known(memory.content);
1967                    if let Some(unknown) = memory.unreadable_bytes {
1968                        builder.unknown(unknown);
1969                    }
1970                    // This is the recursive bit: if we're not yet done with
1971                    // the whole page, we'll kick off a new request with smaller range.
1972                    // Note that this function is recursive only conceptually;
1973                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1974                    this.memory_read_fetch_page_recursive(builder, cx);
1975                } else {
1976                    builder.unknown(size);
1977                }
1978            },
1979            cx,
1980        );
1981    }
1982
1983    pub fn ignore_breakpoints(&self) -> bool {
1984        self.ignore_breakpoints
1985    }
1986
1987    pub fn toggle_ignore_breakpoints(
1988        &mut self,
1989        cx: &mut App,
1990    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1991        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1992    }
1993
1994    pub(crate) fn set_ignore_breakpoints(
1995        &mut self,
1996        ignore: bool,
1997        cx: &mut App,
1998    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1999        if self.ignore_breakpoints == ignore {
2000            return Task::ready(HashMap::default());
2001        }
2002
2003        self.ignore_breakpoints = ignore;
2004
2005        if let Some(local) = self.as_running() {
2006            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
2007        } else {
2008            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
2009            unimplemented!()
2010        }
2011    }
2012
2013    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
2014        self.data_breakpoints.values()
2015    }
2016
2017    pub fn exception_breakpoints(
2018        &self,
2019    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
2020        self.exception_breakpoints.values()
2021    }
2022
2023    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
2024        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
2025            *is_enabled = !*is_enabled;
2026            self.send_exception_breakpoints(cx);
2027        }
2028    }
2029
2030    fn send_exception_breakpoints(&mut self, cx: &App) {
2031        if let Some(local) = self.as_running() {
2032            let exception_filters = self
2033                .exception_breakpoints
2034                .values()
2035                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
2036                .collect();
2037
2038            let supports_exception_filters = self
2039                .capabilities
2040                .supports_exception_filter_options
2041                .unwrap_or_default();
2042            local
2043                .send_exception_breakpoints(exception_filters, supports_exception_filters)
2044                .detach_and_log_err(cx);
2045        } else {
2046            debug_assert!(false, "Not implemented");
2047        }
2048    }
2049
2050    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
2051        if let Some(state) = self.data_breakpoints.get_mut(id) {
2052            state.is_enabled = !state.is_enabled;
2053            self.send_exception_breakpoints(cx);
2054        }
2055    }
2056
2057    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
2058        if let Some(mode) = self.as_running() {
2059            let breakpoints = self
2060                .data_breakpoints
2061                .values()
2062                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
2063                .collect();
2064            let command = SetDataBreakpointsCommand { breakpoints };
2065            mode.request(command).detach_and_log_err(cx);
2066        }
2067    }
2068
2069    pub fn create_data_breakpoint(
2070        &mut self,
2071        context: Arc<DataBreakpointContext>,
2072        data_id: String,
2073        dap: dap::DataBreakpoint,
2074        cx: &mut Context<Self>,
2075    ) {
2076        if self.data_breakpoints.remove(&data_id).is_none() {
2077            self.data_breakpoints.insert(
2078                data_id,
2079                DataBreakpointState {
2080                    dap,
2081                    is_enabled: true,
2082                    context,
2083                },
2084            );
2085        }
2086        self.send_data_breakpoints(cx);
2087    }
2088
2089    pub fn breakpoints_enabled(&self) -> bool {
2090        self.ignore_breakpoints
2091    }
2092
2093    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
2094        self.fetch(
2095            dap_command::LoadedSourcesCommand,
2096            |this, result, cx| {
2097                let Some(result) = result.log_err() else {
2098                    return;
2099                };
2100                this.active_snapshot.loaded_sources = result;
2101                cx.emit(SessionEvent::LoadedSources);
2102                cx.notify();
2103            },
2104            cx,
2105        );
2106        &self.session_state().loaded_sources
2107    }
2108
2109    fn fallback_to_manual_restart(
2110        &mut self,
2111        res: Result<()>,
2112        cx: &mut Context<Self>,
2113    ) -> Option<()> {
2114        if res.log_err().is_none() {
2115            cx.emit(SessionStateEvent::Restart);
2116            return None;
2117        }
2118        Some(())
2119    }
2120
2121    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2122        res.log_err()?;
2123        Some(())
2124    }
2125
2126    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2127        thread_id: ThreadId,
2128    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2129    {
2130        move |this, response, cx| match response.log_err() {
2131            Some(response) => {
2132                this.breakpoint_store.update(cx, |store, cx| {
2133                    store.remove_active_position(Some(this.session_id()), cx)
2134                });
2135                Some(response)
2136            }
2137            None => {
2138                this.active_snapshot.thread_states.stop_thread(thread_id);
2139                cx.notify();
2140                None
2141            }
2142        }
2143    }
2144
2145    fn clear_active_debug_line_response(
2146        &mut self,
2147        response: Result<()>,
2148        cx: &mut Context<Session>,
2149    ) -> Option<()> {
2150        response.log_err()?;
2151        self.clear_active_debug_line(cx);
2152        Some(())
2153    }
2154
2155    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2156        self.breakpoint_store.update(cx, |store, cx| {
2157            store.remove_active_position(Some(self.id), cx)
2158        });
2159    }
2160
2161    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2162        self.request(
2163            PauseCommand {
2164                thread_id: thread_id.0,
2165            },
2166            Self::empty_response,
2167            cx,
2168        )
2169        .detach();
2170    }
2171
2172    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2173        self.request(
2174            RestartStackFrameCommand { stack_frame_id },
2175            Self::empty_response,
2176            cx,
2177        )
2178        .detach();
2179    }
2180
2181    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2182        if self.restart_task.is_some() || self.as_running().is_none() {
2183            return;
2184        }
2185
2186        let supports_dap_restart =
2187            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2188
2189        self.restart_task = Some(cx.spawn(async move |this, cx| {
2190            this.update(cx, |session, cx| {
2191                if supports_dap_restart {
2192                    session.request(
2193                        RestartCommand {
2194                            raw: args.unwrap_or(Value::Null),
2195                        },
2196                        Self::fallback_to_manual_restart,
2197                        cx,
2198                    )
2199                } else {
2200                    cx.emit(SessionStateEvent::Restart);
2201                    Task::ready(None)
2202                }
2203            })
2204            .unwrap_or_else(|_| Task::ready(None))
2205            .await;
2206
2207            this.update(cx, |session, _cx| {
2208                session.restart_task = None;
2209            })
2210            .ok();
2211        }));
2212    }
2213
2214    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2215        if self.is_session_terminated {
2216            return Task::ready(());
2217        }
2218
2219        self.is_session_terminated = true;
2220        self.active_snapshot.thread_states.exit_all_threads();
2221        cx.notify();
2222
2223        let task = match &mut self.state {
2224            SessionState::Running(_) => {
2225                if self
2226                    .capabilities
2227                    .supports_terminate_request
2228                    .unwrap_or_default()
2229                {
2230                    self.request(
2231                        TerminateCommand {
2232                            restart: Some(false),
2233                        },
2234                        Self::clear_active_debug_line_response,
2235                        cx,
2236                    )
2237                } else {
2238                    self.request(
2239                        DisconnectCommand {
2240                            restart: Some(false),
2241                            terminate_debuggee: Some(true),
2242                            suspend_debuggee: Some(false),
2243                        },
2244                        Self::clear_active_debug_line_response,
2245                        cx,
2246                    )
2247                }
2248            }
2249            SessionState::Booting(build_task) => {
2250                build_task.take();
2251                Task::ready(Some(()))
2252            }
2253        };
2254
2255        cx.emit(SessionStateEvent::Shutdown);
2256
2257        cx.spawn(async move |this, cx| {
2258            task.await;
2259            let _ = this.update(cx, |this, _| {
2260                if let Some(adapter_client) = this.adapter_client() {
2261                    adapter_client.kill();
2262                }
2263            });
2264        })
2265    }
2266
2267    pub fn completions(
2268        &mut self,
2269        query: CompletionsQuery,
2270        cx: &mut Context<Self>,
2271    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2272        let task = self.request(query, |_, result, _| result.log_err(), cx);
2273
2274        cx.background_executor().spawn(async move {
2275            anyhow::Ok(
2276                task.await
2277                    .map(|response| response.targets)
2278                    .context("failed to fetch completions")?,
2279            )
2280        })
2281    }
2282
2283    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2284        self.select_historic_snapshot(None, cx);
2285
2286        let supports_single_thread_execution_requests =
2287            self.capabilities.supports_single_thread_execution_requests;
2288        self.active_snapshot
2289            .thread_states
2290            .continue_thread(thread_id);
2291        self.request(
2292            ContinueCommand {
2293                args: ContinueArguments {
2294                    thread_id: thread_id.0,
2295                    single_thread: supports_single_thread_execution_requests,
2296                },
2297            },
2298            Self::on_step_response::<ContinueCommand>(thread_id),
2299            cx,
2300        )
2301        .detach();
2302    }
2303
2304    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2305        match self.state {
2306            SessionState::Running(ref local) => Some(local.client.clone()),
2307            SessionState::Booting(_) => None,
2308        }
2309    }
2310
2311    pub fn has_ever_stopped(&self) -> bool {
2312        self.state.has_ever_stopped()
2313    }
2314
2315    pub fn step_over(
2316        &mut self,
2317        thread_id: ThreadId,
2318        granularity: SteppingGranularity,
2319        cx: &mut Context<Self>,
2320    ) {
2321        self.select_historic_snapshot(None, cx);
2322
2323        let supports_single_thread_execution_requests =
2324            self.capabilities.supports_single_thread_execution_requests;
2325        let supports_stepping_granularity = self
2326            .capabilities
2327            .supports_stepping_granularity
2328            .unwrap_or_default();
2329
2330        let command = NextCommand {
2331            inner: StepCommand {
2332                thread_id: thread_id.0,
2333                granularity: supports_stepping_granularity.then(|| granularity),
2334                single_thread: supports_single_thread_execution_requests,
2335            },
2336        };
2337
2338        self.active_snapshot.thread_states.process_step(thread_id);
2339        self.request(
2340            command,
2341            Self::on_step_response::<NextCommand>(thread_id),
2342            cx,
2343        )
2344        .detach();
2345    }
2346
2347    pub fn step_in(
2348        &mut self,
2349        thread_id: ThreadId,
2350        granularity: SteppingGranularity,
2351        cx: &mut Context<Self>,
2352    ) {
2353        self.select_historic_snapshot(None, cx);
2354
2355        let supports_single_thread_execution_requests =
2356            self.capabilities.supports_single_thread_execution_requests;
2357        let supports_stepping_granularity = self
2358            .capabilities
2359            .supports_stepping_granularity
2360            .unwrap_or_default();
2361
2362        let command = StepInCommand {
2363            inner: StepCommand {
2364                thread_id: thread_id.0,
2365                granularity: supports_stepping_granularity.then(|| granularity),
2366                single_thread: supports_single_thread_execution_requests,
2367            },
2368        };
2369
2370        self.active_snapshot.thread_states.process_step(thread_id);
2371        self.request(
2372            command,
2373            Self::on_step_response::<StepInCommand>(thread_id),
2374            cx,
2375        )
2376        .detach();
2377    }
2378
2379    pub fn step_out(
2380        &mut self,
2381        thread_id: ThreadId,
2382        granularity: SteppingGranularity,
2383        cx: &mut Context<Self>,
2384    ) {
2385        self.select_historic_snapshot(None, cx);
2386
2387        let supports_single_thread_execution_requests =
2388            self.capabilities.supports_single_thread_execution_requests;
2389        let supports_stepping_granularity = self
2390            .capabilities
2391            .supports_stepping_granularity
2392            .unwrap_or_default();
2393
2394        let command = StepOutCommand {
2395            inner: StepCommand {
2396                thread_id: thread_id.0,
2397                granularity: supports_stepping_granularity.then(|| granularity),
2398                single_thread: supports_single_thread_execution_requests,
2399            },
2400        };
2401
2402        self.active_snapshot.thread_states.process_step(thread_id);
2403        self.request(
2404            command,
2405            Self::on_step_response::<StepOutCommand>(thread_id),
2406            cx,
2407        )
2408        .detach();
2409    }
2410
2411    pub fn step_back(
2412        &mut self,
2413        thread_id: ThreadId,
2414        granularity: SteppingGranularity,
2415        cx: &mut Context<Self>,
2416    ) {
2417        self.select_historic_snapshot(None, cx);
2418
2419        let supports_single_thread_execution_requests =
2420            self.capabilities.supports_single_thread_execution_requests;
2421        let supports_stepping_granularity = self
2422            .capabilities
2423            .supports_stepping_granularity
2424            .unwrap_or_default();
2425
2426        let command = StepBackCommand {
2427            inner: StepCommand {
2428                thread_id: thread_id.0,
2429                granularity: supports_stepping_granularity.then(|| granularity),
2430                single_thread: supports_single_thread_execution_requests,
2431            },
2432        };
2433
2434        self.active_snapshot.thread_states.process_step(thread_id);
2435
2436        self.request(
2437            command,
2438            Self::on_step_response::<StepBackCommand>(thread_id),
2439            cx,
2440        )
2441        .detach();
2442    }
2443
2444    pub fn stack_frames(
2445        &mut self,
2446        thread_id: ThreadId,
2447        cx: &mut Context<Self>,
2448    ) -> Result<Vec<StackFrame>> {
2449        if self.active_snapshot.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2450            && self.requests.contains_key(&ThreadsCommand.type_id())
2451            && self.active_snapshot.threads.contains_key(&thread_id)
2452        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2453        // We could still be using an old thread id and have sent a new thread's request
2454        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2455        // But it very well could cause a minor bug in the future that is hard to track down
2456        {
2457            self.fetch(
2458                super::dap_command::StackTraceCommand {
2459                    thread_id: thread_id.0,
2460                    start_frame: None,
2461                    levels: None,
2462                },
2463                move |this, stack_frames, cx| {
2464                    let entry =
2465                        this.active_snapshot
2466                            .threads
2467                            .entry(thread_id)
2468                            .and_modify(|thread| match &stack_frames {
2469                                Ok(stack_frames) => {
2470                                    thread.stack_frames = stack_frames
2471                                        .iter()
2472                                        .cloned()
2473                                        .map(StackFrame::from)
2474                                        .collect();
2475                                    thread.stack_frames_error = None;
2476                                }
2477                                Err(error) => {
2478                                    thread.stack_frames.clear();
2479                                    thread.stack_frames_error = Some(error.to_string().into());
2480                                }
2481                            });
2482                    debug_assert!(
2483                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2484                        "Sent request for thread_id that doesn't exist"
2485                    );
2486                    if let Ok(stack_frames) = stack_frames {
2487                        this.active_snapshot.stack_frames.extend(
2488                            stack_frames
2489                                .into_iter()
2490                                .filter(|frame| {
2491                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2492                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2493                                    !(frame.id == 0
2494                                        && frame.line == 0
2495                                        && frame.column == 0
2496                                        && frame.presentation_hint
2497                                            == Some(StackFramePresentationHint::Label))
2498                                })
2499                                .map(|frame| (frame.id, StackFrame::from(frame))),
2500                        );
2501                    }
2502
2503                    this.invalidate_command_type::<ScopesCommand>();
2504                    this.invalidate_command_type::<VariablesCommand>();
2505
2506                    cx.emit(SessionEvent::StackTrace);
2507                },
2508                cx,
2509            );
2510        }
2511
2512        match self.session_state().threads.get(&thread_id) {
2513            Some(thread) => {
2514                if let Some(error) = &thread.stack_frames_error {
2515                    Err(anyhow!(error.to_string()))
2516                } else {
2517                    Ok(thread.stack_frames.clone())
2518                }
2519            }
2520            None => Ok(Vec::new()),
2521        }
2522    }
2523
2524    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2525        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2526            && self
2527                .requests
2528                .contains_key(&TypeId::of::<StackTraceCommand>())
2529        {
2530            self.fetch(
2531                ScopesCommand { stack_frame_id },
2532                move |this, scopes, cx| {
2533                    let Some(scopes) = scopes.log_err() else {
2534                        return
2535                    };
2536
2537                    for scope in scopes.iter() {
2538                        this.variables(scope.variables_reference, cx);
2539                    }
2540
2541                    let entry = this
2542                        .active_snapshot
2543                        .stack_frames
2544                        .entry(stack_frame_id)
2545                        .and_modify(|stack_frame| {
2546                            stack_frame.scopes = scopes;
2547                        });
2548
2549                    cx.emit(SessionEvent::Variables);
2550
2551                    debug_assert!(
2552                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2553                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2554                    );
2555                },
2556                cx,
2557            );
2558        }
2559
2560        self.session_state()
2561            .stack_frames
2562            .get(&stack_frame_id)
2563            .map(|frame| frame.scopes.as_slice())
2564            .unwrap_or_default()
2565    }
2566
2567    pub fn variables_by_stack_frame_id(
2568        &self,
2569        stack_frame_id: StackFrameId,
2570        globals: bool,
2571        locals: bool,
2572    ) -> Vec<dap::Variable> {
2573        let state = self.session_state();
2574        let Some(stack_frame) = state.stack_frames.get(&stack_frame_id) else {
2575            return Vec::new();
2576        };
2577
2578        stack_frame
2579            .scopes
2580            .iter()
2581            .filter(|scope| {
2582                (scope.name.to_lowercase().contains("local") && locals)
2583                    || (scope.name.to_lowercase().contains("global") && globals)
2584            })
2585            .filter_map(|scope| state.variables.get(&scope.variables_reference))
2586            .flatten()
2587            .cloned()
2588            .collect()
2589    }
2590
2591    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2592        &self.watchers
2593    }
2594
2595    pub fn add_watcher(
2596        &mut self,
2597        expression: SharedString,
2598        frame_id: u64,
2599        cx: &mut Context<Self>,
2600    ) -> Task<Result<()>> {
2601        let request = self.state.request_dap(EvaluateCommand {
2602            expression: expression.to_string(),
2603            context: Some(EvaluateArgumentsContext::Watch),
2604            frame_id: Some(frame_id),
2605            source: None,
2606        });
2607
2608        cx.spawn(async move |this, cx| {
2609            let response = request.await?;
2610
2611            this.update(cx, |session, cx| {
2612                session.watchers.insert(
2613                    expression.clone(),
2614                    Watcher {
2615                        expression,
2616                        value: response.result.into(),
2617                        variables_reference: response.variables_reference,
2618                        presentation_hint: response.presentation_hint,
2619                    },
2620                );
2621                cx.emit(SessionEvent::Watchers);
2622            })
2623        })
2624    }
2625
2626    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2627        let watches = self.watchers.clone();
2628        for (_, watch) in watches.into_iter() {
2629            self.add_watcher(watch.expression.clone(), frame_id, cx)
2630                .detach();
2631        }
2632    }
2633
2634    pub fn remove_watcher(&mut self, expression: SharedString) {
2635        self.watchers.remove(&expression);
2636    }
2637
2638    pub fn variables(
2639        &mut self,
2640        variables_reference: VariableReference,
2641        cx: &mut Context<Self>,
2642    ) -> Vec<dap::Variable> {
2643        let command = VariablesCommand {
2644            variables_reference,
2645            filter: None,
2646            start: None,
2647            count: None,
2648            format: None,
2649        };
2650
2651        self.fetch(
2652            command,
2653            move |this, variables, cx| {
2654                let Some(mut variables) = variables.log_err() else {
2655                    return;
2656                };
2657
2658                if this.adapter.0.as_ref() == "Debugpy" {
2659                    for variable in variables.iter_mut() {
2660                        if variable.type_ == Some("str".into()) {
2661                            // reverse Python repr() escaping
2662                            let mut unescaped = String::with_capacity(variable.value.len());
2663                            let mut chars = variable.value.chars();
2664                            while let Some(c) = chars.next() {
2665                                if c != '\\' {
2666                                    unescaped.push(c);
2667                                } else {
2668                                    match chars.next() {
2669                                        Some('\\') => unescaped.push('\\'),
2670                                        Some('n') => unescaped.push('\n'),
2671                                        Some('t') => unescaped.push('\t'),
2672                                        Some('r') => unescaped.push('\r'),
2673                                        Some('\'') => unescaped.push('\''),
2674                                        Some('"') => unescaped.push('"'),
2675                                        Some(c) => {
2676                                            unescaped.push('\\');
2677                                            unescaped.push(c);
2678                                        }
2679                                        None => {}
2680                                    }
2681                                }
2682                            }
2683                            variable.value = unescaped;
2684                        }
2685                    }
2686                }
2687
2688                this.active_snapshot
2689                    .variables
2690                    .insert(variables_reference, variables);
2691
2692                cx.emit(SessionEvent::Variables);
2693                cx.emit(SessionEvent::InvalidateInlineValue);
2694            },
2695            cx,
2696        );
2697
2698        self.session_state()
2699            .variables
2700            .get(&variables_reference)
2701            .cloned()
2702            .unwrap_or_default()
2703    }
2704
2705    pub fn data_breakpoint_info(
2706        &mut self,
2707        context: Arc<DataBreakpointContext>,
2708        mode: Option<String>,
2709        cx: &mut Context<Self>,
2710    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2711        let command = DataBreakpointInfoCommand { context, mode };
2712
2713        self.request(command, |_, response, _| response.ok(), cx)
2714    }
2715
2716    pub fn set_variable_value(
2717        &mut self,
2718        stack_frame_id: u64,
2719        variables_reference: u64,
2720        name: String,
2721        value: String,
2722        cx: &mut Context<Self>,
2723    ) {
2724        if self.capabilities.supports_set_variable.unwrap_or_default() {
2725            self.request(
2726                SetVariableValueCommand {
2727                    name,
2728                    value,
2729                    variables_reference,
2730                },
2731                move |this, response, cx| {
2732                    let response = response.log_err()?;
2733                    this.invalidate_command_type::<VariablesCommand>();
2734                    this.invalidate_command_type::<ReadMemory>();
2735                    this.memory.clear(cx.background_executor());
2736                    this.refresh_watchers(stack_frame_id, cx);
2737                    cx.emit(SessionEvent::Variables);
2738                    Some(response)
2739                },
2740                cx,
2741            )
2742            .detach();
2743        }
2744    }
2745
2746    pub fn evaluate(
2747        &mut self,
2748        expression: String,
2749        context: Option<EvaluateArgumentsContext>,
2750        frame_id: Option<u64>,
2751        source: Option<Source>,
2752        cx: &mut Context<Self>,
2753    ) -> Task<()> {
2754        let event = dap::OutputEvent {
2755            category: None,
2756            output: format!("> {expression}"),
2757            group: None,
2758            variables_reference: None,
2759            source: None,
2760            line: None,
2761            column: None,
2762            data: None,
2763            location_reference: None,
2764        };
2765        self.push_output(event);
2766        let request = self.state.request_dap(EvaluateCommand {
2767            expression,
2768            context,
2769            frame_id,
2770            source,
2771        });
2772        cx.spawn(async move |this, cx| {
2773            let response = request.await;
2774            this.update(cx, |this, cx| {
2775                this.memory.clear(cx.background_executor());
2776                this.invalidate_command_type::<ReadMemory>();
2777                this.invalidate_command_type::<VariablesCommand>();
2778                cx.emit(SessionEvent::Variables);
2779                match response {
2780                    Ok(response) => {
2781                        let event = dap::OutputEvent {
2782                            category: None,
2783                            output: format!("< {}", &response.result),
2784                            group: None,
2785                            variables_reference: Some(response.variables_reference),
2786                            source: None,
2787                            line: None,
2788                            column: None,
2789                            data: None,
2790                            location_reference: None,
2791                        };
2792                        this.push_output(event);
2793                    }
2794                    Err(e) => {
2795                        let event = dap::OutputEvent {
2796                            category: None,
2797                            output: format!("{}", e),
2798                            group: None,
2799                            variables_reference: None,
2800                            source: None,
2801                            line: None,
2802                            column: None,
2803                            data: None,
2804                            location_reference: None,
2805                        };
2806                        this.push_output(event);
2807                    }
2808                };
2809                cx.notify();
2810            })
2811            .ok();
2812        })
2813    }
2814
2815    pub fn location(
2816        &mut self,
2817        reference: u64,
2818        cx: &mut Context<Self>,
2819    ) -> Option<dap::LocationsResponse> {
2820        self.fetch(
2821            LocationsCommand { reference },
2822            move |this, response, _| {
2823                let Some(response) = response.log_err() else {
2824                    return;
2825                };
2826                this.active_snapshot.locations.insert(reference, response);
2827            },
2828            cx,
2829        );
2830        self.session_state().locations.get(&reference).cloned()
2831    }
2832
2833    pub fn is_attached(&self) -> bool {
2834        let SessionState::Running(local_mode) = &self.state else {
2835            return false;
2836        };
2837        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2838    }
2839
2840    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2841        let command = DisconnectCommand {
2842            restart: Some(false),
2843            terminate_debuggee: Some(false),
2844            suspend_debuggee: Some(false),
2845        };
2846
2847        self.request(command, Self::empty_response, cx).detach()
2848    }
2849
2850    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2851        if self
2852            .capabilities
2853            .supports_terminate_threads_request
2854            .unwrap_or_default()
2855        {
2856            self.request(
2857                TerminateThreadsCommand {
2858                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2859                },
2860                Self::clear_active_debug_line_response,
2861                cx,
2862            )
2863            .detach();
2864        } else {
2865            self.shutdown(cx).detach();
2866        }
2867    }
2868
2869    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2870        self.session_state().thread_states.thread_state(thread_id)
2871    }
2872
2873    pub fn quirks(&self) -> SessionQuirks {
2874        self.quirks
2875    }
2876
2877    fn launch_browser_for_remote_server(
2878        &mut self,
2879        mut request: LaunchBrowserInCompanionParams,
2880        cx: &mut Context<Self>,
2881    ) {
2882        let Some(remote_client) = self.remote_client.clone() else {
2883            log::error!("can't launch browser in companion for non-remote project");
2884            return;
2885        };
2886        let Some(http_client) = self.http_client.clone() else {
2887            return;
2888        };
2889        let Some(node_runtime) = self.node_runtime.clone() else {
2890            return;
2891        };
2892
2893        let mut console_output = self.console_output(cx);
2894        let task = cx.spawn(async move |this, cx| {
2895            let forward_ports_process = if remote_client
2896                .read_with(cx, |client, _| client.shares_network_interface())
2897            {
2898                request.other.insert(
2899                    "proxyUri".into(),
2900                    format!("127.0.0.1:{}", request.server_port).into(),
2901                );
2902                None
2903            } else {
2904                let port = TcpTransport::unused_port(Ipv4Addr::LOCALHOST)
2905                    .await
2906                    .context("getting port for DAP")?;
2907                request
2908                    .other
2909                    .insert("proxyUri".into(), format!("127.0.0.1:{port}").into());
2910                let mut port_forwards = vec![(port, "localhost".to_owned(), request.server_port)];
2911
2912                if let Some(value) = request.params.get("url")
2913                    && let Some(url) = value.as_str()
2914                    && let Some(url) = Url::parse(url).ok()
2915                    && let Some(frontend_port) = url.port()
2916                {
2917                    port_forwards.push((frontend_port, "localhost".to_owned(), frontend_port));
2918                }
2919
2920                let child = remote_client.update(cx, |client, _| {
2921                    let command = client.build_forward_ports_command(port_forwards)?;
2922                    let child = new_command(command.program)
2923                        .args(command.args)
2924                        .envs(command.env)
2925                        .spawn()
2926                        .context("spawning port forwarding process")?;
2927                    anyhow::Ok(child)
2928                })?;
2929                Some(child)
2930            };
2931
2932            let mut companion_process = None;
2933            let companion_port =
2934                if let Some(companion_port) = this.read_with(cx, |this, _| this.companion_port)? {
2935                    companion_port
2936                } else {
2937                    let task = cx.spawn(async move |cx| spawn_companion(node_runtime, cx).await);
2938                    match task.await {
2939                        Ok((port, child)) => {
2940                            companion_process = Some(child);
2941                            port
2942                        }
2943                        Err(e) => {
2944                            console_output
2945                                .send(format!("Failed to launch browser companion process: {e}"))
2946                                .await
2947                                .ok();
2948                            return Err(e);
2949                        }
2950                    }
2951                };
2952
2953            let mut background_tasks = Vec::new();
2954            if let Some(mut forward_ports_process) = forward_ports_process {
2955                background_tasks.push(cx.spawn(async move |_| {
2956                    forward_ports_process.status().await.log_err();
2957                }));
2958            };
2959            if let Some(mut companion_process) = companion_process {
2960                if let Some(stderr) = companion_process.stderr.take() {
2961                    let mut console_output = console_output.clone();
2962                    background_tasks.push(cx.spawn(async move |_| {
2963                        let mut stderr = BufReader::new(stderr);
2964                        let mut line = String::new();
2965                        while let Ok(n) = stderr.read_line(&mut line).await
2966                            && n > 0
2967                        {
2968                            console_output
2969                                .send(format!("companion stderr: {line}"))
2970                                .await
2971                                .ok();
2972                            line.clear();
2973                        }
2974                    }));
2975                }
2976                background_tasks.push(cx.spawn({
2977                    let mut console_output = console_output.clone();
2978                    async move |_| match companion_process.status().await {
2979                        Ok(status) => {
2980                            if status.success() {
2981                                console_output
2982                                    .send("Companion process exited normally".into())
2983                                    .await
2984                                    .ok();
2985                            } else {
2986                                console_output
2987                                    .send(format!(
2988                                        "Companion process exited abnormally with {status:?}"
2989                                    ))
2990                                    .await
2991                                    .ok();
2992                            }
2993                        }
2994                        Err(e) => {
2995                            console_output
2996                                .send(format!("Failed to join companion process: {e}"))
2997                                .await
2998                                .ok();
2999                        }
3000                    }
3001                }));
3002            }
3003
3004            // TODO pass wslInfo as needed
3005
3006            let companion_address = format!("127.0.0.1:{companion_port}");
3007            let mut companion_started = false;
3008            for _ in 0..10 {
3009                if TcpStream::connect(&companion_address).await.is_ok() {
3010                    companion_started = true;
3011                    break;
3012                }
3013                cx.background_executor()
3014                    .timer(Duration::from_millis(100))
3015                    .await;
3016            }
3017            if !companion_started {
3018                console_output
3019                    .send("Browser companion failed to start".into())
3020                    .await
3021                    .ok();
3022                bail!("Browser companion failed to start");
3023            }
3024
3025            let response = http_client
3026                .post_json(
3027                    &format!("http://{companion_address}/launch-and-attach"),
3028                    serde_json::to_string(&request)
3029                        .context("serializing request")?
3030                        .into(),
3031                )
3032                .await;
3033            match response {
3034                Ok(response) => {
3035                    if !response.status().is_success() {
3036                        console_output
3037                            .send("Launch request to companion failed".into())
3038                            .await
3039                            .ok();
3040                        return Err(anyhow!("launch request failed"));
3041                    }
3042                }
3043                Err(e) => {
3044                    console_output
3045                        .send("Failed to read response from companion".into())
3046                        .await
3047                        .ok();
3048                    return Err(e);
3049                }
3050            }
3051
3052            this.update(cx, |this, _| {
3053                this.background_tasks.extend(background_tasks);
3054                this.companion_port = Some(companion_port);
3055            })?;
3056
3057            anyhow::Ok(())
3058        });
3059        self.background_tasks.push(cx.spawn(async move |_, _| {
3060            task.await.log_err();
3061        }));
3062    }
3063
3064    fn kill_browser(&self, request: KillCompanionBrowserParams, cx: &mut App) {
3065        let Some(companion_port) = self.companion_port else {
3066            log::error!("received killCompanionBrowser but js-debug-companion is not running");
3067            return;
3068        };
3069        let Some(http_client) = self.http_client.clone() else {
3070            return;
3071        };
3072
3073        cx.spawn(async move |_| {
3074            http_client
3075                .post_json(
3076                    &format!("http://127.0.0.1:{companion_port}/kill"),
3077                    serde_json::to_string(&request)
3078                        .context("serializing request")?
3079                        .into(),
3080                )
3081                .await?;
3082            anyhow::Ok(())
3083        })
3084        .detach_and_log_err(cx)
3085    }
3086}
3087
3088#[derive(Serialize, Deserialize, Debug)]
3089#[serde(rename_all = "camelCase")]
3090struct LaunchBrowserInCompanionParams {
3091    server_port: u16,
3092    params: HashMap<String, serde_json::Value>,
3093    #[serde(flatten)]
3094    other: HashMap<String, serde_json::Value>,
3095}
3096
3097#[derive(Serialize, Deserialize, Debug)]
3098#[serde(rename_all = "camelCase")]
3099struct KillCompanionBrowserParams {
3100    launch_id: u64,
3101}
3102
3103async fn spawn_companion(
3104    node_runtime: NodeRuntime,
3105    cx: &mut AsyncApp,
3106) -> Result<(u16, util::command::Child)> {
3107    let binary_path = node_runtime
3108        .binary_path()
3109        .await
3110        .context("getting node path")?;
3111    let path = cx
3112        .spawn(async move |cx| get_or_install_companion(node_runtime, cx).await)
3113        .await?;
3114    log::info!("will launch js-debug-companion version {path:?}");
3115
3116    let port = {
3117        let listener = TcpListener::bind("127.0.0.1:0")
3118            .await
3119            .context("getting port for companion")?;
3120        listener.local_addr()?.port()
3121    };
3122
3123    let dir = paths::data_dir()
3124        .join("js_debug_companion_state")
3125        .to_string_lossy()
3126        .to_string();
3127
3128    let child = new_command(binary_path)
3129        .arg(path)
3130        .args([
3131            format!("--listen=127.0.0.1:{port}"),
3132            format!("--state={dir}"),
3133        ])
3134        .stdin(Stdio::piped())
3135        .stdout(Stdio::piped())
3136        .stderr(Stdio::piped())
3137        .spawn()
3138        .context("spawning companion child process")?;
3139
3140    Ok((port, child))
3141}
3142
3143async fn get_or_install_companion(node: NodeRuntime, cx: &mut AsyncApp) -> Result<PathBuf> {
3144    const PACKAGE_NAME: &str = "@zed-industries/js-debug-companion-cli";
3145
3146    async fn install_latest_version(dir: PathBuf, node: NodeRuntime) -> Result<PathBuf> {
3147        let temp_dir = tempfile::tempdir().context("creating temporary directory")?;
3148        node.npm_install_packages(temp_dir.path(), &[(PACKAGE_NAME, "latest")])
3149            .await
3150            .context("installing latest companion package")?;
3151        let version = node
3152            .npm_package_installed_version(temp_dir.path(), PACKAGE_NAME)
3153            .await
3154            .context("getting installed companion version")?
3155            .context("companion was not installed")?;
3156        let version_folder = dir.join(version.to_string());
3157        smol::fs::rename(temp_dir.path(), &version_folder)
3158            .await
3159            .context("moving companion package into place")?;
3160        Ok(version_folder)
3161    }
3162
3163    let dir = paths::debug_adapters_dir().join("js-debug-companion");
3164    let (latest_installed_version, latest_version) = cx
3165        .background_spawn({
3166            let dir = dir.clone();
3167            let node = node.clone();
3168            async move {
3169                smol::fs::create_dir_all(&dir)
3170                    .await
3171                    .context("creating companion installation directory")?;
3172
3173                let children = smol::fs::read_dir(&dir)
3174                    .await
3175                    .context("reading companion installation directory")?
3176                    .try_collect::<Vec<_>>()
3177                    .await
3178                    .context("reading companion installation directory entries")?;
3179
3180                let latest_installed_version = children
3181                    .iter()
3182                    .filter_map(|child| {
3183                        Some((
3184                            child.path(),
3185                            semver::Version::parse(child.file_name().to_str()?).ok()?,
3186                        ))
3187                    })
3188                    .max_by_key(|(_, version)| version.clone());
3189
3190                let latest_version = node
3191                    .npm_package_latest_version(PACKAGE_NAME)
3192                    .await
3193                    .log_err();
3194                anyhow::Ok((latest_installed_version, latest_version))
3195            }
3196        })
3197        .await?;
3198
3199    let path = if let Some((installed_path, installed_version)) = latest_installed_version {
3200        if let Some(latest_version) = latest_version
3201            && latest_version != installed_version
3202        {
3203            cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3204                .detach();
3205        }
3206        Ok(installed_path)
3207    } else {
3208        cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3209            .await
3210    };
3211
3212    Ok(path?
3213        .join("node_modules")
3214        .join(PACKAGE_NAME)
3215        .join("out")
3216        .join("cli.js"))
3217}