session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
   9    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  10    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use crate::debugger::breakpoint_store::BreakpointSessionState;
  14use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
  15use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
  16use anyhow::{Context as _, Result, anyhow, bail};
  17use base64::Engine;
  18use collections::{HashMap, HashSet, IndexMap};
  19use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  20use dap::messages::Response;
  21use dap::requests::{Request, RunInTerminal, StartDebugging};
  22use dap::transport::TcpTransport;
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::channel::mpsc::UnboundedSender;
  35use futures::channel::{mpsc, oneshot};
  36use futures::io::BufReader;
  37use futures::{AsyncBufReadExt as _, SinkExt, StreamExt, TryStreamExt};
  38use futures::{FutureExt, future::Shared};
  39use gpui::{
  40    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  41    Task, WeakEntity,
  42};
  43use http_client::HttpClient;
  44use node_runtime::NodeRuntime;
  45use remote::RemoteClient;
  46use serde::{Deserialize, Serialize};
  47use serde_json::Value;
  48use smol::net::{TcpListener, TcpStream};
  49use std::any::TypeId;
  50use std::collections::{BTreeMap, VecDeque};
  51use std::net::Ipv4Addr;
  52use std::ops::RangeInclusive;
  53use std::path::PathBuf;
  54use std::process::Stdio;
  55use std::time::Duration;
  56use std::u64;
  57use std::{
  58    any::Any,
  59    collections::hash_map::Entry,
  60    hash::{Hash, Hasher},
  61    path::Path,
  62    sync::Arc,
  63};
  64use task::TaskContext;
  65use text::{PointUtf16, ToPointUtf16};
  66use url::Url;
  67use util::command::new_smol_command;
  68use util::{ResultExt, debug_panic, maybe};
  69use worktree::Worktree;
  70
  71const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
  72const DEBUG_HISTORY_LIMIT: usize = 10;
  73
  74#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  75#[repr(transparent)]
  76pub struct ThreadId(pub i64);
  77
  78impl From<i64> for ThreadId {
  79    fn from(id: i64) -> Self {
  80        Self(id)
  81    }
  82}
  83
  84#[derive(Clone, Debug)]
  85pub struct StackFrame {
  86    pub dap: dap::StackFrame,
  87    pub scopes: Vec<dap::Scope>,
  88}
  89
  90impl From<dap::StackFrame> for StackFrame {
  91    fn from(stack_frame: dap::StackFrame) -> Self {
  92        Self {
  93            scopes: vec![],
  94            dap: stack_frame,
  95        }
  96    }
  97}
  98
  99#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
 100pub enum ThreadStatus {
 101    #[default]
 102    Running,
 103    Stopped,
 104    Stepping,
 105    Exited,
 106    Ended,
 107}
 108
 109impl ThreadStatus {
 110    pub fn label(&self) -> &'static str {
 111        match self {
 112            ThreadStatus::Running => "Running",
 113            ThreadStatus::Stopped => "Stopped",
 114            ThreadStatus::Stepping => "Stepping",
 115            ThreadStatus::Exited => "Exited",
 116            ThreadStatus::Ended => "Ended",
 117        }
 118    }
 119}
 120
 121#[derive(Debug, Clone)]
 122pub struct Thread {
 123    dap: dap::Thread,
 124    stack_frames: Vec<StackFrame>,
 125    stack_frames_error: Option<SharedString>,
 126    _has_stopped: bool,
 127}
 128
 129impl From<dap::Thread> for Thread {
 130    fn from(dap: dap::Thread) -> Self {
 131        Self {
 132            dap,
 133            stack_frames: Default::default(),
 134            stack_frames_error: None,
 135            _has_stopped: false,
 136        }
 137    }
 138}
 139
 140#[derive(Debug, Clone, PartialEq)]
 141pub struct Watcher {
 142    pub expression: SharedString,
 143    pub value: SharedString,
 144    pub variables_reference: u64,
 145    pub presentation_hint: Option<VariablePresentationHint>,
 146}
 147
 148#[derive(Debug, Clone, PartialEq)]
 149pub struct DataBreakpointState {
 150    pub dap: dap::DataBreakpoint,
 151    pub is_enabled: bool,
 152    pub context: Arc<DataBreakpointContext>,
 153}
 154
 155pub enum SessionState {
 156    /// Represents a session that is building/initializing
 157    /// even if a session doesn't have a pre build task this state
 158    /// is used to run all the async tasks that are required to start the session
 159    Booting(Option<Task<Result<()>>>),
 160    Running(RunningMode),
 161}
 162
 163#[derive(Clone)]
 164pub struct RunningMode {
 165    client: Arc<DebugAdapterClient>,
 166    binary: DebugAdapterBinary,
 167    tmp_breakpoint: Option<SourceBreakpoint>,
 168    worktree: WeakEntity<Worktree>,
 169    executor: BackgroundExecutor,
 170    is_started: bool,
 171    has_ever_stopped: bool,
 172    messages_tx: UnboundedSender<Message>,
 173}
 174
 175#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 176pub struct SessionQuirks {
 177    pub compact: bool,
 178    pub prefer_thread_name: bool,
 179}
 180
 181fn client_source(abs_path: &Path) -> dap::Source {
 182    dap::Source {
 183        name: abs_path
 184            .file_name()
 185            .map(|filename| filename.to_string_lossy().into_owned()),
 186        path: Some(abs_path.to_string_lossy().into_owned()),
 187        source_reference: None,
 188        presentation_hint: None,
 189        origin: None,
 190        sources: None,
 191        adapter_data: None,
 192        checksums: None,
 193    }
 194}
 195
 196impl RunningMode {
 197    async fn new(
 198        session_id: SessionId,
 199        parent_session: Option<Entity<Session>>,
 200        worktree: WeakEntity<Worktree>,
 201        binary: DebugAdapterBinary,
 202        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 203        cx: &mut AsyncApp,
 204    ) -> Result<Self> {
 205        let message_handler = Box::new({
 206            let messages_tx = messages_tx.clone();
 207            move |message| {
 208                messages_tx.unbounded_send(message).ok();
 209            }
 210        });
 211
 212        let client = if let Some(client) = parent_session
 213            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 214            .flatten()
 215        {
 216            client
 217                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 218                .await?
 219        } else {
 220            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 221        };
 222
 223        Ok(Self {
 224            client: Arc::new(client),
 225            worktree,
 226            tmp_breakpoint: None,
 227            binary,
 228            executor: cx.background_executor().clone(),
 229            is_started: false,
 230            has_ever_stopped: false,
 231            messages_tx,
 232        })
 233    }
 234
 235    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 236        &self.worktree
 237    }
 238
 239    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 240        let tasks: Vec<_> = paths
 241            .iter()
 242            .map(|path| {
 243                self.request(dap_command::SetBreakpoints {
 244                    source: client_source(path),
 245                    source_modified: None,
 246                    breakpoints: vec![],
 247                })
 248            })
 249            .collect();
 250
 251        cx.background_spawn(async move {
 252            futures::future::join_all(tasks)
 253                .await
 254                .iter()
 255                .for_each(|res| match res {
 256                    Ok(_) => {}
 257                    Err(err) => {
 258                        log::warn!("Set breakpoints request failed: {}", err);
 259                    }
 260                });
 261        })
 262    }
 263
 264    fn send_breakpoints_from_path(
 265        &self,
 266        abs_path: Arc<Path>,
 267        reason: BreakpointUpdatedReason,
 268        breakpoint_store: &Entity<BreakpointStore>,
 269        cx: &mut App,
 270    ) -> Task<()> {
 271        let breakpoints =
 272            breakpoint_store
 273                .read(cx)
 274                .source_breakpoints_from_path(&abs_path, cx)
 275                .into_iter()
 276                .filter(|bp| bp.state.is_enabled())
 277                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 278                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 279                }))
 280                .map(Into::into)
 281                .collect();
 282
 283        let raw_breakpoints = breakpoint_store
 284            .read(cx)
 285            .breakpoints_from_path(&abs_path)
 286            .into_iter()
 287            .filter(|bp| bp.bp.state.is_enabled())
 288            .collect::<Vec<_>>();
 289
 290        let task = self.request(dap_command::SetBreakpoints {
 291            source: client_source(&abs_path),
 292            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 293            breakpoints,
 294        });
 295        let session_id = self.client.id();
 296        let breakpoint_store = breakpoint_store.downgrade();
 297        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 298            Ok(breakpoints) => {
 299                let breakpoints =
 300                    breakpoints
 301                        .into_iter()
 302                        .zip(raw_breakpoints)
 303                        .filter_map(|(dap_bp, zed_bp)| {
 304                            Some((
 305                                zed_bp,
 306                                BreakpointSessionState {
 307                                    id: dap_bp.id?,
 308                                    verified: dap_bp.verified,
 309                                },
 310                            ))
 311                        });
 312                breakpoint_store
 313                    .update(cx, |this, _| {
 314                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 315                    })
 316                    .ok();
 317            }
 318            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 319        })
 320    }
 321
 322    fn send_exception_breakpoints(
 323        &self,
 324        filters: Vec<ExceptionBreakpointsFilter>,
 325        supports_filter_options: bool,
 326    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 327        let arg = if supports_filter_options {
 328            SetExceptionBreakpoints::WithOptions {
 329                filters: filters
 330                    .into_iter()
 331                    .map(|filter| ExceptionFilterOptions {
 332                        filter_id: filter.filter,
 333                        condition: None,
 334                        mode: None,
 335                    })
 336                    .collect(),
 337            }
 338        } else {
 339            SetExceptionBreakpoints::Plain {
 340                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 341            }
 342        };
 343        self.request(arg)
 344    }
 345
 346    fn send_source_breakpoints(
 347        &self,
 348        ignore_breakpoints: bool,
 349        breakpoint_store: &Entity<BreakpointStore>,
 350        cx: &App,
 351    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 352        let mut breakpoint_tasks = Vec::new();
 353        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 354        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 355        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 356        let session_id = self.client.id();
 357        for (path, breakpoints) in breakpoints {
 358            let breakpoints = if ignore_breakpoints {
 359                vec![]
 360            } else {
 361                breakpoints
 362                    .into_iter()
 363                    .filter(|bp| bp.state.is_enabled())
 364                    .map(Into::into)
 365                    .collect()
 366            };
 367
 368            let raw_breakpoints = raw_breakpoints
 369                .remove(&path)
 370                .unwrap_or_default()
 371                .into_iter()
 372                .filter(|bp| bp.bp.state.is_enabled());
 373            let error_path = path.clone();
 374            let send_request = self
 375                .request(dap_command::SetBreakpoints {
 376                    source: client_source(&path),
 377                    source_modified: Some(false),
 378                    breakpoints,
 379                })
 380                .map(|result| result.map_err(move |e| (error_path, e)));
 381
 382            let task = cx.spawn({
 383                let breakpoint_store = breakpoint_store.downgrade();
 384                async move |cx| {
 385                    let breakpoints = cx.background_spawn(send_request).await?;
 386
 387                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 388                        |(dap_bp, zed_bp)| {
 389                            Some((
 390                                zed_bp,
 391                                BreakpointSessionState {
 392                                    id: dap_bp.id?,
 393                                    verified: dap_bp.verified,
 394                                },
 395                            ))
 396                        },
 397                    );
 398                    breakpoint_store
 399                        .update(cx, |this, _| {
 400                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 401                        })
 402                        .ok();
 403
 404                    Ok(())
 405                }
 406            });
 407            breakpoint_tasks.push(task);
 408        }
 409
 410        cx.background_spawn(async move {
 411            futures::future::join_all(breakpoint_tasks)
 412                .await
 413                .into_iter()
 414                .filter_map(Result::err)
 415                .collect::<HashMap<_, _>>()
 416        })
 417    }
 418
 419    fn initialize_sequence(
 420        &self,
 421        capabilities: &Capabilities,
 422        initialized_rx: oneshot::Receiver<()>,
 423        dap_store: WeakEntity<DapStore>,
 424        cx: &mut Context<Session>,
 425    ) -> Task<Result<()>> {
 426        let raw = self.binary.request_args.clone();
 427
 428        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 429        let launch = match raw.request {
 430            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 431                raw: raw.configuration,
 432            }),
 433            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 434                raw: raw.configuration,
 435            }),
 436        };
 437
 438        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 439        // From spec (on initialization sequence):
 440        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 441        //
 442        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 443        let should_send_exception_breakpoints = capabilities
 444            .exception_breakpoint_filters
 445            .as_ref()
 446            .is_some_and(|filters| !filters.is_empty())
 447            || !configuration_done_supported;
 448        let supports_exception_filters = capabilities
 449            .supports_exception_filter_options
 450            .unwrap_or_default();
 451        let this = self.clone();
 452        let worktree = self.worktree().clone();
 453        let mut filters = capabilities
 454            .exception_breakpoint_filters
 455            .clone()
 456            .unwrap_or_default();
 457        let configuration_sequence = cx.spawn({
 458            async move |session, cx| {
 459                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 460                let (breakpoint_store, adapter_defaults) =
 461                    dap_store.read_with(cx, |dap_store, _| {
 462                        (
 463                            dap_store.breakpoint_store().clone(),
 464                            dap_store.adapter_options(&adapter_name),
 465                        )
 466                    })?;
 467                initialized_rx.await?;
 468                let errors_by_path = cx
 469                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 470                    .await;
 471
 472                dap_store.update(cx, |_, cx| {
 473                    let Some(worktree) = worktree.upgrade() else {
 474                        return;
 475                    };
 476
 477                    for (path, error) in &errors_by_path {
 478                        log::error!("failed to set breakpoints for {path:?}: {error}");
 479                    }
 480
 481                    if let Some(failed_path) = errors_by_path.keys().next() {
 482                        let failed_path = failed_path
 483                            .strip_prefix(worktree.read(cx).abs_path())
 484                            .unwrap_or(failed_path)
 485                            .display();
 486                        let message = format!(
 487                            "Failed to set breakpoints for {failed_path}{}",
 488                            match errors_by_path.len() {
 489                                0 => unreachable!(),
 490                                1 => "".into(),
 491                                2 => " and 1 other path".into(),
 492                                n => format!(" and {} other paths", n - 1),
 493                            }
 494                        );
 495                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 496                    }
 497                })?;
 498
 499                if should_send_exception_breakpoints {
 500                    _ = session.update(cx, |this, _| {
 501                        filters.retain(|filter| {
 502                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 503                                defaults
 504                                    .exception_breakpoints
 505                                    .get(&filter.filter)
 506                                    .map(|options| options.enabled)
 507                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 508                            } else {
 509                                filter.default.unwrap_or_default()
 510                            };
 511                            this.exception_breakpoints
 512                                .entry(filter.filter.clone())
 513                                .or_insert_with(|| (filter.clone(), is_enabled));
 514                            is_enabled
 515                        });
 516                    });
 517
 518                    this.send_exception_breakpoints(filters, supports_exception_filters)
 519                        .await
 520                        .ok();
 521                }
 522
 523                if configuration_done_supported {
 524                    this.request(ConfigurationDone {})
 525                } else {
 526                    Task::ready(Ok(()))
 527                }
 528                .await
 529            }
 530        });
 531
 532        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 533
 534        cx.spawn(async move |this, cx| {
 535            let result = task.await;
 536
 537            this.update(cx, |this, cx| {
 538                if let Some(this) = this.as_running_mut() {
 539                    this.is_started = true;
 540                    cx.notify();
 541                }
 542            })
 543            .ok();
 544
 545            result?;
 546            anyhow::Ok(())
 547        })
 548    }
 549
 550    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 551        let client = self.client.clone();
 552        let messages_tx = self.messages_tx.clone();
 553        let message_handler = Box::new(move |message| {
 554            messages_tx.unbounded_send(message).ok();
 555        });
 556        if client.should_reconnect_for_ssh() {
 557            Some(cx.spawn(async move |cx| {
 558                client.connect(message_handler, cx).await?;
 559                anyhow::Ok(())
 560            }))
 561        } else {
 562            None
 563        }
 564    }
 565
 566    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 567    where
 568        <R::DapRequest as dap::requests::Request>::Response: 'static,
 569        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 570    {
 571        let request = Arc::new(request);
 572
 573        let request_clone = request.clone();
 574        let connection = self.client.clone();
 575        self.executor.spawn(async move {
 576            let args = request_clone.to_dap();
 577            let response = connection.request::<R::DapRequest>(args).await?;
 578            request.response_from_dap(response)
 579        })
 580    }
 581}
 582
 583impl SessionState {
 584    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 585    where
 586        <R::DapRequest as dap::requests::Request>::Response: 'static,
 587        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 588    {
 589        match self {
 590            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 591            SessionState::Booting(_) => Task::ready(Err(anyhow!(
 592                "no adapter running to send request: {request:?}"
 593            ))),
 594        }
 595    }
 596
 597    /// Did this debug session stop at least once?
 598    pub(crate) fn has_ever_stopped(&self) -> bool {
 599        match self {
 600            SessionState::Booting(_) => false,
 601            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 602        }
 603    }
 604
 605    fn stopped(&mut self) {
 606        if let SessionState::Running(running) = self {
 607            running.has_ever_stopped = true;
 608        }
 609    }
 610}
 611
 612#[derive(Default)]
 613struct ThreadStates {
 614    global_state: Option<ThreadStatus>,
 615    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 616}
 617
 618impl ThreadStates {
 619    fn stop_all_threads(&mut self) {
 620        self.global_state = Some(ThreadStatus::Stopped);
 621        self.known_thread_states.clear();
 622    }
 623
 624    fn exit_all_threads(&mut self) {
 625        self.global_state = Some(ThreadStatus::Exited);
 626        self.known_thread_states.clear();
 627    }
 628
 629    fn continue_all_threads(&mut self) {
 630        self.global_state = Some(ThreadStatus::Running);
 631        self.known_thread_states.clear();
 632    }
 633
 634    fn stop_thread(&mut self, thread_id: ThreadId) {
 635        self.known_thread_states
 636            .insert(thread_id, ThreadStatus::Stopped);
 637    }
 638
 639    fn continue_thread(&mut self, thread_id: ThreadId) {
 640        self.known_thread_states
 641            .insert(thread_id, ThreadStatus::Running);
 642    }
 643
 644    fn process_step(&mut self, thread_id: ThreadId) {
 645        self.known_thread_states
 646            .insert(thread_id, ThreadStatus::Stepping);
 647    }
 648
 649    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 650        self.thread_state(thread_id)
 651            .unwrap_or(ThreadStatus::Running)
 652    }
 653
 654    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 655        self.known_thread_states
 656            .get(&thread_id)
 657            .copied()
 658            .or(self.global_state)
 659    }
 660
 661    fn exit_thread(&mut self, thread_id: ThreadId) {
 662        self.known_thread_states
 663            .insert(thread_id, ThreadStatus::Exited);
 664    }
 665
 666    fn any_stopped_thread(&self) -> bool {
 667        self.global_state
 668            .is_some_and(|state| state == ThreadStatus::Stopped)
 669            || self
 670                .known_thread_states
 671                .values()
 672                .any(|status| *status == ThreadStatus::Stopped)
 673    }
 674}
 675
 676// TODO(debugger): Wrap dap types with reference counting so the UI doesn't have to clone them on refresh
 677#[derive(Default)]
 678pub struct SessionSnapshot {
 679    threads: IndexMap<ThreadId, Thread>,
 680    thread_states: ThreadStates,
 681    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 682    stack_frames: IndexMap<StackFrameId, StackFrame>,
 683    locations: HashMap<u64, dap::LocationsResponse>,
 684    modules: Vec<dap::Module>,
 685    loaded_sources: Vec<dap::Source>,
 686}
 687
 688type IsEnabled = bool;
 689
 690#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 691pub struct OutputToken(pub usize);
 692/// Represents a current state of a single debug adapter and provides ways to mutate it.
 693pub struct Session {
 694    pub state: SessionState,
 695    active_snapshot: SessionSnapshot,
 696    snapshots: VecDeque<SessionSnapshot>,
 697    selected_snapshot_index: Option<usize>,
 698    id: SessionId,
 699    label: Option<SharedString>,
 700    adapter: DebugAdapterName,
 701    pub(super) capabilities: Capabilities,
 702    child_session_ids: HashSet<SessionId>,
 703    parent_session: Option<Entity<Session>>,
 704    output_token: OutputToken,
 705    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 706    watchers: HashMap<SharedString, Watcher>,
 707    is_session_terminated: bool,
 708    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 709    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 710    ignore_breakpoints: bool,
 711    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 712    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 713    background_tasks: Vec<Task<()>>,
 714    restart_task: Option<Task<()>>,
 715    task_context: TaskContext,
 716    memory: memory::Memory,
 717    quirks: SessionQuirks,
 718    remote_client: Option<Entity<RemoteClient>>,
 719    node_runtime: Option<NodeRuntime>,
 720    http_client: Option<Arc<dyn HttpClient>>,
 721    companion_port: Option<u16>,
 722}
 723
 724trait CacheableCommand: Any + Send + Sync {
 725    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 726    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 727    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 728}
 729
 730impl<T> CacheableCommand for T
 731where
 732    T: LocalDapCommand + PartialEq + Eq + Hash,
 733{
 734    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 735        (rhs as &dyn Any).downcast_ref::<Self>() == Some(self)
 736    }
 737
 738    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 739        T::hash(self, &mut hasher);
 740    }
 741
 742    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 743        self
 744    }
 745}
 746
 747pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 748
 749impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 750    fn from(request: T) -> Self {
 751        Self(Arc::new(request))
 752    }
 753}
 754
 755impl PartialEq for RequestSlot {
 756    fn eq(&self, other: &Self) -> bool {
 757        self.0.dyn_eq(other.0.as_ref())
 758    }
 759}
 760
 761impl Eq for RequestSlot {}
 762
 763impl Hash for RequestSlot {
 764    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 765        self.0.dyn_hash(state);
 766        (&*self.0 as &dyn Any).type_id().hash(state)
 767    }
 768}
 769
 770#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 771pub struct CompletionsQuery {
 772    pub query: String,
 773    pub column: u64,
 774    pub line: Option<u64>,
 775    pub frame_id: Option<u64>,
 776}
 777
 778impl CompletionsQuery {
 779    pub fn new(
 780        buffer: &language::Buffer,
 781        cursor_position: language::Anchor,
 782        frame_id: Option<u64>,
 783    ) -> Self {
 784        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 785        Self {
 786            query: buffer.text(),
 787            column: column as u64,
 788            frame_id,
 789            line: Some(row as u64),
 790        }
 791    }
 792}
 793
 794#[derive(Debug)]
 795pub enum SessionEvent {
 796    Modules,
 797    LoadedSources,
 798    Stopped(Option<ThreadId>),
 799    StackTrace,
 800    Variables,
 801    Watchers,
 802    Threads,
 803    InvalidateInlineValue,
 804    CapabilitiesLoaded,
 805    RunInTerminal {
 806        request: RunInTerminalRequestArguments,
 807        sender: mpsc::Sender<Result<u32>>,
 808    },
 809    DataBreakpointInfo,
 810    ConsoleOutput,
 811    HistoricSnapshotSelected,
 812}
 813
 814#[derive(Clone, Debug, PartialEq, Eq)]
 815pub enum SessionStateEvent {
 816    Running,
 817    Shutdown,
 818    Restart,
 819    SpawnChildSession {
 820        request: StartDebuggingRequestArguments,
 821    },
 822}
 823
 824impl EventEmitter<SessionEvent> for Session {}
 825impl EventEmitter<SessionStateEvent> for Session {}
 826
 827// local session will send breakpoint updates to DAP for all new breakpoints
 828// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 829// BreakpointStore notifies session on breakpoint changes
 830impl Session {
 831    pub(crate) fn new(
 832        breakpoint_store: Entity<BreakpointStore>,
 833        session_id: SessionId,
 834        parent_session: Option<Entity<Session>>,
 835        label: Option<SharedString>,
 836        adapter: DebugAdapterName,
 837        task_context: TaskContext,
 838        quirks: SessionQuirks,
 839        remote_client: Option<Entity<RemoteClient>>,
 840        node_runtime: Option<NodeRuntime>,
 841        http_client: Option<Arc<dyn HttpClient>>,
 842        cx: &mut App,
 843    ) -> Entity<Self> {
 844        cx.new::<Self>(|cx| {
 845            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 846                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 847                    if let Some(local) = (!this.ignore_breakpoints)
 848                        .then(|| this.as_running_mut())
 849                        .flatten()
 850                    {
 851                        local
 852                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 853                            .detach();
 854                    };
 855                }
 856                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 857                    if let Some(local) = (!this.ignore_breakpoints)
 858                        .then(|| this.as_running_mut())
 859                        .flatten()
 860                    {
 861                        local.unset_breakpoints_from_paths(paths, cx).detach();
 862                    }
 863                }
 864                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 865            })
 866            .detach();
 867
 868            Self {
 869                state: SessionState::Booting(None),
 870                snapshots: VecDeque::with_capacity(DEBUG_HISTORY_LIMIT),
 871                selected_snapshot_index: None,
 872                active_snapshot: Default::default(),
 873                id: session_id,
 874                child_session_ids: HashSet::default(),
 875                parent_session,
 876                capabilities: Capabilities::default(),
 877                watchers: HashMap::default(),
 878                output_token: OutputToken(0),
 879                output: circular_buffer::CircularBuffer::boxed(),
 880                requests: HashMap::default(),
 881                background_tasks: Vec::default(),
 882                restart_task: None,
 883                is_session_terminated: false,
 884                ignore_breakpoints: false,
 885                breakpoint_store,
 886                data_breakpoints: Default::default(),
 887                exception_breakpoints: Default::default(),
 888                label,
 889                adapter,
 890                task_context,
 891                memory: memory::Memory::new(),
 892                quirks,
 893                remote_client,
 894                node_runtime,
 895                http_client,
 896                companion_port: None,
 897            }
 898        })
 899    }
 900
 901    pub fn task_context(&self) -> &TaskContext {
 902        &self.task_context
 903    }
 904
 905    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 906        match &self.state {
 907            SessionState::Booting(_) => None,
 908            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 909        }
 910    }
 911
 912    pub fn boot(
 913        &mut self,
 914        binary: DebugAdapterBinary,
 915        worktree: Entity<Worktree>,
 916        dap_store: WeakEntity<DapStore>,
 917        cx: &mut Context<Self>,
 918    ) -> Task<Result<()>> {
 919        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 920        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 921
 922        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 923            let mut initialized_tx = Some(initialized_tx);
 924            while let Some(message) = message_rx.next().await {
 925                if let Message::Event(event) = message {
 926                    if let Events::Initialized(_) = *event {
 927                        if let Some(tx) = initialized_tx.take() {
 928                            tx.send(()).ok();
 929                        }
 930                    } else {
 931                        let Ok(_) = this.update(cx, |session, cx| {
 932                            session.handle_dap_event(event, cx);
 933                        }) else {
 934                            break;
 935                        };
 936                    }
 937                } else if let Message::Request(request) = message {
 938                    let Ok(_) = this.update(cx, |this, cx| {
 939                        if request.command == StartDebugging::COMMAND {
 940                            this.handle_start_debugging_request(request, cx)
 941                                .detach_and_log_err(cx);
 942                        } else if request.command == RunInTerminal::COMMAND {
 943                            this.handle_run_in_terminal_request(request, cx)
 944                                .detach_and_log_err(cx);
 945                        }
 946                    }) else {
 947                        break;
 948                    };
 949                }
 950            }
 951        })];
 952        self.background_tasks = background_tasks;
 953        let id = self.id;
 954        let parent_session = self.parent_session.clone();
 955
 956        cx.spawn(async move |this, cx| {
 957            let mode = RunningMode::new(
 958                id,
 959                parent_session,
 960                worktree.downgrade(),
 961                binary.clone(),
 962                message_tx,
 963                cx,
 964            )
 965            .await?;
 966            this.update(cx, |this, cx| {
 967                match &mut this.state {
 968                    SessionState::Booting(task) if task.is_some() => {
 969                        task.take().unwrap().detach_and_log_err(cx);
 970                    }
 971                    SessionState::Booting(_) => {}
 972                    SessionState::Running(_) => {
 973                        debug_panic!("Attempting to boot a session that is already running");
 974                    }
 975                };
 976                this.state = SessionState::Running(mode);
 977                cx.emit(SessionStateEvent::Running);
 978            })?;
 979
 980            this.update(cx, |session, cx| session.request_initialize(cx))?
 981                .await?;
 982
 983            let result = this
 984                .update(cx, |session, cx| {
 985                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 986                })?
 987                .await;
 988
 989            if result.is_err() {
 990                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 991
 992                console
 993                    .send(format!(
 994                        "Tried to launch debugger with: {}",
 995                        serde_json::to_string_pretty(&binary.request_args.configuration)
 996                            .unwrap_or_default(),
 997                    ))
 998                    .await
 999                    .ok();
1000            }
1001
1002            result
1003        })
1004    }
1005
1006    pub fn session_id(&self) -> SessionId {
1007        self.id
1008    }
1009
1010    pub fn child_session_ids(&self) -> HashSet<SessionId> {
1011        self.child_session_ids.clone()
1012    }
1013
1014    pub fn add_child_session_id(&mut self, session_id: SessionId) {
1015        self.child_session_ids.insert(session_id);
1016    }
1017
1018    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1019        self.child_session_ids.remove(&session_id);
1020    }
1021
1022    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1023        self.parent_session
1024            .as_ref()
1025            .map(|session| session.read(cx).id)
1026    }
1027
1028    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1029        self.parent_session.as_ref()
1030    }
1031
1032    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1033        let Some(client) = self.adapter_client() else {
1034            return Task::ready(());
1035        };
1036
1037        let supports_terminate = self
1038            .capabilities
1039            .support_terminate_debuggee
1040            .unwrap_or(false);
1041
1042        cx.background_spawn(async move {
1043            if supports_terminate {
1044                client
1045                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1046                        restart: Some(false),
1047                    })
1048                    .await
1049                    .ok();
1050            } else {
1051                client
1052                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1053                        restart: Some(false),
1054                        terminate_debuggee: Some(true),
1055                        suspend_debuggee: Some(false),
1056                    })
1057                    .await
1058                    .ok();
1059            }
1060        })
1061    }
1062
1063    pub fn capabilities(&self) -> &Capabilities {
1064        &self.capabilities
1065    }
1066
1067    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1068        match &self.state {
1069            SessionState::Booting(_) => None,
1070            SessionState::Running(running_mode) => Some(&running_mode.binary),
1071        }
1072    }
1073
1074    pub fn adapter(&self) -> DebugAdapterName {
1075        self.adapter.clone()
1076    }
1077
1078    pub fn label(&self) -> Option<SharedString> {
1079        self.label.clone()
1080    }
1081
1082    pub fn is_terminated(&self) -> bool {
1083        self.is_session_terminated
1084    }
1085
1086    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1087        let (tx, mut rx) = mpsc::unbounded();
1088
1089        cx.spawn(async move |this, cx| {
1090            while let Some(output) = rx.next().await {
1091                this.update(cx, |this, _| {
1092                    let event = dap::OutputEvent {
1093                        category: None,
1094                        output,
1095                        group: None,
1096                        variables_reference: None,
1097                        source: None,
1098                        line: None,
1099                        column: None,
1100                        data: None,
1101                        location_reference: None,
1102                    };
1103                    this.push_output(event);
1104                })?;
1105            }
1106            anyhow::Ok(())
1107        })
1108        .detach();
1109
1110        tx
1111    }
1112
1113    pub fn is_started(&self) -> bool {
1114        match &self.state {
1115            SessionState::Booting(_) => false,
1116            SessionState::Running(running) => running.is_started,
1117        }
1118    }
1119
1120    pub fn is_building(&self) -> bool {
1121        matches!(self.state, SessionState::Booting(_))
1122    }
1123
1124    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1125        match &mut self.state {
1126            SessionState::Running(local_mode) => Some(local_mode),
1127            SessionState::Booting(_) => None,
1128        }
1129    }
1130
1131    pub fn as_running(&self) -> Option<&RunningMode> {
1132        match &self.state {
1133            SessionState::Running(local_mode) => Some(local_mode),
1134            SessionState::Booting(_) => None,
1135        }
1136    }
1137
1138    fn handle_start_debugging_request(
1139        &mut self,
1140        request: dap::messages::Request,
1141        cx: &mut Context<Self>,
1142    ) -> Task<Result<()>> {
1143        let request_seq = request.seq;
1144
1145        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1146            .arguments
1147            .as_ref()
1148            .map(|value| serde_json::from_value(value.clone()));
1149
1150        let mut success = true;
1151        if let Some(Ok(request)) = launch_request {
1152            cx.emit(SessionStateEvent::SpawnChildSession { request });
1153        } else {
1154            log::error!(
1155                "Failed to parse launch request arguments: {:?}",
1156                request.arguments
1157            );
1158            success = false;
1159        }
1160
1161        cx.spawn(async move |this, cx| {
1162            this.update(cx, |this, cx| {
1163                this.respond_to_client(
1164                    request_seq,
1165                    success,
1166                    StartDebugging::COMMAND.to_string(),
1167                    None,
1168                    cx,
1169                )
1170            })?
1171            .await
1172        })
1173    }
1174
1175    fn handle_run_in_terminal_request(
1176        &mut self,
1177        request: dap::messages::Request,
1178        cx: &mut Context<Self>,
1179    ) -> Task<Result<()>> {
1180        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1181            request.arguments.unwrap_or_default(),
1182        ) {
1183            Ok(args) => args,
1184            Err(error) => {
1185                return cx.spawn(async move |session, cx| {
1186                    let error = serde_json::to_value(dap::ErrorResponse {
1187                        error: Some(dap::Message {
1188                            id: request.seq,
1189                            format: error.to_string(),
1190                            variables: None,
1191                            send_telemetry: None,
1192                            show_user: None,
1193                            url: None,
1194                            url_label: None,
1195                        }),
1196                    })
1197                    .ok();
1198
1199                    session
1200                        .update(cx, |this, cx| {
1201                            this.respond_to_client(
1202                                request.seq,
1203                                false,
1204                                StartDebugging::COMMAND.to_string(),
1205                                error,
1206                                cx,
1207                            )
1208                        })?
1209                        .await?;
1210
1211                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1212                });
1213            }
1214        };
1215
1216        let seq = request.seq;
1217
1218        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1219        cx.emit(SessionEvent::RunInTerminal {
1220            request: request_args,
1221            sender: tx,
1222        });
1223        cx.notify();
1224
1225        cx.spawn(async move |session, cx| {
1226            let result = util::maybe!(async move {
1227                rx.next().await.ok_or_else(|| {
1228                    anyhow!("failed to receive response from spawn terminal".to_string())
1229                })?
1230            })
1231            .await;
1232            let (success, body) = match result {
1233                Ok(pid) => (
1234                    true,
1235                    serde_json::to_value(dap::RunInTerminalResponse {
1236                        process_id: None,
1237                        shell_process_id: Some(pid as u64),
1238                    })
1239                    .ok(),
1240                ),
1241                Err(error) => (
1242                    false,
1243                    serde_json::to_value(dap::ErrorResponse {
1244                        error: Some(dap::Message {
1245                            id: seq,
1246                            format: error.to_string(),
1247                            variables: None,
1248                            send_telemetry: None,
1249                            show_user: None,
1250                            url: None,
1251                            url_label: None,
1252                        }),
1253                    })
1254                    .ok(),
1255                ),
1256            };
1257
1258            session
1259                .update(cx, |session, cx| {
1260                    session.respond_to_client(
1261                        seq,
1262                        success,
1263                        RunInTerminal::COMMAND.to_string(),
1264                        body,
1265                        cx,
1266                    )
1267                })?
1268                .await
1269        })
1270    }
1271
1272    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1273        let adapter_id = self.adapter().to_string();
1274        let request = Initialize { adapter_id };
1275
1276        let SessionState::Running(running) = &self.state else {
1277            return Task::ready(Err(anyhow!(
1278                "Cannot send initialize request, task still building"
1279            )));
1280        };
1281        let mut response = running.request(request.clone());
1282
1283        cx.spawn(async move |this, cx| {
1284            loop {
1285                let capabilities = response.await;
1286                match capabilities {
1287                    Err(e) => {
1288                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1289                            this.as_running()
1290                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1291                        }) else {
1292                            return Err(e);
1293                        };
1294                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1295                        reconnect.await?;
1296
1297                        let Ok(Some(r)) = this.update(cx, |this, _| {
1298                            this.as_running()
1299                                .map(|running| running.request(request.clone()))
1300                        }) else {
1301                            return Err(e);
1302                        };
1303                        response = r
1304                    }
1305                    Ok(capabilities) => {
1306                        this.update(cx, |session, cx| {
1307                            session.capabilities = capabilities;
1308
1309                            cx.emit(SessionEvent::CapabilitiesLoaded);
1310                        })?;
1311                        return Ok(());
1312                    }
1313                }
1314            }
1315        })
1316    }
1317
1318    pub(super) fn initialize_sequence(
1319        &mut self,
1320        initialize_rx: oneshot::Receiver<()>,
1321        dap_store: WeakEntity<DapStore>,
1322        cx: &mut Context<Self>,
1323    ) -> Task<Result<()>> {
1324        match &self.state {
1325            SessionState::Running(local_mode) => {
1326                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1327            }
1328            SessionState::Booting(_) => {
1329                Task::ready(Err(anyhow!("cannot initialize, still building")))
1330            }
1331        }
1332    }
1333
1334    pub fn run_to_position(
1335        &mut self,
1336        breakpoint: SourceBreakpoint,
1337        active_thread_id: ThreadId,
1338        cx: &mut Context<Self>,
1339    ) {
1340        match &mut self.state {
1341            SessionState::Running(local_mode) => {
1342                if !matches!(
1343                    self.active_snapshot
1344                        .thread_states
1345                        .thread_state(active_thread_id),
1346                    Some(ThreadStatus::Stopped)
1347                ) {
1348                    return;
1349                };
1350                let path = breakpoint.path.clone();
1351                local_mode.tmp_breakpoint = Some(breakpoint);
1352                let task = local_mode.send_breakpoints_from_path(
1353                    path,
1354                    BreakpointUpdatedReason::Toggled,
1355                    &self.breakpoint_store,
1356                    cx,
1357                );
1358
1359                cx.spawn(async move |this, cx| {
1360                    task.await;
1361                    this.update(cx, |this, cx| {
1362                        this.continue_thread(active_thread_id, cx);
1363                    })
1364                })
1365                .detach();
1366            }
1367            SessionState::Booting(_) => {}
1368        }
1369    }
1370
1371    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1372        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1373    }
1374
1375    pub fn output(
1376        &self,
1377        since: OutputToken,
1378    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1379        if self.output_token.0 == 0 {
1380            return (self.output.range(0..0), OutputToken(0));
1381        };
1382
1383        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1384
1385        let clamped_events_since = events_since.clamp(0, self.output.len());
1386        (
1387            self.output
1388                .range(self.output.len() - clamped_events_since..),
1389            self.output_token,
1390        )
1391    }
1392
1393    pub fn respond_to_client(
1394        &self,
1395        request_seq: u64,
1396        success: bool,
1397        command: String,
1398        body: Option<serde_json::Value>,
1399        cx: &mut Context<Self>,
1400    ) -> Task<Result<()>> {
1401        let Some(local_session) = self.as_running() else {
1402            unreachable!("Cannot respond to remote client");
1403        };
1404        let client = local_session.client.clone();
1405
1406        cx.background_spawn(async move {
1407            client
1408                .send_message(Message::Response(Response {
1409                    body,
1410                    success,
1411                    command,
1412                    seq: request_seq + 1,
1413                    request_seq,
1414                    message: None,
1415                }))
1416                .await
1417        })
1418    }
1419
1420    fn session_state(&self) -> &SessionSnapshot {
1421        self.selected_snapshot_index
1422            .and_then(|ix| self.snapshots.get(ix))
1423            .unwrap_or_else(|| &self.active_snapshot)
1424    }
1425
1426    fn push_to_history(&mut self) {
1427        if !self.has_ever_stopped() {
1428            return;
1429        }
1430
1431        while self.snapshots.len() >= DEBUG_HISTORY_LIMIT {
1432            self.snapshots.pop_front();
1433        }
1434
1435        self.snapshots
1436            .push_back(std::mem::take(&mut self.active_snapshot));
1437    }
1438
1439    pub fn historic_snapshots(&self) -> &VecDeque<SessionSnapshot> {
1440        &self.snapshots
1441    }
1442
1443    pub fn select_historic_snapshot(&mut self, ix: Option<usize>, cx: &mut Context<Session>) {
1444        if self.selected_snapshot_index == ix {
1445            return;
1446        }
1447
1448        if self
1449            .selected_snapshot_index
1450            .is_some_and(|ix| self.snapshots.len() <= ix)
1451        {
1452            debug_panic!("Attempted to select a debug session with an out of bounds index");
1453            return;
1454        }
1455
1456        self.selected_snapshot_index = ix;
1457
1458        if ix.is_some() {
1459            cx.emit(SessionEvent::HistoricSnapshotSelected);
1460        }
1461
1462        cx.notify();
1463    }
1464
1465    pub fn active_snapshot_index(&self) -> Option<usize> {
1466        self.selected_snapshot_index
1467    }
1468
1469    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1470        self.push_to_history();
1471
1472        self.state.stopped();
1473        // todo(debugger): Find a clean way to get around the clone
1474        let breakpoint_store = self.breakpoint_store.clone();
1475        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1476            let breakpoint = local.tmp_breakpoint.take()?;
1477            let path = breakpoint.path;
1478            Some((local, path))
1479        }) {
1480            local
1481                .send_breakpoints_from_path(
1482                    path,
1483                    BreakpointUpdatedReason::Toggled,
1484                    &breakpoint_store,
1485                    cx,
1486                )
1487                .detach();
1488        };
1489
1490        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1491            self.active_snapshot.thread_states.stop_all_threads();
1492            self.invalidate_command_type::<StackTraceCommand>();
1493        }
1494
1495        // Event if we stopped all threads we still need to insert the thread_id
1496        // to our own data
1497        if let Some(thread_id) = event.thread_id {
1498            self.active_snapshot
1499                .thread_states
1500                .stop_thread(ThreadId(thread_id));
1501
1502            self.invalidate_state(
1503                &StackTraceCommand {
1504                    thread_id,
1505                    start_frame: None,
1506                    levels: None,
1507                }
1508                .into(),
1509            );
1510        }
1511
1512        self.invalidate_generic();
1513        self.active_snapshot.threads.clear();
1514        self.active_snapshot.variables.clear();
1515        cx.emit(SessionEvent::Stopped(
1516            event
1517                .thread_id
1518                .map(Into::into)
1519                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1520        ));
1521        cx.emit(SessionEvent::InvalidateInlineValue);
1522        cx.notify();
1523    }
1524
1525    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1526        match *event {
1527            Events::Initialized(_) => {
1528                debug_assert!(
1529                    false,
1530                    "Initialized event should have been handled in LocalMode"
1531                );
1532            }
1533            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1534            Events::Continued(event) => {
1535                if event.all_threads_continued.unwrap_or_default() {
1536                    self.active_snapshot.thread_states.continue_all_threads();
1537                    self.breakpoint_store.update(cx, |store, cx| {
1538                        store.remove_active_position(Some(self.session_id()), cx)
1539                    });
1540                } else {
1541                    self.active_snapshot
1542                        .thread_states
1543                        .continue_thread(ThreadId(event.thread_id));
1544                }
1545                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1546                self.invalidate_generic();
1547            }
1548            Events::Exited(_event) => {
1549                self.clear_active_debug_line(cx);
1550            }
1551            Events::Terminated(_) => {
1552                self.shutdown(cx).detach();
1553            }
1554            Events::Thread(event) => {
1555                let thread_id = ThreadId(event.thread_id);
1556
1557                match event.reason {
1558                    dap::ThreadEventReason::Started => {
1559                        self.active_snapshot
1560                            .thread_states
1561                            .continue_thread(thread_id);
1562                    }
1563                    dap::ThreadEventReason::Exited => {
1564                        self.active_snapshot.thread_states.exit_thread(thread_id);
1565                    }
1566                    reason => {
1567                        log::error!("Unhandled thread event reason {:?}", reason);
1568                    }
1569                }
1570                self.invalidate_state(&ThreadsCommand.into());
1571                cx.notify();
1572            }
1573            Events::Output(event) => {
1574                if event
1575                    .category
1576                    .as_ref()
1577                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1578                {
1579                    return;
1580                }
1581
1582                self.push_output(event);
1583                cx.notify();
1584            }
1585            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1586                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1587            }),
1588            Events::Module(event) => {
1589                match event.reason {
1590                    dap::ModuleEventReason::New => {
1591                        self.active_snapshot.modules.push(event.module);
1592                    }
1593                    dap::ModuleEventReason::Changed => {
1594                        if let Some(module) = self
1595                            .active_snapshot
1596                            .modules
1597                            .iter_mut()
1598                            .find(|other| event.module.id == other.id)
1599                        {
1600                            *module = event.module;
1601                        }
1602                    }
1603                    dap::ModuleEventReason::Removed => {
1604                        self.active_snapshot
1605                            .modules
1606                            .retain(|other| event.module.id != other.id);
1607                    }
1608                }
1609
1610                // todo(debugger): We should only send the invalidate command to downstream clients.
1611                // self.invalidate_state(&ModulesCommand.into());
1612            }
1613            Events::LoadedSource(_) => {
1614                self.invalidate_state(&LoadedSourcesCommand.into());
1615            }
1616            Events::Capabilities(event) => {
1617                self.capabilities = self.capabilities.merge(event.capabilities);
1618
1619                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1620                let recent_filters = self
1621                    .capabilities
1622                    .exception_breakpoint_filters
1623                    .iter()
1624                    .flatten()
1625                    .map(|filter| (filter.filter.clone(), filter.clone()))
1626                    .collect::<BTreeMap<_, _>>();
1627                for filter in recent_filters.values() {
1628                    let default = filter.default.unwrap_or_default();
1629                    self.exception_breakpoints
1630                        .entry(filter.filter.clone())
1631                        .or_insert_with(|| (filter.clone(), default));
1632                }
1633                self.exception_breakpoints
1634                    .retain(|k, _| recent_filters.contains_key(k));
1635                if self.is_started() {
1636                    self.send_exception_breakpoints(cx);
1637                }
1638
1639                // Remove the ones that no longer exist.
1640                cx.notify();
1641            }
1642            Events::Memory(_) => {}
1643            Events::Process(_) => {}
1644            Events::ProgressEnd(_) => {}
1645            Events::ProgressStart(_) => {}
1646            Events::ProgressUpdate(_) => {}
1647            Events::Invalidated(_) => {}
1648            Events::Other(event) => {
1649                if event.event == "launchBrowserInCompanion" {
1650                    let Some(request) = serde_json::from_value(event.body).ok() else {
1651                        log::error!("failed to deserialize launchBrowserInCompanion event");
1652                        return;
1653                    };
1654                    self.launch_browser_for_remote_server(request, cx);
1655                } else if event.event == "killCompanionBrowser" {
1656                    let Some(request) = serde_json::from_value(event.body).ok() else {
1657                        log::error!("failed to deserialize killCompanionBrowser event");
1658                        return;
1659                    };
1660                    self.kill_browser(request, cx);
1661                }
1662            }
1663        }
1664    }
1665
1666    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1667    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1668        &mut self,
1669        request: T,
1670        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1671        cx: &mut Context<Self>,
1672    ) {
1673        const {
1674            assert!(
1675                T::CACHEABLE,
1676                "Only requests marked as cacheable should invoke `fetch`"
1677            );
1678        }
1679
1680        if (!self.active_snapshot.thread_states.any_stopped_thread()
1681            && request.type_id() != TypeId::of::<ThreadsCommand>())
1682            || self.selected_snapshot_index.is_some()
1683            || self.is_session_terminated
1684        {
1685            return;
1686        }
1687
1688        let request_map = self
1689            .requests
1690            .entry(std::any::TypeId::of::<T>())
1691            .or_default();
1692
1693        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1694            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1695
1696            let task = Self::request_inner::<Arc<T>>(
1697                &self.capabilities,
1698                &self.state,
1699                command,
1700                |this, result, cx| {
1701                    process_result(this, result, cx);
1702                    None
1703                },
1704                cx,
1705            );
1706            let task = cx
1707                .background_executor()
1708                .spawn(async move {
1709                    let _ = task.await?;
1710                    Some(())
1711                })
1712                .shared();
1713
1714            vacant.insert(task);
1715            cx.notify();
1716        }
1717    }
1718
1719    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1720        capabilities: &Capabilities,
1721        mode: &SessionState,
1722        request: T,
1723        process_result: impl FnOnce(
1724            &mut Self,
1725            Result<T::Response>,
1726            &mut Context<Self>,
1727        ) -> Option<T::Response>
1728        + 'static,
1729        cx: &mut Context<Self>,
1730    ) -> Task<Option<T::Response>> {
1731        if !T::is_supported(capabilities) {
1732            log::warn!(
1733                "Attempted to send a DAP request that isn't supported: {:?}",
1734                request
1735            );
1736            let error = Err(anyhow::Error::msg(
1737                "Couldn't complete request because it's not supported",
1738            ));
1739            return cx.spawn(async move |this, cx| {
1740                this.update(cx, |this, cx| process_result(this, error, cx))
1741                    .ok()
1742                    .flatten()
1743            });
1744        }
1745
1746        let request = mode.request_dap(request);
1747        cx.spawn(async move |this, cx| {
1748            let result = request.await;
1749            this.update(cx, |this, cx| process_result(this, result, cx))
1750                .ok()
1751                .flatten()
1752        })
1753    }
1754
1755    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1756        &self,
1757        request: T,
1758        process_result: impl FnOnce(
1759            &mut Self,
1760            Result<T::Response>,
1761            &mut Context<Self>,
1762        ) -> Option<T::Response>
1763        + 'static,
1764        cx: &mut Context<Self>,
1765    ) -> Task<Option<T::Response>> {
1766        Self::request_inner(&self.capabilities, &self.state, request, process_result, cx)
1767    }
1768
1769    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1770        self.requests.remove(&std::any::TypeId::of::<Command>());
1771    }
1772
1773    fn invalidate_generic(&mut self) {
1774        self.invalidate_command_type::<ModulesCommand>();
1775        self.invalidate_command_type::<LoadedSourcesCommand>();
1776        self.invalidate_command_type::<ThreadsCommand>();
1777        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1778        self.invalidate_command_type::<ReadMemory>();
1779        let executor = self.as_running().map(|running| running.executor.clone());
1780        if let Some(executor) = executor {
1781            self.memory.clear(&executor);
1782        }
1783    }
1784
1785    fn invalidate_state(&mut self, key: &RequestSlot) {
1786        self.requests
1787            .entry((&*key.0 as &dyn Any).type_id())
1788            .and_modify(|request_map| {
1789                request_map.remove(key);
1790            });
1791    }
1792
1793    fn push_output(&mut self, event: OutputEvent) {
1794        self.output.push_back(event);
1795        self.output_token.0 += 1;
1796    }
1797
1798    pub fn any_stopped_thread(&self) -> bool {
1799        self.active_snapshot.thread_states.any_stopped_thread()
1800    }
1801
1802    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1803        self.active_snapshot.thread_states.thread_status(thread_id)
1804    }
1805
1806    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1807        self.fetch(
1808            dap_command::ThreadsCommand,
1809            |this, result, cx| {
1810                let Some(result) = result.log_err() else {
1811                    return;
1812                };
1813
1814                this.active_snapshot.threads = result
1815                    .into_iter()
1816                    .map(|thread| (ThreadId(thread.id), Thread::from(thread)))
1817                    .collect();
1818
1819                this.invalidate_command_type::<StackTraceCommand>();
1820                cx.emit(SessionEvent::Threads);
1821                cx.notify();
1822            },
1823            cx,
1824        );
1825
1826        let state = self.session_state();
1827        state
1828            .threads
1829            .values()
1830            .map(|thread| {
1831                (
1832                    thread.dap.clone(),
1833                    state.thread_states.thread_status(ThreadId(thread.dap.id)),
1834                )
1835            })
1836            .collect()
1837    }
1838
1839    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1840        self.fetch(
1841            dap_command::ModulesCommand,
1842            |this, result, cx| {
1843                let Some(result) = result.log_err() else {
1844                    return;
1845                };
1846
1847                this.active_snapshot.modules = result;
1848                cx.emit(SessionEvent::Modules);
1849                cx.notify();
1850            },
1851            cx,
1852        );
1853
1854        &self.session_state().modules
1855    }
1856
1857    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1858    pub fn data_access_size(
1859        &mut self,
1860        frame_id: Option<u64>,
1861        evaluate_name: &str,
1862        cx: &mut Context<Self>,
1863    ) -> Task<Option<u64>> {
1864        let request = self.request(
1865            EvaluateCommand {
1866                expression: format!("?${{sizeof({evaluate_name})}}"),
1867                frame_id,
1868
1869                context: Some(EvaluateArgumentsContext::Repl),
1870                source: None,
1871            },
1872            |_, response, _| response.ok(),
1873            cx,
1874        );
1875        cx.background_spawn(async move {
1876            let result = request.await?;
1877            result.result.parse().ok()
1878        })
1879    }
1880
1881    pub fn memory_reference_of_expr(
1882        &mut self,
1883        frame_id: Option<u64>,
1884        expression: String,
1885        cx: &mut Context<Self>,
1886    ) -> Task<Option<(String, Option<String>)>> {
1887        let request = self.request(
1888            EvaluateCommand {
1889                expression,
1890                frame_id,
1891
1892                context: Some(EvaluateArgumentsContext::Repl),
1893                source: None,
1894            },
1895            |_, response, _| response.ok(),
1896            cx,
1897        );
1898        cx.background_spawn(async move {
1899            let result = request.await?;
1900            result
1901                .memory_reference
1902                .map(|reference| (reference, result.type_))
1903        })
1904    }
1905
1906    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1907        let data = base64::engine::general_purpose::STANDARD.encode(data);
1908        self.request(
1909            WriteMemoryArguments {
1910                memory_reference: address.to_string(),
1911                data,
1912                allow_partial: None,
1913                offset: None,
1914            },
1915            |this, response, cx| {
1916                this.memory.clear(cx.background_executor());
1917                this.invalidate_command_type::<ReadMemory>();
1918                this.invalidate_command_type::<VariablesCommand>();
1919                cx.emit(SessionEvent::Variables);
1920                response.ok()
1921            },
1922            cx,
1923        )
1924        .detach();
1925    }
1926    pub fn read_memory(
1927        &mut self,
1928        range: RangeInclusive<u64>,
1929        cx: &mut Context<Self>,
1930    ) -> MemoryIterator {
1931        // This function is a bit more involved when it comes to fetching data.
1932        // Since we attempt to read memory in pages, we need to account for some parts
1933        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1934        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1935        let page_range = Memory::memory_range_to_page_range(range.clone());
1936        for page_address in PageAddress::iter_range(page_range) {
1937            self.read_single_page_memory(page_address, cx);
1938        }
1939        self.memory.memory_range(range)
1940    }
1941
1942    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1943        _ = maybe!({
1944            let builder = self.memory.build_page(page_start)?;
1945
1946            self.memory_read_fetch_page_recursive(builder, cx);
1947            Some(())
1948        });
1949    }
1950    fn memory_read_fetch_page_recursive(
1951        &mut self,
1952        mut builder: MemoryPageBuilder,
1953        cx: &mut Context<Self>,
1954    ) {
1955        let Some(next_request) = builder.next_request() else {
1956            // We're done fetching. Let's grab the page and insert it into our memory store.
1957            let (address, contents) = builder.build();
1958            self.memory.insert_page(address, contents);
1959
1960            return;
1961        };
1962        let size = next_request.size;
1963        self.fetch(
1964            ReadMemory {
1965                memory_reference: format!("0x{:X}", next_request.address),
1966                offset: Some(0),
1967                count: next_request.size,
1968            },
1969            move |this, memory, cx| {
1970                if let Ok(memory) = memory {
1971                    builder.known(memory.content);
1972                    if let Some(unknown) = memory.unreadable_bytes {
1973                        builder.unknown(unknown);
1974                    }
1975                    // This is the recursive bit: if we're not yet done with
1976                    // the whole page, we'll kick off a new request with smaller range.
1977                    // Note that this function is recursive only conceptually;
1978                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1979                    this.memory_read_fetch_page_recursive(builder, cx);
1980                } else {
1981                    builder.unknown(size);
1982                }
1983            },
1984            cx,
1985        );
1986    }
1987
1988    pub fn ignore_breakpoints(&self) -> bool {
1989        self.ignore_breakpoints
1990    }
1991
1992    pub fn toggle_ignore_breakpoints(
1993        &mut self,
1994        cx: &mut App,
1995    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1996        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1997    }
1998
1999    pub(crate) fn set_ignore_breakpoints(
2000        &mut self,
2001        ignore: bool,
2002        cx: &mut App,
2003    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
2004        if self.ignore_breakpoints == ignore {
2005            return Task::ready(HashMap::default());
2006        }
2007
2008        self.ignore_breakpoints = ignore;
2009
2010        if let Some(local) = self.as_running() {
2011            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
2012        } else {
2013            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
2014            unimplemented!()
2015        }
2016    }
2017
2018    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
2019        self.data_breakpoints.values()
2020    }
2021
2022    pub fn exception_breakpoints(
2023        &self,
2024    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
2025        self.exception_breakpoints.values()
2026    }
2027
2028    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
2029        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
2030            *is_enabled = !*is_enabled;
2031            self.send_exception_breakpoints(cx);
2032        }
2033    }
2034
2035    fn send_exception_breakpoints(&mut self, cx: &App) {
2036        if let Some(local) = self.as_running() {
2037            let exception_filters = self
2038                .exception_breakpoints
2039                .values()
2040                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
2041                .collect();
2042
2043            let supports_exception_filters = self
2044                .capabilities
2045                .supports_exception_filter_options
2046                .unwrap_or_default();
2047            local
2048                .send_exception_breakpoints(exception_filters, supports_exception_filters)
2049                .detach_and_log_err(cx);
2050        } else {
2051            debug_assert!(false, "Not implemented");
2052        }
2053    }
2054
2055    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
2056        if let Some(state) = self.data_breakpoints.get_mut(id) {
2057            state.is_enabled = !state.is_enabled;
2058            self.send_exception_breakpoints(cx);
2059        }
2060    }
2061
2062    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
2063        if let Some(mode) = self.as_running() {
2064            let breakpoints = self
2065                .data_breakpoints
2066                .values()
2067                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
2068                .collect();
2069            let command = SetDataBreakpointsCommand { breakpoints };
2070            mode.request(command).detach_and_log_err(cx);
2071        }
2072    }
2073
2074    pub fn create_data_breakpoint(
2075        &mut self,
2076        context: Arc<DataBreakpointContext>,
2077        data_id: String,
2078        dap: dap::DataBreakpoint,
2079        cx: &mut Context<Self>,
2080    ) {
2081        if self.data_breakpoints.remove(&data_id).is_none() {
2082            self.data_breakpoints.insert(
2083                data_id,
2084                DataBreakpointState {
2085                    dap,
2086                    is_enabled: true,
2087                    context,
2088                },
2089            );
2090        }
2091        self.send_data_breakpoints(cx);
2092    }
2093
2094    pub fn breakpoints_enabled(&self) -> bool {
2095        self.ignore_breakpoints
2096    }
2097
2098    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
2099        self.fetch(
2100            dap_command::LoadedSourcesCommand,
2101            |this, result, cx| {
2102                let Some(result) = result.log_err() else {
2103                    return;
2104                };
2105                this.active_snapshot.loaded_sources = result;
2106                cx.emit(SessionEvent::LoadedSources);
2107                cx.notify();
2108            },
2109            cx,
2110        );
2111        &self.session_state().loaded_sources
2112    }
2113
2114    fn fallback_to_manual_restart(
2115        &mut self,
2116        res: Result<()>,
2117        cx: &mut Context<Self>,
2118    ) -> Option<()> {
2119        if res.log_err().is_none() {
2120            cx.emit(SessionStateEvent::Restart);
2121            return None;
2122        }
2123        Some(())
2124    }
2125
2126    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2127        res.log_err()?;
2128        Some(())
2129    }
2130
2131    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2132        thread_id: ThreadId,
2133    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2134    {
2135        move |this, response, cx| match response.log_err() {
2136            Some(response) => {
2137                this.breakpoint_store.update(cx, |store, cx| {
2138                    store.remove_active_position(Some(this.session_id()), cx)
2139                });
2140                Some(response)
2141            }
2142            None => {
2143                this.active_snapshot.thread_states.stop_thread(thread_id);
2144                cx.notify();
2145                None
2146            }
2147        }
2148    }
2149
2150    fn clear_active_debug_line_response(
2151        &mut self,
2152        response: Result<()>,
2153        cx: &mut Context<Session>,
2154    ) -> Option<()> {
2155        response.log_err()?;
2156        self.clear_active_debug_line(cx);
2157        Some(())
2158    }
2159
2160    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2161        self.breakpoint_store.update(cx, |store, cx| {
2162            store.remove_active_position(Some(self.id), cx)
2163        });
2164    }
2165
2166    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2167        self.request(
2168            PauseCommand {
2169                thread_id: thread_id.0,
2170            },
2171            Self::empty_response,
2172            cx,
2173        )
2174        .detach();
2175    }
2176
2177    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2178        self.request(
2179            RestartStackFrameCommand { stack_frame_id },
2180            Self::empty_response,
2181            cx,
2182        )
2183        .detach();
2184    }
2185
2186    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2187        if self.restart_task.is_some() || self.as_running().is_none() {
2188            return;
2189        }
2190
2191        let supports_dap_restart =
2192            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2193
2194        self.restart_task = Some(cx.spawn(async move |this, cx| {
2195            let _ = this.update(cx, |session, cx| {
2196                if supports_dap_restart {
2197                    session
2198                        .request(
2199                            RestartCommand {
2200                                raw: args.unwrap_or(Value::Null),
2201                            },
2202                            Self::fallback_to_manual_restart,
2203                            cx,
2204                        )
2205                        .detach();
2206                } else {
2207                    cx.emit(SessionStateEvent::Restart);
2208                }
2209            });
2210        }));
2211    }
2212
2213    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2214        if self.is_session_terminated {
2215            return Task::ready(());
2216        }
2217
2218        self.is_session_terminated = true;
2219        self.active_snapshot.thread_states.exit_all_threads();
2220        cx.notify();
2221
2222        let task = match &mut self.state {
2223            SessionState::Running(_) => {
2224                if self
2225                    .capabilities
2226                    .supports_terminate_request
2227                    .unwrap_or_default()
2228                {
2229                    self.request(
2230                        TerminateCommand {
2231                            restart: Some(false),
2232                        },
2233                        Self::clear_active_debug_line_response,
2234                        cx,
2235                    )
2236                } else {
2237                    self.request(
2238                        DisconnectCommand {
2239                            restart: Some(false),
2240                            terminate_debuggee: Some(true),
2241                            suspend_debuggee: Some(false),
2242                        },
2243                        Self::clear_active_debug_line_response,
2244                        cx,
2245                    )
2246                }
2247            }
2248            SessionState::Booting(build_task) => {
2249                build_task.take();
2250                Task::ready(Some(()))
2251            }
2252        };
2253
2254        cx.emit(SessionStateEvent::Shutdown);
2255
2256        cx.spawn(async move |this, cx| {
2257            task.await;
2258            let _ = this.update(cx, |this, _| {
2259                if let Some(adapter_client) = this.adapter_client() {
2260                    adapter_client.kill();
2261                }
2262            });
2263        })
2264    }
2265
2266    pub fn completions(
2267        &mut self,
2268        query: CompletionsQuery,
2269        cx: &mut Context<Self>,
2270    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2271        let task = self.request(query, |_, result, _| result.log_err(), cx);
2272
2273        cx.background_executor().spawn(async move {
2274            anyhow::Ok(
2275                task.await
2276                    .map(|response| response.targets)
2277                    .context("failed to fetch completions")?,
2278            )
2279        })
2280    }
2281
2282    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2283        self.select_historic_snapshot(None, cx);
2284
2285        let supports_single_thread_execution_requests =
2286            self.capabilities.supports_single_thread_execution_requests;
2287        self.active_snapshot
2288            .thread_states
2289            .continue_thread(thread_id);
2290        self.request(
2291            ContinueCommand {
2292                args: ContinueArguments {
2293                    thread_id: thread_id.0,
2294                    single_thread: supports_single_thread_execution_requests,
2295                },
2296            },
2297            Self::on_step_response::<ContinueCommand>(thread_id),
2298            cx,
2299        )
2300        .detach();
2301    }
2302
2303    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2304        match self.state {
2305            SessionState::Running(ref local) => Some(local.client.clone()),
2306            SessionState::Booting(_) => None,
2307        }
2308    }
2309
2310    pub fn has_ever_stopped(&self) -> bool {
2311        self.state.has_ever_stopped()
2312    }
2313
2314    pub fn step_over(
2315        &mut self,
2316        thread_id: ThreadId,
2317        granularity: SteppingGranularity,
2318        cx: &mut Context<Self>,
2319    ) {
2320        self.select_historic_snapshot(None, cx);
2321
2322        let supports_single_thread_execution_requests =
2323            self.capabilities.supports_single_thread_execution_requests;
2324        let supports_stepping_granularity = self
2325            .capabilities
2326            .supports_stepping_granularity
2327            .unwrap_or_default();
2328
2329        let command = NextCommand {
2330            inner: StepCommand {
2331                thread_id: thread_id.0,
2332                granularity: supports_stepping_granularity.then(|| granularity),
2333                single_thread: supports_single_thread_execution_requests,
2334            },
2335        };
2336
2337        self.active_snapshot.thread_states.process_step(thread_id);
2338        self.request(
2339            command,
2340            Self::on_step_response::<NextCommand>(thread_id),
2341            cx,
2342        )
2343        .detach();
2344    }
2345
2346    pub fn step_in(
2347        &mut self,
2348        thread_id: ThreadId,
2349        granularity: SteppingGranularity,
2350        cx: &mut Context<Self>,
2351    ) {
2352        self.select_historic_snapshot(None, cx);
2353
2354        let supports_single_thread_execution_requests =
2355            self.capabilities.supports_single_thread_execution_requests;
2356        let supports_stepping_granularity = self
2357            .capabilities
2358            .supports_stepping_granularity
2359            .unwrap_or_default();
2360
2361        let command = StepInCommand {
2362            inner: StepCommand {
2363                thread_id: thread_id.0,
2364                granularity: supports_stepping_granularity.then(|| granularity),
2365                single_thread: supports_single_thread_execution_requests,
2366            },
2367        };
2368
2369        self.active_snapshot.thread_states.process_step(thread_id);
2370        self.request(
2371            command,
2372            Self::on_step_response::<StepInCommand>(thread_id),
2373            cx,
2374        )
2375        .detach();
2376    }
2377
2378    pub fn step_out(
2379        &mut self,
2380        thread_id: ThreadId,
2381        granularity: SteppingGranularity,
2382        cx: &mut Context<Self>,
2383    ) {
2384        self.select_historic_snapshot(None, cx);
2385
2386        let supports_single_thread_execution_requests =
2387            self.capabilities.supports_single_thread_execution_requests;
2388        let supports_stepping_granularity = self
2389            .capabilities
2390            .supports_stepping_granularity
2391            .unwrap_or_default();
2392
2393        let command = StepOutCommand {
2394            inner: StepCommand {
2395                thread_id: thread_id.0,
2396                granularity: supports_stepping_granularity.then(|| granularity),
2397                single_thread: supports_single_thread_execution_requests,
2398            },
2399        };
2400
2401        self.active_snapshot.thread_states.process_step(thread_id);
2402        self.request(
2403            command,
2404            Self::on_step_response::<StepOutCommand>(thread_id),
2405            cx,
2406        )
2407        .detach();
2408    }
2409
2410    pub fn step_back(
2411        &mut self,
2412        thread_id: ThreadId,
2413        granularity: SteppingGranularity,
2414        cx: &mut Context<Self>,
2415    ) {
2416        self.select_historic_snapshot(None, cx);
2417
2418        let supports_single_thread_execution_requests =
2419            self.capabilities.supports_single_thread_execution_requests;
2420        let supports_stepping_granularity = self
2421            .capabilities
2422            .supports_stepping_granularity
2423            .unwrap_or_default();
2424
2425        let command = StepBackCommand {
2426            inner: StepCommand {
2427                thread_id: thread_id.0,
2428                granularity: supports_stepping_granularity.then(|| granularity),
2429                single_thread: supports_single_thread_execution_requests,
2430            },
2431        };
2432
2433        self.active_snapshot.thread_states.process_step(thread_id);
2434
2435        self.request(
2436            command,
2437            Self::on_step_response::<StepBackCommand>(thread_id),
2438            cx,
2439        )
2440        .detach();
2441    }
2442
2443    pub fn stack_frames(
2444        &mut self,
2445        thread_id: ThreadId,
2446        cx: &mut Context<Self>,
2447    ) -> Result<Vec<StackFrame>> {
2448        if self.active_snapshot.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2449            && self.requests.contains_key(&ThreadsCommand.type_id())
2450            && self.active_snapshot.threads.contains_key(&thread_id)
2451        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2452        // We could still be using an old thread id and have sent a new thread's request
2453        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2454        // But it very well could cause a minor bug in the future that is hard to track down
2455        {
2456            self.fetch(
2457                super::dap_command::StackTraceCommand {
2458                    thread_id: thread_id.0,
2459                    start_frame: None,
2460                    levels: None,
2461                },
2462                move |this, stack_frames, cx| {
2463                    let entry =
2464                        this.active_snapshot
2465                            .threads
2466                            .entry(thread_id)
2467                            .and_modify(|thread| match &stack_frames {
2468                                Ok(stack_frames) => {
2469                                    thread.stack_frames = stack_frames
2470                                        .iter()
2471                                        .cloned()
2472                                        .map(StackFrame::from)
2473                                        .collect();
2474                                    thread.stack_frames_error = None;
2475                                }
2476                                Err(error) => {
2477                                    thread.stack_frames.clear();
2478                                    thread.stack_frames_error = Some(error.to_string().into());
2479                                }
2480                            });
2481                    debug_assert!(
2482                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2483                        "Sent request for thread_id that doesn't exist"
2484                    );
2485                    if let Ok(stack_frames) = stack_frames {
2486                        this.active_snapshot.stack_frames.extend(
2487                            stack_frames
2488                                .into_iter()
2489                                .filter(|frame| {
2490                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2491                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2492                                    !(frame.id == 0
2493                                        && frame.line == 0
2494                                        && frame.column == 0
2495                                        && frame.presentation_hint
2496                                            == Some(StackFramePresentationHint::Label))
2497                                })
2498                                .map(|frame| (frame.id, StackFrame::from(frame))),
2499                        );
2500                    }
2501
2502                    this.invalidate_command_type::<ScopesCommand>();
2503                    this.invalidate_command_type::<VariablesCommand>();
2504
2505                    cx.emit(SessionEvent::StackTrace);
2506                },
2507                cx,
2508            );
2509        }
2510
2511        match self.session_state().threads.get(&thread_id) {
2512            Some(thread) => {
2513                if let Some(error) = &thread.stack_frames_error {
2514                    Err(anyhow!(error.to_string()))
2515                } else {
2516                    Ok(thread.stack_frames.clone())
2517                }
2518            }
2519            None => Ok(Vec::new()),
2520        }
2521    }
2522
2523    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2524        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2525            && self
2526                .requests
2527                .contains_key(&TypeId::of::<StackTraceCommand>())
2528        {
2529            self.fetch(
2530                ScopesCommand { stack_frame_id },
2531                move |this, scopes, cx| {
2532                    let Some(scopes) = scopes.log_err() else {
2533                        return
2534                    };
2535
2536                    for scope in scopes.iter() {
2537                        this.variables(scope.variables_reference, cx);
2538                    }
2539
2540                    let entry = this
2541                        .active_snapshot
2542                        .stack_frames
2543                        .entry(stack_frame_id)
2544                        .and_modify(|stack_frame| {
2545                            stack_frame.scopes = scopes;
2546                        });
2547
2548                    cx.emit(SessionEvent::Variables);
2549
2550                    debug_assert!(
2551                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2552                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2553                    );
2554                },
2555                cx,
2556            );
2557        }
2558
2559        self.session_state()
2560            .stack_frames
2561            .get(&stack_frame_id)
2562            .map(|frame| frame.scopes.as_slice())
2563            .unwrap_or_default()
2564    }
2565
2566    pub fn variables_by_stack_frame_id(
2567        &self,
2568        stack_frame_id: StackFrameId,
2569        globals: bool,
2570        locals: bool,
2571    ) -> Vec<dap::Variable> {
2572        let state = self.session_state();
2573        let Some(stack_frame) = state.stack_frames.get(&stack_frame_id) else {
2574            return Vec::new();
2575        };
2576
2577        stack_frame
2578            .scopes
2579            .iter()
2580            .filter(|scope| {
2581                (scope.name.to_lowercase().contains("local") && locals)
2582                    || (scope.name.to_lowercase().contains("global") && globals)
2583            })
2584            .filter_map(|scope| state.variables.get(&scope.variables_reference))
2585            .flatten()
2586            .cloned()
2587            .collect()
2588    }
2589
2590    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2591        &self.watchers
2592    }
2593
2594    pub fn add_watcher(
2595        &mut self,
2596        expression: SharedString,
2597        frame_id: u64,
2598        cx: &mut Context<Self>,
2599    ) -> Task<Result<()>> {
2600        let request = self.state.request_dap(EvaluateCommand {
2601            expression: expression.to_string(),
2602            context: Some(EvaluateArgumentsContext::Watch),
2603            frame_id: Some(frame_id),
2604            source: None,
2605        });
2606
2607        cx.spawn(async move |this, cx| {
2608            let response = request.await?;
2609
2610            this.update(cx, |session, cx| {
2611                session.watchers.insert(
2612                    expression.clone(),
2613                    Watcher {
2614                        expression,
2615                        value: response.result.into(),
2616                        variables_reference: response.variables_reference,
2617                        presentation_hint: response.presentation_hint,
2618                    },
2619                );
2620                cx.emit(SessionEvent::Watchers);
2621            })
2622        })
2623    }
2624
2625    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2626        let watches = self.watchers.clone();
2627        for (_, watch) in watches.into_iter() {
2628            self.add_watcher(watch.expression.clone(), frame_id, cx)
2629                .detach();
2630        }
2631    }
2632
2633    pub fn remove_watcher(&mut self, expression: SharedString) {
2634        self.watchers.remove(&expression);
2635    }
2636
2637    pub fn variables(
2638        &mut self,
2639        variables_reference: VariableReference,
2640        cx: &mut Context<Self>,
2641    ) -> Vec<dap::Variable> {
2642        let command = VariablesCommand {
2643            variables_reference,
2644            filter: None,
2645            start: None,
2646            count: None,
2647            format: None,
2648        };
2649
2650        self.fetch(
2651            command,
2652            move |this, variables, cx| {
2653                let Some(variables) = variables.log_err() else {
2654                    return;
2655                };
2656
2657                this.active_snapshot
2658                    .variables
2659                    .insert(variables_reference, variables);
2660
2661                cx.emit(SessionEvent::Variables);
2662                cx.emit(SessionEvent::InvalidateInlineValue);
2663            },
2664            cx,
2665        );
2666
2667        self.session_state()
2668            .variables
2669            .get(&variables_reference)
2670            .cloned()
2671            .unwrap_or_default()
2672    }
2673
2674    pub fn data_breakpoint_info(
2675        &mut self,
2676        context: Arc<DataBreakpointContext>,
2677        mode: Option<String>,
2678        cx: &mut Context<Self>,
2679    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2680        let command = DataBreakpointInfoCommand { context, mode };
2681
2682        self.request(command, |_, response, _| response.ok(), cx)
2683    }
2684
2685    pub fn set_variable_value(
2686        &mut self,
2687        stack_frame_id: u64,
2688        variables_reference: u64,
2689        name: String,
2690        value: String,
2691        cx: &mut Context<Self>,
2692    ) {
2693        if self.capabilities.supports_set_variable.unwrap_or_default() {
2694            self.request(
2695                SetVariableValueCommand {
2696                    name,
2697                    value,
2698                    variables_reference,
2699                },
2700                move |this, response, cx| {
2701                    let response = response.log_err()?;
2702                    this.invalidate_command_type::<VariablesCommand>();
2703                    this.invalidate_command_type::<ReadMemory>();
2704                    this.memory.clear(cx.background_executor());
2705                    this.refresh_watchers(stack_frame_id, cx);
2706                    cx.emit(SessionEvent::Variables);
2707                    Some(response)
2708                },
2709                cx,
2710            )
2711            .detach();
2712        }
2713    }
2714
2715    pub fn evaluate(
2716        &mut self,
2717        expression: String,
2718        context: Option<EvaluateArgumentsContext>,
2719        frame_id: Option<u64>,
2720        source: Option<Source>,
2721        cx: &mut Context<Self>,
2722    ) -> Task<()> {
2723        let event = dap::OutputEvent {
2724            category: None,
2725            output: format!("> {expression}"),
2726            group: None,
2727            variables_reference: None,
2728            source: None,
2729            line: None,
2730            column: None,
2731            data: None,
2732            location_reference: None,
2733        };
2734        self.push_output(event);
2735        let request = self.state.request_dap(EvaluateCommand {
2736            expression,
2737            context,
2738            frame_id,
2739            source,
2740        });
2741        cx.spawn(async move |this, cx| {
2742            let response = request.await;
2743            this.update(cx, |this, cx| {
2744                this.memory.clear(cx.background_executor());
2745                this.invalidate_command_type::<ReadMemory>();
2746                this.invalidate_command_type::<VariablesCommand>();
2747                cx.emit(SessionEvent::Variables);
2748                match response {
2749                    Ok(response) => {
2750                        let event = dap::OutputEvent {
2751                            category: None,
2752                            output: format!("< {}", &response.result),
2753                            group: None,
2754                            variables_reference: Some(response.variables_reference),
2755                            source: None,
2756                            line: None,
2757                            column: None,
2758                            data: None,
2759                            location_reference: None,
2760                        };
2761                        this.push_output(event);
2762                    }
2763                    Err(e) => {
2764                        let event = dap::OutputEvent {
2765                            category: None,
2766                            output: format!("{}", e),
2767                            group: None,
2768                            variables_reference: None,
2769                            source: None,
2770                            line: None,
2771                            column: None,
2772                            data: None,
2773                            location_reference: None,
2774                        };
2775                        this.push_output(event);
2776                    }
2777                };
2778                cx.notify();
2779            })
2780            .ok();
2781        })
2782    }
2783
2784    pub fn location(
2785        &mut self,
2786        reference: u64,
2787        cx: &mut Context<Self>,
2788    ) -> Option<dap::LocationsResponse> {
2789        self.fetch(
2790            LocationsCommand { reference },
2791            move |this, response, _| {
2792                let Some(response) = response.log_err() else {
2793                    return;
2794                };
2795                this.active_snapshot.locations.insert(reference, response);
2796            },
2797            cx,
2798        );
2799        self.session_state().locations.get(&reference).cloned()
2800    }
2801
2802    pub fn is_attached(&self) -> bool {
2803        let SessionState::Running(local_mode) = &self.state else {
2804            return false;
2805        };
2806        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2807    }
2808
2809    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2810        let command = DisconnectCommand {
2811            restart: Some(false),
2812            terminate_debuggee: Some(false),
2813            suspend_debuggee: Some(false),
2814        };
2815
2816        self.request(command, Self::empty_response, cx).detach()
2817    }
2818
2819    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2820        if self
2821            .capabilities
2822            .supports_terminate_threads_request
2823            .unwrap_or_default()
2824        {
2825            self.request(
2826                TerminateThreadsCommand {
2827                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2828                },
2829                Self::clear_active_debug_line_response,
2830                cx,
2831            )
2832            .detach();
2833        } else {
2834            self.shutdown(cx).detach();
2835        }
2836    }
2837
2838    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2839        self.session_state().thread_states.thread_state(thread_id)
2840    }
2841
2842    pub fn quirks(&self) -> SessionQuirks {
2843        self.quirks
2844    }
2845
2846    fn launch_browser_for_remote_server(
2847        &mut self,
2848        mut request: LaunchBrowserInCompanionParams,
2849        cx: &mut Context<Self>,
2850    ) {
2851        let Some(remote_client) = self.remote_client.clone() else {
2852            log::error!("can't launch browser in companion for non-remote project");
2853            return;
2854        };
2855        let Some(http_client) = self.http_client.clone() else {
2856            return;
2857        };
2858        let Some(node_runtime) = self.node_runtime.clone() else {
2859            return;
2860        };
2861
2862        let mut console_output = self.console_output(cx);
2863        let task = cx.spawn(async move |this, cx| {
2864            let forward_ports_process = if remote_client
2865                .read_with(cx, |client, _| client.shares_network_interface())?
2866            {
2867                request.other.insert(
2868                    "proxyUri".into(),
2869                    format!("127.0.0.1:{}", request.server_port).into(),
2870                );
2871                None
2872            } else {
2873                let port = TcpTransport::unused_port(Ipv4Addr::LOCALHOST)
2874                    .await
2875                    .context("getting port for DAP")?;
2876                request
2877                    .other
2878                    .insert("proxyUri".into(), format!("127.0.0.1:{port}").into());
2879                let mut port_forwards = vec![(port, "localhost".to_owned(), request.server_port)];
2880
2881                if let Some(value) = request.params.get("url")
2882                    && let Some(url) = value.as_str()
2883                    && let Some(url) = Url::parse(url).ok()
2884                    && let Some(frontend_port) = url.port()
2885                {
2886                    port_forwards.push((frontend_port, "localhost".to_owned(), frontend_port));
2887                }
2888
2889                let child = remote_client.update(cx, |client, _| {
2890                    let command = client.build_forward_ports_command(port_forwards)?;
2891                    let child = new_smol_command(command.program)
2892                        .args(command.args)
2893                        .envs(command.env)
2894                        .spawn()
2895                        .context("spawning port forwarding process")?;
2896                    anyhow::Ok(child)
2897                })??;
2898                Some(child)
2899            };
2900
2901            let mut companion_process = None;
2902            let companion_port =
2903                if let Some(companion_port) = this.read_with(cx, |this, _| this.companion_port)? {
2904                    companion_port
2905                } else {
2906                    let task = cx.spawn(async move |cx| spawn_companion(node_runtime, cx).await);
2907                    match task.await {
2908                        Ok((port, child)) => {
2909                            companion_process = Some(child);
2910                            port
2911                        }
2912                        Err(e) => {
2913                            console_output
2914                                .send(format!("Failed to launch browser companion process: {e}"))
2915                                .await
2916                                .ok();
2917                            return Err(e);
2918                        }
2919                    }
2920                };
2921
2922            let mut background_tasks = Vec::new();
2923            if let Some(mut forward_ports_process) = forward_ports_process {
2924                background_tasks.push(cx.spawn(async move |_| {
2925                    forward_ports_process.status().await.log_err();
2926                }));
2927            };
2928            if let Some(mut companion_process) = companion_process {
2929                if let Some(stderr) = companion_process.stderr.take() {
2930                    let mut console_output = console_output.clone();
2931                    background_tasks.push(cx.spawn(async move |_| {
2932                        let mut stderr = BufReader::new(stderr);
2933                        let mut line = String::new();
2934                        while let Ok(n) = stderr.read_line(&mut line).await
2935                            && n > 0
2936                        {
2937                            console_output
2938                                .send(format!("companion stderr: {line}"))
2939                                .await
2940                                .ok();
2941                            line.clear();
2942                        }
2943                    }));
2944                }
2945                background_tasks.push(cx.spawn({
2946                    let mut console_output = console_output.clone();
2947                    async move |_| match companion_process.status().await {
2948                        Ok(status) => {
2949                            if status.success() {
2950                                console_output
2951                                    .send("Companion process exited normally".into())
2952                                    .await
2953                                    .ok();
2954                            } else {
2955                                console_output
2956                                    .send(format!(
2957                                        "Companion process exited abnormally with {status:?}"
2958                                    ))
2959                                    .await
2960                                    .ok();
2961                            }
2962                        }
2963                        Err(e) => {
2964                            console_output
2965                                .send(format!("Failed to join companion process: {e}"))
2966                                .await
2967                                .ok();
2968                        }
2969                    }
2970                }));
2971            }
2972
2973            // TODO pass wslInfo as needed
2974
2975            let companion_address = format!("127.0.0.1:{companion_port}");
2976            let mut companion_started = false;
2977            for _ in 0..10 {
2978                if TcpStream::connect(&companion_address).await.is_ok() {
2979                    companion_started = true;
2980                    break;
2981                }
2982                cx.background_executor()
2983                    .timer(Duration::from_millis(100))
2984                    .await;
2985            }
2986            if !companion_started {
2987                console_output
2988                    .send("Browser companion failed to start".into())
2989                    .await
2990                    .ok();
2991                bail!("Browser companion failed to start");
2992            }
2993
2994            let response = http_client
2995                .post_json(
2996                    &format!("http://{companion_address}/launch-and-attach"),
2997                    serde_json::to_string(&request)
2998                        .context("serializing request")?
2999                        .into(),
3000                )
3001                .await;
3002            match response {
3003                Ok(response) => {
3004                    if !response.status().is_success() {
3005                        console_output
3006                            .send("Launch request to companion failed".into())
3007                            .await
3008                            .ok();
3009                        return Err(anyhow!("launch request failed"));
3010                    }
3011                }
3012                Err(e) => {
3013                    console_output
3014                        .send("Failed to read response from companion".into())
3015                        .await
3016                        .ok();
3017                    return Err(e);
3018                }
3019            }
3020
3021            this.update(cx, |this, _| {
3022                this.background_tasks.extend(background_tasks);
3023                this.companion_port = Some(companion_port);
3024            })?;
3025
3026            anyhow::Ok(())
3027        });
3028        self.background_tasks.push(cx.spawn(async move |_, _| {
3029            task.await.log_err();
3030        }));
3031    }
3032
3033    fn kill_browser(&self, request: KillCompanionBrowserParams, cx: &mut App) {
3034        let Some(companion_port) = self.companion_port else {
3035            log::error!("received killCompanionBrowser but js-debug-companion is not running");
3036            return;
3037        };
3038        let Some(http_client) = self.http_client.clone() else {
3039            return;
3040        };
3041
3042        cx.spawn(async move |_| {
3043            http_client
3044                .post_json(
3045                    &format!("http://127.0.0.1:{companion_port}/kill"),
3046                    serde_json::to_string(&request)
3047                        .context("serializing request")?
3048                        .into(),
3049                )
3050                .await?;
3051            anyhow::Ok(())
3052        })
3053        .detach_and_log_err(cx)
3054    }
3055}
3056
3057#[derive(Serialize, Deserialize, Debug)]
3058#[serde(rename_all = "camelCase")]
3059struct LaunchBrowserInCompanionParams {
3060    server_port: u16,
3061    params: HashMap<String, serde_json::Value>,
3062    #[serde(flatten)]
3063    other: HashMap<String, serde_json::Value>,
3064}
3065
3066#[derive(Serialize, Deserialize, Debug)]
3067#[serde(rename_all = "camelCase")]
3068struct KillCompanionBrowserParams {
3069    launch_id: u64,
3070}
3071
3072async fn spawn_companion(
3073    node_runtime: NodeRuntime,
3074    cx: &mut AsyncApp,
3075) -> Result<(u16, smol::process::Child)> {
3076    let binary_path = node_runtime
3077        .binary_path()
3078        .await
3079        .context("getting node path")?;
3080    let path = cx
3081        .spawn(async move |cx| get_or_install_companion(node_runtime, cx).await)
3082        .await?;
3083    log::info!("will launch js-debug-companion version {path:?}");
3084
3085    let port = {
3086        let listener = TcpListener::bind("127.0.0.1:0")
3087            .await
3088            .context("getting port for companion")?;
3089        listener.local_addr()?.port()
3090    };
3091
3092    let dir = paths::data_dir()
3093        .join("js_debug_companion_state")
3094        .to_string_lossy()
3095        .to_string();
3096
3097    let child = new_smol_command(binary_path)
3098        .arg(path)
3099        .args([
3100            format!("--listen=127.0.0.1:{port}"),
3101            format!("--state={dir}"),
3102        ])
3103        .stdin(Stdio::piped())
3104        .stdout(Stdio::piped())
3105        .stderr(Stdio::piped())
3106        .spawn()
3107        .context("spawning companion child process")?;
3108
3109    Ok((port, child))
3110}
3111
3112async fn get_or_install_companion(node: NodeRuntime, cx: &mut AsyncApp) -> Result<PathBuf> {
3113    const PACKAGE_NAME: &str = "@zed-industries/js-debug-companion-cli";
3114
3115    async fn install_latest_version(dir: PathBuf, node: NodeRuntime) -> Result<PathBuf> {
3116        let temp_dir = tempfile::tempdir().context("creating temporary directory")?;
3117        node.npm_install_packages(temp_dir.path(), &[(PACKAGE_NAME, "latest")])
3118            .await
3119            .context("installing latest companion package")?;
3120        let version = node
3121            .npm_package_installed_version(temp_dir.path(), PACKAGE_NAME)
3122            .await
3123            .context("getting installed companion version")?
3124            .context("companion was not installed")?;
3125        smol::fs::rename(temp_dir.path(), dir.join(&version))
3126            .await
3127            .context("moving companion package into place")?;
3128        Ok(dir.join(version))
3129    }
3130
3131    let dir = paths::debug_adapters_dir().join("js-debug-companion");
3132    let (latest_installed_version, latest_version) = cx
3133        .background_spawn({
3134            let dir = dir.clone();
3135            let node = node.clone();
3136            async move {
3137                smol::fs::create_dir_all(&dir)
3138                    .await
3139                    .context("creating companion installation directory")?;
3140
3141                let mut children = smol::fs::read_dir(&dir)
3142                    .await
3143                    .context("reading companion installation directory")?
3144                    .try_collect::<Vec<_>>()
3145                    .await
3146                    .context("reading companion installation directory entries")?;
3147                children
3148                    .sort_by_key(|child| semver::Version::parse(child.file_name().to_str()?).ok());
3149
3150                let latest_installed_version = children.last().and_then(|child| {
3151                    let version = child.file_name().into_string().ok()?;
3152                    Some((child.path(), version))
3153                });
3154                let latest_version = node
3155                    .npm_package_latest_version(PACKAGE_NAME)
3156                    .await
3157                    .log_err();
3158                anyhow::Ok((latest_installed_version, latest_version))
3159            }
3160        })
3161        .await?;
3162
3163    let path = if let Some((installed_path, installed_version)) = latest_installed_version {
3164        if let Some(latest_version) = latest_version
3165            && latest_version != installed_version
3166        {
3167            cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3168                .detach();
3169        }
3170        Ok(installed_path)
3171    } else {
3172        cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3173            .await
3174    };
3175
3176    Ok(path?
3177        .join("node_modules")
3178        .join(PACKAGE_NAME)
3179        .join("out")
3180        .join("cli.js"))
3181}