session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
   3use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
   4
   5use super::breakpoint_store::{
   6    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   7};
   8use super::dap_command::{
   9    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
  10    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
  11    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  12    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
  13    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  14    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  15};
  16use super::dap_store::DapStore;
  17use anyhow::{Context as _, Result, anyhow, bail};
  18use base64::Engine;
  19use collections::{HashMap, HashSet, IndexMap};
  20use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  21use dap::messages::Response;
  22use dap::requests::{Request, RunInTerminal, StartDebugging};
  23use dap::transport::TcpTransport;
  24use dap::{
  25    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  26    SteppingGranularity, StoppedEvent, VariableReference,
  27    client::{DebugAdapterClient, SessionId},
  28    messages::{Events, Message},
  29};
  30use dap::{
  31    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  32    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  33    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  34};
  35use futures::channel::mpsc::UnboundedSender;
  36use futures::channel::{mpsc, oneshot};
  37use futures::io::BufReader;
  38use futures::{AsyncBufReadExt as _, SinkExt, StreamExt, TryStreamExt};
  39use futures::{FutureExt, future::Shared};
  40use gpui::{
  41    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  42    Task, WeakEntity,
  43};
  44use http_client::HttpClient;
  45
  46use node_runtime::NodeRuntime;
  47use remote::RemoteClient;
  48use rpc::ErrorExt;
  49use serde::{Deserialize, Serialize};
  50use serde_json::Value;
  51use smol::net::{TcpListener, TcpStream};
  52use std::any::TypeId;
  53use std::collections::BTreeMap;
  54use std::net::Ipv4Addr;
  55use std::ops::RangeInclusive;
  56use std::path::PathBuf;
  57use std::process::Stdio;
  58use std::time::Duration;
  59use std::u64;
  60use std::{
  61    any::Any,
  62    collections::hash_map::Entry,
  63    hash::{Hash, Hasher},
  64    path::Path,
  65    sync::Arc,
  66};
  67use task::TaskContext;
  68use text::{PointUtf16, ToPointUtf16};
  69use url::Url;
  70use util::command::new_smol_command;
  71use util::{ResultExt, debug_panic, maybe};
  72use worktree::Worktree;
  73
  74#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  75#[repr(transparent)]
  76pub struct ThreadId(pub i64);
  77
  78impl From<i64> for ThreadId {
  79    fn from(id: i64) -> Self {
  80        Self(id)
  81    }
  82}
  83
  84#[derive(Clone, Debug)]
  85pub struct StackFrame {
  86    pub dap: dap::StackFrame,
  87    pub scopes: Vec<dap::Scope>,
  88}
  89
  90impl From<dap::StackFrame> for StackFrame {
  91    fn from(stack_frame: dap::StackFrame) -> Self {
  92        Self {
  93            scopes: vec![],
  94            dap: stack_frame,
  95        }
  96    }
  97}
  98
  99#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
 100pub enum ThreadStatus {
 101    #[default]
 102    Running,
 103    Stopped,
 104    Stepping,
 105    Exited,
 106    Ended,
 107}
 108
 109impl ThreadStatus {
 110    pub fn label(&self) -> &'static str {
 111        match self {
 112            ThreadStatus::Running => "Running",
 113            ThreadStatus::Stopped => "Stopped",
 114            ThreadStatus::Stepping => "Stepping",
 115            ThreadStatus::Exited => "Exited",
 116            ThreadStatus::Ended => "Ended",
 117        }
 118    }
 119}
 120
 121#[derive(Debug)]
 122pub struct Thread {
 123    dap: dap::Thread,
 124    stack_frames: Vec<StackFrame>,
 125    stack_frames_error: Option<anyhow::Error>,
 126    _has_stopped: bool,
 127}
 128
 129impl From<dap::Thread> for Thread {
 130    fn from(dap: dap::Thread) -> Self {
 131        Self {
 132            dap,
 133            stack_frames: Default::default(),
 134            stack_frames_error: None,
 135            _has_stopped: false,
 136        }
 137    }
 138}
 139
 140#[derive(Debug, Clone, PartialEq)]
 141pub struct Watcher {
 142    pub expression: SharedString,
 143    pub value: SharedString,
 144    pub variables_reference: u64,
 145    pub presentation_hint: Option<VariablePresentationHint>,
 146}
 147
 148#[derive(Debug, Clone, PartialEq)]
 149pub struct DataBreakpointState {
 150    pub dap: dap::DataBreakpoint,
 151    pub is_enabled: bool,
 152    pub context: Arc<DataBreakpointContext>,
 153}
 154
 155pub enum SessionState {
 156    /// Represents a session that is building/initializing
 157    /// even if a session doesn't have a pre build task this state
 158    /// is used to run all the async tasks that are required to start the session
 159    Booting(Option<Task<Result<()>>>),
 160    Running(RunningMode),
 161}
 162
 163#[derive(Clone)]
 164pub struct RunningMode {
 165    client: Arc<DebugAdapterClient>,
 166    binary: DebugAdapterBinary,
 167    tmp_breakpoint: Option<SourceBreakpoint>,
 168    worktree: WeakEntity<Worktree>,
 169    executor: BackgroundExecutor,
 170    is_started: bool,
 171    has_ever_stopped: bool,
 172    messages_tx: UnboundedSender<Message>,
 173}
 174
 175#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 176pub struct SessionQuirks {
 177    pub compact: bool,
 178    pub prefer_thread_name: bool,
 179}
 180
 181fn client_source(abs_path: &Path) -> dap::Source {
 182    dap::Source {
 183        name: abs_path
 184            .file_name()
 185            .map(|filename| filename.to_string_lossy().into_owned()),
 186        path: Some(abs_path.to_string_lossy().into_owned()),
 187        source_reference: None,
 188        presentation_hint: None,
 189        origin: None,
 190        sources: None,
 191        adapter_data: None,
 192        checksums: None,
 193    }
 194}
 195
 196impl RunningMode {
 197    async fn new(
 198        session_id: SessionId,
 199        parent_session: Option<Entity<Session>>,
 200        worktree: WeakEntity<Worktree>,
 201        binary: DebugAdapterBinary,
 202        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 203        cx: &mut AsyncApp,
 204    ) -> Result<Self> {
 205        let message_handler = Box::new({
 206            let messages_tx = messages_tx.clone();
 207            move |message| {
 208                messages_tx.unbounded_send(message).ok();
 209            }
 210        });
 211
 212        let client = if let Some(client) = parent_session
 213            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 214            .flatten()
 215        {
 216            client
 217                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 218                .await?
 219        } else {
 220            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 221        };
 222
 223        Ok(Self {
 224            client: Arc::new(client),
 225            worktree,
 226            tmp_breakpoint: None,
 227            binary,
 228            executor: cx.background_executor().clone(),
 229            is_started: false,
 230            has_ever_stopped: false,
 231            messages_tx,
 232        })
 233    }
 234
 235    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 236        &self.worktree
 237    }
 238
 239    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 240        let tasks: Vec<_> = paths
 241            .iter()
 242            .map(|path| {
 243                self.request(dap_command::SetBreakpoints {
 244                    source: client_source(path),
 245                    source_modified: None,
 246                    breakpoints: vec![],
 247                })
 248            })
 249            .collect();
 250
 251        cx.background_spawn(async move {
 252            futures::future::join_all(tasks)
 253                .await
 254                .iter()
 255                .for_each(|res| match res {
 256                    Ok(_) => {}
 257                    Err(err) => {
 258                        log::warn!("Set breakpoints request failed: {}", err);
 259                    }
 260                });
 261        })
 262    }
 263
 264    fn send_breakpoints_from_path(
 265        &self,
 266        abs_path: Arc<Path>,
 267        reason: BreakpointUpdatedReason,
 268        breakpoint_store: &Entity<BreakpointStore>,
 269        cx: &mut App,
 270    ) -> Task<()> {
 271        let breakpoints =
 272            breakpoint_store
 273                .read(cx)
 274                .source_breakpoints_from_path(&abs_path, cx)
 275                .into_iter()
 276                .filter(|bp| bp.state.is_enabled())
 277                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 278                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 279                }))
 280                .map(Into::into)
 281                .collect();
 282
 283        let raw_breakpoints = breakpoint_store
 284            .read(cx)
 285            .breakpoints_from_path(&abs_path)
 286            .into_iter()
 287            .filter(|bp| bp.bp.state.is_enabled())
 288            .collect::<Vec<_>>();
 289
 290        let task = self.request(dap_command::SetBreakpoints {
 291            source: client_source(&abs_path),
 292            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 293            breakpoints,
 294        });
 295        let session_id = self.client.id();
 296        let breakpoint_store = breakpoint_store.downgrade();
 297        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 298            Ok(breakpoints) => {
 299                let breakpoints =
 300                    breakpoints
 301                        .into_iter()
 302                        .zip(raw_breakpoints)
 303                        .filter_map(|(dap_bp, zed_bp)| {
 304                            Some((
 305                                zed_bp,
 306                                BreakpointSessionState {
 307                                    id: dap_bp.id?,
 308                                    verified: dap_bp.verified,
 309                                },
 310                            ))
 311                        });
 312                breakpoint_store
 313                    .update(cx, |this, _| {
 314                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 315                    })
 316                    .ok();
 317            }
 318            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 319        })
 320    }
 321
 322    fn send_exception_breakpoints(
 323        &self,
 324        filters: Vec<ExceptionBreakpointsFilter>,
 325        supports_filter_options: bool,
 326    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 327        let arg = if supports_filter_options {
 328            SetExceptionBreakpoints::WithOptions {
 329                filters: filters
 330                    .into_iter()
 331                    .map(|filter| ExceptionFilterOptions {
 332                        filter_id: filter.filter,
 333                        condition: None,
 334                        mode: None,
 335                    })
 336                    .collect(),
 337            }
 338        } else {
 339            SetExceptionBreakpoints::Plain {
 340                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 341            }
 342        };
 343        self.request(arg)
 344    }
 345
 346    fn send_source_breakpoints(
 347        &self,
 348        ignore_breakpoints: bool,
 349        breakpoint_store: &Entity<BreakpointStore>,
 350        cx: &App,
 351    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 352        let mut breakpoint_tasks = Vec::new();
 353        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 354        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 355        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 356        let session_id = self.client.id();
 357        for (path, breakpoints) in breakpoints {
 358            let breakpoints = if ignore_breakpoints {
 359                vec![]
 360            } else {
 361                breakpoints
 362                    .into_iter()
 363                    .filter(|bp| bp.state.is_enabled())
 364                    .map(Into::into)
 365                    .collect()
 366            };
 367
 368            let raw_breakpoints = raw_breakpoints
 369                .remove(&path)
 370                .unwrap_or_default()
 371                .into_iter()
 372                .filter(|bp| bp.bp.state.is_enabled());
 373            let error_path = path.clone();
 374            let send_request = self
 375                .request(dap_command::SetBreakpoints {
 376                    source: client_source(&path),
 377                    source_modified: Some(false),
 378                    breakpoints,
 379                })
 380                .map(|result| result.map_err(move |e| (error_path, e)));
 381
 382            let task = cx.spawn({
 383                let breakpoint_store = breakpoint_store.downgrade();
 384                async move |cx| {
 385                    let breakpoints = cx.background_spawn(send_request).await?;
 386
 387                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 388                        |(dap_bp, zed_bp)| {
 389                            Some((
 390                                zed_bp,
 391                                BreakpointSessionState {
 392                                    id: dap_bp.id?,
 393                                    verified: dap_bp.verified,
 394                                },
 395                            ))
 396                        },
 397                    );
 398                    breakpoint_store
 399                        .update(cx, |this, _| {
 400                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 401                        })
 402                        .ok();
 403
 404                    Ok(())
 405                }
 406            });
 407            breakpoint_tasks.push(task);
 408        }
 409
 410        cx.background_spawn(async move {
 411            futures::future::join_all(breakpoint_tasks)
 412                .await
 413                .into_iter()
 414                .filter_map(Result::err)
 415                .collect::<HashMap<_, _>>()
 416        })
 417    }
 418
 419    fn initialize_sequence(
 420        &self,
 421        capabilities: &Capabilities,
 422        initialized_rx: oneshot::Receiver<()>,
 423        dap_store: WeakEntity<DapStore>,
 424        cx: &mut Context<Session>,
 425    ) -> Task<Result<()>> {
 426        let raw = self.binary.request_args.clone();
 427
 428        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 429        let launch = match raw.request {
 430            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 431                raw: raw.configuration,
 432            }),
 433            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 434                raw: raw.configuration,
 435            }),
 436        };
 437
 438        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 439        // From spec (on initialization sequence):
 440        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 441        //
 442        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 443        let should_send_exception_breakpoints = capabilities
 444            .exception_breakpoint_filters
 445            .as_ref()
 446            .is_some_and(|filters| !filters.is_empty())
 447            || !configuration_done_supported;
 448        let supports_exception_filters = capabilities
 449            .supports_exception_filter_options
 450            .unwrap_or_default();
 451        let this = self.clone();
 452        let worktree = self.worktree().clone();
 453        let mut filters = capabilities
 454            .exception_breakpoint_filters
 455            .clone()
 456            .unwrap_or_default();
 457        let configuration_sequence = cx.spawn({
 458            async move |session, cx| {
 459                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 460                let (breakpoint_store, adapter_defaults) =
 461                    dap_store.read_with(cx, |dap_store, _| {
 462                        (
 463                            dap_store.breakpoint_store().clone(),
 464                            dap_store.adapter_options(&adapter_name),
 465                        )
 466                    })?;
 467                initialized_rx.await?;
 468                let errors_by_path = cx
 469                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 470                    .await;
 471
 472                dap_store.update(cx, |_, cx| {
 473                    let Some(worktree) = worktree.upgrade() else {
 474                        return;
 475                    };
 476
 477                    for (path, error) in &errors_by_path {
 478                        log::error!("failed to set breakpoints for {path:?}: {error}");
 479                    }
 480
 481                    if let Some(failed_path) = errors_by_path.keys().next() {
 482                        let failed_path = failed_path
 483                            .strip_prefix(worktree.read(cx).abs_path())
 484                            .unwrap_or(failed_path)
 485                            .display();
 486                        let message = format!(
 487                            "Failed to set breakpoints for {failed_path}{}",
 488                            match errors_by_path.len() {
 489                                0 => unreachable!(),
 490                                1 => "".into(),
 491                                2 => " and 1 other path".into(),
 492                                n => format!(" and {} other paths", n - 1),
 493                            }
 494                        );
 495                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 496                    }
 497                })?;
 498
 499                if should_send_exception_breakpoints {
 500                    _ = session.update(cx, |this, _| {
 501                        filters.retain(|filter| {
 502                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 503                                defaults
 504                                    .exception_breakpoints
 505                                    .get(&filter.filter)
 506                                    .map(|options| options.enabled)
 507                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 508                            } else {
 509                                filter.default.unwrap_or_default()
 510                            };
 511                            this.exception_breakpoints
 512                                .entry(filter.filter.clone())
 513                                .or_insert_with(|| (filter.clone(), is_enabled));
 514                            is_enabled
 515                        });
 516                    });
 517
 518                    this.send_exception_breakpoints(filters, supports_exception_filters)
 519                        .await
 520                        .ok();
 521                }
 522
 523                if configuration_done_supported {
 524                    this.request(ConfigurationDone {})
 525                } else {
 526                    Task::ready(Ok(()))
 527                }
 528                .await
 529            }
 530        });
 531
 532        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 533
 534        cx.spawn(async move |this, cx| {
 535            let result = task.await;
 536
 537            this.update(cx, |this, cx| {
 538                if let Some(this) = this.as_running_mut() {
 539                    this.is_started = true;
 540                    cx.notify();
 541                }
 542            })
 543            .ok();
 544
 545            result?;
 546            anyhow::Ok(())
 547        })
 548    }
 549
 550    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 551        let client = self.client.clone();
 552        let messages_tx = self.messages_tx.clone();
 553        let message_handler = Box::new(move |message| {
 554            messages_tx.unbounded_send(message).ok();
 555        });
 556        if client.should_reconnect_for_ssh() {
 557            Some(cx.spawn(async move |cx| {
 558                client.connect(message_handler, cx).await?;
 559                anyhow::Ok(())
 560            }))
 561        } else {
 562            None
 563        }
 564    }
 565
 566    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 567    where
 568        <R::DapRequest as dap::requests::Request>::Response: 'static,
 569        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 570    {
 571        let request = Arc::new(request);
 572
 573        let request_clone = request.clone();
 574        let connection = self.client.clone();
 575        self.executor.spawn(async move {
 576            let args = request_clone.to_dap();
 577            let response = connection.request::<R::DapRequest>(args).await?;
 578            request.response_from_dap(response)
 579        })
 580    }
 581}
 582
 583impl SessionState {
 584    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 585    where
 586        <R::DapRequest as dap::requests::Request>::Response: 'static,
 587        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 588    {
 589        match self {
 590            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 591            SessionState::Booting(_) => Task::ready(Err(anyhow!(
 592                "no adapter running to send request: {request:?}"
 593            ))),
 594        }
 595    }
 596
 597    /// Did this debug session stop at least once?
 598    pub(crate) fn has_ever_stopped(&self) -> bool {
 599        match self {
 600            SessionState::Booting(_) => false,
 601            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 602        }
 603    }
 604
 605    fn stopped(&mut self) {
 606        if let SessionState::Running(running) = self {
 607            running.has_ever_stopped = true;
 608        }
 609    }
 610}
 611
 612#[derive(Default)]
 613struct ThreadStates {
 614    global_state: Option<ThreadStatus>,
 615    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 616}
 617
 618impl ThreadStates {
 619    fn stop_all_threads(&mut self) {
 620        self.global_state = Some(ThreadStatus::Stopped);
 621        self.known_thread_states.clear();
 622    }
 623
 624    fn exit_all_threads(&mut self) {
 625        self.global_state = Some(ThreadStatus::Exited);
 626        self.known_thread_states.clear();
 627    }
 628
 629    fn continue_all_threads(&mut self) {
 630        self.global_state = Some(ThreadStatus::Running);
 631        self.known_thread_states.clear();
 632    }
 633
 634    fn stop_thread(&mut self, thread_id: ThreadId) {
 635        self.known_thread_states
 636            .insert(thread_id, ThreadStatus::Stopped);
 637    }
 638
 639    fn continue_thread(&mut self, thread_id: ThreadId) {
 640        self.known_thread_states
 641            .insert(thread_id, ThreadStatus::Running);
 642    }
 643
 644    fn process_step(&mut self, thread_id: ThreadId) {
 645        self.known_thread_states
 646            .insert(thread_id, ThreadStatus::Stepping);
 647    }
 648
 649    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 650        self.thread_state(thread_id)
 651            .unwrap_or(ThreadStatus::Running)
 652    }
 653
 654    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 655        self.known_thread_states
 656            .get(&thread_id)
 657            .copied()
 658            .or(self.global_state)
 659    }
 660
 661    fn exit_thread(&mut self, thread_id: ThreadId) {
 662        self.known_thread_states
 663            .insert(thread_id, ThreadStatus::Exited);
 664    }
 665
 666    fn any_stopped_thread(&self) -> bool {
 667        self.global_state
 668            .is_some_and(|state| state == ThreadStatus::Stopped)
 669            || self
 670                .known_thread_states
 671                .values()
 672                .any(|status| *status == ThreadStatus::Stopped)
 673    }
 674}
 675const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 676
 677type IsEnabled = bool;
 678
 679#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 680pub struct OutputToken(pub usize);
 681/// Represents a current state of a single debug adapter and provides ways to mutate it.
 682pub struct Session {
 683    pub mode: SessionState,
 684    id: SessionId,
 685    label: Option<SharedString>,
 686    adapter: DebugAdapterName,
 687    pub(super) capabilities: Capabilities,
 688    child_session_ids: HashSet<SessionId>,
 689    parent_session: Option<Entity<Session>>,
 690    modules: Vec<dap::Module>,
 691    loaded_sources: Vec<dap::Source>,
 692    output_token: OutputToken,
 693    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 694    threads: IndexMap<ThreadId, Thread>,
 695    thread_states: ThreadStates,
 696    watchers: HashMap<SharedString, Watcher>,
 697    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 698    stack_frames: IndexMap<StackFrameId, StackFrame>,
 699    locations: HashMap<u64, dap::LocationsResponse>,
 700    is_session_terminated: bool,
 701    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 702    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 703    ignore_breakpoints: bool,
 704    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 705    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 706    background_tasks: Vec<Task<()>>,
 707    restart_task: Option<Task<()>>,
 708    task_context: TaskContext,
 709    memory: memory::Memory,
 710    quirks: SessionQuirks,
 711    remote_client: Option<Entity<RemoteClient>>,
 712    node_runtime: Option<NodeRuntime>,
 713    http_client: Option<Arc<dyn HttpClient>>,
 714    companion_port: Option<u16>,
 715}
 716
 717trait CacheableCommand: Any + Send + Sync {
 718    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 719    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 720    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 721}
 722
 723impl<T> CacheableCommand for T
 724where
 725    T: LocalDapCommand + PartialEq + Eq + Hash,
 726{
 727    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 728        (rhs as &dyn Any).downcast_ref::<Self>() == Some(self)
 729    }
 730
 731    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 732        T::hash(self, &mut hasher);
 733    }
 734
 735    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 736        self
 737    }
 738}
 739
 740pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 741
 742impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 743    fn from(request: T) -> Self {
 744        Self(Arc::new(request))
 745    }
 746}
 747
 748impl PartialEq for RequestSlot {
 749    fn eq(&self, other: &Self) -> bool {
 750        self.0.dyn_eq(other.0.as_ref())
 751    }
 752}
 753
 754impl Eq for RequestSlot {}
 755
 756impl Hash for RequestSlot {
 757    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 758        self.0.dyn_hash(state);
 759        (&*self.0 as &dyn Any).type_id().hash(state)
 760    }
 761}
 762
 763#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 764pub struct CompletionsQuery {
 765    pub query: String,
 766    pub column: u64,
 767    pub line: Option<u64>,
 768    pub frame_id: Option<u64>,
 769}
 770
 771impl CompletionsQuery {
 772    pub fn new(
 773        buffer: &language::Buffer,
 774        cursor_position: language::Anchor,
 775        frame_id: Option<u64>,
 776    ) -> Self {
 777        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 778        Self {
 779            query: buffer.text(),
 780            column: column as u64,
 781            frame_id,
 782            line: Some(row as u64),
 783        }
 784    }
 785}
 786
 787#[derive(Debug)]
 788pub enum SessionEvent {
 789    Modules,
 790    LoadedSources,
 791    Stopped(Option<ThreadId>),
 792    StackTrace,
 793    Variables,
 794    Watchers,
 795    Threads,
 796    InvalidateInlineValue,
 797    CapabilitiesLoaded,
 798    RunInTerminal {
 799        request: RunInTerminalRequestArguments,
 800        sender: mpsc::Sender<Result<u32>>,
 801    },
 802    DataBreakpointInfo,
 803    ConsoleOutput,
 804}
 805
 806#[derive(Clone, Debug, PartialEq, Eq)]
 807pub enum SessionStateEvent {
 808    Running,
 809    Shutdown,
 810    Restart,
 811    SpawnChildSession {
 812        request: StartDebuggingRequestArguments,
 813    },
 814}
 815
 816impl EventEmitter<SessionEvent> for Session {}
 817impl EventEmitter<SessionStateEvent> for Session {}
 818
 819// local session will send breakpoint updates to DAP for all new breakpoints
 820// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 821// BreakpointStore notifies session on breakpoint changes
 822impl Session {
 823    pub(crate) fn new(
 824        breakpoint_store: Entity<BreakpointStore>,
 825        session_id: SessionId,
 826        parent_session: Option<Entity<Session>>,
 827        label: Option<SharedString>,
 828        adapter: DebugAdapterName,
 829        task_context: TaskContext,
 830        quirks: SessionQuirks,
 831        remote_client: Option<Entity<RemoteClient>>,
 832        node_runtime: Option<NodeRuntime>,
 833        http_client: Option<Arc<dyn HttpClient>>,
 834        cx: &mut App,
 835    ) -> Entity<Self> {
 836        cx.new::<Self>(|cx| {
 837            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 838                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 839                    if let Some(local) = (!this.ignore_breakpoints)
 840                        .then(|| this.as_running_mut())
 841                        .flatten()
 842                    {
 843                        local
 844                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 845                            .detach();
 846                    };
 847                }
 848                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 849                    if let Some(local) = (!this.ignore_breakpoints)
 850                        .then(|| this.as_running_mut())
 851                        .flatten()
 852                    {
 853                        local.unset_breakpoints_from_paths(paths, cx).detach();
 854                    }
 855                }
 856                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 857            })
 858            .detach();
 859
 860            Self {
 861                mode: SessionState::Booting(None),
 862                id: session_id,
 863                child_session_ids: HashSet::default(),
 864                parent_session,
 865                capabilities: Capabilities::default(),
 866                watchers: HashMap::default(),
 867                variables: Default::default(),
 868                stack_frames: Default::default(),
 869                thread_states: ThreadStates::default(),
 870                output_token: OutputToken(0),
 871                output: circular_buffer::CircularBuffer::boxed(),
 872                requests: HashMap::default(),
 873                modules: Vec::default(),
 874                loaded_sources: Vec::default(),
 875                threads: IndexMap::default(),
 876                background_tasks: Vec::default(),
 877                restart_task: None,
 878                locations: Default::default(),
 879                is_session_terminated: false,
 880                ignore_breakpoints: false,
 881                breakpoint_store,
 882                data_breakpoints: Default::default(),
 883                exception_breakpoints: Default::default(),
 884                label,
 885                adapter,
 886                task_context,
 887                memory: memory::Memory::new(),
 888                quirks,
 889                remote_client,
 890                node_runtime,
 891                http_client,
 892                companion_port: None,
 893            }
 894        })
 895    }
 896
 897    pub fn task_context(&self) -> &TaskContext {
 898        &self.task_context
 899    }
 900
 901    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 902        match &self.mode {
 903            SessionState::Booting(_) => None,
 904            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 905        }
 906    }
 907
 908    pub fn boot(
 909        &mut self,
 910        binary: DebugAdapterBinary,
 911        worktree: Entity<Worktree>,
 912        dap_store: WeakEntity<DapStore>,
 913        cx: &mut Context<Self>,
 914    ) -> Task<Result<()>> {
 915        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 916        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 917
 918        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 919            let mut initialized_tx = Some(initialized_tx);
 920            while let Some(message) = message_rx.next().await {
 921                if let Message::Event(event) = message {
 922                    if let Events::Initialized(_) = *event {
 923                        if let Some(tx) = initialized_tx.take() {
 924                            tx.send(()).ok();
 925                        }
 926                    } else {
 927                        let Ok(_) = this.update(cx, |session, cx| {
 928                            session.handle_dap_event(event, cx);
 929                        }) else {
 930                            break;
 931                        };
 932                    }
 933                } else if let Message::Request(request) = message {
 934                    let Ok(_) = this.update(cx, |this, cx| {
 935                        if request.command == StartDebugging::COMMAND {
 936                            this.handle_start_debugging_request(request, cx)
 937                                .detach_and_log_err(cx);
 938                        } else if request.command == RunInTerminal::COMMAND {
 939                            this.handle_run_in_terminal_request(request, cx)
 940                                .detach_and_log_err(cx);
 941                        }
 942                    }) else {
 943                        break;
 944                    };
 945                }
 946            }
 947        })];
 948        self.background_tasks = background_tasks;
 949        let id = self.id;
 950        let parent_session = self.parent_session.clone();
 951
 952        cx.spawn(async move |this, cx| {
 953            let mode = RunningMode::new(
 954                id,
 955                parent_session,
 956                worktree.downgrade(),
 957                binary.clone(),
 958                message_tx,
 959                cx,
 960            )
 961            .await?;
 962            this.update(cx, |this, cx| {
 963                match &mut this.mode {
 964                    SessionState::Booting(task) if task.is_some() => {
 965                        task.take().unwrap().detach_and_log_err(cx);
 966                    }
 967                    SessionState::Booting(_) => {}
 968                    SessionState::Running(_) => {
 969                        debug_panic!("Attempting to boot a session that is already running");
 970                    }
 971                };
 972                this.mode = SessionState::Running(mode);
 973                cx.emit(SessionStateEvent::Running);
 974            })?;
 975
 976            this.update(cx, |session, cx| session.request_initialize(cx))?
 977                .await?;
 978
 979            let result = this
 980                .update(cx, |session, cx| {
 981                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 982                })?
 983                .await;
 984
 985            if result.is_err() {
 986                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 987
 988                console
 989                    .send(format!(
 990                        "Tried to launch debugger with: {}",
 991                        serde_json::to_string_pretty(&binary.request_args.configuration)
 992                            .unwrap_or_default(),
 993                    ))
 994                    .await
 995                    .ok();
 996            }
 997
 998            result
 999        })
