session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
   9    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  10    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use crate::debugger::breakpoint_store::BreakpointSessionState;
  14use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
  15use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
  16use anyhow::{Context as _, Result, anyhow, bail};
  17use base64::Engine;
  18use collections::{HashMap, HashSet, IndexMap};
  19use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  20use dap::messages::Response;
  21use dap::requests::{Request, RunInTerminal, StartDebugging};
  22use dap::transport::TcpTransport;
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::channel::mpsc::UnboundedSender;
  35use futures::channel::{mpsc, oneshot};
  36use futures::io::BufReader;
  37use futures::{AsyncBufReadExt as _, SinkExt, StreamExt, TryStreamExt};
  38use futures::{FutureExt, future::Shared};
  39use gpui::{
  40    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  41    Task, WeakEntity,
  42};
  43use http_client::HttpClient;
  44use node_runtime::NodeRuntime;
  45use remote::RemoteClient;
  46use serde::{Deserialize, Serialize};
  47use serde_json::Value;
  48use smol::net::{TcpListener, TcpStream};
  49use std::any::TypeId;
  50use std::collections::{BTreeMap, VecDeque};
  51use std::net::Ipv4Addr;
  52use std::ops::RangeInclusive;
  53use std::path::PathBuf;
  54use std::process::Stdio;
  55use std::time::Duration;
  56use std::u64;
  57use std::{
  58    any::Any,
  59    collections::hash_map::Entry,
  60    hash::{Hash, Hasher},
  61    path::Path,
  62    sync::Arc,
  63};
  64use task::TaskContext;
  65use text::{PointUtf16, ToPointUtf16};
  66use url::Url;
  67use util::command::new_smol_command;
  68use util::{ResultExt, debug_panic, maybe};
  69use worktree::Worktree;
  70
  71const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
  72const DEBUG_HISTORY_LIMIT: usize = 10;
  73
  74#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  75#[repr(transparent)]
  76pub struct ThreadId(pub i64);
  77
  78impl From<i64> for ThreadId {
  79    fn from(id: i64) -> Self {
  80        Self(id)
  81    }
  82}
  83
  84#[derive(Clone, Debug)]
  85pub struct StackFrame {
  86    pub dap: dap::StackFrame,
  87    pub scopes: Vec<dap::Scope>,
  88}
  89
  90impl From<dap::StackFrame> for StackFrame {
  91    fn from(stack_frame: dap::StackFrame) -> Self {
  92        Self {
  93            scopes: vec![],
  94            dap: stack_frame,
  95        }
  96    }
  97}
  98
  99#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
 100pub enum ThreadStatus {
 101    #[default]
 102    Running,
 103    Stopped,
 104    Stepping,
 105    Exited,
 106    Ended,
 107}
 108
 109impl ThreadStatus {
 110    pub fn label(&self) -> &'static str {
 111        match self {
 112            ThreadStatus::Running => "Running",
 113            ThreadStatus::Stopped => "Stopped",
 114            ThreadStatus::Stepping => "Stepping",
 115            ThreadStatus::Exited => "Exited",
 116            ThreadStatus::Ended => "Ended",
 117        }
 118    }
 119}
 120
 121#[derive(Debug, Clone)]
 122pub struct Thread {
 123    dap: dap::Thread,
 124    stack_frames: Vec<StackFrame>,
 125    stack_frames_error: Option<SharedString>,
 126    _has_stopped: bool,
 127}
 128
 129impl From<dap::Thread> for Thread {
 130    fn from(dap: dap::Thread) -> Self {
 131        Self {
 132            dap,
 133            stack_frames: Default::default(),
 134            stack_frames_error: None,
 135            _has_stopped: false,
 136        }
 137    }
 138}
 139
 140#[derive(Debug, Clone, PartialEq)]
 141pub struct Watcher {
 142    pub expression: SharedString,
 143    pub value: SharedString,
 144    pub variables_reference: u64,
 145    pub presentation_hint: Option<VariablePresentationHint>,
 146}
 147
 148#[derive(Debug, Clone, PartialEq)]
 149pub struct DataBreakpointState {
 150    pub dap: dap::DataBreakpoint,
 151    pub is_enabled: bool,
 152    pub context: Arc<DataBreakpointContext>,
 153}
 154
 155pub enum SessionState {
 156    /// Represents a session that is building/initializing
 157    /// even if a session doesn't have a pre build task this state
 158    /// is used to run all the async tasks that are required to start the session
 159    Booting(Option<Task<Result<()>>>),
 160    Running(RunningMode),
 161}
 162
 163#[derive(Clone)]
 164pub struct RunningMode {
 165    client: Arc<DebugAdapterClient>,
 166    binary: DebugAdapterBinary,
 167    tmp_breakpoint: Option<SourceBreakpoint>,
 168    worktree: WeakEntity<Worktree>,
 169    executor: BackgroundExecutor,
 170    is_started: bool,
 171    has_ever_stopped: bool,
 172    messages_tx: UnboundedSender<Message>,
 173}
 174
 175#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 176pub struct SessionQuirks {
 177    pub compact: bool,
 178    pub prefer_thread_name: bool,
 179}
 180
 181fn client_source(abs_path: &Path) -> dap::Source {
 182    dap::Source {
 183        name: abs_path
 184            .file_name()
 185            .map(|filename| filename.to_string_lossy().into_owned()),
 186        path: Some(abs_path.to_string_lossy().into_owned()),
 187        source_reference: None,
 188        presentation_hint: None,
 189        origin: None,
 190        sources: None,
 191        adapter_data: None,
 192        checksums: None,
 193    }
 194}
 195
 196impl RunningMode {
 197    async fn new(
 198        session_id: SessionId,
 199        parent_session: Option<Entity<Session>>,
 200        worktree: WeakEntity<Worktree>,
 201        binary: DebugAdapterBinary,
 202        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 203        cx: &mut AsyncApp,
 204    ) -> Result<Self> {
 205        let message_handler = Box::new({
 206            let messages_tx = messages_tx.clone();
 207            move |message| {
 208                messages_tx.unbounded_send(message).ok();
 209            }
 210        });
 211
 212        let client = if let Some(client) = parent_session
 213            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 214            .flatten()
 215        {
 216            client
 217                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 218                .await?
 219        } else {
 220            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 221        };
 222
 223        Ok(Self {
 224            client: Arc::new(client),
 225            worktree,
 226            tmp_breakpoint: None,
 227            binary,
 228            executor: cx.background_executor().clone(),
 229            is_started: false,
 230            has_ever_stopped: false,
 231            messages_tx,
 232        })
 233    }
 234
 235    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 236        &self.worktree
 237    }
 238
 239    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 240        let tasks: Vec<_> = paths
 241            .iter()
 242            .map(|path| {
 243                self.request(dap_command::SetBreakpoints {
 244                    source: client_source(path),
 245                    source_modified: None,
 246                    breakpoints: vec![],
 247                })
 248            })
 249            .collect();
 250
 251        cx.background_spawn(async move {
 252            futures::future::join_all(tasks)
 253                .await
 254                .iter()
 255                .for_each(|res| match res {
 256                    Ok(_) => {}
 257                    Err(err) => {
 258                        log::warn!("Set breakpoints request failed: {}", err);
 259                    }
 260                });
 261        })
 262    }
 263
 264    fn send_breakpoints_from_path(
 265        &self,
 266        abs_path: Arc<Path>,
 267        reason: BreakpointUpdatedReason,
 268        breakpoint_store: &Entity<BreakpointStore>,
 269        cx: &mut App,
 270    ) -> Task<()> {
 271        let breakpoints =
 272            breakpoint_store
 273                .read(cx)
 274                .source_breakpoints_from_path(&abs_path, cx)
 275                .into_iter()
 276                .filter(|bp| bp.state.is_enabled())
 277                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 278                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 279                }))
 280                .map(Into::into)
 281                .collect();
 282
 283        let raw_breakpoints = breakpoint_store
 284            .read(cx)
 285            .breakpoints_from_path(&abs_path)
 286            .into_iter()
 287            .filter(|bp| bp.bp.state.is_enabled())
 288            .collect::<Vec<_>>();
 289
 290        let task = self.request(dap_command::SetBreakpoints {
 291            source: client_source(&abs_path),
 292            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 293            breakpoints,
 294        });
 295        let session_id = self.client.id();
 296        let breakpoint_store = breakpoint_store.downgrade();
 297        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 298            Ok(breakpoints) => {
 299                let breakpoints =
 300                    breakpoints
 301                        .into_iter()
 302                        .zip(raw_breakpoints)
 303                        .filter_map(|(dap_bp, zed_bp)| {
 304                            Some((
 305                                zed_bp,
 306                                BreakpointSessionState {
 307                                    id: dap_bp.id?,
 308                                    verified: dap_bp.verified,
 309                                },
 310                            ))
 311                        });
 312                breakpoint_store
 313                    .update(cx, |this, _| {
 314                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 315                    })
 316                    .ok();
 317            }
 318            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 319        })
 320    }
 321
 322    fn send_exception_breakpoints(
 323        &self,
 324        filters: Vec<ExceptionBreakpointsFilter>,
 325        supports_filter_options: bool,
 326    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 327        let arg = if supports_filter_options {
 328            SetExceptionBreakpoints::WithOptions {
 329                filters: filters
 330                    .into_iter()
 331                    .map(|filter| ExceptionFilterOptions {
 332                        filter_id: filter.filter,
 333                        condition: None,
 334                        mode: None,
 335                    })
 336                    .collect(),
 337            }
 338        } else {
 339            SetExceptionBreakpoints::Plain {
 340                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 341            }
 342        };
 343        self.request(arg)
 344    }
 345
 346    fn send_source_breakpoints(
 347        &self,
 348        ignore_breakpoints: bool,
 349        breakpoint_store: &Entity<BreakpointStore>,
 350        cx: &App,
 351    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 352        let mut breakpoint_tasks = Vec::new();
 353        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 354        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 355        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 356        let session_id = self.client.id();
 357        for (path, breakpoints) in breakpoints {
 358            let breakpoints = if ignore_breakpoints {
 359                vec![]
 360            } else {
 361                breakpoints
 362                    .into_iter()
 363                    .filter(|bp| bp.state.is_enabled())
 364                    .map(Into::into)
 365                    .collect()
 366            };
 367
 368            let raw_breakpoints = raw_breakpoints
 369                .remove(&path)
 370                .unwrap_or_default()
 371                .into_iter()
 372                .filter(|bp| bp.bp.state.is_enabled());
 373            let error_path = path.clone();
 374            let send_request = self
 375                .request(dap_command::SetBreakpoints {
 376                    source: client_source(&path),
 377                    source_modified: Some(false),
 378                    breakpoints,
 379                })
 380                .map(|result| result.map_err(move |e| (error_path, e)));
 381
 382            let task = cx.spawn({
 383                let breakpoint_store = breakpoint_store.downgrade();
 384                async move |cx| {
 385                    let breakpoints = cx.background_spawn(send_request).await?;
 386
 387                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 388                        |(dap_bp, zed_bp)| {
 389                            Some((
 390                                zed_bp,
 391                                BreakpointSessionState {
 392                                    id: dap_bp.id?,
 393                                    verified: dap_bp.verified,
 394                                },
 395                            ))
 396                        },
 397                    );
 398                    breakpoint_store
 399                        .update(cx, |this, _| {
 400                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 401                        })
 402                        .ok();
 403
 404                    Ok(())
 405                }
 406            });
 407            breakpoint_tasks.push(task);
 408        }
 409
 410        cx.background_spawn(async move {
 411            futures::future::join_all(breakpoint_tasks)
 412                .await
 413                .into_iter()
 414                .filter_map(Result::err)
 415                .collect::<HashMap<_, _>>()
 416        })
 417    }
 418
 419    fn initialize_sequence(
 420        &self,
 421        capabilities: &Capabilities,
 422        initialized_rx: oneshot::Receiver<()>,
 423        dap_store: WeakEntity<DapStore>,
 424        cx: &mut Context<Session>,
 425    ) -> Task<Result<()>> {
 426        let raw = self.binary.request_args.clone();
 427
 428        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 429        let launch = match raw.request {
 430            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 431                raw: raw.configuration,
 432            }),
 433            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 434                raw: raw.configuration,
 435            }),
 436        };
 437
 438        // From spec (on initialization sequence):
 439        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 440        //
 441        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 442        let should_send_exception_breakpoints = capabilities
 443            .exception_breakpoint_filters
 444            .as_ref()
 445            .is_some_and(|filters| !filters.is_empty());
 446        let supports_exception_filters = capabilities
 447            .supports_exception_filter_options
 448            .unwrap_or_default();
 449        let this = self.clone();
 450        let worktree = self.worktree().clone();
 451        let mut filters = capabilities
 452            .exception_breakpoint_filters
 453            .clone()
 454            .unwrap_or_default();
 455        let capabilities = capabilities.clone();
 456        let configuration_sequence = cx.spawn({
 457            async move |session, cx| {
 458                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 459                let configuration_done_supported = if adapter_name.as_ref() == "GDB" {
 460                    // Until GDB 17 is officially out and in circulation, as a workaround to misconfigured handling of DAP in GDB,
 461                    // we do not use ConfigurationDone for communications.
 462                    // See https://github.com/zed-industries/zed/issues/41753
 463                    false
 464                } else {
 465                    ConfigurationDone::is_supported(&capabilities)
 466                };
 467                let (breakpoint_store, adapter_defaults) =
 468                    dap_store.read_with(cx, |dap_store, _| {
 469                        (
 470                            dap_store.breakpoint_store().clone(),
 471                            dap_store.adapter_options(&adapter_name),
 472                        )
 473                    })?;
 474                initialized_rx.await?;
 475                let errors_by_path = cx
 476                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 477                    .await;
 478
 479                dap_store.update(cx, |_, cx| {
 480                    let Some(worktree) = worktree.upgrade() else {
 481                        return;
 482                    };
 483
 484                    for (path, error) in &errors_by_path {
 485                        log::error!("failed to set breakpoints for {path:?}: {error}");
 486                    }
 487
 488                    if let Some(failed_path) = errors_by_path.keys().next() {
 489                        let failed_path = failed_path
 490                            .strip_prefix(worktree.read(cx).abs_path())
 491                            .unwrap_or(failed_path)
 492                            .display();
 493                        let message = format!(
 494                            "Failed to set breakpoints for {failed_path}{}",
 495                            match errors_by_path.len() {
 496                                0 => unreachable!(),
 497                                1 => "".into(),
 498                                2 => " and 1 other path".into(),
 499                                n => format!(" and {} other paths", n - 1),
 500                            }
 501                        );
 502                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 503                    }
 504                })?;
 505
 506                if should_send_exception_breakpoints || !configuration_done_supported {
 507                    _ = session.update(cx, |this, _| {
 508                        filters.retain(|filter| {
 509                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 510                                defaults
 511                                    .exception_breakpoints
 512                                    .get(&filter.filter)
 513                                    .map(|options| options.enabled)
 514                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 515                            } else {
 516                                filter.default.unwrap_or_default()
 517                            };
 518                            this.exception_breakpoints
 519                                .entry(filter.filter.clone())
 520                                .or_insert_with(|| (filter.clone(), is_enabled));
 521                            is_enabled
 522                        });
 523                    });
 524
 525                    this.send_exception_breakpoints(filters, supports_exception_filters)
 526                        .await
 527                        .ok();
 528                }
 529
 530                if configuration_done_supported {
 531                    this.request(ConfigurationDone {})
 532                } else {
 533                    Task::ready(Ok(()))
 534                }
 535                .await
 536            }
 537        });
 538
 539        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 540
 541        cx.spawn(async move |this, cx| {
 542            let result = task.await;
 543
 544            this.update(cx, |this, cx| {
 545                if let Some(this) = this.as_running_mut() {
 546                    this.is_started = true;
 547                    cx.notify();
 548                }
 549            })
 550            .ok();
 551
 552            result?;
 553            anyhow::Ok(())
 554        })
 555    }
 556
 557    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 558        let client = self.client.clone();
 559        let messages_tx = self.messages_tx.clone();
 560        let message_handler = Box::new(move |message| {
 561            messages_tx.unbounded_send(message).ok();
 562        });
 563        if client.should_reconnect_for_ssh() {
 564            Some(cx.spawn(async move |cx| {
 565                client.connect(message_handler, cx).await?;
 566                anyhow::Ok(())
 567            }))
 568        } else {
 569            None
 570        }
 571    }
 572
 573    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 574    where
 575        <R::DapRequest as dap::requests::Request>::Response: 'static,
 576        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 577    {
 578        let request = Arc::new(request);
 579
 580        let request_clone = request.clone();
 581        let connection = self.client.clone();
 582        self.executor.spawn(async move {
 583            let args = request_clone.to_dap();
 584            let response = connection.request::<R::DapRequest>(args).await?;
 585            request.response_from_dap(response)
 586        })
 587    }
 588}
 589
 590impl SessionState {
 591    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 592    where
 593        <R::DapRequest as dap::requests::Request>::Response: 'static,
 594        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 595    {
 596        match self {
 597            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 598            SessionState::Booting(_) => Task::ready(Err(anyhow!(
 599                "no adapter running to send request: {request:?}"
 600            ))),
 601        }
 602    }
 603
 604    /// Did this debug session stop at least once?
 605    pub(crate) fn has_ever_stopped(&self) -> bool {
 606        match self {
 607            SessionState::Booting(_) => false,
 608            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 609        }
 610    }
 611
 612    fn stopped(&mut self) {
 613        if let SessionState::Running(running) = self {
 614            running.has_ever_stopped = true;
 615        }
 616    }
 617}
 618
 619#[derive(Default)]
 620struct ThreadStates {
 621    global_state: Option<ThreadStatus>,
 622    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 623}
 624
 625impl ThreadStates {
 626    fn stop_all_threads(&mut self) {
 627        self.global_state = Some(ThreadStatus::Stopped);
 628        self.known_thread_states.clear();
 629    }
 630
 631    fn exit_all_threads(&mut self) {
 632        self.global_state = Some(ThreadStatus::Exited);
 633        self.known_thread_states.clear();
 634    }
 635
 636    fn continue_all_threads(&mut self) {
 637        self.global_state = Some(ThreadStatus::Running);
 638        self.known_thread_states.clear();
 639    }
 640
 641    fn stop_thread(&mut self, thread_id: ThreadId) {
 642        self.known_thread_states
 643            .insert(thread_id, ThreadStatus::Stopped);
 644    }
 645
 646    fn continue_thread(&mut self, thread_id: ThreadId) {
 647        self.known_thread_states
 648            .insert(thread_id, ThreadStatus::Running);
 649    }
 650
 651    fn process_step(&mut self, thread_id: ThreadId) {
 652        self.known_thread_states
 653            .insert(thread_id, ThreadStatus::Stepping);
 654    }
 655
 656    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 657        self.thread_state(thread_id)
 658            .unwrap_or(ThreadStatus::Running)
 659    }
 660
 661    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 662        self.known_thread_states
 663            .get(&thread_id)
 664            .copied()
 665            .or(self.global_state)
 666    }
 667
 668    fn exit_thread(&mut self, thread_id: ThreadId) {
 669        self.known_thread_states
 670            .insert(thread_id, ThreadStatus::Exited);
 671    }
 672
 673    fn any_stopped_thread(&self) -> bool {
 674        self.global_state
 675            .is_some_and(|state| state == ThreadStatus::Stopped)
 676            || self
 677                .known_thread_states
 678                .values()
 679                .any(|status| *status == ThreadStatus::Stopped)
 680    }
 681}
 682
 683// TODO(debugger): Wrap dap types with reference counting so the UI doesn't have to clone them on refresh
 684#[derive(Default)]
 685pub struct SessionSnapshot {
 686    threads: IndexMap<ThreadId, Thread>,
 687    thread_states: ThreadStates,
 688    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 689    stack_frames: IndexMap<StackFrameId, StackFrame>,
 690    locations: HashMap<u64, dap::LocationsResponse>,
 691    modules: Vec<dap::Module>,
 692    loaded_sources: Vec<dap::Source>,
 693}
 694
 695type IsEnabled = bool;
 696
 697#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 698pub struct OutputToken(pub usize);
 699/// Represents a current state of a single debug adapter and provides ways to mutate it.
 700pub struct Session {
 701    pub state: SessionState,
 702    active_snapshot: SessionSnapshot,
 703    snapshots: VecDeque<SessionSnapshot>,
 704    selected_snapshot_index: Option<usize>,
 705    id: SessionId,
 706    label: Option<SharedString>,
 707    adapter: DebugAdapterName,
 708    pub(super) capabilities: Capabilities,
 709    child_session_ids: HashSet<SessionId>,
 710    parent_session: Option<Entity<Session>>,
 711    output_token: OutputToken,
 712    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 713    watchers: HashMap<SharedString, Watcher>,
 714    is_session_terminated: bool,
 715    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 716    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 717    ignore_breakpoints: bool,
 718    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 719    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 720    background_tasks: Vec<Task<()>>,
 721    restart_task: Option<Task<()>>,
 722    task_context: TaskContext,
 723    memory: memory::Memory,
 724    quirks: SessionQuirks,
 725    remote_client: Option<Entity<RemoteClient>>,
 726    node_runtime: Option<NodeRuntime>,
 727    http_client: Option<Arc<dyn HttpClient>>,
 728    companion_port: Option<u16>,
 729}
 730
 731trait CacheableCommand: Any + Send + Sync {
 732    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 733    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 734    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 735}
 736
 737impl<T> CacheableCommand for T
 738where
 739    T: LocalDapCommand + PartialEq + Eq + Hash,
 740{
 741    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 742        (rhs as &dyn Any).downcast_ref::<Self>() == Some(self)
 743    }
 744
 745    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 746        T::hash(self, &mut hasher);
 747    }
 748
 749    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 750        self
 751    }
 752}
 753
 754pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 755
 756impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 757    fn from(request: T) -> Self {
 758        Self(Arc::new(request))
 759    }
 760}
 761
 762impl PartialEq for RequestSlot {
 763    fn eq(&self, other: &Self) -> bool {
 764        self.0.dyn_eq(other.0.as_ref())
 765    }
 766}
 767
 768impl Eq for RequestSlot {}
 769
 770impl Hash for RequestSlot {
 771    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 772        self.0.dyn_hash(state);
 773        (&*self.0 as &dyn Any).type_id().hash(state)
 774    }
 775}
 776
 777#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 778pub struct CompletionsQuery {
 779    pub query: String,
 780    pub column: u64,
 781    pub line: Option<u64>,
 782    pub frame_id: Option<u64>,
 783}
 784
 785impl CompletionsQuery {
 786    pub fn new(
 787        buffer: &language::Buffer,
 788        cursor_position: language::Anchor,
 789        frame_id: Option<u64>,
 790    ) -> Self {
 791        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 792        Self {
 793            query: buffer.text(),
 794            column: column as u64,
 795            frame_id,
 796            line: Some(row as u64),
 797        }
 798    }
 799}
 800
 801#[derive(Debug)]
 802pub enum SessionEvent {
 803    Modules,
 804    LoadedSources,
 805    Stopped(Option<ThreadId>),
 806    StackTrace,
 807    Variables,
 808    Watchers,
 809    Threads,
 810    InvalidateInlineValue,
 811    CapabilitiesLoaded,
 812    RunInTerminal {
 813        request: RunInTerminalRequestArguments,
 814        sender: mpsc::Sender<Result<u32>>,
 815    },
 816    DataBreakpointInfo,
 817    ConsoleOutput,
 818    HistoricSnapshotSelected,
 819}
 820
 821#[derive(Clone, Debug, PartialEq, Eq)]
 822pub enum SessionStateEvent {
 823    Running,
 824    Shutdown,
 825    Restart,
 826    SpawnChildSession {
 827        request: StartDebuggingRequestArguments,
 828    },
 829}
 830
 831impl EventEmitter<SessionEvent> for Session {}
 832impl EventEmitter<SessionStateEvent> for Session {}
 833
 834// local session will send breakpoint updates to DAP for all new breakpoints
 835// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 836// BreakpointStore notifies session on breakpoint changes
 837impl Session {
 838    pub(crate) fn new(
 839        breakpoint_store: Entity<BreakpointStore>,
 840        session_id: SessionId,
 841        parent_session: Option<Entity<Session>>,
 842        label: Option<SharedString>,
 843        adapter: DebugAdapterName,
 844        task_context: TaskContext,
 845        quirks: SessionQuirks,
 846        remote_client: Option<Entity<RemoteClient>>,
 847        node_runtime: Option<NodeRuntime>,
 848        http_client: Option<Arc<dyn HttpClient>>,
 849        cx: &mut App,
 850    ) -> Entity<Self> {
 851        cx.new::<Self>(|cx| {
 852            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 853                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 854                    if let Some(local) = (!this.ignore_breakpoints)
 855                        .then(|| this.as_running_mut())
 856                        .flatten()
 857                    {
 858                        local
 859                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 860                            .detach();
 861                    };
 862                }
 863                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 864                    if let Some(local) = (!this.ignore_breakpoints)
 865                        .then(|| this.as_running_mut())
 866                        .flatten()
 867                    {
 868                        local.unset_breakpoints_from_paths(paths, cx).detach();
 869                    }
 870                }
 871                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 872            })
 873            .detach();
 874
 875            Self {
 876                state: SessionState::Booting(None),
 877                snapshots: VecDeque::with_capacity(DEBUG_HISTORY_LIMIT),
 878                selected_snapshot_index: None,
 879                active_snapshot: Default::default(),
 880                id: session_id,
 881                child_session_ids: HashSet::default(),
 882                parent_session,
 883                capabilities: Capabilities::default(),
 884                watchers: HashMap::default(),
 885                output_token: OutputToken(0),
 886                output: circular_buffer::CircularBuffer::boxed(),
 887                requests: HashMap::default(),
 888                background_tasks: Vec::default(),
 889                restart_task: None,
 890                is_session_terminated: false,
 891                ignore_breakpoints: false,
 892                breakpoint_store,
 893                data_breakpoints: Default::default(),
 894                exception_breakpoints: Default::default(),
 895                label,
 896                adapter,
 897                task_context,
 898                memory: memory::Memory::new(),
 899                quirks,
 900                remote_client,
 901                node_runtime,
 902                http_client,
 903                companion_port: None,
 904            }
 905        })
 906    }
 907
 908    pub fn task_context(&self) -> &TaskContext {
 909        &self.task_context
 910    }
 911
 912    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 913        match &self.state {
 914            SessionState::Booting(_) => None,
 915            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 916        }
 917    }
 918
 919    pub fn boot(
 920        &mut self,
 921        binary: DebugAdapterBinary,
 922        worktree: Entity<Worktree>,
 923        dap_store: WeakEntity<DapStore>,
 924        cx: &mut Context<Self>,
 925    ) -> Task<Result<()>> {
 926        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 927        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 928
 929        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 930            let mut initialized_tx = Some(initialized_tx);
 931            while let Some(message) = message_rx.next().await {
 932                if let Message::Event(event) = message {
 933                    if let Events::Initialized(_) = *event {
 934                        if let Some(tx) = initialized_tx.take() {
 935                            tx.send(()).ok();
 936                        }
 937                    } else {
 938                        let Ok(_) = this.update(cx, |session, cx| {
 939                            session.handle_dap_event(event, cx);
 940                        }) else {
 941                            break;
 942                        };
 943                    }
 944                } else if let Message::Request(request) = message {
 945                    let Ok(_) = this.update(cx, |this, cx| {
 946                        if request.command == StartDebugging::COMMAND {
 947                            this.handle_start_debugging_request(request, cx)
 948                                .detach_and_log_err(cx);
 949                        } else if request.command == RunInTerminal::COMMAND {
 950                            this.handle_run_in_terminal_request(request, cx)
 951                                .detach_and_log_err(cx);
 952                        }
 953                    }) else {
 954                        break;
 955                    };
 956                }
 957            }
 958        })];
 959        self.background_tasks = background_tasks;
 960        let id = self.id;
 961        let parent_session = self.parent_session.clone();
 962
 963        cx.spawn(async move |this, cx| {
 964            let mode = RunningMode::new(
 965                id,
 966                parent_session,
 967                worktree.downgrade(),
 968                binary.clone(),
 969                message_tx,
 970                cx,
 971            )
 972            .await?;
 973            this.update(cx, |this, cx| {
 974                match &mut this.state {
 975                    SessionState::Booting(task) if task.is_some() => {
 976                        task.take().unwrap().detach_and_log_err(cx);
 977                    }
 978                    SessionState::Booting(_) => {}
 979                    SessionState::Running(_) => {
 980                        debug_panic!("Attempting to boot a session that is already running");
 981                    }
 982                };
 983                this.state = SessionState::Running(mode);
 984                cx.emit(SessionStateEvent::Running);
 985            })?;
 986
 987            this.update(cx, |session, cx| session.request_initialize(cx))?
 988                .await?;
 989
 990            let result = this
 991                .update(cx, |session, cx| {
 992                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 993                })?
 994                .await;
 995
 996            if result.is_err() {
 997                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 998
 999                console
1000                    .send(format!(
1001                        "Tried to launch debugger with: {}",
1002                        serde_json::to_string_pretty(&binary.request_args.configuration)
1003                            .unwrap_or_default(),
1004                    ))
1005                    .await
1006                    .ok();
1007            }
1008
1009            result
1010        })
1011    }
1012
1013    pub fn session_id(&self) -> SessionId {
1014        self.id
1015    }
1016
1017    pub fn child_session_ids(&self) -> HashSet<SessionId> {
1018        self.child_session_ids.clone()
1019    }
1020
1021    pub fn add_child_session_id(&mut self, session_id: SessionId) {
1022        self.child_session_ids.insert(session_id);
1023    }
1024
1025    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1026        self.child_session_ids.remove(&session_id);
1027    }
1028
1029    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1030        self.parent_session
1031            .as_ref()
1032            .map(|session| session.read(cx).id)
1033    }
1034
1035    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1036        self.parent_session.as_ref()
1037    }
1038
1039    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1040        let Some(client) = self.adapter_client() else {
1041            return Task::ready(());
1042        };
1043
1044        let supports_terminate = self
1045            .capabilities
1046            .support_terminate_debuggee
1047            .unwrap_or(false);
1048
1049        cx.background_spawn(async move {
1050            if supports_terminate {
1051                client
1052                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1053                        restart: Some(false),
1054                    })
1055                    .await
1056                    .ok();
1057            } else {
1058                client
1059                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1060                        restart: Some(false),
1061                        terminate_debuggee: Some(true),
1062                        suspend_debuggee: Some(false),
1063                    })
1064                    .await
1065                    .ok();
1066            }
1067        })
1068    }
1069
1070    pub fn capabilities(&self) -> &Capabilities {
1071        &self.capabilities
1072    }
1073
1074    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1075        match &self.state {
1076            SessionState::Booting(_) => None,
1077            SessionState::Running(running_mode) => Some(&running_mode.binary),
1078        }
1079    }
1080
1081    pub fn adapter(&self) -> DebugAdapterName {
1082        self.adapter.clone()
1083    }
1084
1085    pub fn label(&self) -> Option<SharedString> {
1086        self.label.clone()
1087    }
1088
1089    pub fn is_terminated(&self) -> bool {
1090        self.is_session_terminated
1091    }
1092
1093    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1094        let (tx, mut rx) = mpsc::unbounded();
1095
1096        cx.spawn(async move |this, cx| {
1097            while let Some(output) = rx.next().await {
1098                this.update(cx, |this, _| {
1099                    let event = dap::OutputEvent {
1100                        category: None,
1101                        output,
1102                        group: None,
1103                        variables_reference: None,
1104                        source: None,
1105                        line: None,
1106                        column: None,
1107                        data: None,
1108                        location_reference: None,
1109                    };
1110                    this.push_output(event);
1111                })?;
1112            }
1113            anyhow::Ok(())
1114        })
1115        .detach();
1116
1117        tx
1118    }
1119
1120    pub fn is_started(&self) -> bool {
1121        match &self.state {
1122            SessionState::Booting(_) => false,
1123            SessionState::Running(running) => running.is_started,
1124        }
1125    }
1126
1127    pub fn is_building(&self) -> bool {
1128        matches!(self.state, SessionState::Booting(_))
1129    }
1130
1131    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1132        match &mut self.state {
1133            SessionState::Running(local_mode) => Some(local_mode),
1134            SessionState::Booting(_) => None,
1135        }
1136    }
1137
1138    pub fn as_running(&self) -> Option<&RunningMode> {
1139        match &self.state {
1140            SessionState::Running(local_mode) => Some(local_mode),
1141            SessionState::Booting(_) => None,
1142        }
1143    }
1144
1145    fn handle_start_debugging_request(
1146        &mut self,
1147        request: dap::messages::Request,
1148        cx: &mut Context<Self>,
1149    ) -> Task<Result<()>> {
1150        let request_seq = request.seq;
1151
1152        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1153            .arguments
1154            .as_ref()
1155            .map(|value| serde_json::from_value(value.clone()));
1156
1157        let mut success = true;
1158        if let Some(Ok(request)) = launch_request {
1159            cx.emit(SessionStateEvent::SpawnChildSession { request });
1160        } else {
1161            log::error!(
1162                "Failed to parse launch request arguments: {:?}",
1163                request.arguments
1164            );
1165            success = false;
1166        }
1167
1168        cx.spawn(async move |this, cx| {
1169            this.update(cx, |this, cx| {
1170                this.respond_to_client(
1171                    request_seq,
1172                    success,
1173                    StartDebugging::COMMAND.to_string(),
1174                    None,
1175                    cx,
1176                )
1177            })?
1178            .await
1179        })
1180    }
1181
1182    fn handle_run_in_terminal_request(
1183        &mut self,
1184        request: dap::messages::Request,
1185        cx: &mut Context<Self>,
1186    ) -> Task<Result<()>> {
1187        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1188            request.arguments.unwrap_or_default(),
1189        ) {
1190            Ok(args) => args,
1191            Err(error) => {
1192                return cx.spawn(async move |session, cx| {
1193                    let error = serde_json::to_value(dap::ErrorResponse {
1194                        error: Some(dap::Message {
1195                            id: request.seq,
1196                            format: error.to_string(),
1197                            variables: None,
1198                            send_telemetry: None,
1199                            show_user: None,
1200                            url: None,
1201                            url_label: None,
1202                        }),
1203                    })
1204                    .ok();
1205
1206                    session
1207                        .update(cx, |this, cx| {
1208                            this.respond_to_client(
1209                                request.seq,
1210                                false,
1211                                StartDebugging::COMMAND.to_string(),
1212                                error,
1213                                cx,
1214                            )
1215                        })?
1216                        .await?;
1217
1218                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1219                });
1220            }
1221        };
1222
1223        let seq = request.seq;
1224
1225        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1226        cx.emit(SessionEvent::RunInTerminal {
1227            request: request_args,
1228            sender: tx,
1229        });
1230        cx.notify();
1231
1232        cx.spawn(async move |session, cx| {
1233            let result = util::maybe!(async move {
1234                rx.next().await.ok_or_else(|| {
1235                    anyhow!("failed to receive response from spawn terminal".to_string())
1236                })?
1237            })
1238            .await;
1239            let (success, body) = match result {
1240                Ok(pid) => (
1241                    true,
1242                    serde_json::to_value(dap::RunInTerminalResponse {
1243                        process_id: None,
1244                        shell_process_id: Some(pid as u64),
1245                    })
1246                    .ok(),
1247                ),
1248                Err(error) => (
1249                    false,
1250                    serde_json::to_value(dap::ErrorResponse {
1251                        error: Some(dap::Message {
1252                            id: seq,
1253                            format: error.to_string(),
1254                            variables: None,
1255                            send_telemetry: None,
1256                            show_user: None,
1257                            url: None,
1258                            url_label: None,
1259                        }),
1260                    })
1261                    .ok(),
1262                ),
1263            };
1264
1265            session
1266                .update(cx, |session, cx| {
1267                    session.respond_to_client(
1268                        seq,
1269                        success,
1270                        RunInTerminal::COMMAND.to_string(),
1271                        body,
1272                        cx,
1273                    )
1274                })?
1275                .await
1276        })
1277    }
1278
1279    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1280        let adapter_id = self.adapter().to_string();
1281        let request = Initialize { adapter_id };
1282
1283        let SessionState::Running(running) = &self.state else {
1284            return Task::ready(Err(anyhow!(
1285                "Cannot send initialize request, task still building"
1286            )));
1287        };
1288        let mut response = running.request(request.clone());
1289
1290        cx.spawn(async move |this, cx| {
1291            loop {
1292                let capabilities = response.await;
1293                match capabilities {
1294                    Err(e) => {
1295                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1296                            this.as_running()
1297                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1298                        }) else {
1299                            return Err(e);
1300                        };
1301                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1302                        reconnect.await?;
1303
1304                        let Ok(Some(r)) = this.update(cx, |this, _| {
1305                            this.as_running()
1306                                .map(|running| running.request(request.clone()))
1307                        }) else {
1308                            return Err(e);
1309                        };
1310                        response = r
1311                    }
1312                    Ok(capabilities) => {
1313                        this.update(cx, |session, cx| {
1314                            session.capabilities = capabilities;
1315
1316                            cx.emit(SessionEvent::CapabilitiesLoaded);
1317                        })?;
1318                        return Ok(());
1319                    }
1320                }
1321            }
1322        })
1323    }
1324
1325    pub(super) fn initialize_sequence(
1326        &mut self,
1327        initialize_rx: oneshot::Receiver<()>,
1328        dap_store: WeakEntity<DapStore>,
1329        cx: &mut Context<Self>,
1330    ) -> Task<Result<()>> {
1331        match &self.state {
1332            SessionState::Running(local_mode) => {
1333                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1334            }
1335            SessionState::Booting(_) => {
1336                Task::ready(Err(anyhow!("cannot initialize, still building")))
1337            }
1338        }
1339    }
1340
1341    pub fn run_to_position(
1342        &mut self,
1343        breakpoint: SourceBreakpoint,
1344        active_thread_id: ThreadId,
1345        cx: &mut Context<Self>,
1346    ) {
1347        match &mut self.state {
1348            SessionState::Running(local_mode) => {
1349                if !matches!(
1350                    self.active_snapshot
1351                        .thread_states
1352                        .thread_state(active_thread_id),
1353                    Some(ThreadStatus::Stopped)
1354                ) {
1355                    return;
1356                };
1357                let path = breakpoint.path.clone();
1358                local_mode.tmp_breakpoint = Some(breakpoint);
1359                let task = local_mode.send_breakpoints_from_path(
1360                    path,
1361                    BreakpointUpdatedReason::Toggled,
1362                    &self.breakpoint_store,
1363                    cx,
1364                );
1365
1366                cx.spawn(async move |this, cx| {
1367                    task.await;
1368                    this.update(cx, |this, cx| {
1369                        this.continue_thread(active_thread_id, cx);
1370                    })
1371                })
1372                .detach();
1373            }
1374            SessionState::Booting(_) => {}
1375        }
1376    }
1377
1378    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1379        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1380    }
1381
1382    pub fn output(
1383        &self,
1384        since: OutputToken,
1385    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1386        if self.output_token.0 == 0 {
1387            return (self.output.range(0..0), OutputToken(0));
1388        };
1389
1390        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1391
1392        let clamped_events_since = events_since.clamp(0, self.output.len());
1393        (
1394            self.output
1395                .range(self.output.len() - clamped_events_since..),
1396            self.output_token,
1397        )
1398    }
1399
1400    pub fn respond_to_client(
1401        &self,
1402        request_seq: u64,
1403        success: bool,
1404        command: String,
1405        body: Option<serde_json::Value>,
1406        cx: &mut Context<Self>,
1407    ) -> Task<Result<()>> {
1408        let Some(local_session) = self.as_running() else {
1409            unreachable!("Cannot respond to remote client");
1410        };
1411        let client = local_session.client.clone();
1412
1413        cx.background_spawn(async move {
1414            client
1415                .send_message(Message::Response(Response {
1416                    body,
1417                    success,
1418                    command,
1419                    seq: request_seq + 1,
1420                    request_seq,
1421                    message: None,
1422                }))
1423                .await
1424        })
1425    }
1426
1427    fn session_state(&self) -> &SessionSnapshot {
1428        self.selected_snapshot_index
1429            .and_then(|ix| self.snapshots.get(ix))
1430            .unwrap_or_else(|| &self.active_snapshot)
1431    }
1432
1433    fn push_to_history(&mut self) {
1434        if !self.has_ever_stopped() {
1435            return;
1436        }
1437
1438        while self.snapshots.len() >= DEBUG_HISTORY_LIMIT {
1439            self.snapshots.pop_front();
1440        }
1441
1442        self.snapshots
1443            .push_back(std::mem::take(&mut self.active_snapshot));
1444    }
1445
1446    pub fn historic_snapshots(&self) -> &VecDeque<SessionSnapshot> {
1447        &self.snapshots
1448    }
1449
1450    pub fn select_historic_snapshot(&mut self, ix: Option<usize>, cx: &mut Context<Session>) {
1451        if self.selected_snapshot_index == ix {
1452            return;
1453        }
1454
1455        if self
1456            .selected_snapshot_index
1457            .is_some_and(|ix| self.snapshots.len() <= ix)
1458        {
1459            debug_panic!("Attempted to select a debug session with an out of bounds index");
1460            return;
1461        }
1462
1463        self.selected_snapshot_index = ix;
1464        cx.emit(SessionEvent::HistoricSnapshotSelected);
1465        cx.notify();
1466    }
1467
1468    pub fn active_snapshot_index(&self) -> Option<usize> {
1469        self.selected_snapshot_index
1470    }
1471
1472    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1473        self.push_to_history();
1474
1475        self.state.stopped();
1476        // todo(debugger): Find a clean way to get around the clone
1477        let breakpoint_store = self.breakpoint_store.clone();
1478        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1479            let breakpoint = local.tmp_breakpoint.take()?;
1480            let path = breakpoint.path;
1481            Some((local, path))
1482        }) {
1483            local
1484                .send_breakpoints_from_path(
1485                    path,
1486                    BreakpointUpdatedReason::Toggled,
1487                    &breakpoint_store,
1488                    cx,
1489                )
1490                .detach();
1491        };
1492
1493        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1494            self.active_snapshot.thread_states.stop_all_threads();
1495            self.invalidate_command_type::<StackTraceCommand>();
1496        }
1497
1498        // Event if we stopped all threads we still need to insert the thread_id
1499        // to our own data
1500        if let Some(thread_id) = event.thread_id {
1501            self.active_snapshot
1502                .thread_states
1503                .stop_thread(ThreadId(thread_id));
1504
1505            self.invalidate_state(
1506                &StackTraceCommand {
1507                    thread_id,
1508                    start_frame: None,
1509                    levels: None,
1510                }
1511                .into(),
1512            );
1513        }
1514
1515        self.invalidate_generic();
1516        self.active_snapshot.threads.clear();
1517        self.active_snapshot.variables.clear();
1518        cx.emit(SessionEvent::Stopped(
1519            event
1520                .thread_id
1521                .map(Into::into)
1522                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1523        ));
1524        cx.emit(SessionEvent::InvalidateInlineValue);
1525        cx.notify();
1526    }
1527
1528    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1529        match *event {
1530            Events::Initialized(_) => {
1531                debug_assert!(
1532                    false,
1533                    "Initialized event should have been handled in LocalMode"
1534                );
1535            }
1536            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1537            Events::Continued(event) => {
1538                if event.all_threads_continued.unwrap_or_default() {
1539                    self.active_snapshot.thread_states.continue_all_threads();
1540                    self.breakpoint_store.update(cx, |store, cx| {
1541                        store.remove_active_position(Some(self.session_id()), cx)
1542                    });
1543                } else {
1544                    self.active_snapshot
1545                        .thread_states
1546                        .continue_thread(ThreadId(event.thread_id));
1547                }
1548                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1549                self.invalidate_generic();
1550            }
1551            Events::Exited(_event) => {
1552                self.clear_active_debug_line(cx);
1553            }
1554            Events::Terminated(_) => {
1555                self.shutdown(cx).detach();
1556            }
1557            Events::Thread(event) => {
1558                let thread_id = ThreadId(event.thread_id);
1559
1560                match event.reason {
1561                    dap::ThreadEventReason::Started => {
1562                        self.active_snapshot
1563                            .thread_states
1564                            .continue_thread(thread_id);
1565                    }
1566                    dap::ThreadEventReason::Exited => {
1567                        self.active_snapshot.thread_states.exit_thread(thread_id);
1568                    }
1569                    reason => {
1570                        log::error!("Unhandled thread event reason {:?}", reason);
1571                    }
1572                }
1573                self.invalidate_state(&ThreadsCommand.into());
1574                cx.notify();
1575            }
1576            Events::Output(event) => {
1577                if event
1578                    .category
1579                    .as_ref()
1580                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1581                {
1582                    return;
1583                }
1584
1585                self.push_output(event);
1586                cx.notify();
1587            }
1588            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1589                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1590            }),
1591            Events::Module(event) => {
1592                match event.reason {
1593                    dap::ModuleEventReason::New => {
1594                        self.active_snapshot.modules.push(event.module);
1595                    }
1596                    dap::ModuleEventReason::Changed => {
1597                        if let Some(module) = self
1598                            .active_snapshot
1599                            .modules
1600                            .iter_mut()
1601                            .find(|other| event.module.id == other.id)
1602                        {
1603                            *module = event.module;
1604                        }
1605                    }
1606                    dap::ModuleEventReason::Removed => {
1607                        self.active_snapshot
1608                            .modules
1609                            .retain(|other| event.module.id != other.id);
1610                    }
1611                }
1612
1613                // todo(debugger): We should only send the invalidate command to downstream clients.
1614                // self.invalidate_state(&ModulesCommand.into());
1615            }
1616            Events::LoadedSource(_) => {
1617                self.invalidate_state(&LoadedSourcesCommand.into());
1618            }
1619            Events::Capabilities(event) => {
1620                self.capabilities = self.capabilities.merge(event.capabilities);
1621
1622                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1623                let recent_filters = self
1624                    .capabilities
1625                    .exception_breakpoint_filters
1626                    .iter()
1627                    .flatten()
1628                    .map(|filter| (filter.filter.clone(), filter.clone()))
1629                    .collect::<BTreeMap<_, _>>();
1630                for filter in recent_filters.values() {
1631                    let default = filter.default.unwrap_or_default();
1632                    self.exception_breakpoints
1633                        .entry(filter.filter.clone())
1634                        .or_insert_with(|| (filter.clone(), default));
1635                }
1636                self.exception_breakpoints
1637                    .retain(|k, _| recent_filters.contains_key(k));
1638                if self.is_started() {
1639                    self.send_exception_breakpoints(cx);
1640                }
1641
1642                // Remove the ones that no longer exist.
1643                cx.notify();
1644            }
1645            Events::Memory(_) => {}
1646            Events::Process(_) => {}
1647            Events::ProgressEnd(_) => {}
1648            Events::ProgressStart(_) => {}
1649            Events::ProgressUpdate(_) => {}
1650            Events::Invalidated(_) => {}
1651            Events::Other(event) => {
1652                if event.event == "launchBrowserInCompanion" {
1653                    let Some(request) = serde_json::from_value(event.body).ok() else {
1654                        log::error!("failed to deserialize launchBrowserInCompanion event");
1655                        return;
1656                    };
1657                    self.launch_browser_for_remote_server(request, cx);
1658                } else if event.event == "killCompanionBrowser" {
1659                    let Some(request) = serde_json::from_value(event.body).ok() else {
1660                        log::error!("failed to deserialize killCompanionBrowser event");
1661                        return;
1662                    };
1663                    self.kill_browser(request, cx);
1664                }
1665            }
1666        }
1667    }
1668
1669    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1670    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1671        &mut self,
1672        request: T,
1673        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1674        cx: &mut Context<Self>,
1675    ) {
1676        const {
1677            assert!(
1678                T::CACHEABLE,
1679                "Only requests marked as cacheable should invoke `fetch`"
1680            );
1681        }
1682
1683        if (!self.active_snapshot.thread_states.any_stopped_thread()
1684            && request.type_id() != TypeId::of::<ThreadsCommand>())
1685            || self.selected_snapshot_index.is_some()
1686            || self.is_session_terminated
1687        {
1688            return;
1689        }
1690
1691        let request_map = self
1692            .requests
1693            .entry(std::any::TypeId::of::<T>())
1694            .or_default();
1695
1696        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1697            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1698
1699            let task = Self::request_inner::<Arc<T>>(
1700                &self.capabilities,
1701                &self.state,
1702                command,
1703                |this, result, cx| {
1704                    process_result(this, result, cx);
1705                    None
1706                },
1707                cx,
1708            );
1709            let task = cx
1710                .background_executor()
1711                .spawn(async move {
1712                    let _ = task.await?;
1713                    Some(())
1714                })
1715                .shared();
1716
1717            vacant.insert(task);
1718            cx.notify();
1719        }
1720    }
1721
1722    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1723        capabilities: &Capabilities,
1724        mode: &SessionState,
1725        request: T,
1726        process_result: impl FnOnce(
1727            &mut Self,
1728            Result<T::Response>,
1729            &mut Context<Self>,
1730        ) -> Option<T::Response>
1731        + 'static,
1732        cx: &mut Context<Self>,
1733    ) -> Task<Option<T::Response>> {
1734        if !T::is_supported(capabilities) {
1735            log::warn!(
1736                "Attempted to send a DAP request that isn't supported: {:?}",
1737                request
1738            );
1739            let error = Err(anyhow::Error::msg(
1740                "Couldn't complete request because it's not supported",
1741            ));
1742            return cx.spawn(async move |this, cx| {
1743                this.update(cx, |this, cx| process_result(this, error, cx))
1744                    .ok()
1745                    .flatten()
1746            });
1747        }
1748
1749        let request = mode.request_dap(request);
1750        cx.spawn(async move |this, cx| {
1751            let result = request.await;
1752            this.update(cx, |this, cx| process_result(this, result, cx))
1753                .ok()
1754                .flatten()
1755        })
1756    }
1757
1758    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1759        &self,
1760        request: T,
1761        process_result: impl FnOnce(
1762            &mut Self,
1763            Result<T::Response>,
1764            &mut Context<Self>,
1765        ) -> Option<T::Response>
1766        + 'static,
1767        cx: &mut Context<Self>,
1768    ) -> Task<Option<T::Response>> {
1769        Self::request_inner(&self.capabilities, &self.state, request, process_result, cx)
1770    }
1771
1772    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1773        self.requests.remove(&std::any::TypeId::of::<Command>());
1774    }
1775
1776    fn invalidate_generic(&mut self) {
1777        self.invalidate_command_type::<ModulesCommand>();
1778        self.invalidate_command_type::<LoadedSourcesCommand>();
1779        self.invalidate_command_type::<ThreadsCommand>();
1780        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1781        self.invalidate_command_type::<ReadMemory>();
1782        let executor = self.as_running().map(|running| running.executor.clone());
1783        if let Some(executor) = executor {
1784            self.memory.clear(&executor);
1785        }
1786    }
1787
1788    fn invalidate_state(&mut self, key: &RequestSlot) {
1789        self.requests
1790            .entry((&*key.0 as &dyn Any).type_id())
1791            .and_modify(|request_map| {
1792                request_map.remove(key);
1793            });
1794    }
1795
1796    fn push_output(&mut self, event: OutputEvent) {
1797        self.output.push_back(event);
1798        self.output_token.0 += 1;
1799    }
1800
1801    pub fn any_stopped_thread(&self) -> bool {
1802        self.active_snapshot.thread_states.any_stopped_thread()
1803    }
1804
1805    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1806        self.active_snapshot.thread_states.thread_status(thread_id)
1807    }
1808
1809    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1810        self.fetch(
1811            dap_command::ThreadsCommand,
1812            |this, result, cx| {
1813                let Some(result) = result.log_err() else {
1814                    return;
1815                };
1816
1817                this.active_snapshot.threads = result
1818                    .into_iter()
1819                    .map(|thread| (ThreadId(thread.id), Thread::from(thread)))
1820                    .collect();
1821
1822                this.invalidate_command_type::<StackTraceCommand>();
1823                cx.emit(SessionEvent::Threads);
1824                cx.notify();
1825            },
1826            cx,
1827        );
1828
1829        let state = self.session_state();
1830        state
1831            .threads
1832            .values()
1833            .map(|thread| {
1834                (
1835                    thread.dap.clone(),
1836                    state.thread_states.thread_status(ThreadId(thread.dap.id)),
1837                )
1838            })
1839            .collect()
1840    }
1841
1842    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1843        self.fetch(
1844            dap_command::ModulesCommand,
1845            |this, result, cx| {
1846                let Some(result) = result.log_err() else {
1847                    return;
1848                };
1849
1850                this.active_snapshot.modules = result;
1851                cx.emit(SessionEvent::Modules);
1852                cx.notify();
1853            },
1854            cx,
1855        );
1856
1857        &self.session_state().modules
1858    }
1859
1860    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1861    pub fn data_access_size(
1862        &mut self,
1863        frame_id: Option<u64>,
1864        evaluate_name: &str,
1865        cx: &mut Context<Self>,
1866    ) -> Task<Option<u64>> {
1867        let request = self.request(
1868            EvaluateCommand {
1869                expression: format!("?${{sizeof({evaluate_name})}}"),
1870                frame_id,
1871
1872                context: Some(EvaluateArgumentsContext::Repl),
1873                source: None,
1874            },
1875            |_, response, _| response.ok(),
1876            cx,
1877        );
1878        cx.background_spawn(async move {
1879            let result = request.await?;
1880            result.result.parse().ok()
1881        })
1882    }
1883
1884    pub fn memory_reference_of_expr(
1885        &mut self,
1886        frame_id: Option<u64>,
1887        expression: String,
1888        cx: &mut Context<Self>,
1889    ) -> Task<Option<(String, Option<String>)>> {
1890        let request = self.request(
1891            EvaluateCommand {
1892                expression,
1893                frame_id,
1894
1895                context: Some(EvaluateArgumentsContext::Repl),
1896                source: None,
1897            },
1898            |_, response, _| response.ok(),
1899            cx,
1900        );
1901        cx.background_spawn(async move {
1902            let result = request.await?;
1903            result
1904                .memory_reference
1905                .map(|reference| (reference, result.type_))
1906        })
1907    }
1908
1909    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1910        let data = base64::engine::general_purpose::STANDARD.encode(data);
1911        self.request(
1912            WriteMemoryArguments {
1913                memory_reference: address.to_string(),
1914                data,
1915                allow_partial: None,
1916                offset: None,
1917            },
1918            |this, response, cx| {
1919                this.memory.clear(cx.background_executor());
1920                this.invalidate_command_type::<ReadMemory>();
1921                this.invalidate_command_type::<VariablesCommand>();
1922                cx.emit(SessionEvent::Variables);
1923                response.ok()
1924            },
1925            cx,
1926        )
1927        .detach();
1928    }
1929    pub fn read_memory(
1930        &mut self,
1931        range: RangeInclusive<u64>,
1932        cx: &mut Context<Self>,
1933    ) -> MemoryIterator {
1934        // This function is a bit more involved when it comes to fetching data.
1935        // Since we attempt to read memory in pages, we need to account for some parts
1936        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1937        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1938        let page_range = Memory::memory_range_to_page_range(range.clone());
1939        for page_address in PageAddress::iter_range(page_range) {
1940            self.read_single_page_memory(page_address, cx);
1941        }
1942        self.memory.memory_range(range)
1943    }
1944
1945    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1946        _ = maybe!({
1947            let builder = self.memory.build_page(page_start)?;
1948
1949            self.memory_read_fetch_page_recursive(builder, cx);
1950            Some(())
1951        });
1952    }
1953    fn memory_read_fetch_page_recursive(
1954        &mut self,
1955        mut builder: MemoryPageBuilder,
1956        cx: &mut Context<Self>,
1957    ) {
1958        let Some(next_request) = builder.next_request() else {
1959            // We're done fetching. Let's grab the page and insert it into our memory store.
1960            let (address, contents) = builder.build();
1961            self.memory.insert_page(address, contents);
1962
1963            return;
1964        };
1965        let size = next_request.size;
1966        self.fetch(
1967            ReadMemory {
1968                memory_reference: format!("0x{:X}", next_request.address),
1969                offset: Some(0),
1970                count: next_request.size,
1971            },
1972            move |this, memory, cx| {
1973                if let Ok(memory) = memory {
1974                    builder.known(memory.content);
1975                    if let Some(unknown) = memory.unreadable_bytes {
1976                        builder.unknown(unknown);
1977                    }
1978                    // This is the recursive bit: if we're not yet done with
1979                    // the whole page, we'll kick off a new request with smaller range.
1980                    // Note that this function is recursive only conceptually;
1981                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1982                    this.memory_read_fetch_page_recursive(builder, cx);
1983                } else {
1984                    builder.unknown(size);
1985                }
1986            },
1987            cx,
1988        );
1989    }
1990
1991    pub fn ignore_breakpoints(&self) -> bool {
1992        self.ignore_breakpoints
1993    }
1994
1995    pub fn toggle_ignore_breakpoints(
1996        &mut self,
1997        cx: &mut App,
1998    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1999        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
2000    }
2001
2002    pub(crate) fn set_ignore_breakpoints(
2003        &mut self,
2004        ignore: bool,
2005        cx: &mut App,
2006    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
2007        if self.ignore_breakpoints == ignore {
2008            return Task::ready(HashMap::default());
2009        }
2010
2011        self.ignore_breakpoints = ignore;
2012
2013        if let Some(local) = self.as_running() {
2014            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
2015        } else {
2016            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
2017            unimplemented!()
2018        }
2019    }
2020
2021    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
2022        self.data_breakpoints.values()
2023    }
2024
2025    pub fn exception_breakpoints(
2026        &self,
2027    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
2028        self.exception_breakpoints.values()
2029    }
2030
2031    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
2032        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
2033            *is_enabled = !*is_enabled;
2034            self.send_exception_breakpoints(cx);
2035        }
2036    }
2037
2038    fn send_exception_breakpoints(&mut self, cx: &App) {
2039        if let Some(local) = self.as_running() {
2040            let exception_filters = self
2041                .exception_breakpoints
2042                .values()
2043                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
2044                .collect();
2045
2046            let supports_exception_filters = self
2047                .capabilities
2048                .supports_exception_filter_options
2049                .unwrap_or_default();
2050            local
2051                .send_exception_breakpoints(exception_filters, supports_exception_filters)
2052                .detach_and_log_err(cx);
2053        } else {
2054            debug_assert!(false, "Not implemented");
2055        }
2056    }
2057
2058    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
2059        if let Some(state) = self.data_breakpoints.get_mut(id) {
2060            state.is_enabled = !state.is_enabled;
2061            self.send_exception_breakpoints(cx);
2062        }
2063    }
2064
2065    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
2066        if let Some(mode) = self.as_running() {
2067            let breakpoints = self
2068                .data_breakpoints
2069                .values()
2070                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
2071                .collect();
2072            let command = SetDataBreakpointsCommand { breakpoints };
2073            mode.request(command).detach_and_log_err(cx);
2074        }
2075    }
2076
2077    pub fn create_data_breakpoint(
2078        &mut self,
2079        context: Arc<DataBreakpointContext>,
2080        data_id: String,
2081        dap: dap::DataBreakpoint,
2082        cx: &mut Context<Self>,
2083    ) {
2084        if self.data_breakpoints.remove(&data_id).is_none() {
2085            self.data_breakpoints.insert(
2086                data_id,
2087                DataBreakpointState {
2088                    dap,
2089                    is_enabled: true,
2090                    context,
2091                },
2092            );
2093        }
2094        self.send_data_breakpoints(cx);
2095    }
2096
2097    pub fn breakpoints_enabled(&self) -> bool {
2098        self.ignore_breakpoints
2099    }
2100
2101    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
2102        self.fetch(
2103            dap_command::LoadedSourcesCommand,
2104            |this, result, cx| {
2105                let Some(result) = result.log_err() else {
2106                    return;
2107                };
2108                this.active_snapshot.loaded_sources = result;
2109                cx.emit(SessionEvent::LoadedSources);
2110                cx.notify();
2111            },
2112            cx,
2113        );
2114        &self.session_state().loaded_sources
2115    }
2116
2117    fn fallback_to_manual_restart(
2118        &mut self,
2119        res: Result<()>,
2120        cx: &mut Context<Self>,
2121    ) -> Option<()> {
2122        if res.log_err().is_none() {
2123            cx.emit(SessionStateEvent::Restart);
2124            return None;
2125        }
2126        Some(())
2127    }
2128
2129    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2130        res.log_err()?;
2131        Some(())
2132    }
2133
2134    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2135        thread_id: ThreadId,
2136    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2137    {
2138        move |this, response, cx| match response.log_err() {
2139            Some(response) => {
2140                this.breakpoint_store.update(cx, |store, cx| {
2141                    store.remove_active_position(Some(this.session_id()), cx)
2142                });
2143                Some(response)
2144            }
2145            None => {
2146                this.active_snapshot.thread_states.stop_thread(thread_id);
2147                cx.notify();
2148                None
2149            }
2150        }
2151    }
2152
2153    fn clear_active_debug_line_response(
2154        &mut self,
2155        response: Result<()>,
2156        cx: &mut Context<Session>,
2157    ) -> Option<()> {
2158        response.log_err()?;
2159        self.clear_active_debug_line(cx);
2160        Some(())
2161    }
2162
2163    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2164        self.breakpoint_store.update(cx, |store, cx| {
2165            store.remove_active_position(Some(self.id), cx)
2166        });
2167    }
2168
2169    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2170        self.request(
2171            PauseCommand {
2172                thread_id: thread_id.0,
2173            },
2174            Self::empty_response,
2175            cx,
2176        )
2177        .detach();
2178    }
2179
2180    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2181        self.request(
2182            RestartStackFrameCommand { stack_frame_id },
2183            Self::empty_response,
2184            cx,
2185        )
2186        .detach();
2187    }
2188
2189    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2190        if self.restart_task.is_some() || self.as_running().is_none() {
2191            return;
2192        }
2193
2194        let supports_dap_restart =
2195            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2196
2197        self.restart_task = Some(cx.spawn(async move |this, cx| {
2198            let _ = this.update(cx, |session, cx| {
2199                if supports_dap_restart {
2200                    session
2201                        .request(
2202                            RestartCommand {
2203                                raw: args.unwrap_or(Value::Null),
2204                            },
2205                            Self::fallback_to_manual_restart,
2206                            cx,
2207                        )
2208                        .detach();
2209                } else {
2210                    cx.emit(SessionStateEvent::Restart);
2211                }
2212            });
2213        }));
2214    }
2215
2216    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2217        if self.is_session_terminated {
2218            return Task::ready(());
2219        }
2220
2221        self.is_session_terminated = true;
2222        self.active_snapshot.thread_states.exit_all_threads();
2223        cx.notify();
2224
2225        let task = match &mut self.state {
2226            SessionState::Running(_) => {
2227                if self
2228                    .capabilities
2229                    .supports_terminate_request
2230                    .unwrap_or_default()
2231                {
2232                    self.request(
2233                        TerminateCommand {
2234                            restart: Some(false),
2235                        },
2236                        Self::clear_active_debug_line_response,
2237                        cx,
2238                    )
2239                } else {
2240                    self.request(
2241                        DisconnectCommand {
2242                            restart: Some(false),
2243                            terminate_debuggee: Some(true),
2244                            suspend_debuggee: Some(false),
2245                        },
2246                        Self::clear_active_debug_line_response,
2247                        cx,
2248                    )
2249                }
2250            }
2251            SessionState::Booting(build_task) => {
2252                build_task.take();
2253                Task::ready(Some(()))
2254            }
2255        };
2256
2257        cx.emit(SessionStateEvent::Shutdown);
2258
2259        cx.spawn(async move |this, cx| {
2260            task.await;
2261            let _ = this.update(cx, |this, _| {
2262                if let Some(adapter_client) = this.adapter_client() {
2263                    adapter_client.kill();
2264                }
2265            });
2266        })
2267    }
2268
2269    pub fn completions(
2270        &mut self,
2271        query: CompletionsQuery,
2272        cx: &mut Context<Self>,
2273    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2274        let task = self.request(query, |_, result, _| result.log_err(), cx);
2275
2276        cx.background_executor().spawn(async move {
2277            anyhow::Ok(
2278                task.await
2279                    .map(|response| response.targets)
2280                    .context("failed to fetch completions")?,
2281            )
2282        })
2283    }
2284
2285    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2286        self.select_historic_snapshot(None, cx);
2287
2288        let supports_single_thread_execution_requests =
2289            self.capabilities.supports_single_thread_execution_requests;
2290        self.active_snapshot
2291            .thread_states
2292            .continue_thread(thread_id);
2293        self.request(
2294            ContinueCommand {
2295                args: ContinueArguments {
2296                    thread_id: thread_id.0,
2297                    single_thread: supports_single_thread_execution_requests,
2298                },
2299            },
2300            Self::on_step_response::<ContinueCommand>(thread_id),
2301            cx,
2302        )
2303        .detach();
2304    }
2305
2306    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2307        match self.state {
2308            SessionState::Running(ref local) => Some(local.client.clone()),
2309            SessionState::Booting(_) => None,
2310        }
2311    }
2312
2313    pub fn has_ever_stopped(&self) -> bool {
2314        self.state.has_ever_stopped()
2315    }
2316
2317    pub fn step_over(
2318        &mut self,
2319        thread_id: ThreadId,
2320        granularity: SteppingGranularity,
2321        cx: &mut Context<Self>,
2322    ) {
2323        self.select_historic_snapshot(None, cx);
2324
2325        let supports_single_thread_execution_requests =
2326            self.capabilities.supports_single_thread_execution_requests;
2327        let supports_stepping_granularity = self
2328            .capabilities
2329            .supports_stepping_granularity
2330            .unwrap_or_default();
2331
2332        let command = NextCommand {
2333            inner: StepCommand {
2334                thread_id: thread_id.0,
2335                granularity: supports_stepping_granularity.then(|| granularity),
2336                single_thread: supports_single_thread_execution_requests,
2337            },
2338        };
2339
2340        self.active_snapshot.thread_states.process_step(thread_id);
2341        self.request(
2342            command,
2343            Self::on_step_response::<NextCommand>(thread_id),
2344            cx,
2345        )
2346        .detach();
2347    }
2348
2349    pub fn step_in(
2350        &mut self,
2351        thread_id: ThreadId,
2352        granularity: SteppingGranularity,
2353        cx: &mut Context<Self>,
2354    ) {
2355        self.select_historic_snapshot(None, cx);
2356
2357        let supports_single_thread_execution_requests =
2358            self.capabilities.supports_single_thread_execution_requests;
2359        let supports_stepping_granularity = self
2360            .capabilities
2361            .supports_stepping_granularity
2362            .unwrap_or_default();
2363
2364        let command = StepInCommand {
2365            inner: StepCommand {
2366                thread_id: thread_id.0,
2367                granularity: supports_stepping_granularity.then(|| granularity),
2368                single_thread: supports_single_thread_execution_requests,
2369            },
2370        };
2371
2372        self.active_snapshot.thread_states.process_step(thread_id);
2373        self.request(
2374            command,
2375            Self::on_step_response::<StepInCommand>(thread_id),
2376            cx,
2377        )
2378        .detach();
2379    }
2380
2381    pub fn step_out(
2382        &mut self,
2383        thread_id: ThreadId,
2384        granularity: SteppingGranularity,
2385        cx: &mut Context<Self>,
2386    ) {
2387        self.select_historic_snapshot(None, cx);
2388
2389        let supports_single_thread_execution_requests =
2390            self.capabilities.supports_single_thread_execution_requests;
2391        let supports_stepping_granularity = self
2392            .capabilities
2393            .supports_stepping_granularity
2394            .unwrap_or_default();
2395
2396        let command = StepOutCommand {
2397            inner: StepCommand {
2398                thread_id: thread_id.0,
2399                granularity: supports_stepping_granularity.then(|| granularity),
2400                single_thread: supports_single_thread_execution_requests,
2401            },
2402        };
2403
2404        self.active_snapshot.thread_states.process_step(thread_id);
2405        self.request(
2406            command,
2407            Self::on_step_response::<StepOutCommand>(thread_id),
2408            cx,
2409        )
2410        .detach();
2411    }
2412
2413    pub fn step_back(
2414        &mut self,
2415        thread_id: ThreadId,
2416        granularity: SteppingGranularity,
2417        cx: &mut Context<Self>,
2418    ) {
2419        self.select_historic_snapshot(None, cx);
2420
2421        let supports_single_thread_execution_requests =
2422            self.capabilities.supports_single_thread_execution_requests;
2423        let supports_stepping_granularity = self
2424            .capabilities
2425            .supports_stepping_granularity
2426            .unwrap_or_default();
2427
2428        let command = StepBackCommand {
2429            inner: StepCommand {
2430                thread_id: thread_id.0,
2431                granularity: supports_stepping_granularity.then(|| granularity),
2432                single_thread: supports_single_thread_execution_requests,
2433            },
2434        };
2435
2436        self.active_snapshot.thread_states.process_step(thread_id);
2437
2438        self.request(
2439            command,
2440            Self::on_step_response::<StepBackCommand>(thread_id),
2441            cx,
2442        )
2443        .detach();
2444    }
2445
2446    pub fn stack_frames(
2447        &mut self,
2448        thread_id: ThreadId,
2449        cx: &mut Context<Self>,
2450    ) -> Result<Vec<StackFrame>> {
2451        if self.active_snapshot.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2452            && self.requests.contains_key(&ThreadsCommand.type_id())
2453            && self.active_snapshot.threads.contains_key(&thread_id)
2454        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2455        // We could still be using an old thread id and have sent a new thread's request
2456        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2457        // But it very well could cause a minor bug in the future that is hard to track down
2458        {
2459            self.fetch(
2460                super::dap_command::StackTraceCommand {
2461                    thread_id: thread_id.0,
2462                    start_frame: None,
2463                    levels: None,
2464                },
2465                move |this, stack_frames, cx| {
2466                    let entry =
2467                        this.active_snapshot
2468                            .threads
2469                            .entry(thread_id)
2470                            .and_modify(|thread| match &stack_frames {
2471                                Ok(stack_frames) => {
2472                                    thread.stack_frames = stack_frames
2473                                        .iter()
2474                                        .cloned()
2475                                        .map(StackFrame::from)
2476                                        .collect();
2477                                    thread.stack_frames_error = None;
2478                                }
2479                                Err(error) => {
2480                                    thread.stack_frames.clear();
2481                                    thread.stack_frames_error = Some(error.to_string().into());
2482                                }
2483                            });
2484                    debug_assert!(
2485                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2486                        "Sent request for thread_id that doesn't exist"
2487                    );
2488                    if let Ok(stack_frames) = stack_frames {
2489                        this.active_snapshot.stack_frames.extend(
2490                            stack_frames
2491                                .into_iter()
2492                                .filter(|frame| {
2493                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2494                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2495                                    !(frame.id == 0
2496                                        && frame.line == 0
2497                                        && frame.column == 0
2498                                        && frame.presentation_hint
2499                                            == Some(StackFramePresentationHint::Label))
2500                                })
2501                                .map(|frame| (frame.id, StackFrame::from(frame))),
2502                        );
2503                    }
2504
2505                    this.invalidate_command_type::<ScopesCommand>();
2506                    this.invalidate_command_type::<VariablesCommand>();
2507
2508                    cx.emit(SessionEvent::StackTrace);
2509                },
2510                cx,
2511            );
2512        }
2513
2514        match self.session_state().threads.get(&thread_id) {
2515            Some(thread) => {
2516                if let Some(error) = &thread.stack_frames_error {
2517                    Err(anyhow!(error.to_string()))
2518                } else {
2519                    Ok(thread.stack_frames.clone())
2520                }
2521            }
2522            None => Ok(Vec::new()),
2523        }
2524    }
2525
2526    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2527        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2528            && self
2529                .requests
2530                .contains_key(&TypeId::of::<StackTraceCommand>())
2531        {
2532            self.fetch(
2533                ScopesCommand { stack_frame_id },
2534                move |this, scopes, cx| {
2535                    let Some(scopes) = scopes.log_err() else {
2536                        return
2537                    };
2538
2539                    for scope in scopes.iter() {
2540                        this.variables(scope.variables_reference, cx);
2541                    }
2542
2543                    let entry = this
2544                        .active_snapshot
2545                        .stack_frames
2546                        .entry(stack_frame_id)
2547                        .and_modify(|stack_frame| {
2548                            stack_frame.scopes = scopes;
2549                        });
2550
2551                    cx.emit(SessionEvent::Variables);
2552
2553                    debug_assert!(
2554                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2555                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2556                    );
2557                },
2558                cx,
2559            );
2560        }
2561
2562        self.session_state()
2563            .stack_frames
2564            .get(&stack_frame_id)
2565            .map(|frame| frame.scopes.as_slice())
2566            .unwrap_or_default()
2567    }
2568
2569    pub fn variables_by_stack_frame_id(
2570        &self,
2571        stack_frame_id: StackFrameId,
2572        globals: bool,
2573        locals: bool,
2574    ) -> Vec<dap::Variable> {
2575        let state = self.session_state();
2576        let Some(stack_frame) = state.stack_frames.get(&stack_frame_id) else {
2577            return Vec::new();
2578        };
2579
2580        stack_frame
2581            .scopes
2582            .iter()
2583            .filter(|scope| {
2584                (scope.name.to_lowercase().contains("local") && locals)
2585                    || (scope.name.to_lowercase().contains("global") && globals)
2586            })
2587            .filter_map(|scope| state.variables.get(&scope.variables_reference))
2588            .flatten()
2589            .cloned()
2590            .collect()
2591    }
2592
2593    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2594        &self.watchers
2595    }
2596
2597    pub fn add_watcher(
2598        &mut self,
2599        expression: SharedString,
2600        frame_id: u64,
2601        cx: &mut Context<Self>,
2602    ) -> Task<Result<()>> {
2603        let request = self.state.request_dap(EvaluateCommand {
2604            expression: expression.to_string(),
2605            context: Some(EvaluateArgumentsContext::Watch),
2606            frame_id: Some(frame_id),
2607            source: None,
2608        });
2609
2610        cx.spawn(async move |this, cx| {
2611            let response = request.await?;
2612
2613            this.update(cx, |session, cx| {
2614                session.watchers.insert(
2615                    expression.clone(),
2616                    Watcher {
2617                        expression,
2618                        value: response.result.into(),
2619                        variables_reference: response.variables_reference,
2620                        presentation_hint: response.presentation_hint,
2621                    },
2622                );
2623                cx.emit(SessionEvent::Watchers);
2624            })
2625        })
2626    }
2627
2628    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2629        let watches = self.watchers.clone();
2630        for (_, watch) in watches.into_iter() {
2631            self.add_watcher(watch.expression.clone(), frame_id, cx)
2632                .detach();
2633        }
2634    }
2635
2636    pub fn remove_watcher(&mut self, expression: SharedString) {
2637        self.watchers.remove(&expression);
2638    }
2639
2640    pub fn variables(
2641        &mut self,
2642        variables_reference: VariableReference,
2643        cx: &mut Context<Self>,
2644    ) -> Vec<dap::Variable> {
2645        let command = VariablesCommand {
2646            variables_reference,
2647            filter: None,
2648            start: None,
2649            count: None,
2650            format: None,
2651        };
2652
2653        self.fetch(
2654            command,
2655            move |this, variables, cx| {
2656                let Some(variables) = variables.log_err() else {
2657                    return;
2658                };
2659
2660                this.active_snapshot
2661                    .variables
2662                    .insert(variables_reference, variables);
2663
2664                cx.emit(SessionEvent::Variables);
2665                cx.emit(SessionEvent::InvalidateInlineValue);
2666            },
2667            cx,
2668        );
2669
2670        self.session_state()
2671            .variables
2672            .get(&variables_reference)
2673            .cloned()
2674            .unwrap_or_default()
2675    }
2676
2677    pub fn data_breakpoint_info(
2678        &mut self,
2679        context: Arc<DataBreakpointContext>,
2680        mode: Option<String>,
2681        cx: &mut Context<Self>,
2682    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2683        let command = DataBreakpointInfoCommand { context, mode };
2684
2685        self.request(command, |_, response, _| response.ok(), cx)
2686    }
2687
2688    pub fn set_variable_value(
2689        &mut self,
2690        stack_frame_id: u64,
2691        variables_reference: u64,
2692        name: String,
2693        value: String,
2694        cx: &mut Context<Self>,
2695    ) {
2696        if self.capabilities.supports_set_variable.unwrap_or_default() {
2697            self.request(
2698                SetVariableValueCommand {
2699                    name,
2700                    value,
2701                    variables_reference,
2702                },
2703                move |this, response, cx| {
2704                    let response = response.log_err()?;
2705                    this.invalidate_command_type::<VariablesCommand>();
2706                    this.invalidate_command_type::<ReadMemory>();
2707                    this.memory.clear(cx.background_executor());
2708                    this.refresh_watchers(stack_frame_id, cx);
2709                    cx.emit(SessionEvent::Variables);
2710                    Some(response)
2711                },
2712                cx,
2713            )
2714            .detach();
2715        }
2716    }
2717
2718    pub fn evaluate(
2719        &mut self,
2720        expression: String,
2721        context: Option<EvaluateArgumentsContext>,
2722        frame_id: Option<u64>,
2723        source: Option<Source>,
2724        cx: &mut Context<Self>,
2725    ) -> Task<()> {
2726        let event = dap::OutputEvent {
2727            category: None,
2728            output: format!("> {expression}"),
2729            group: None,
2730            variables_reference: None,
2731            source: None,
2732            line: None,
2733            column: None,
2734            data: None,
2735            location_reference: None,
2736        };
2737        self.push_output(event);
2738        let request = self.state.request_dap(EvaluateCommand {
2739            expression,
2740            context,
2741            frame_id,
2742            source,
2743        });
2744        cx.spawn(async move |this, cx| {
2745            let response = request.await;
2746            this.update(cx, |this, cx| {
2747                this.memory.clear(cx.background_executor());
2748                this.invalidate_command_type::<ReadMemory>();
2749                this.invalidate_command_type::<VariablesCommand>();
2750                cx.emit(SessionEvent::Variables);
2751                match response {
2752                    Ok(response) => {
2753                        let event = dap::OutputEvent {
2754                            category: None,
2755                            output: format!("< {}", &response.result),
2756                            group: None,
2757                            variables_reference: Some(response.variables_reference),
2758                            source: None,
2759                            line: None,
2760                            column: None,
2761                            data: None,
2762                            location_reference: None,
2763                        };
2764                        this.push_output(event);
2765                    }
2766                    Err(e) => {
2767                        let event = dap::OutputEvent {
2768                            category: None,
2769                            output: format!("{}", e),
2770                            group: None,
2771                            variables_reference: None,
2772                            source: None,
2773                            line: None,
2774                            column: None,
2775                            data: None,
2776                            location_reference: None,
2777                        };
2778                        this.push_output(event);
2779                    }
2780                };
2781                cx.notify();
2782            })
2783            .ok();
2784        })
2785    }
2786
2787    pub fn location(
2788        &mut self,
2789        reference: u64,
2790        cx: &mut Context<Self>,
2791    ) -> Option<dap::LocationsResponse> {
2792        self.fetch(
2793            LocationsCommand { reference },
2794            move |this, response, _| {
2795                let Some(response) = response.log_err() else {
2796                    return;
2797                };
2798                this.active_snapshot.locations.insert(reference, response);
2799            },
2800            cx,
2801        );
2802        self.session_state().locations.get(&reference).cloned()
2803    }
2804
2805    pub fn is_attached(&self) -> bool {
2806        let SessionState::Running(local_mode) = &self.state else {
2807            return false;
2808        };
2809        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2810    }
2811
2812    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2813        let command = DisconnectCommand {
2814            restart: Some(false),
2815            terminate_debuggee: Some(false),
2816            suspend_debuggee: Some(false),
2817        };
2818
2819        self.request(command, Self::empty_response, cx).detach()
2820    }
2821
2822    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2823        if self
2824            .capabilities
2825            .supports_terminate_threads_request
2826            .unwrap_or_default()
2827        {
2828            self.request(
2829                TerminateThreadsCommand {
2830                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2831                },
2832                Self::clear_active_debug_line_response,
2833                cx,
2834            )
2835            .detach();
2836        } else {
2837            self.shutdown(cx).detach();
2838        }
2839    }
2840
2841    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2842        self.session_state().thread_states.thread_state(thread_id)
2843    }
2844
2845    pub fn quirks(&self) -> SessionQuirks {
2846        self.quirks
2847    }
2848
2849    fn launch_browser_for_remote_server(
2850        &mut self,
2851        mut request: LaunchBrowserInCompanionParams,
2852        cx: &mut Context<Self>,
2853    ) {
2854        let Some(remote_client) = self.remote_client.clone() else {
2855            log::error!("can't launch browser in companion for non-remote project");
2856            return;
2857        };
2858        let Some(http_client) = self.http_client.clone() else {
2859            return;
2860        };
2861        let Some(node_runtime) = self.node_runtime.clone() else {
2862            return;
2863        };
2864
2865        let mut console_output = self.console_output(cx);
2866        let task = cx.spawn(async move |this, cx| {
2867            let forward_ports_process = if remote_client
2868                .read_with(cx, |client, _| client.shares_network_interface())?
2869            {
2870                request.other.insert(
2871                    "proxyUri".into(),
2872                    format!("127.0.0.1:{}", request.server_port).into(),
2873                );
2874                None
2875            } else {
2876                let port = TcpTransport::unused_port(Ipv4Addr::LOCALHOST)
2877                    .await
2878                    .context("getting port for DAP")?;
2879                request
2880                    .other
2881                    .insert("proxyUri".into(), format!("127.0.0.1:{port}").into());
2882                let mut port_forwards = vec![(port, "localhost".to_owned(), request.server_port)];
2883
2884                if let Some(value) = request.params.get("url")
2885                    && let Some(url) = value.as_str()
2886                    && let Some(url) = Url::parse(url).ok()
2887                    && let Some(frontend_port) = url.port()
2888                {
2889                    port_forwards.push((frontend_port, "localhost".to_owned(), frontend_port));
2890                }
2891
2892                let child = remote_client.update(cx, |client, _| {
2893                    let command = client.build_forward_ports_command(port_forwards)?;
2894                    let child = new_smol_command(command.program)
2895                        .args(command.args)
2896                        .envs(command.env)
2897                        .spawn()
2898                        .context("spawning port forwarding process")?;
2899                    anyhow::Ok(child)
2900                })??;
2901                Some(child)
2902            };
2903
2904            let mut companion_process = None;
2905            let companion_port =
2906                if let Some(companion_port) = this.read_with(cx, |this, _| this.companion_port)? {
2907                    companion_port
2908                } else {
2909                    let task = cx.spawn(async move |cx| spawn_companion(node_runtime, cx).await);
2910                    match task.await {
2911                        Ok((port, child)) => {
2912                            companion_process = Some(child);
2913                            port
2914                        }
2915                        Err(e) => {
2916                            console_output
2917                                .send(format!("Failed to launch browser companion process: {e}"))
2918                                .await
2919                                .ok();
2920                            return Err(e);
2921                        }
2922                    }
2923                };
2924
2925            let mut background_tasks = Vec::new();
2926            if let Some(mut forward_ports_process) = forward_ports_process {
2927                background_tasks.push(cx.spawn(async move |_| {
2928                    forward_ports_process.status().await.log_err();
2929                }));
2930            };
2931            if let Some(mut companion_process) = companion_process {
2932                if let Some(stderr) = companion_process.stderr.take() {
2933                    let mut console_output = console_output.clone();
2934                    background_tasks.push(cx.spawn(async move |_| {
2935                        let mut stderr = BufReader::new(stderr);
2936                        let mut line = String::new();
2937                        while let Ok(n) = stderr.read_line(&mut line).await
2938                            && n > 0
2939                        {
2940                            console_output
2941                                .send(format!("companion stderr: {line}"))
2942                                .await
2943                                .ok();
2944                            line.clear();
2945                        }
2946                    }));
2947                }
2948                background_tasks.push(cx.spawn({
2949                    let mut console_output = console_output.clone();
2950                    async move |_| match companion_process.status().await {
2951                        Ok(status) => {
2952                            if status.success() {
2953                                console_output
2954                                    .send("Companion process exited normally".into())
2955                                    .await
2956                                    .ok();
2957                            } else {
2958                                console_output
2959                                    .send(format!(
2960                                        "Companion process exited abnormally with {status:?}"
2961                                    ))
2962                                    .await
2963                                    .ok();
2964                            }
2965                        }
2966                        Err(e) => {
2967                            console_output
2968                                .send(format!("Failed to join companion process: {e}"))
2969                                .await
2970                                .ok();
2971                        }
2972                    }
2973                }));
2974            }
2975
2976            // TODO pass wslInfo as needed
2977
2978            let companion_address = format!("127.0.0.1:{companion_port}");
2979            let mut companion_started = false;
2980            for _ in 0..10 {
2981                if TcpStream::connect(&companion_address).await.is_ok() {
2982                    companion_started = true;
2983                    break;
2984                }
2985                cx.background_executor()
2986                    .timer(Duration::from_millis(100))
2987                    .await;
2988            }
2989            if !companion_started {
2990                console_output
2991                    .send("Browser companion failed to start".into())
2992                    .await
2993                    .ok();
2994                bail!("Browser companion failed to start");
2995            }
2996
2997            let response = http_client
2998                .post_json(
2999                    &format!("http://{companion_address}/launch-and-attach"),
3000                    serde_json::to_string(&request)
3001                        .context("serializing request")?
3002                        .into(),
3003                )
3004                .await;
3005            match response {
3006                Ok(response) => {
3007                    if !response.status().is_success() {
3008                        console_output
3009                            .send("Launch request to companion failed".into())
3010                            .await
3011                            .ok();
3012                        return Err(anyhow!("launch request failed"));
3013                    }
3014                }
3015                Err(e) => {
3016                    console_output
3017                        .send("Failed to read response from companion".into())
3018                        .await
3019                        .ok();
3020                    return Err(e);
3021                }
3022            }
3023
3024            this.update(cx, |this, _| {
3025                this.background_tasks.extend(background_tasks);
3026                this.companion_port = Some(companion_port);
3027            })?;
3028
3029            anyhow::Ok(())
3030        });
3031        self.background_tasks.push(cx.spawn(async move |_, _| {
3032            task.await.log_err();
3033        }));
3034    }
3035
3036    fn kill_browser(&self, request: KillCompanionBrowserParams, cx: &mut App) {
3037        let Some(companion_port) = self.companion_port else {
3038            log::error!("received killCompanionBrowser but js-debug-companion is not running");
3039            return;
3040        };
3041        let Some(http_client) = self.http_client.clone() else {
3042            return;
3043        };
3044
3045        cx.spawn(async move |_| {
3046            http_client
3047                .post_json(
3048                    &format!("http://127.0.0.1:{companion_port}/kill"),
3049                    serde_json::to_string(&request)
3050                        .context("serializing request")?
3051                        .into(),
3052                )
3053                .await?;
3054            anyhow::Ok(())
3055        })
3056        .detach_and_log_err(cx)
3057    }
3058}
3059
3060#[derive(Serialize, Deserialize, Debug)]
3061#[serde(rename_all = "camelCase")]
3062struct LaunchBrowserInCompanionParams {
3063    server_port: u16,
3064    params: HashMap<String, serde_json::Value>,
3065    #[serde(flatten)]
3066    other: HashMap<String, serde_json::Value>,
3067}
3068
3069#[derive(Serialize, Deserialize, Debug)]
3070#[serde(rename_all = "camelCase")]
3071struct KillCompanionBrowserParams {
3072    launch_id: u64,
3073}
3074
3075async fn spawn_companion(
3076    node_runtime: NodeRuntime,
3077    cx: &mut AsyncApp,
3078) -> Result<(u16, smol::process::Child)> {
3079    let binary_path = node_runtime
3080        .binary_path()
3081        .await
3082        .context("getting node path")?;
3083    let path = cx
3084        .spawn(async move |cx| get_or_install_companion(node_runtime, cx).await)
3085        .await?;
3086    log::info!("will launch js-debug-companion version {path:?}");
3087
3088    let port = {
3089        let listener = TcpListener::bind("127.0.0.1:0")
3090            .await
3091            .context("getting port for companion")?;
3092        listener.local_addr()?.port()
3093    };
3094
3095    let dir = paths::data_dir()
3096        .join("js_debug_companion_state")
3097        .to_string_lossy()
3098        .to_string();
3099
3100    let child = new_smol_command(binary_path)
3101        .arg(path)
3102        .args([
3103            format!("--listen=127.0.0.1:{port}"),
3104            format!("--state={dir}"),
3105        ])
3106        .stdin(Stdio::piped())
3107        .stdout(Stdio::piped())
3108        .stderr(Stdio::piped())
3109        .spawn()
3110        .context("spawning companion child process")?;
3111
3112    Ok((port, child))
3113}
3114
3115async fn get_or_install_companion(node: NodeRuntime, cx: &mut AsyncApp) -> Result<PathBuf> {
3116    const PACKAGE_NAME: &str = "@zed-industries/js-debug-companion-cli";
3117
3118    async fn install_latest_version(dir: PathBuf, node: NodeRuntime) -> Result<PathBuf> {
3119        let temp_dir = tempfile::tempdir().context("creating temporary directory")?;
3120        node.npm_install_packages(temp_dir.path(), &[(PACKAGE_NAME, "latest")])
3121            .await
3122            .context("installing latest companion package")?;
3123        let version = node
3124            .npm_package_installed_version(temp_dir.path(), PACKAGE_NAME)
3125            .await
3126            .context("getting installed companion version")?
3127            .context("companion was not installed")?;
3128        let version_folder = dir.join(version.to_string());
3129        smol::fs::rename(temp_dir.path(), &version_folder)
3130            .await
3131            .context("moving companion package into place")?;
3132        Ok(version_folder)
3133    }
3134
3135    let dir = paths::debug_adapters_dir().join("js-debug-companion");
3136    let (latest_installed_version, latest_version) = cx
3137        .background_spawn({
3138            let dir = dir.clone();
3139            let node = node.clone();
3140            async move {
3141                smol::fs::create_dir_all(&dir)
3142                    .await
3143                    .context("creating companion installation directory")?;
3144
3145                let children = smol::fs::read_dir(&dir)
3146                    .await
3147                    .context("reading companion installation directory")?
3148                    .try_collect::<Vec<_>>()
3149                    .await
3150                    .context("reading companion installation directory entries")?;
3151
3152                let latest_installed_version = children
3153                    .iter()
3154                    .filter_map(|child| {
3155                        Some((
3156                            child.path(),
3157                            semver::Version::parse(child.file_name().to_str()?).ok()?,
3158                        ))
3159                    })
3160                    .max_by_key(|(_, version)| version.clone());
3161
3162                let latest_version = node
3163                    .npm_package_latest_version(PACKAGE_NAME)
3164                    .await
3165                    .log_err();
3166                anyhow::Ok((latest_installed_version, latest_version))
3167            }
3168        })
3169        .await?;
3170
3171    let path = if let Some((installed_path, installed_version)) = latest_installed_version {
3172        if let Some(latest_version) = latest_version
3173            && latest_version != installed_version
3174        {
3175            cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3176                .detach();
3177        }
3178        Ok(installed_path)
3179    } else {
3180        cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3181            .await
3182    };
3183
3184    Ok(path?
3185        .join("node_modules")
3186        .join(PACKAGE_NAME)
3187        .join("out")
3188        .join("cli.js"))
3189}