1000    }
1001
1002    pub fn session_id(&self) -> SessionId {
1003        self.id
1004    }
1005
1006    pub fn child_session_ids(&self) -> HashSet<SessionId> {
1007        self.child_session_ids.clone()
1008    }
1009
1010    pub fn add_child_session_id(&mut self, session_id: SessionId) {
1011        self.child_session_ids.insert(session_id);
1012    }
1013
1014    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1015        self.child_session_ids.remove(&session_id);
1016    }
1017
1018    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1019        self.parent_session
1020            .as_ref()
1021            .map(|session| session.read(cx).id)
1022    }
1023
1024    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1025        self.parent_session.as_ref()
1026    }
1027
1028    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1029        let Some(client) = self.adapter_client() else {
1030            return Task::ready(());
1031        };
1032
1033        let supports_terminate = self
1034            .capabilities
1035            .support_terminate_debuggee
1036            .unwrap_or(false);
1037
1038        cx.background_spawn(async move {
1039            if supports_terminate {
1040                client
1041                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1042                        restart: Some(false),
1043                    })
1044                    .await
1045                    .ok();
1046            } else {
1047                client
1048                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1049                        restart: Some(false),
1050                        terminate_debuggee: Some(true),
1051                        suspend_debuggee: Some(false),
1052                    })
1053                    .await
1054                    .ok();
1055            }
1056        })
1057    }
1058
1059    pub fn capabilities(&self) -> &Capabilities {
1060        &self.capabilities
1061    }
1062
1063    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1064        match &self.mode {
1065            SessionState::Booting(_) => None,
1066            SessionState::Running(running_mode) => Some(&running_mode.binary),
1067        }
1068    }
1069
1070    pub fn adapter(&self) -> DebugAdapterName {
1071        self.adapter.clone()
1072    }
1073
1074    pub fn label(&self) -> Option<SharedString> {
1075        self.label.clone()
1076    }
1077
1078    pub fn is_terminated(&self) -> bool {
1079        self.is_session_terminated
1080    }
1081
1082    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1083        let (tx, mut rx) = mpsc::unbounded();
1084
1085        cx.spawn(async move |this, cx| {
1086            while let Some(output) = rx.next().await {
1087                this.update(cx, |this, _| {
1088                    let event = dap::OutputEvent {
1089                        category: None,
1090                        output,
1091                        group: None,
1092                        variables_reference: None,
1093                        source: None,
1094                        line: None,
1095                        column: None,
1096                        data: None,
1097                        location_reference: None,
1098                    };
1099                    this.push_output(event);
1100                })?;
1101            }
1102            anyhow::Ok(())
1103        })
1104        .detach();
1105
1106        tx
1107    }
1108
1109    pub fn is_started(&self) -> bool {
1110        match &self.mode {
1111            SessionState::Booting(_) => false,
1112            SessionState::Running(running) => running.is_started,
1113        }
1114    }
1115
1116    pub fn is_building(&self) -> bool {
1117        matches!(self.mode, SessionState::Booting(_))
1118    }
1119
1120    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1121        match &mut self.mode {
1122            SessionState::Running(local_mode) => Some(local_mode),
1123            SessionState::Booting(_) => None,
1124        }
1125    }
1126
1127    pub fn as_running(&self) -> Option<&RunningMode> {
1128        match &self.mode {
1129            SessionState::Running(local_mode) => Some(local_mode),
1130            SessionState::Booting(_) => None,
1131        }
1132    }
1133
1134    fn handle_start_debugging_request(
1135        &mut self,
1136        request: dap::messages::Request,
1137        cx: &mut Context<Self>,
1138    ) -> Task<Result<()>> {
1139        let request_seq = request.seq;
1140
1141        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1142            .arguments
1143            .as_ref()
1144            .map(|value| serde_json::from_value(value.clone()));
1145
1146        let mut success = true;
1147        if let Some(Ok(request)) = launch_request {
1148            cx.emit(SessionStateEvent::SpawnChildSession { request });
1149        } else {
1150            log::error!(
1151                "Failed to parse launch request arguments: {:?}",
1152                request.arguments
1153            );
1154            success = false;
1155        }
1156
1157        cx.spawn(async move |this, cx| {
1158            this.update(cx, |this, cx| {
1159                this.respond_to_client(
1160                    request_seq,
1161                    success,
1162                    StartDebugging::COMMAND.to_string(),
1163                    None,
1164                    cx,
1165                )
1166            })?
1167            .await
1168        })
1169    }
1170
1171    fn handle_run_in_terminal_request(
1172        &mut self,
1173        request: dap::messages::Request,
1174        cx: &mut Context<Self>,
1175    ) -> Task<Result<()>> {
1176        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1177            request.arguments.unwrap_or_default(),
1178        ) {
1179            Ok(args) => args,
1180            Err(error) => {
1181                return cx.spawn(async move |session, cx| {
1182                    let error = serde_json::to_value(dap::ErrorResponse {
1183                        error: Some(dap::Message {
1184                            id: request.seq,
1185                            format: error.to_string(),
1186                            variables: None,
1187                            send_telemetry: None,
1188                            show_user: None,
1189                            url: None,
1190                            url_label: None,
1191                        }),
1192                    })
1193                    .ok();
1194
1195                    session
1196                        .update(cx, |this, cx| {
1197                            this.respond_to_client(
1198                                request.seq,
1199                                false,
1200                                StartDebugging::COMMAND.to_string(),
1201                                error,
1202                                cx,
1203                            )
1204                        })?
1205                        .await?;
1206
1207                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1208                });
1209            }
1210        };
1211
1212        let seq = request.seq;
1213
1214        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1215        cx.emit(SessionEvent::RunInTerminal {
1216            request: request_args,
1217            sender: tx,
1218        });
1219        cx.notify();
1220
1221        cx.spawn(async move |session, cx| {
1222            let result = util::maybe!(async move {
1223                rx.next().await.ok_or_else(|| {
1224                    anyhow!("failed to receive response from spawn terminal".to_string())
1225                })?
1226            })
1227            .await;
1228            let (success, body) = match result {
1229                Ok(pid) => (
1230                    true,
1231                    serde_json::to_value(dap::RunInTerminalResponse {
1232                        process_id: None,
1233                        shell_process_id: Some(pid as u64),
1234                    })
1235                    .ok(),
1236                ),
1237                Err(error) => (
1238                    false,
1239                    serde_json::to_value(dap::ErrorResponse {
1240                        error: Some(dap::Message {
1241                            id: seq,
1242                            format: error.to_string(),
1243                            variables: None,
1244                            send_telemetry: None,
1245                            show_user: None,
1246                            url: None,
1247                            url_label: None,
1248                        }),
1249                    })
1250                    .ok(),
1251                ),
1252            };
1253
1254            session
1255                .update(cx, |session, cx| {
1256                    session.respond_to_client(
1257                        seq,
1258                        success,
1259                        RunInTerminal::COMMAND.to_string(),
1260                        body,
1261                        cx,
1262                    )
1263                })?
1264                .await
1265        })
1266    }
1267
1268    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1269        let adapter_id = self.adapter().to_string();
1270        let request = Initialize { adapter_id };
1271
1272        let SessionState::Running(running) = &self.mode else {
1273            return Task::ready(Err(anyhow!(
1274                "Cannot send initialize request, task still building"
1275            )));
1276        };
1277        let mut response = running.request(request.clone());
1278
1279        cx.spawn(async move |this, cx| {
1280            loop {
1281                let capabilities = response.await;
1282                match capabilities {
1283                    Err(e) => {
1284                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1285                            this.as_running()
1286                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1287                        }) else {
1288                            return Err(e);
1289                        };
1290                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1291                        reconnect.await?;
1292
1293                        let Ok(Some(r)) = this.update(cx, |this, _| {
1294                            this.as_running()
1295                                .map(|running| running.request(request.clone()))
1296                        }) else {
1297                            return Err(e);
1298                        };
1299                        response = r
1300                    }
1301                    Ok(capabilities) => {
1302                        this.update(cx, |session, cx| {
1303                            session.capabilities = capabilities;
1304
1305                            cx.emit(SessionEvent::CapabilitiesLoaded);
1306                        })?;
1307                        return Ok(());
1308                    }
1309                }
1310            }
1311        })
1312    }
1313
1314    pub(super) fn initialize_sequence(
1315        &mut self,
1316        initialize_rx: oneshot::Receiver<()>,
1317        dap_store: WeakEntity<DapStore>,
1318        cx: &mut Context<Self>,
1319    ) -> Task<Result<()>> {
1320        match &self.mode {
1321            SessionState::Running(local_mode) => {
1322                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1323            }
1324            SessionState::Booting(_) => {
1325                Task::ready(Err(anyhow!("cannot initialize, still building")))
1326            }
1327        }
1328    }
1329
1330    pub fn run_to_position(
1331        &mut self,
1332        breakpoint: SourceBreakpoint,
1333        active_thread_id: ThreadId,
1334        cx: &mut Context<Self>,
1335    ) {
1336        match &mut self.mode {
1337            SessionState::Running(local_mode) => {
1338                if !matches!(
1339                    self.thread_states.thread_state(active_thread_id),
1340                    Some(ThreadStatus::Stopped)
1341                ) {
1342                    return;
1343                };
1344                let path = breakpoint.path.clone();
1345                local_mode.tmp_breakpoint = Some(breakpoint);
1346                let task = local_mode.send_breakpoints_from_path(
1347                    path,
1348                    BreakpointUpdatedReason::Toggled,
1349                    &self.breakpoint_store,
1350                    cx,
1351                );
1352
1353                cx.spawn(async move |this, cx| {
1354                    task.await;
1355                    this.update(cx, |this, cx| {
1356                        this.continue_thread(active_thread_id, cx);
1357                    })
1358                })
1359                .detach();
1360            }
1361            SessionState::Booting(_) => {}
1362        }
1363    }
1364
1365    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1366        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1367    }
1368
1369    pub fn output(
1370        &self,
1371        since: OutputToken,
1372    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1373        if self.output_token.0 == 0 {
1374            return (self.output.range(0..0), OutputToken(0));
1375        };
1376
1377        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1378
1379        let clamped_events_since = events_since.clamp(0, self.output.len());
1380        (
1381            self.output
1382                .range(self.output.len() - clamped_events_since..),
1383            self.output_token,
1384        )
1385    }
1386
1387    pub fn respond_to_client(
1388        &self,
1389        request_seq: u64,
1390        success: bool,
1391        command: String,
1392        body: Option<serde_json::Value>,
1393        cx: &mut Context<Self>,
1394    ) -> Task<Result<()>> {
1395        let Some(local_session) = self.as_running() else {
1396            unreachable!("Cannot respond to remote client");
1397        };
1398        let client = local_session.client.clone();
1399
1400        cx.background_spawn(async move {
1401            client
1402                .send_message(Message::Response(Response {
1403                    body,
1404                    success,
1405                    command,
1406                    seq: request_seq + 1,
1407                    request_seq,
1408                    message: None,
1409                }))
1410                .await
1411        })
1412    }
1413
1414    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1415        self.mode.stopped();
1416        // todo(debugger): Find a clean way to get around the clone
1417        let breakpoint_store = self.breakpoint_store.clone();
1418        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1419            let breakpoint = local.tmp_breakpoint.take()?;
1420            let path = breakpoint.path;
1421            Some((local, path))
1422        }) {
1423            local
1424                .send_breakpoints_from_path(
1425                    path,
1426                    BreakpointUpdatedReason::Toggled,
1427                    &breakpoint_store,
1428                    cx,
1429                )
1430                .detach();
1431        };
1432
1433        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1434            self.thread_states.stop_all_threads();
1435            self.invalidate_command_type::<StackTraceCommand>();
1436        }
1437
1438        // Event if we stopped all threads we still need to insert the thread_id
1439        // to our own data
1440        if let Some(thread_id) = event.thread_id {
1441            self.thread_states.stop_thread(ThreadId(thread_id));
1442
1443            self.invalidate_state(
1444                &StackTraceCommand {
1445                    thread_id,
1446                    start_frame: None,
1447                    levels: None,
1448                }
1449                .into(),
1450            );
1451        }
1452
1453        self.invalidate_generic();
1454        self.threads.clear();
1455        self.variables.clear();
1456        cx.emit(SessionEvent::Stopped(
1457            event
1458                .thread_id
1459                .map(Into::into)
1460                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1461        ));
1462        cx.emit(SessionEvent::InvalidateInlineValue);
1463        cx.notify();
1464    }
1465
1466    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1467        match *event {
1468            Events::Initialized(_) => {
1469                debug_assert!(
1470                    false,
1471                    "Initialized event should have been handled in LocalMode"
1472                );
1473            }
1474            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1475            Events::Continued(event) => {
1476                if event.all_threads_continued.unwrap_or_default() {
1477                    self.thread_states.continue_all_threads();
1478                    self.breakpoint_store.update(cx, |store, cx| {
1479                        store.remove_active_position(Some(self.session_id()), cx)
1480                    });
1481                } else {
1482                    self.thread_states
1483                        .continue_thread(ThreadId(event.thread_id));
1484                }
1485                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1486                self.invalidate_generic();
1487            }
1488            Events::Exited(_event) => {
1489                self.clear_active_debug_line(cx);
1490            }
1491            Events::Terminated(_) => {
1492                self.shutdown(cx).detach();
1493            }
1494            Events::Thread(event) => {
1495                let thread_id = ThreadId(event.thread_id);
1496
1497                match event.reason {
1498                    dap::ThreadEventReason::Started => {
1499                        self.thread_states.continue_thread(thread_id);
1500                    }
1501                    dap::ThreadEventReason::Exited => {
1502                        self.thread_states.exit_thread(thread_id);
1503                    }
1504                    reason => {
1505                        log::error!("Unhandled thread event reason {:?}", reason);
1506                    }
1507                }
1508                self.invalidate_state(&ThreadsCommand.into());
1509                cx.notify();
1510            }
1511            Events::Output(event) => {
1512                if event
1513                    .category
1514                    .as_ref()
1515                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1516                {
1517                    return;
1518                }
1519
1520                self.push_output(event);
1521                cx.notify();
1522            }
1523            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1524                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1525            }),
1526            Events::Module(event) => {
1527                match event.reason {
1528                    dap::ModuleEventReason::New => {
1529                        self.modules.push(event.module);
1530                    }
1531                    dap::ModuleEventReason::Changed => {
1532                        if let Some(module) = self
1533                            .modules
1534                            .iter_mut()
1535                            .find(|other| event.module.id == other.id)
1536                        {
1537                            *module = event.module;
1538                        }
1539                    }
1540                    dap::ModuleEventReason::Removed => {
1541                        self.modules.retain(|other| event.module.id != other.id);
1542                    }
1543                }
1544
1545                // todo(debugger): We should only send the invalidate command to downstream clients.
1546                // self.invalidate_state(&ModulesCommand.into());
1547            }
1548            Events::LoadedSource(_) => {
1549                self.invalidate_state(&LoadedSourcesCommand.into());
1550            }
1551            Events::Capabilities(event) => {
1552                self.capabilities = self.capabilities.merge(event.capabilities);
1553
1554                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1555                let recent_filters = self
1556                    .capabilities
1557                    .exception_breakpoint_filters
1558                    .iter()
1559                    .flatten()
1560                    .map(|filter| (filter.filter.clone(), filter.clone()))
1561                    .collect::<BTreeMap<_, _>>();
1562                for filter in recent_filters.values() {
1563                    let default = filter.default.unwrap_or_default();
1564                    self.exception_breakpoints
1565                        .entry(filter.filter.clone())
1566                        .or_insert_with(|| (filter.clone(), default));
1567                }
1568                self.exception_breakpoints
1569                    .retain(|k, _| recent_filters.contains_key(k));
1570                if self.is_started() {
1571                    self.send_exception_breakpoints(cx);
1572                }
1573
1574                // Remove the ones that no longer exist.
1575                cx.notify();
1576            }
1577            Events::Memory(_) => {}
1578            Events::Process(_) => {}
1579            Events::ProgressEnd(_) => {}
1580            Events::ProgressStart(_) => {}
1581            Events::ProgressUpdate(_) => {}
1582            Events::Invalidated(_) => {}
1583            Events::Other(event) => {
1584                if event.event == "launchBrowserInCompanion" {
1585                    let Some(request) = serde_json::from_value(event.body).ok() else {
1586                        log::error!("failed to deserialize launchBrowserInCompanion event");
1587                        return;
1588                    };
1589                    self.launch_browser_for_remote_server(request, cx);
1590                } else if event.event == "killCompanionBrowser" {
1591                    let Some(request) = serde_json::from_value(event.body).ok() else {
1592                        log::error!("failed to deserialize killCompanionBrowser event");
1593                        return;
1594                    };
1595                    self.kill_browser(request, cx);
1596                }
1597            }
1598        }
1599    }
1600
1601    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1602    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1603        &mut self,
1604        request: T,
1605        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1606        cx: &mut Context<Self>,
1607    ) {
1608        const {
1609            assert!(
1610                T::CACHEABLE,
1611                "Only requests marked as cacheable should invoke `fetch`"
1612            );
1613        }
1614
1615        if !self.thread_states.any_stopped_thread()
1616            && request.type_id() != TypeId::of::<ThreadsCommand>()
1617            || self.is_session_terminated
1618        {
1619            return;
1620        }
1621
1622        let request_map = self
1623            .requests
1624            .entry(std::any::TypeId::of::<T>())
1625            .or_default();
1626
1627        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1628            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1629
1630            let task = Self::request_inner::<Arc<T>>(
1631                &self.capabilities,
1632                &self.mode,
1633                command,
1634                |this, result, cx| {
1635                    process_result(this, result, cx);
1636                    None
1637                },
1638                cx,
1639            );
1640            let task = cx
1641                .background_executor()
1642                .spawn(async move {
1643                    let _ = task.await?;
1644                    Some(())
1645                })
1646                .shared();
1647
1648            vacant.insert(task);
1649            cx.notify();
1650        }
1651    }
1652
1653    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1654        capabilities: &Capabilities,
1655        mode: &SessionState,
1656        request: T,
1657        process_result: impl FnOnce(
1658            &mut Self,
1659            Result<T::Response>,
1660            &mut Context<Self>,
1661        ) -> Option<T::Response>
1662        + 'static,
1663        cx: &mut Context<Self>,
1664    ) -> Task<Option<T::Response>> {
1665        if !T::is_supported(capabilities) {
1666            log::warn!(
1667                "Attempted to send a DAP request that isn't supported: {:?}",
1668                request
1669            );
1670            let error = Err(anyhow::Error::msg(
1671                "Couldn't complete request because it's not supported",
1672            ));
1673            return cx.spawn(async move |this, cx| {
1674                this.update(cx, |this, cx| process_result(this, error, cx))
1675                    .ok()
1676                    .flatten()
1677            });
1678        }
1679
1680        let request = mode.request_dap(request);
1681        cx.spawn(async move |this, cx| {
1682            let result = request.await;
1683            this.update(cx, |this, cx| process_result(this, result, cx))
1684                .ok()
1685                .flatten()
1686        })
1687    }
1688
1689    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1690        &self,
1691        request: T,
1692        process_result: impl FnOnce(
1693            &mut Self,
1694            Result<T::Response>,
1695            &mut Context<Self>,
1696        ) -> Option<T::Response>
1697        + 'static,
1698        cx: &mut Context<Self>,
1699    ) -> Task<Option<T::Response>> {
1700        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1701    }
1702
1703    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1704        self.requests.remove(&std::any::TypeId::of::<Command>());
1705    }
1706
1707    fn invalidate_generic(&mut self) {
1708        self.invalidate_command_type::<ModulesCommand>();
1709        self.invalidate_command_type::<LoadedSourcesCommand>();
1710        self.invalidate_command_type::<ThreadsCommand>();
1711        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1712        self.invalidate_command_type::<ReadMemory>();
1713        let executor = self.as_running().map(|running| running.executor.clone());
1714        if let Some(executor) = executor {
1715            self.memory.clear(&executor);
1716        }
1717    }
1718
1719    fn invalidate_state(&mut self, key: &RequestSlot) {
1720        self.requests
1721            .entry((&*key.0 as &dyn Any).type_id())
1722            .and_modify(|request_map| {
1723                request_map.remove(key);
1724            });
1725    }
1726
1727    fn push_output(&mut self, event: OutputEvent) {
1728        self.output.push_back(event);
1729        self.output_token.0 += 1;
1730    }
1731
1732    pub fn any_stopped_thread(&self) -> bool {
1733        self.thread_states.any_stopped_thread()
1734    }
1735
1736    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1737        self.thread_states.thread_status(thread_id)
1738    }
1739
1740    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1741        self.fetch(
1742            dap_command::ThreadsCommand,
1743            |this, result, cx| {
1744                let Some(result) = result.log_err() else {
1745                    return;
1746                };
1747
1748                this.threads = result
1749                    .into_iter()
1750                    .map(|thread| (ThreadId(thread.id), Thread::from(thread)))
1751                    .collect();
1752
1753                this.invalidate_command_type::<StackTraceCommand>();
1754                cx.emit(SessionEvent::Threads);
1755                cx.notify();
1756            },
1757            cx,
1758        );
1759
1760        self.threads
1761            .values()
1762            .map(|thread| {
1763                (
1764                    thread.dap.clone(),
1765                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1766                )
1767            })
1768            .collect()
1769    }
1770
1771    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1772        self.fetch(
1773            dap_command::ModulesCommand,
1774            |this, result, cx| {
1775                let Some(result) = result.log_err() else {
1776                    return;
1777                };
1778
1779                this.modules = result;
1780                cx.emit(SessionEvent::Modules);
1781                cx.notify();
1782            },
1783            cx,
1784        );
1785
1786        &self.modules
1787    }
1788
1789    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1790    pub fn data_access_size(
1791        &mut self,
1792        frame_id: Option<u64>,
1793        evaluate_name: &str,
1794        cx: &mut Context<Self>,
1795    ) -> Task<Option<u64>> {
1796        let request = self.request(
1797            EvaluateCommand {
1798                expression: format!("?${{sizeof({evaluate_name})}}"),
1799                frame_id,
1800
1801                context: Some(EvaluateArgumentsContext::Repl),
1802                source: None,
1803            },
1804            |_, response, _| response.ok(),
1805            cx,
1806        );
1807        cx.background_spawn(async move {
1808            let result = request.await?;
1809            result.result.parse().ok()
1810        })
1811    }
1812
1813    pub fn memory_reference_of_expr(
1814        &mut self,
1815        frame_id: Option<u64>,
1816        expression: String,
1817        cx: &mut Context<Self>,
1818    ) -> Task<Option<(String, Option<String>)>> {
1819        let request = self.request(
1820            EvaluateCommand {
1821                expression,
1822                frame_id,
1823
1824                context: Some(EvaluateArgumentsContext::Repl),
1825                source: None,
1826            },
1827            |_, response, _| response.ok(),
1828            cx,
1829        );
1830        cx.background_spawn(async move {
1831            let result = request.await?;
1832            result
1833                .memory_reference
1834                .map(|reference| (reference, result.type_))
1835        })
1836    }
1837
1838    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1839        let data = base64::engine::general_purpose::STANDARD.encode(data);
1840        self.request(
1841            WriteMemoryArguments {
1842                memory_reference: address.to_string(),
1843                data,
1844                allow_partial: None,
1845                offset: None,
1846            },
1847            |this, response, cx| {
1848                this.memory.clear(cx.background_executor());
1849                this.invalidate_command_type::<ReadMemory>();
1850                this.invalidate_command_type::<VariablesCommand>();
1851                cx.emit(SessionEvent::Variables);
1852                response.ok()
1853            },
1854            cx,
1855        )
1856        .detach();
1857    }
1858    pub fn read_memory(
1859        &mut self,
1860        range: RangeInclusive<u64>,
1861        cx: &mut Context<Self>,
1862    ) -> MemoryIterator {
1863        // This function is a bit more involved when it comes to fetching data.
1864        // Since we attempt to read memory in pages, we need to account for some parts
1865        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1866        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1867        let page_range = Memory::memory_range_to_page_range(range.clone());
1868        for page_address in PageAddress::iter_range(page_range) {
1869            self.read_single_page_memory(page_address, cx);
1870        }
1871        self.memory.memory_range(range)
1872    }
1873
1874    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1875        _ = maybe!({
1876            let builder = self.memory.build_page(page_start)?;
1877
1878            self.memory_read_fetch_page_recursive(builder, cx);
1879            Some(())
1880        });
1881    }
1882    fn memory_read_fetch_page_recursive(
1883        &mut self,
1884        mut builder: MemoryPageBuilder,
1885        cx: &mut Context<Self>,
1886    ) {
1887        let Some(next_request) = builder.next_request() else {
1888            // We're done fetching. Let's grab the page and insert it into our memory store.
1889            let (address, contents) = builder.build();
1890            self.memory.insert_page(address, contents);
1891
1892            return;
1893        };
1894        let size = next_request.size;
1895        self.fetch(
1896            ReadMemory {
1897                memory_reference: format!("0x{:X}", next_request.address),
1898                offset: Some(0),
1899                count: next_request.size,
1900            },
1901            move |this, memory, cx| {
1902                if let Ok(memory) = memory {
1903                    builder.known(memory.content);
1904                    if let Some(unknown) = memory.unreadable_bytes {
1905                        builder.unknown(unknown);
1906                    }
1907                    // This is the recursive bit: if we're not yet done with
1908                    // the whole page, we'll kick off a new request with smaller range.
1909                    // Note that this function is recursive only conceptually;
1910                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1911                    this.memory_read_fetch_page_recursive(builder, cx);
1912                } else {
1913                    builder.unknown(size);
1914                }
1915            },
1916            cx,
1917        );
1918    }
1919
1920    pub fn ignore_breakpoints(&self) -> bool {
1921        self.ignore_breakpoints
1922    }
1923
1924    pub fn toggle_ignore_breakpoints(
1925        &mut self,
1926        cx: &mut App,
1927    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1928        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1929    }
1930
1931    pub(crate) fn set_ignore_breakpoints(
1932        &mut self,
1933        ignore: bool,
1934        cx: &mut App,
1935    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1936        if self.ignore_breakpoints == ignore {
1937            return Task::ready(HashMap::default());
1938        }
1939
1940        self.ignore_breakpoints = ignore;
1941
1942        if let Some(local) = self.as_running() {
1943            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1944        } else {
1945            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1946            unimplemented!()
1947        }
1948    }
1949
1950    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
1951        self.data_breakpoints.values()
1952    }
1953
1954    pub fn exception_breakpoints(
1955        &self,
1956    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1957        self.exception_breakpoints.values()
1958    }
1959
1960    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1961        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1962            *is_enabled = !*is_enabled;
1963            self.send_exception_breakpoints(cx);
1964        }
1965    }
1966
1967    fn send_exception_breakpoints(&mut self, cx: &App) {
1968        if let Some(local) = self.as_running() {
1969            let exception_filters = self
1970                .exception_breakpoints
1971                .values()
1972                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1973                .collect();
1974
1975            let supports_exception_filters = self
1976                .capabilities
1977                .supports_exception_filter_options
1978                .unwrap_or_default();
1979            local
1980                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1981                .detach_and_log_err(cx);
1982        } else {
1983            debug_assert!(false, "Not implemented");
1984        }
1985    }
1986
1987    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
1988        if let Some(state) = self.data_breakpoints.get_mut(id) {
1989            state.is_enabled = !state.is_enabled;
1990            self.send_exception_breakpoints(cx);
1991        }
1992    }
1993
1994    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
1995        if let Some(mode) = self.as_running() {
1996            let breakpoints = self
1997                .data_breakpoints
1998                .values()
1999                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
2000                .collect();
2001            let command = SetDataBreakpointsCommand { breakpoints };
2002            mode.request(command).detach_and_log_err(cx);
2003        }
2004    }
2005
2006    pub fn create_data_breakpoint(
2007        &mut self,
2008        context: Arc<DataBreakpointContext>,
2009        data_id: String,
2010        dap: dap::DataBreakpoint,
2011        cx: &mut Context<Self>,
2012    ) {
2013        if self.data_breakpoints.remove(&data_id).is_none() {
2014            self.data_breakpoints.insert(
2015                data_id,
2016                DataBreakpointState {
2017                    dap,
2018                    is_enabled: true,
2019                    context,
2020                },
2021            );
2022        }
2023        self.send_data_breakpoints(cx);
2024    }
2025
2026    pub fn breakpoints_enabled(&self) -> bool {
2027        self.ignore_breakpoints
2028    }
2029
2030    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
2031        self.fetch(
2032            dap_command::LoadedSourcesCommand,
2033            |this, result, cx| {
2034                let Some(result) = result.log_err() else {
2035                    return;
2036                };
2037                this.loaded_sources = result;
2038                cx.emit(SessionEvent::LoadedSources);
2039                cx.notify();
2040            },
2041            cx,
2042        );
2043
2044        &self.loaded_sources
2045    }
2046
2047    fn fallback_to_manual_restart(
2048        &mut self,
2049        res: Result<()>,
2050        cx: &mut Context<Self>,
2051    ) -> Option<()> {
2052        if res.log_err().is_none() {
2053            cx.emit(SessionStateEvent::Restart);
2054            return None;
2055        }
2056        Some(())
2057    }
2058
2059    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2060        res.log_err()?;
2061        Some(())
2062    }
2063
2064    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2065        thread_id: ThreadId,
2066    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2067    {
2068        move |this, response, cx| match response.log_err() {
2069            Some(response) => {
2070                this.breakpoint_store.update(cx, |store, cx| {
2071                    store.remove_active_position(Some(this.session_id()), cx)
2072                });
2073                Some(response)
2074            }
2075            None => {
2076                this.thread_states.stop_thread(thread_id);
2077                cx.notify();
2078                None
2079            }
2080        }
2081    }
2082
2083    fn clear_active_debug_line_response(
2084        &mut self,
2085        response: Result<()>,
2086        cx: &mut Context<Session>,
2087    ) -> Option<()> {
2088        response.log_err()?;
2089        self.clear_active_debug_line(cx);
2090        Some(())
2091    }
2092
2093    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2094        self.breakpoint_store.update(cx, |store, cx| {
2095            store.remove_active_position(Some(self.id), cx)
2096        });
2097    }
2098
2099    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2100        self.request(
2101            PauseCommand {
2102                thread_id: thread_id.0,
2103            },
2104            Self::empty_response,
2105            cx,
2106        )
2107        .detach();
2108    }
2109
2110    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2111        self.request(
2112            RestartStackFrameCommand { stack_frame_id },
2113            Self::empty_response,
2114            cx,
2115        )
2116        .detach();
2117    }
2118
2119    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2120        if self.restart_task.is_some() || self.as_running().is_none() {
2121            return;
2122        }
2123
2124        let supports_dap_restart =
2125            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2126
2127        self.restart_task = Some(cx.spawn(async move |this, cx| {
2128            let _ = this.update(cx, |session, cx| {
2129                if supports_dap_restart {
2130                    session
2131                        .request(
2132                            RestartCommand {
2133                                raw: args.unwrap_or(Value::Null),
2134                            },
2135                            Self::fallback_to_manual_restart,
2136                            cx,
2137                        )
2138                        .detach();
2139                } else {
2140                    cx.emit(SessionStateEvent::Restart);
2141                }
2142            });
2143        }));
2144    }
2145
2146    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2147        if self.is_session_terminated {
2148            return Task::ready(());
2149        }
2150
2151        self.is_session_terminated = true;
2152        self.thread_states.exit_all_threads();
2153        cx.notify();
2154
2155        let task = match &mut self.mode {
2156            SessionState::Running(_) => {
2157                if self
2158                    .capabilities
2159                    .supports_terminate_request
2160                    .unwrap_or_default()
2161                {
2162                    self.request(
2163                        TerminateCommand {
2164                            restart: Some(false),
2165                        },
2166                        Self::clear_active_debug_line_response,
2167                        cx,
2168                    )
2169                } else {
2170                    self.request(
2171                        DisconnectCommand {
2172                            restart: Some(false),
2173                            terminate_debuggee: Some(true),
2174                            suspend_debuggee: Some(false),
2175                        },
2176                        Self::clear_active_debug_line_response,
2177                        cx,
2178                    )
2179                }
2180            }
2181            SessionState::Booting(build_task) => {
2182                build_task.take();
2183                Task::ready(Some(()))
2184            }
2185        };
2186
2187        cx.emit(SessionStateEvent::Shutdown);
2188
2189        cx.spawn(async move |this, cx| {
2190            task.await;
2191            let _ = this.update(cx, |this, _| {
2192                if let Some(adapter_client) = this.adapter_client() {
2193                    adapter_client.kill();
2194                }
2195            });
2196        })
2197    }
2198
2199    pub fn completions(
2200        &mut self,
2201        query: CompletionsQuery,
2202        cx: &mut Context<Self>,
2203    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2204        let task = self.request(query, |_, result, _| result.log_err(), cx);
2205
2206        cx.background_executor().spawn(async move {
2207            anyhow::Ok(
2208                task.await
2209                    .map(|response| response.targets)
2210                    .context("failed to fetch completions")?,
2211            )
2212        })
2213    }
2214
2215    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2216        let supports_single_thread_execution_requests =
2217            self.capabilities.supports_single_thread_execution_requests;
2218        self.thread_states.continue_thread(thread_id);
2219        self.request(
2220            ContinueCommand {
2221                args: ContinueArguments {
2222                    thread_id: thread_id.0,
2223                    single_thread: supports_single_thread_execution_requests,
2224                },
2225            },
2226            Self::on_step_response::<ContinueCommand>(thread_id),
2227            cx,
2228        )
2229        .detach();
2230    }
2231
2232    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2233        match self.mode {
2234            SessionState::Running(ref local) => Some(local.client.clone()),
2235            SessionState::Booting(_) => None,
2236        }
2237    }
2238
2239    pub fn has_ever_stopped(&self) -> bool {
2240        self.mode.has_ever_stopped()
2241    }
2242    pub fn step_over(
2243        &mut self,
2244        thread_id: ThreadId,
2245        granularity: SteppingGranularity,
2246        cx: &mut Context<Self>,
2247    ) {
2248        let supports_single_thread_execution_requests =
2249            self.capabilities.supports_single_thread_execution_requests;
2250        let supports_stepping_granularity = self
2251            .capabilities
2252            .supports_stepping_granularity
2253            .unwrap_or_default();
2254
2255        let command = NextCommand {
2256            inner: StepCommand {
2257                thread_id: thread_id.0,
2258                granularity: supports_stepping_granularity.then(|| granularity),
2259                single_thread: supports_single_thread_execution_requests,
2260            },
2261        };
2262
2263        self.thread_states.process_step(thread_id);
2264        self.request(
2265            command,
2266            Self::on_step_response::<NextCommand>(thread_id),
2267            cx,
2268        )
2269        .detach();
2270    }
2271
2272    pub fn step_in(
2273        &mut self,
2274        thread_id: ThreadId,
2275        granularity: SteppingGranularity,
2276        cx: &mut Context<Self>,
2277    ) {
2278        let supports_single_thread_execution_requests =
2279            self.capabilities.supports_single_thread_execution_requests;
2280        let supports_stepping_granularity = self
2281            .capabilities
2282            .supports_stepping_granularity
2283            .unwrap_or_default();
2284
2285        let command = StepInCommand {
2286            inner: StepCommand {
2287                thread_id: thread_id.0,
2288                granularity: supports_stepping_granularity.then(|| granularity),
2289                single_thread: supports_single_thread_execution_requests,
2290            },
2291        };
2292
2293        self.thread_states.process_step(thread_id);
2294        self.request(
2295            command,
2296            Self::on_step_response::<StepInCommand>(thread_id),
2297            cx,
2298        )
2299        .detach();
2300    }
2301
2302    pub fn step_out(
2303        &mut self,
2304        thread_id: ThreadId,
2305        granularity: SteppingGranularity,
2306        cx: &mut Context<Self>,
2307    ) {
2308        let supports_single_thread_execution_requests =
2309            self.capabilities.supports_single_thread_execution_requests;
2310        let supports_stepping_granularity = self
2311            .capabilities
2312            .supports_stepping_granularity
2313            .unwrap_or_default();
2314
2315        let command = StepOutCommand {
2316            inner: StepCommand {
2317                thread_id: thread_id.0,
2318                granularity: supports_stepping_granularity.then(|| granularity),
2319                single_thread: supports_single_thread_execution_requests,
2320            },
2321        };
2322
2323        self.thread_states.process_step(thread_id);
2324        self.request(
2325            command,
2326            Self::on_step_response::<StepOutCommand>(thread_id),
2327            cx,
2328        )
2329        .detach();
2330    }
2331
2332    pub fn step_back(
2333        &mut self,
2334        thread_id: ThreadId,
2335        granularity: SteppingGranularity,
2336        cx: &mut Context<Self>,
2337    ) {
2338        let supports_single_thread_execution_requests =
2339            self.capabilities.supports_single_thread_execution_requests;
2340        let supports_stepping_granularity = self
2341            .capabilities
2342            .supports_stepping_granularity
2343            .unwrap_or_default();
2344
2345        let command = StepBackCommand {
2346            inner: StepCommand {
2347                thread_id: thread_id.0,
2348                granularity: supports_stepping_granularity.then(|| granularity),
2349                single_thread: supports_single_thread_execution_requests,
2350            },
2351        };
2352
2353        self.thread_states.process_step(thread_id);
2354
2355        self.request(
2356            command,
2357            Self::on_step_response::<StepBackCommand>(thread_id),
2358            cx,
2359        )
2360        .detach();
2361    }
2362
2363    pub fn stack_frames(
2364        &mut self,
2365        thread_id: ThreadId,
2366        cx: &mut Context<Self>,
2367    ) -> Result<Vec<StackFrame>> {
2368        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2369            && self.requests.contains_key(&ThreadsCommand.type_id())
2370            && self.threads.contains_key(&thread_id)
2371        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2372        // We could still be using an old thread id and have sent a new thread's request
2373        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2374        // But it very well could cause a minor bug in the future that is hard to track down
2375        {
2376            self.fetch(
2377                super::dap_command::StackTraceCommand {
2378                    thread_id: thread_id.0,
2379                    start_frame: None,
2380                    levels: None,
2381                },
2382                move |this, stack_frames, cx| {
2383                    let entry =
2384                        this.threads
2385                            .entry(thread_id)
2386                            .and_modify(|thread| match &stack_frames {
2387                                Ok(stack_frames) => {
2388                                    thread.stack_frames = stack_frames
2389                                        .iter()
2390                                        .cloned()
2391                                        .map(StackFrame::from)
2392                                        .collect();
2393                                    thread.stack_frames_error = None;
2394                                }
2395                                Err(error) => {
2396                                    thread.stack_frames.clear();
2397                                    thread.stack_frames_error = Some(error.cloned());
2398                                }
2399                            });
2400                    debug_assert!(
2401                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2402                        "Sent request for thread_id that doesn't exist"
2403                    );
2404                    if let Ok(stack_frames) = stack_frames {
2405                        this.stack_frames.extend(
2406                            stack_frames
2407                                .into_iter()
2408                                .filter(|frame| {
2409                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2410                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2411                                    !(frame.id == 0
2412                                        && frame.line == 0
2413                                        && frame.column == 0
2414                                        && frame.presentation_hint
2415                                            == Some(StackFramePresentationHint::Label))
2416                                })
2417                                .map(|frame| (frame.id, StackFrame::from(frame))),
2418                        );
2419                    }
2420
2421                    this.invalidate_command_type::<ScopesCommand>();
2422                    this.invalidate_command_type::<VariablesCommand>();
2423
2424                    cx.emit(SessionEvent::StackTrace);
2425                },
2426                cx,
2427            );
2428        }
2429
2430        match self.threads.get(&thread_id) {
2431            Some(thread) => {
2432                if let Some(error) = &thread.stack_frames_error {
2433                    Err(error.cloned())
2434                } else {
2435                    Ok(thread.stack_frames.clone())
2436                }
2437            }
2438            None => Ok(Vec::new()),
2439        }
2440    }
2441
2442    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2443        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2444            && self
2445                .requests
2446                .contains_key(&TypeId::of::<StackTraceCommand>())
2447        {
2448            self.fetch(
2449                ScopesCommand { stack_frame_id },
2450                move |this, scopes, cx| {
2451                    let Some(scopes) = scopes.log_err() else {
2452                        return
2453                    };
2454
2455                    for scope in scopes.iter() {
2456                        this.variables(scope.variables_reference, cx);
2457                    }
2458
2459                    let entry = this
2460                        .stack_frames
2461                        .entry(stack_frame_id)
2462                        .and_modify(|stack_frame| {
2463                            stack_frame.scopes = scopes;
2464                        });
2465
2466                    cx.emit(SessionEvent::Variables);
2467
2468                    debug_assert!(
2469                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2470                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2471                    );
2472                },
2473                cx,
2474            );
2475        }
2476
2477        self.stack_frames
2478            .get(&stack_frame_id)
2479            .map(|frame| frame.scopes.as_slice())
2480            .unwrap_or_default()
2481    }
2482
2483    pub fn variables_by_stack_frame_id(
2484        &self,
2485        stack_frame_id: StackFrameId,
2486        globals: bool,
2487        locals: bool,
2488    ) -> Vec<dap::Variable> {
2489        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2490            return Vec::new();
2491        };
2492
2493        stack_frame
2494            .scopes
2495            .iter()
2496            .filter(|scope| {
2497                (scope.name.to_lowercase().contains("local") && locals)
2498                    || (scope.name.to_lowercase().contains("global") && globals)
2499            })
2500            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2501            .flatten()
2502            .cloned()
2503            .collect()
2504    }
2505
2506    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2507        &self.watchers
2508    }
2509
2510    pub fn add_watcher(
2511        &mut self,
2512        expression: SharedString,
2513        frame_id: u64,
2514        cx: &mut Context<Self>,
2515    ) -> Task<Result<()>> {
2516        let request = self.mode.request_dap(EvaluateCommand {
2517            expression: expression.to_string(),
2518            context: Some(EvaluateArgumentsContext::Watch),
2519            frame_id: Some(frame_id),
2520            source: None,
2521        });
2522
2523        cx.spawn(async move |this, cx| {
2524            let response = request.await?;
2525
2526            this.update(cx, |session, cx| {
2527                session.watchers.insert(
2528                    expression.clone(),
2529                    Watcher {
2530                        expression,
2531                        value: response.result.into(),
2532                        variables_reference: response.variables_reference,
2533                        presentation_hint: response.presentation_hint,
2534                    },
2535                );
2536                cx.emit(SessionEvent::Watchers);
2537            })
2538        })
2539    }
2540
2541    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2542        let watches = self.watchers.clone();
2543        for (_, watch) in watches.into_iter() {
2544            self.add_watcher(watch.expression.clone(), frame_id, cx)
2545                .detach();
2546        }
2547    }
2548
2549    pub fn remove_watcher(&mut self, expression: SharedString) {
2550        self.watchers.remove(&expression);
2551    }
2552
2553    pub fn variables(
2554        &mut self,
2555        variables_reference: VariableReference,
2556        cx: &mut Context<Self>,
2557    ) -> Vec<dap::Variable> {
2558        let command = VariablesCommand {
2559            variables_reference,
2560            filter: None,
2561            start: None,
2562            count: None,
2563            format: None,
2564        };
2565
2566        self.fetch(
2567            command,
2568            move |this, variables, cx| {
2569                let Some(variables) = variables.log_err() else {
2570                    return;
2571                };
2572
2573                this.variables.insert(variables_reference, variables);
2574
2575                cx.emit(SessionEvent::Variables);
2576                cx.emit(SessionEvent::InvalidateInlineValue);
2577            },
2578            cx,
2579        );
2580
2581        self.variables
2582            .get(&variables_reference)
2583            .cloned()
2584            .unwrap_or_default()
2585    }
2586
2587    pub fn data_breakpoint_info(
2588        &mut self,
2589        context: Arc<DataBreakpointContext>,
2590        mode: Option<String>,
2591        cx: &mut Context<Self>,
2592    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2593        let command = DataBreakpointInfoCommand { context, mode };
2594
2595        self.request(command, |_, response, _| response.ok(), cx)
2596    }
2597
2598    pub fn set_variable_value(
2599        &mut self,
2600        stack_frame_id: u64,
2601        variables_reference: u64,
2602        name: String,
2603        value: String,
2604        cx: &mut Context<Self>,
2605    ) {
2606        if self.capabilities.supports_set_variable.unwrap_or_default() {
2607            self.request(
2608                SetVariableValueCommand {
2609                    name,
2610                    value,
2611                    variables_reference,
2612                },
2613                move |this, response, cx| {
2614                    let response = response.log_err()?;
2615                    this.invalidate_command_type::<VariablesCommand>();
2616                    this.invalidate_command_type::<ReadMemory>();
2617                    this.memory.clear(cx.background_executor());
2618                    this.refresh_watchers(stack_frame_id, cx);
2619                    cx.emit(SessionEvent::Variables);
2620                    Some(response)
2621                },
2622                cx,
2623            )
2624            .detach();
2625        }
2626    }
2627
2628    pub fn evaluate(
2629        &mut self,
2630        expression: String,
2631        context: Option<EvaluateArgumentsContext>,
2632        frame_id: Option<u64>,
2633        source: Option<Source>,
2634        cx: &mut Context<Self>,
2635    ) -> Task<()> {
2636        let event = dap::OutputEvent {
2637            category: None,
2638            output: format!("> {expression}"),
2639            group: None,
2640            variables_reference: None,
2641            source: None,
2642            line: None,
2643            column: None,
2644            data: None,
2645            location_reference: None,
2646        };
2647        self.push_output(event);
2648        let request = self.mode.request_dap(EvaluateCommand {
2649            expression,
2650            context,
2651            frame_id,
2652            source,
2653        });
2654        cx.spawn(async move |this, cx| {
2655            let response = request.await;
2656            this.update(cx, |this, cx| {
2657                this.memory.clear(cx.background_executor());
2658                this.invalidate_command_type::<ReadMemory>();
2659                match response {
2660                    Ok(response) => {
2661                        let event = dap::OutputEvent {
2662                            category: None,
2663                            output: format!("< {}", &response.result),
2664                            group: None,
2665                            variables_reference: Some(response.variables_reference),
2666                            source: None,
2667                            line: None,
2668                            column: None,
2669                            data: None,
2670                            location_reference: None,
2671                        };
2672                        this.push_output(event);
2673                    }
2674                    Err(e) => {
2675                        let event = dap::OutputEvent {
2676                            category: None,
2677                            output: format!("{}", e),
2678                            group: None,
2679                            variables_reference: None,
2680                            source: None,
2681                            line: None,
2682                            column: None,
2683                            data: None,
2684                            location_reference: None,
2685                        };
2686                        this.push_output(event);
2687                    }
2688                };
2689                cx.notify();
2690            })
2691            .ok();
2692        })
2693    }
2694
2695    pub fn location(
2696        &mut self,
2697        reference: u64,
2698        cx: &mut Context<Self>,
2699    ) -> Option<dap::LocationsResponse> {
2700        self.fetch(
2701            LocationsCommand { reference },
2702            move |this, response, _| {
2703                let Some(response) = response.log_err() else {
2704                    return;
2705                };
2706                this.locations.insert(reference, response);
2707            },
2708            cx,
2709        );
2710        self.locations.get(&reference).cloned()
2711    }
2712
2713    pub fn is_attached(&self) -> bool {
2714        let SessionState::Running(local_mode) = &self.mode else {
2715            return false;
2716        };
2717        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2718    }
2719
2720    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2721        let command = DisconnectCommand {
2722            restart: Some(false),
2723            terminate_debuggee: Some(false),
2724            suspend_debuggee: Some(false),
2725        };
2726
2727        self.request(command, Self::empty_response, cx).detach()
2728    }
2729
2730    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2731        if self
2732            .capabilities
2733            .supports_terminate_threads_request
2734            .unwrap_or_default()
2735        {
2736            self.request(
2737                TerminateThreadsCommand {
2738                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2739                },
2740                Self::clear_active_debug_line_response,
2741                cx,
2742            )
2743            .detach();
2744        } else {
2745            self.shutdown(cx).detach();
2746        }
2747    }
2748
2749    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2750        self.thread_states.thread_state(thread_id)
2751    }
2752
2753    pub fn quirks(&self) -> SessionQuirks {
2754        self.quirks
2755    }
2756
2757    fn launch_browser_for_remote_server(
2758        &mut self,
2759        mut request: LaunchBrowserInCompanionParams,
2760        cx: &mut Context<Self>,
2761    ) {
2762        let Some(remote_client) = self.remote_client.clone() else {
2763            log::error!("can't launch browser in companion for non-remote project");
2764            return;
2765        };
2766        let Some(http_client) = self.http_client.clone() else {
2767            return;
2768        };
2769        let Some(node_runtime) = self.node_runtime.clone() else {
2770            return;
2771        };
2772
2773        let mut console_output = self.console_output(cx);
2774        let task = cx.spawn(async move |this, cx| {
2775            let forward_ports_process = if remote_client
2776                .read_with(cx, |client, _| client.shares_network_interface())?
2777            {
2778                request.other.insert(
2779                    "proxyUri".into(),
2780                    format!("127.0.0.1:{}", request.server_port).into(),
2781                );
2782                None
2783            } else {
2784                let port = TcpTransport::unused_port(Ipv4Addr::LOCALHOST)
2785                    .await
2786                    .context("getting port for DAP")?;
2787                request
2788                    .other
2789                    .insert("proxyUri".into(), format!("127.0.0.1:{port}").into());
2790                let mut port_forwards = vec![(port, "localhost".to_owned(), request.server_port)];
2791
2792                if let Some(value) = request.params.get("url")
2793                    && let Some(url) = value.as_str()
2794                    && let Some(url) = Url::parse(url).ok()
2795                    && let Some(frontend_port) = url.port()
2796                {
2797                    port_forwards.push((frontend_port, "localhost".to_owned(), frontend_port));
2798                }
2799
2800                let child = remote_client.update(cx, |client, _| {
2801                    let command = client.build_forward_ports_command(port_forwards)?;
2802                    let child = new_smol_command(command.program)
2803                        .args(command.args)
2804                        .envs(command.env)
2805                        .spawn()
2806                        .context("spawning port forwarding process")?;
2807                    anyhow::Ok(child)
2808                })??;
2809                Some(child)
2810            };
2811
2812            let mut companion_process = None;
2813            let companion_port =
2814                if let Some(companion_port) = this.read_with(cx, |this, _| this.companion_port)? {
2815                    companion_port
2816                } else {
2817                    let task = cx.spawn(async move |cx| spawn_companion(node_runtime, cx).await);
2818                    match task.await {
2819                        Ok((port, child)) => {
2820                            companion_process = Some(child);
2821                            port
2822                        }
2823                        Err(e) => {
2824                            console_output
2825                                .send(format!("Failed to launch browser companion process: {e}"))
2826                                .await
2827                                .ok();
2828                            return Err(e);
2829                        }
2830                    }
2831                };
2832
2833            let mut background_tasks = Vec::new();
2834            if let Some(mut forward_ports_process) = forward_ports_process {
2835                background_tasks.push(cx.spawn(async move |_| {
2836                    forward_ports_process.status().await.log_err();
2837                }));
2838            };
2839            if let Some(mut companion_process) = companion_process {
2840                if let Some(stderr) = companion_process.stderr.take() {
2841                    let mut console_output = console_output.clone();
2842                    background_tasks.push(cx.spawn(async move |_| {
2843                        let mut stderr = BufReader::new(stderr);
2844                        let mut line = String::new();
2845                        while let Ok(n) = stderr.read_line(&mut line).await
2846                            && n > 0
2847                        {
2848                            console_output
2849                                .send(format!("companion stderr: {line}"))
2850                                .await
2851                                .ok();
2852                            line.clear();
2853                        }
2854                    }));
2855                }
2856                background_tasks.push(cx.spawn({
2857                    let mut console_output = console_output.clone();
2858                    async move |_| match companion_process.status().await {
2859                        Ok(status) => {
2860                            if status.success() {
2861                                console_output
2862                                    .send("Companion process exited normally".into())
2863                                    .await
2864                                    .ok();
2865                            } else {
2866                                console_output
2867                                    .send(format!(
2868                                        "Companion process exited abnormally with {status:?}"
2869                                    ))
2870                                    .await
2871                                    .ok();
2872                            }
2873                        }
2874                        Err(e) => {
2875                            console_output
2876                                .send(format!("Failed to join companion process: {e}"))
2877                                .await
2878                                .ok();
2879                        }
2880                    }
2881                }));
2882            }
2883
2884            // TODO pass wslInfo as needed
2885
2886            let companion_address = format!("127.0.0.1:{companion_port}");
2887            let mut companion_started = false;
2888            for _ in 0..10 {
2889                if TcpStream::connect(&companion_address).await.is_ok() {
2890                    companion_started = true;
2891                    break;
2892                }
2893                cx.background_executor()
2894                    .timer(Duration::from_millis(100))
2895                    .await;
2896            }
2897            if !companion_started {
2898                console_output
2899                    .send("Browser companion failed to start".into())
2900                    .await
2901                    .ok();
2902                bail!("Browser companion failed to start");
2903            }
2904
2905            let response = http_client
2906                .post_json(
2907                    &format!("http://{companion_address}/launch-and-attach"),
2908                    serde_json::to_string(&request)
2909                        .context("serializing request")?
2910                        .into(),
2911                )
2912                .await;
2913            match response {
2914                Ok(response) => {
2915                    if !response.status().is_success() {
2916                        console_output
2917                            .send("Launch request to companion failed".into())
2918                            .await
2919                            .ok();
2920                        return Err(anyhow!("launch request failed"));
2921                    }
2922                }
2923                Err(e) => {
2924                    console_output
2925                        .send("Failed to read response from companion".into())
2926                        .await
2927                        .ok();
2928                    return Err(e);
2929                }
2930            }
2931
2932            this.update(cx, |this, _| {
2933                this.background_tasks.extend(background_tasks);
2934                this.companion_port = Some(companion_port);
2935            })?;
2936
2937            anyhow::Ok(())
2938        });
2939        self.background_tasks.push(cx.spawn(async move |_, _| {
2940            task.await.log_err();
2941        }));
2942    }
2943
2944    fn kill_browser(&self, request: KillCompanionBrowserParams, cx: &mut App) {
2945        let Some(companion_port) = self.companion_port else {
2946            log::error!("received killCompanionBrowser but js-debug-companion is not running");
2947            return;
2948        };
2949        let Some(http_client) = self.http_client.clone() else {
2950            return;
2951        };
2952
2953        cx.spawn(async move |_| {
2954            http_client
2955                .post_json(
2956                    &format!("http://127.0.0.1:{companion_port}/kill"),
2957                    serde_json::to_string(&request)
2958                        .context("serializing request")?
2959                        .into(),
2960                )
2961                .await?;
2962            anyhow::Ok(())
2963        })
2964        .detach_and_log_err(cx)
2965    }
2966}
2967
2968#[derive(Serialize, Deserialize, Debug)]
2969#[serde(rename_all = "camelCase")]
2970struct LaunchBrowserInCompanionParams {
2971    server_port: u16,
2972    params: HashMap<String, serde_json::Value>,
2973    #[serde(flatten)]
2974    other: HashMap<String, serde_json::Value>,
2975}
2976
2977#[derive(Serialize, Deserialize, Debug)]
2978#[serde(rename_all = "camelCase")]
2979struct KillCompanionBrowserParams {
2980    launch_id: u64,
2981}
2982
2983async fn spawn_companion(
2984    node_runtime: NodeRuntime,
2985    cx: &mut AsyncApp,
2986) -> Result<(u16, smol::process::Child)> {
2987    let binary_path = node_runtime
2988        .binary_path()
2989        .await
2990        .context("getting node path")?;
2991    let path = cx
2992        .spawn(async move |cx| get_or_install_companion(node_runtime, cx).await)
2993        .await?;
2994    log::info!("will launch js-debug-companion version {path:?}");
2995
2996    let port = {
2997        let listener = TcpListener::bind("127.0.0.1:0")
2998            .await
2999            .context("getting port for companion")?;
3000        listener.local_addr()?.port()
3001    };
3002
3003    let dir = paths::data_dir()
3004        .join("js_debug_companion_state")
3005        .to_string_lossy()
3006        .to_string();
3007
3008    let child = new_smol_command(binary_path)
3009        .arg(path)
3010        .args([
3011            format!("--listen=127.0.0.1:{port}"),
3012            format!("--state={dir}"),
3013        ])
3014        .stdin(Stdio::piped())
3015        .stdout(Stdio::piped())
3016        .stderr(Stdio::piped())
3017        .spawn()
3018        .context("spawning companion child process")?;
3019
3020    Ok((port, child))
3021}
3022
3023async fn get_or_install_companion(node: NodeRuntime, cx: &mut AsyncApp) -> Result<PathBuf> {
3024    const PACKAGE_NAME: &str = "@zed-industries/js-debug-companion-cli";
3025
3026    async fn install_latest_version(dir: PathBuf, node: NodeRuntime) -> Result<PathBuf> {
3027        let temp_dir = tempfile::tempdir().context("creating temporary directory")?;
3028        node.npm_install_packages(temp_dir.path(), &[(PACKAGE_NAME, "latest")])
3029            .await
3030            .context("installing latest companion package")?;
3031        let version = node
3032            .npm_package_installed_version(temp_dir.path(), PACKAGE_NAME)
3033            .await
3034            .context("getting installed companion version")?
3035            .context("companion was not installed")?;
3036        smol::fs::rename(temp_dir.path(), dir.join(&version))
3037            .await
3038            .context("moving companion package into place")?;
3039        Ok(dir.join(version))
3040    }
3041
3042    let dir = paths::debug_adapters_dir().join("js-debug-companion");
3043    let (latest_installed_version, latest_version) = cx
3044        .background_spawn({
3045            let dir = dir.clone();
3046            let node = node.clone();
3047            async move {
3048                smol::fs::create_dir_all(&dir)
3049                    .await
3050                    .context("creating companion installation directory")?;
3051
3052                let mut children = smol::fs::read_dir(&dir)
3053                    .await
3054                    .context("reading companion installation directory")?
3055                    .try_collect::<Vec<_>>()
3056                    .await
3057                    .context("reading companion installation directory entries")?;
3058                children
3059                    .sort_by_key(|child| semver::Version::parse(child.file_name().to_str()?).ok());
3060
3061                let latest_installed_version = children.last().and_then(|child| {
3062                    let version = child.file_name().into_string().ok()?;
3063                    Some((child.path(), version))
3064                });
3065                let latest_version = node
3066                    .npm_package_latest_version(PACKAGE_NAME)
3067                    .await
3068                    .log_err();
3069                anyhow::Ok((latest_installed_version, latest_version))
3070            }
3071        })
3072        .await?;
3073
3074    let path = if let Some((installed_path, installed_version)) = latest_installed_version {
3075        if let Some(latest_version) = latest_version
3076            && latest_version != installed_version
3077        {
3078            cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3079                .detach();
3080        }
3081        Ok(installed_path)
3082    } else {
3083        cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3084            .await
3085    };
3086
3087    Ok(path?
3088        .join("node_modules")
3089        .join(PACKAGE_NAME)
3090        .join("out")
3091        .join("cli.js"))
3092}