session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
   3use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
   4
   5use super::breakpoint_store::{
   6    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   7};
   8use super::dap_command::{
   9    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
  10    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
  11    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  12    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
  13    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  14    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  15};
  16use super::dap_store::DapStore;
  17use anyhow::{Context as _, Result, anyhow, bail};
  18use base64::Engine;
  19use collections::{HashMap, HashSet, IndexMap};
  20use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  21use dap::messages::Response;
  22use dap::requests::{Request, RunInTerminal, StartDebugging};
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::channel::mpsc::UnboundedSender;
  35use futures::channel::{mpsc, oneshot};
  36use futures::compat::CompatSink;
  37use futures::{FutureExt, future::Shared};
  38use futures::{SinkExt, StreamExt};
  39use gpui::{
  40    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  41    Task, WeakEntity,
  42};
  43
  44use gpui_tokio::Tokio;
  45use remote::RemoteClient;
  46use rpc::ErrorExt;
  47use serde::Deserialize;
  48use serde_json::Value;
  49use smol::net::TcpListener;
  50use std::any::TypeId;
  51use std::collections::BTreeMap;
  52use std::ops::RangeInclusive;
  53use std::pin::Pin;
  54use std::process::Stdio;
  55use std::u64;
  56use std::{
  57    any::Any,
  58    collections::hash_map::Entry,
  59    hash::{Hash, Hasher},
  60    path::Path,
  61    sync::Arc,
  62};
  63use task::TaskContext;
  64use text::{PointUtf16, ToPointUtf16};
  65use util::command::new_smol_command;
  66use util::{ResultExt, debug_panic, maybe};
  67use worktree::Worktree;
  68
  69#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  70#[repr(transparent)]
  71pub struct ThreadId(pub i64);
  72
  73impl From<i64> for ThreadId {
  74    fn from(id: i64) -> Self {
  75        Self(id)
  76    }
  77}
  78
  79#[derive(Clone, Debug)]
  80pub struct StackFrame {
  81    pub dap: dap::StackFrame,
  82    pub scopes: Vec<dap::Scope>,
  83}
  84
  85impl From<dap::StackFrame> for StackFrame {
  86    fn from(stack_frame: dap::StackFrame) -> Self {
  87        Self {
  88            scopes: vec![],
  89            dap: stack_frame,
  90        }
  91    }
  92}
  93
  94#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  95pub enum ThreadStatus {
  96    #[default]
  97    Running,
  98    Stopped,
  99    Stepping,
 100    Exited,
 101    Ended,
 102}
 103
 104impl ThreadStatus {
 105    pub fn label(&self) -> &'static str {
 106        match self {
 107            ThreadStatus::Running => "Running",
 108            ThreadStatus::Stopped => "Stopped",
 109            ThreadStatus::Stepping => "Stepping",
 110            ThreadStatus::Exited => "Exited",
 111            ThreadStatus::Ended => "Ended",
 112        }
 113    }
 114}
 115
 116#[derive(Debug)]
 117pub struct Thread {
 118    dap: dap::Thread,
 119    stack_frames: Vec<StackFrame>,
 120    stack_frames_error: Option<anyhow::Error>,
 121    _has_stopped: bool,
 122}
 123
 124impl From<dap::Thread> for Thread {
 125    fn from(dap: dap::Thread) -> Self {
 126        Self {
 127            dap,
 128            stack_frames: Default::default(),
 129            stack_frames_error: None,
 130            _has_stopped: false,
 131        }
 132    }
 133}
 134
 135#[derive(Debug, Clone, PartialEq)]
 136pub struct Watcher {
 137    pub expression: SharedString,
 138    pub value: SharedString,
 139    pub variables_reference: u64,
 140    pub presentation_hint: Option<VariablePresentationHint>,
 141}
 142
 143#[derive(Debug, Clone, PartialEq)]
 144pub struct DataBreakpointState {
 145    pub dap: dap::DataBreakpoint,
 146    pub is_enabled: bool,
 147    pub context: Arc<DataBreakpointContext>,
 148}
 149
 150pub enum SessionState {
 151    /// Represents a session that is building/initializing
 152    /// even if a session doesn't have a pre build task this state
 153    /// is used to run all the async tasks that are required to start the session
 154    Booting(Option<Task<Result<()>>>),
 155    Running(RunningMode),
 156}
 157
 158#[derive(Clone)]
 159pub struct RunningMode {
 160    client: Arc<DebugAdapterClient>,
 161    binary: DebugAdapterBinary,
 162    tmp_breakpoint: Option<SourceBreakpoint>,
 163    worktree: WeakEntity<Worktree>,
 164    executor: BackgroundExecutor,
 165    is_started: bool,
 166    has_ever_stopped: bool,
 167    messages_tx: UnboundedSender<Message>,
 168}
 169
 170#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 171pub struct SessionQuirks {
 172    pub compact: bool,
 173    pub prefer_thread_name: bool,
 174}
 175
 176fn client_source(abs_path: &Path) -> dap::Source {
 177    dap::Source {
 178        name: abs_path
 179            .file_name()
 180            .map(|filename| filename.to_string_lossy().to_string()),
 181        path: Some(abs_path.to_string_lossy().to_string()),
 182        source_reference: None,
 183        presentation_hint: None,
 184        origin: None,
 185        sources: None,
 186        adapter_data: None,
 187        checksums: None,
 188    }
 189}
 190
 191impl RunningMode {
 192    async fn new(
 193        session_id: SessionId,
 194        parent_session: Option<Entity<Session>>,
 195        worktree: WeakEntity<Worktree>,
 196        binary: DebugAdapterBinary,
 197        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 198        cx: &mut AsyncApp,
 199    ) -> Result<Self> {
 200        let message_handler = Box::new({
 201            let messages_tx = messages_tx.clone();
 202            move |message| {
 203                messages_tx.unbounded_send(message).ok();
 204            }
 205        });
 206
 207        let client = if let Some(client) = parent_session
 208            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 209            .flatten()
 210        {
 211            client
 212                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 213                .await?
 214        } else {
 215            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 216        };
 217
 218        Ok(Self {
 219            client: Arc::new(client),
 220            worktree,
 221            tmp_breakpoint: None,
 222            binary,
 223            executor: cx.background_executor().clone(),
 224            is_started: false,
 225            has_ever_stopped: false,
 226            messages_tx,
 227        })
 228    }
 229
 230    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 231        &self.worktree
 232    }
 233
 234    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 235        let tasks: Vec<_> = paths
 236            .iter()
 237            .map(|path| {
 238                self.request(dap_command::SetBreakpoints {
 239                    source: client_source(path),
 240                    source_modified: None,
 241                    breakpoints: vec![],
 242                })
 243            })
 244            .collect();
 245
 246        cx.background_spawn(async move {
 247            futures::future::join_all(tasks)
 248                .await
 249                .iter()
 250                .for_each(|res| match res {
 251                    Ok(_) => {}
 252                    Err(err) => {
 253                        log::warn!("Set breakpoints request failed: {}", err);
 254                    }
 255                });
 256        })
 257    }
 258
 259    fn send_breakpoints_from_path(
 260        &self,
 261        abs_path: Arc<Path>,
 262        reason: BreakpointUpdatedReason,
 263        breakpoint_store: &Entity<BreakpointStore>,
 264        cx: &mut App,
 265    ) -> Task<()> {
 266        let breakpoints =
 267            breakpoint_store
 268                .read(cx)
 269                .source_breakpoints_from_path(&abs_path, cx)
 270                .into_iter()
 271                .filter(|bp| bp.state.is_enabled())
 272                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 273                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 274                }))
 275                .map(Into::into)
 276                .collect();
 277
 278        let raw_breakpoints = breakpoint_store
 279            .read(cx)
 280            .breakpoints_from_path(&abs_path)
 281            .into_iter()
 282            .filter(|bp| bp.bp.state.is_enabled())
 283            .collect::<Vec<_>>();
 284
 285        let task = self.request(dap_command::SetBreakpoints {
 286            source: client_source(&abs_path),
 287            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 288            breakpoints,
 289        });
 290        let session_id = self.client.id();
 291        let breakpoint_store = breakpoint_store.downgrade();
 292        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 293            Ok(breakpoints) => {
 294                let breakpoints =
 295                    breakpoints
 296                        .into_iter()
 297                        .zip(raw_breakpoints)
 298                        .filter_map(|(dap_bp, zed_bp)| {
 299                            Some((
 300                                zed_bp,
 301                                BreakpointSessionState {
 302                                    id: dap_bp.id?,
 303                                    verified: dap_bp.verified,
 304                                },
 305                            ))
 306                        });
 307                breakpoint_store
 308                    .update(cx, |this, _| {
 309                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 310                    })
 311                    .ok();
 312            }
 313            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 314        })
 315    }
 316
 317    fn send_exception_breakpoints(
 318        &self,
 319        filters: Vec<ExceptionBreakpointsFilter>,
 320        supports_filter_options: bool,
 321    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 322        let arg = if supports_filter_options {
 323            SetExceptionBreakpoints::WithOptions {
 324                filters: filters
 325                    .into_iter()
 326                    .map(|filter| ExceptionFilterOptions {
 327                        filter_id: filter.filter,
 328                        condition: None,
 329                        mode: None,
 330                    })
 331                    .collect(),
 332            }
 333        } else {
 334            SetExceptionBreakpoints::Plain {
 335                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 336            }
 337        };
 338        self.request(arg)
 339    }
 340
 341    fn send_source_breakpoints(
 342        &self,
 343        ignore_breakpoints: bool,
 344        breakpoint_store: &Entity<BreakpointStore>,
 345        cx: &App,
 346    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 347        let mut breakpoint_tasks = Vec::new();
 348        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 349        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 350        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 351        let session_id = self.client.id();
 352        for (path, breakpoints) in breakpoints {
 353            let breakpoints = if ignore_breakpoints {
 354                vec![]
 355            } else {
 356                breakpoints
 357                    .into_iter()
 358                    .filter(|bp| bp.state.is_enabled())
 359                    .map(Into::into)
 360                    .collect()
 361            };
 362
 363            let raw_breakpoints = raw_breakpoints
 364                .remove(&path)
 365                .unwrap_or_default()
 366                .into_iter()
 367                .filter(|bp| bp.bp.state.is_enabled());
 368            let error_path = path.clone();
 369            let send_request = self
 370                .request(dap_command::SetBreakpoints {
 371                    source: client_source(&path),
 372                    source_modified: Some(false),
 373                    breakpoints,
 374                })
 375                .map(|result| result.map_err(move |e| (error_path, e)));
 376
 377            let task = cx.spawn({
 378                let breakpoint_store = breakpoint_store.downgrade();
 379                async move |cx| {
 380                    let breakpoints = cx.background_spawn(send_request).await?;
 381
 382                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 383                        |(dap_bp, zed_bp)| {
 384                            Some((
 385                                zed_bp,
 386                                BreakpointSessionState {
 387                                    id: dap_bp.id?,
 388                                    verified: dap_bp.verified,
 389                                },
 390                            ))
 391                        },
 392                    );
 393                    breakpoint_store
 394                        .update(cx, |this, _| {
 395                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 396                        })
 397                        .ok();
 398
 399                    Ok(())
 400                }
 401            });
 402            breakpoint_tasks.push(task);
 403        }
 404
 405        cx.background_spawn(async move {
 406            futures::future::join_all(breakpoint_tasks)
 407                .await
 408                .into_iter()
 409                .filter_map(Result::err)
 410                .collect::<HashMap<_, _>>()
 411        })
 412    }
 413
 414    fn initialize_sequence(
 415        &self,
 416        capabilities: &Capabilities,
 417        initialized_rx: oneshot::Receiver<()>,
 418        dap_store: WeakEntity<DapStore>,
 419        cx: &mut Context<Session>,
 420    ) -> Task<Result<()>> {
 421        let raw = self.binary.request_args.clone();
 422
 423        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 424        let launch = match raw.request {
 425            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 426                raw: raw.configuration,
 427            }),
 428            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 429                raw: raw.configuration,
 430            }),
 431        };
 432
 433        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 434        // From spec (on initialization sequence):
 435        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 436        //
 437        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 438        let should_send_exception_breakpoints = capabilities
 439            .exception_breakpoint_filters
 440            .as_ref()
 441            .is_some_and(|filters| !filters.is_empty())
 442            || !configuration_done_supported;
 443        let supports_exception_filters = capabilities
 444            .supports_exception_filter_options
 445            .unwrap_or_default();
 446        let this = self.clone();
 447        let worktree = self.worktree().clone();
 448        let mut filters = capabilities
 449            .exception_breakpoint_filters
 450            .clone()
 451            .unwrap_or_default();
 452        let configuration_sequence = cx.spawn({
 453            async move |session, cx| {
 454                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 455                let (breakpoint_store, adapter_defaults) =
 456                    dap_store.read_with(cx, |dap_store, _| {
 457                        (
 458                            dap_store.breakpoint_store().clone(),
 459                            dap_store.adapter_options(&adapter_name),
 460                        )
 461                    })?;
 462                initialized_rx.await?;
 463                let errors_by_path = cx
 464                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 465                    .await;
 466
 467                dap_store.update(cx, |_, cx| {
 468                    let Some(worktree) = worktree.upgrade() else {
 469                        return;
 470                    };
 471
 472                    for (path, error) in &errors_by_path {
 473                        log::error!("failed to set breakpoints for {path:?}: {error}");
 474                    }
 475
 476                    if let Some(failed_path) = errors_by_path.keys().next() {
 477                        let failed_path = failed_path
 478                            .strip_prefix(worktree.read(cx).abs_path())
 479                            .unwrap_or(failed_path)
 480                            .display();
 481                        let message = format!(
 482                            "Failed to set breakpoints for {failed_path}{}",
 483                            match errors_by_path.len() {
 484                                0 => unreachable!(),
 485                                1 => "".into(),
 486                                2 => " and 1 other path".into(),
 487                                n => format!(" and {} other paths", n - 1),
 488                            }
 489                        );
 490                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 491                    }
 492                })?;
 493
 494                if should_send_exception_breakpoints {
 495                    _ = session.update(cx, |this, _| {
 496                        filters.retain(|filter| {
 497                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 498                                defaults
 499                                    .exception_breakpoints
 500                                    .get(&filter.filter)
 501                                    .map(|options| options.enabled)
 502                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 503                            } else {
 504                                filter.default.unwrap_or_default()
 505                            };
 506                            this.exception_breakpoints
 507                                .entry(filter.filter.clone())
 508                                .or_insert_with(|| (filter.clone(), is_enabled));
 509                            is_enabled
 510                        });
 511                    });
 512
 513                    this.send_exception_breakpoints(filters, supports_exception_filters)
 514                        .await
 515                        .ok();
 516                }
 517
 518                if configuration_done_supported {
 519                    this.request(ConfigurationDone {})
 520                } else {
 521                    Task::ready(Ok(()))
 522                }
 523                .await
 524            }
 525        });
 526
 527        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 528
 529        cx.spawn(async move |this, cx| {
 530            let result = task.await;
 531
 532            this.update(cx, |this, cx| {
 533                if let Some(this) = this.as_running_mut() {
 534                    this.is_started = true;
 535                    cx.notify();
 536                }
 537            })
 538            .ok();
 539
 540            result?;
 541            anyhow::Ok(())
 542        })
 543    }
 544
 545    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 546        let client = self.client.clone();
 547        let messages_tx = self.messages_tx.clone();
 548        let message_handler = Box::new(move |message| {
 549            messages_tx.unbounded_send(message).ok();
 550        });
 551        if client.should_reconnect_for_ssh() {
 552            Some(cx.spawn(async move |cx| {
 553                client.connect(message_handler, cx).await?;
 554                anyhow::Ok(())
 555            }))
 556        } else {
 557            None
 558        }
 559    }
 560
 561    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 562    where
 563        <R::DapRequest as dap::requests::Request>::Response: 'static,
 564        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 565    {
 566        let request = Arc::new(request);
 567
 568        let request_clone = request.clone();
 569        let connection = self.client.clone();
 570        self.executor.spawn(async move {
 571            let args = request_clone.to_dap();
 572            let response = connection.request::<R::DapRequest>(args).await?;
 573            request.response_from_dap(response)
 574        })
 575    }
 576}
 577
 578impl SessionState {
 579    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 580    where
 581        <R::DapRequest as dap::requests::Request>::Response: 'static,
 582        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 583    {
 584        match self {
 585            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 586            SessionState::Booting(_) => Task::ready(Err(anyhow!(
 587                "no adapter running to send request: {request:?}"
 588            ))),
 589        }
 590    }
 591
 592    /// Did this debug session stop at least once?
 593    pub(crate) fn has_ever_stopped(&self) -> bool {
 594        match self {
 595            SessionState::Booting(_) => false,
 596            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 597        }
 598    }
 599
 600    fn stopped(&mut self) {
 601        if let SessionState::Running(running) = self {
 602            running.has_ever_stopped = true;
 603        }
 604    }
 605}
 606
 607#[derive(Default)]
 608struct ThreadStates {
 609    global_state: Option<ThreadStatus>,
 610    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 611}
 612
 613impl ThreadStates {
 614    fn stop_all_threads(&mut self) {
 615        self.global_state = Some(ThreadStatus::Stopped);
 616        self.known_thread_states.clear();
 617    }
 618
 619    fn exit_all_threads(&mut self) {
 620        self.global_state = Some(ThreadStatus::Exited);
 621        self.known_thread_states.clear();
 622    }
 623
 624    fn continue_all_threads(&mut self) {
 625        self.global_state = Some(ThreadStatus::Running);
 626        self.known_thread_states.clear();
 627    }
 628
 629    fn stop_thread(&mut self, thread_id: ThreadId) {
 630        self.known_thread_states
 631            .insert(thread_id, ThreadStatus::Stopped);
 632    }
 633
 634    fn continue_thread(&mut self, thread_id: ThreadId) {
 635        self.known_thread_states
 636            .insert(thread_id, ThreadStatus::Running);
 637    }
 638
 639    fn process_step(&mut self, thread_id: ThreadId) {
 640        self.known_thread_states
 641            .insert(thread_id, ThreadStatus::Stepping);
 642    }
 643
 644    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 645        self.thread_state(thread_id)
 646            .unwrap_or(ThreadStatus::Running)
 647    }
 648
 649    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 650        self.known_thread_states
 651            .get(&thread_id)
 652            .copied()
 653            .or(self.global_state)
 654    }
 655
 656    fn exit_thread(&mut self, thread_id: ThreadId) {
 657        self.known_thread_states
 658            .insert(thread_id, ThreadStatus::Exited);
 659    }
 660
 661    fn any_stopped_thread(&self) -> bool {
 662        self.global_state
 663            .is_some_and(|state| state == ThreadStatus::Stopped)
 664            || self
 665                .known_thread_states
 666                .values()
 667                .any(|status| *status == ThreadStatus::Stopped)
 668    }
 669}
 670const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 671
 672type IsEnabled = bool;
 673
 674#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 675pub struct OutputToken(pub usize);
 676/// Represents a current state of a single debug adapter and provides ways to mutate it.
 677pub struct Session {
 678    pub mode: SessionState,
 679    id: SessionId,
 680    label: Option<SharedString>,
 681    adapter: DebugAdapterName,
 682    pub(super) capabilities: Capabilities,
 683    child_session_ids: HashSet<SessionId>,
 684    parent_session: Option<Entity<Session>>,
 685    modules: Vec<dap::Module>,
 686    loaded_sources: Vec<dap::Source>,
 687    output_token: OutputToken,
 688    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 689    threads: IndexMap<ThreadId, Thread>,
 690    thread_states: ThreadStates,
 691    watchers: HashMap<SharedString, Watcher>,
 692    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 693    stack_frames: IndexMap<StackFrameId, StackFrame>,
 694    locations: HashMap<u64, dap::LocationsResponse>,
 695    is_session_terminated: bool,
 696    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 697    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 698    ignore_breakpoints: bool,
 699    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 700    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 701    background_tasks: Vec<Task<()>>,
 702    restart_task: Option<Task<()>>,
 703    task_context: TaskContext,
 704    memory: memory::Memory,
 705    quirks: SessionQuirks,
 706    remote_client: Option<Entity<RemoteClient>>,
 707}
 708
 709trait CacheableCommand: Any + Send + Sync {
 710    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 711    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 712    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 713}
 714
 715impl<T> CacheableCommand for T
 716where
 717    T: LocalDapCommand + PartialEq + Eq + Hash,
 718{
 719    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 720        (rhs as &dyn Any).downcast_ref::<Self>() == Some(self)
 721    }
 722
 723    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 724        T::hash(self, &mut hasher);
 725    }
 726
 727    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 728        self
 729    }
 730}
 731
 732pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 733
 734impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 735    fn from(request: T) -> Self {
 736        Self(Arc::new(request))
 737    }
 738}
 739
 740impl PartialEq for RequestSlot {
 741    fn eq(&self, other: &Self) -> bool {
 742        self.0.dyn_eq(other.0.as_ref())
 743    }
 744}
 745
 746impl Eq for RequestSlot {}
 747
 748impl Hash for RequestSlot {
 749    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 750        self.0.dyn_hash(state);
 751        (&*self.0 as &dyn Any).type_id().hash(state)
 752    }
 753}
 754
 755#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 756pub struct CompletionsQuery {
 757    pub query: String,
 758    pub column: u64,
 759    pub line: Option<u64>,
 760    pub frame_id: Option<u64>,
 761}
 762
 763impl CompletionsQuery {
 764    pub fn new(
 765        buffer: &language::Buffer,
 766        cursor_position: language::Anchor,
 767        frame_id: Option<u64>,
 768    ) -> Self {
 769        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 770        Self {
 771            query: buffer.text(),
 772            column: column as u64,
 773            frame_id,
 774            line: Some(row as u64),
 775        }
 776    }
 777}
 778
 779#[derive(Debug)]
 780pub enum SessionEvent {
 781    Modules,
 782    LoadedSources,
 783    Stopped(Option<ThreadId>),
 784    StackTrace,
 785    Variables,
 786    Watchers,
 787    Threads,
 788    InvalidateInlineValue,
 789    CapabilitiesLoaded,
 790    RunInTerminal {
 791        request: RunInTerminalRequestArguments,
 792        sender: mpsc::Sender<Result<u32>>,
 793    },
 794    DataBreakpointInfo,
 795    ConsoleOutput,
 796}
 797
 798#[derive(Clone, Debug, PartialEq, Eq)]
 799pub enum SessionStateEvent {
 800    Running,
 801    Shutdown,
 802    Restart,
 803    SpawnChildSession {
 804        request: StartDebuggingRequestArguments,
 805    },
 806}
 807
 808impl EventEmitter<SessionEvent> for Session {}
 809impl EventEmitter<SessionStateEvent> for Session {}
 810
 811// local session will send breakpoint updates to DAP for all new breakpoints
 812// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 813// BreakpointStore notifies session on breakpoint changes
 814impl Session {
 815    pub(crate) fn new(
 816        breakpoint_store: Entity<BreakpointStore>,
 817        session_id: SessionId,
 818        parent_session: Option<Entity<Session>>,
 819        label: Option<SharedString>,
 820        adapter: DebugAdapterName,
 821        task_context: TaskContext,
 822        quirks: SessionQuirks,
 823        remote_client: Option<Entity<RemoteClient>>,
 824        cx: &mut App,
 825    ) -> Entity<Self> {
 826        cx.new::<Self>(|cx| {
 827            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 828                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 829                    if let Some(local) = (!this.ignore_breakpoints)
 830                        .then(|| this.as_running_mut())
 831                        .flatten()
 832                    {
 833                        local
 834                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 835                            .detach();
 836                    };
 837                }
 838                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 839                    if let Some(local) = (!this.ignore_breakpoints)
 840                        .then(|| this.as_running_mut())
 841                        .flatten()
 842                    {
 843                        local.unset_breakpoints_from_paths(paths, cx).detach();
 844                    }
 845                }
 846                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 847            })
 848            .detach();
 849
 850            Self {
 851                mode: SessionState::Booting(None),
 852                id: session_id,
 853                child_session_ids: HashSet::default(),
 854                parent_session,
 855                capabilities: Capabilities::default(),
 856                watchers: HashMap::default(),
 857                variables: Default::default(),
 858                stack_frames: Default::default(),
 859                thread_states: ThreadStates::default(),
 860                output_token: OutputToken(0),
 861                output: circular_buffer::CircularBuffer::boxed(),
 862                requests: HashMap::default(),
 863                modules: Vec::default(),
 864                loaded_sources: Vec::default(),
 865                threads: IndexMap::default(),
 866                background_tasks: Vec::default(),
 867                restart_task: None,
 868                locations: Default::default(),
 869                is_session_terminated: false,
 870                ignore_breakpoints: false,
 871                breakpoint_store,
 872                data_breakpoints: Default::default(),
 873                exception_breakpoints: Default::default(),
 874                label,
 875                adapter,
 876                task_context,
 877                memory: memory::Memory::new(),
 878                quirks,
 879                remote_client,
 880            }
 881        })
 882    }
 883
 884    pub fn task_context(&self) -> &TaskContext {
 885        &self.task_context
 886    }
 887
 888    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 889        match &self.mode {
 890            SessionState::Booting(_) => None,
 891            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 892        }
 893    }
 894
 895    pub fn boot(
 896        &mut self,
 897        binary: DebugAdapterBinary,
 898        worktree: Entity<Worktree>,
 899        dap_store: WeakEntity<DapStore>,
 900        cx: &mut Context<Self>,
 901    ) -> Task<Result<()>> {
 902        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 903        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 904
 905        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 906            let mut initialized_tx = Some(initialized_tx);
 907            while let Some(message) = message_rx.next().await {
 908                if let Message::Event(event) = message {
 909                    if let Events::Initialized(_) = *event {
 910                        if let Some(tx) = initialized_tx.take() {
 911                            tx.send(()).ok();
 912                        }
 913                    } else {
 914                        let Ok(_) = this.update(cx, |session, cx| {
 915                            session.handle_dap_event(event, cx);
 916                        }) else {
 917                            break;
 918                        };
 919                    }
 920                } else if let Message::Request(request) = message {
 921                    let Ok(_) = this.update(cx, |this, cx| {
 922                        if request.command == StartDebugging::COMMAND {
 923                            this.handle_start_debugging_request(request, cx)
 924                                .detach_and_log_err(cx);
 925                        } else if request.command == RunInTerminal::COMMAND {
 926                            this.handle_run_in_terminal_request(request, cx)
 927                                .detach_and_log_err(cx);
 928                        }
 929                    }) else {
 930                        break;
 931                    };
 932                }
 933            }
 934        })];
 935        self.background_tasks = background_tasks;
 936        let id = self.id;
 937        let parent_session = self.parent_session.clone();
 938
 939        cx.spawn(async move |this, cx| {
 940            let mode = RunningMode::new(
 941                id,
 942                parent_session,
 943                worktree.downgrade(),
 944                binary.clone(),
 945                message_tx,
 946                cx,
 947            )
 948            .await?;
 949            this.update(cx, |this, cx| {
 950                match &mut this.mode {
 951                    SessionState::Booting(task) if task.is_some() => {
 952                        task.take().unwrap().detach_and_log_err(cx);
 953                    }
 954                    SessionState::Booting(_) => {}
 955                    SessionState::Running(_) => {
 956                        debug_panic!("Attempting to boot a session that is already running");
 957                    }
 958                };
 959                this.mode = SessionState::Running(mode);
 960                cx.emit(SessionStateEvent::Running);
 961            })?;
 962
 963            this.update(cx, |session, cx| session.request_initialize(cx))?
 964                .await?;
 965
 966            let result = this
 967                .update(cx, |session, cx| {
 968                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 969                })?
 970                .await;
 971
 972            if result.is_err() {
 973                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 974
 975                console
 976                    .send(format!(
 977                        "Tried to launch debugger with: {}",
 978                        serde_json::to_string_pretty(&binary.request_args.configuration)
 979                            .unwrap_or_default(),
 980                    ))
 981                    .await
 982                    .ok();
 983            }
 984
 985            result
 986        })
 987    }
 988
 989    pub fn session_id(&self) -> SessionId {
 990        self.id
 991    }
 992
 993    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 994        self.child_session_ids.clone()
 995    }
 996
 997    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 998        self.child_session_ids.insert(session_id);
 999    }
1000
1001    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1002        self.child_session_ids.remove(&session_id);
1003    }
1004
1005    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1006        self.parent_session
1007            .as_ref()
1008            .map(|session| session.read(cx).id)
1009    }
1010
1011    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1012        self.parent_session.as_ref()
1013    }
1014
1015    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1016        let Some(client) = self.adapter_client() else {
1017            return Task::ready(());
1018        };
1019
1020        let supports_terminate = self
1021            .capabilities
1022            .support_terminate_debuggee
1023            .unwrap_or(false);
1024
1025        cx.background_spawn(async move {
1026            if supports_terminate {
1027                client
1028                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1029                        restart: Some(false),
1030                    })
1031                    .await
1032                    .ok();
1033            } else {
1034                client
1035                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1036                        restart: Some(false),
1037                        terminate_debuggee: Some(true),
1038                        suspend_debuggee: Some(false),
1039                    })
1040                    .await
1041                    .ok();
1042            }
1043        })
1044    }
1045
1046    pub fn capabilities(&self) -> &Capabilities {
1047        &self.capabilities
1048    }
1049
1050    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1051        match &self.mode {
1052            SessionState::Booting(_) => None,
1053            SessionState::Running(running_mode) => Some(&running_mode.binary),
1054        }
1055    }
1056
1057    pub fn adapter(&self) -> DebugAdapterName {
1058        self.adapter.clone()
1059    }
1060
1061    pub fn label(&self) -> Option<SharedString> {
1062        self.label.clone()
1063    }
1064
1065    pub fn is_terminated(&self) -> bool {
1066        self.is_session_terminated
1067    }
1068
1069    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1070        let (tx, mut rx) = mpsc::unbounded();
1071
1072        cx.spawn(async move |this, cx| {
1073            while let Some(output) = rx.next().await {
1074                this.update(cx, |this, _| {
1075                    let event = dap::OutputEvent {
1076                        category: None,
1077                        output,
1078                        group: None,
1079                        variables_reference: None,
1080                        source: None,
1081                        line: None,
1082                        column: None,
1083                        data: None,
1084                        location_reference: None,
1085                    };
1086                    this.push_output(event);
1087                })?;
1088            }
1089            anyhow::Ok(())
1090        })
1091        .detach();
1092
1093        tx
1094    }
1095
1096    pub fn is_started(&self) -> bool {
1097        match &self.mode {
1098            SessionState::Booting(_) => false,
1099            SessionState::Running(running) => running.is_started,
1100        }
1101    }
1102
1103    pub fn is_building(&self) -> bool {
1104        matches!(self.mode, SessionState::Booting(_))
1105    }
1106
1107    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1108        match &mut self.mode {
1109            SessionState::Running(local_mode) => Some(local_mode),
1110            SessionState::Booting(_) => None,
1111        }
1112    }
1113
1114    pub fn as_running(&self) -> Option<&RunningMode> {
1115        match &self.mode {
1116            SessionState::Running(local_mode) => Some(local_mode),
1117            SessionState::Booting(_) => None,
1118        }
1119    }
1120
1121    fn handle_start_debugging_request(
1122        &mut self,
1123        request: dap::messages::Request,
1124        cx: &mut Context<Self>,
1125    ) -> Task<Result<()>> {
1126        let request_seq = request.seq;
1127
1128        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1129            .arguments
1130            .as_ref()
1131            .map(|value| serde_json::from_value(value.clone()));
1132
1133        let mut success = true;
1134        if let Some(Ok(request)) = launch_request {
1135            cx.emit(SessionStateEvent::SpawnChildSession { request });
1136        } else {
1137            log::error!(
1138                "Failed to parse launch request arguments: {:?}",
1139                request.arguments
1140            );
1141            success = false;
1142        }
1143
1144        cx.spawn(async move |this, cx| {
1145            this.update(cx, |this, cx| {
1146                this.respond_to_client(
1147                    request_seq,
1148                    success,
1149                    StartDebugging::COMMAND.to_string(),
1150                    None,
1151                    cx,
1152                )
1153            })?
1154            .await
1155        })
1156    }
1157
1158    fn handle_run_in_terminal_request(
1159        &mut self,
1160        request: dap::messages::Request,
1161        cx: &mut Context<Self>,
1162    ) -> Task<Result<()>> {
1163        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1164            request.arguments.unwrap_or_default(),
1165        ) {
1166            Ok(args) => args,
1167            Err(error) => {
1168                return cx.spawn(async move |session, cx| {
1169                    let error = serde_json::to_value(dap::ErrorResponse {
1170                        error: Some(dap::Message {
1171                            id: request.seq,
1172                            format: error.to_string(),
1173                            variables: None,
1174                            send_telemetry: None,
1175                            show_user: None,
1176                            url: None,
1177                            url_label: None,
1178                        }),
1179                    })
1180                    .ok();
1181
1182                    session
1183                        .update(cx, |this, cx| {
1184                            this.respond_to_client(
1185                                request.seq,
1186                                false,
1187                                StartDebugging::COMMAND.to_string(),
1188                                error,
1189                                cx,
1190                            )
1191                        })?
1192                        .await?;
1193
1194                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1195                });
1196            }
1197        };
1198
1199        let seq = request.seq;
1200
1201        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1202        cx.emit(SessionEvent::RunInTerminal {
1203            request: request_args,
1204            sender: tx,
1205        });
1206        cx.notify();
1207
1208        cx.spawn(async move |session, cx| {
1209            let result = util::maybe!(async move {
1210                rx.next().await.ok_or_else(|| {
1211                    anyhow!("failed to receive response from spawn terminal".to_string())
1212                })?
1213            })
1214            .await;
1215            let (success, body) = match result {
1216                Ok(pid) => (
1217                    true,
1218                    serde_json::to_value(dap::RunInTerminalResponse {
1219                        process_id: None,
1220                        shell_process_id: Some(pid as u64),
1221                    })
1222                    .ok(),
1223                ),
1224                Err(error) => (
1225                    false,
1226                    serde_json::to_value(dap::ErrorResponse {
1227                        error: Some(dap::Message {
1228                            id: seq,
1229                            format: error.to_string(),
1230                            variables: None,
1231                            send_telemetry: None,
1232                            show_user: None,
1233                            url: None,
1234                            url_label: None,
1235                        }),
1236                    })
1237                    .ok(),
1238                ),
1239            };
1240
1241            session
1242                .update(cx, |session, cx| {
1243                    session.respond_to_client(
1244                        seq,
1245                        success,
1246                        RunInTerminal::COMMAND.to_string(),
1247                        body,
1248                        cx,
1249                    )
1250                })?
1251                .await
1252        })
1253    }
1254
1255    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1256        let adapter_id = self.adapter().to_string();
1257        let request = Initialize { adapter_id };
1258
1259        let SessionState::Running(running) = &self.mode else {
1260            return Task::ready(Err(anyhow!(
1261                "Cannot send initialize request, task still building"
1262            )));
1263        };
1264        let mut response = running.request(request.clone());
1265
1266        cx.spawn(async move |this, cx| {
1267            loop {
1268                let capabilities = response.await;
1269                match capabilities {
1270                    Err(e) => {
1271                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1272                            this.as_running()
1273                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1274                        }) else {
1275                            return Err(e);
1276                        };
1277                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1278                        reconnect.await?;
1279
1280                        let Ok(Some(r)) = this.update(cx, |this, _| {
1281                            this.as_running()
1282                                .map(|running| running.request(request.clone()))
1283                        }) else {
1284                            return Err(e);
1285                        };
1286                        response = r
1287                    }
1288                    Ok(capabilities) => {
1289                        this.update(cx, |session, cx| {
1290                            session.capabilities = capabilities;
1291
1292                            cx.emit(SessionEvent::CapabilitiesLoaded);
1293                        })?;
1294                        return Ok(());
1295                    }
1296                }
1297            }
1298        })
1299    }
1300
1301    pub(super) fn initialize_sequence(
1302        &mut self,
1303        initialize_rx: oneshot::Receiver<()>,
1304        dap_store: WeakEntity<DapStore>,
1305        cx: &mut Context<Self>,
1306    ) -> Task<Result<()>> {
1307        match &self.mode {
1308            SessionState::Running(local_mode) => {
1309                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1310            }
1311            SessionState::Booting(_) => {
1312                Task::ready(Err(anyhow!("cannot initialize, still building")))
1313            }
1314        }
1315    }
1316
1317    pub fn run_to_position(
1318        &mut self,
1319        breakpoint: SourceBreakpoint,
1320        active_thread_id: ThreadId,
1321        cx: &mut Context<Self>,
1322    ) {
1323        match &mut self.mode {
1324            SessionState::Running(local_mode) => {
1325                if !matches!(
1326                    self.thread_states.thread_state(active_thread_id),
1327                    Some(ThreadStatus::Stopped)
1328                ) {
1329                    return;
1330                };
1331                let path = breakpoint.path.clone();
1332                local_mode.tmp_breakpoint = Some(breakpoint);
1333                let task = local_mode.send_breakpoints_from_path(
1334                    path,
1335                    BreakpointUpdatedReason::Toggled,
1336                    &self.breakpoint_store,
1337                    cx,
1338                );
1339
1340                cx.spawn(async move |this, cx| {
1341                    task.await;
1342                    this.update(cx, |this, cx| {
1343                        this.continue_thread(active_thread_id, cx);
1344                    })
1345                })
1346                .detach();
1347            }
1348            SessionState::Booting(_) => {}
1349        }
1350    }
1351
1352    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1353        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1354    }
1355
1356    pub fn output(
1357        &self,
1358        since: OutputToken,
1359    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1360        if self.output_token.0 == 0 {
1361            return (self.output.range(0..0), OutputToken(0));
1362        };
1363
1364        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1365
1366        let clamped_events_since = events_since.clamp(0, self.output.len());
1367        (
1368            self.output
1369                .range(self.output.len() - clamped_events_since..),
1370            self.output_token,
1371        )
1372    }
1373
1374    pub fn respond_to_client(
1375        &self,
1376        request_seq: u64,
1377        success: bool,
1378        command: String,
1379        body: Option<serde_json::Value>,
1380        cx: &mut Context<Self>,
1381    ) -> Task<Result<()>> {
1382        let Some(local_session) = self.as_running() else {
1383            unreachable!("Cannot respond to remote client");
1384        };
1385        let client = local_session.client.clone();
1386
1387        cx.background_spawn(async move {
1388            client
1389                .send_message(Message::Response(Response {
1390                    body,
1391                    success,
1392                    command,
1393                    seq: request_seq + 1,
1394                    request_seq,
1395                    message: None,
1396                }))
1397                .await
1398        })
1399    }
1400
1401    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1402        self.mode.stopped();
1403        // todo(debugger): Find a clean way to get around the clone
1404        let breakpoint_store = self.breakpoint_store.clone();
1405        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1406            let breakpoint = local.tmp_breakpoint.take()?;
1407            let path = breakpoint.path;
1408            Some((local, path))
1409        }) {
1410            local
1411                .send_breakpoints_from_path(
1412                    path,
1413                    BreakpointUpdatedReason::Toggled,
1414                    &breakpoint_store,
1415                    cx,
1416                )
1417                .detach();
1418        };
1419
1420        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1421            self.thread_states.stop_all_threads();
1422            self.invalidate_command_type::<StackTraceCommand>();
1423        }
1424
1425        // Event if we stopped all threads we still need to insert the thread_id
1426        // to our own data
1427        if let Some(thread_id) = event.thread_id {
1428            self.thread_states.stop_thread(ThreadId(thread_id));
1429
1430            self.invalidate_state(
1431                &StackTraceCommand {
1432                    thread_id,
1433                    start_frame: None,
1434                    levels: None,
1435                }
1436                .into(),
1437            );
1438        }
1439
1440        self.invalidate_generic();
1441        self.threads.clear();
1442        self.variables.clear();
1443        cx.emit(SessionEvent::Stopped(
1444            event
1445                .thread_id
1446                .map(Into::into)
1447                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1448        ));
1449        cx.emit(SessionEvent::InvalidateInlineValue);
1450        cx.notify();
1451    }
1452
1453    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1454        match *event {
1455            Events::Initialized(_) => {
1456                debug_assert!(
1457                    false,
1458                    "Initialized event should have been handled in LocalMode"
1459                );
1460            }
1461            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1462            Events::Continued(event) => {
1463                if event.all_threads_continued.unwrap_or_default() {
1464                    self.thread_states.continue_all_threads();
1465                    self.breakpoint_store.update(cx, |store, cx| {
1466                        store.remove_active_position(Some(self.session_id()), cx)
1467                    });
1468                } else {
1469                    self.thread_states
1470                        .continue_thread(ThreadId(event.thread_id));
1471                }
1472                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1473                self.invalidate_generic();
1474            }
1475            Events::Exited(_event) => {
1476                self.clear_active_debug_line(cx);
1477            }
1478            Events::Terminated(_) => {
1479                self.shutdown(cx).detach();
1480            }
1481            Events::Thread(event) => {
1482                let thread_id = ThreadId(event.thread_id);
1483
1484                match event.reason {
1485                    dap::ThreadEventReason::Started => {
1486                        self.thread_states.continue_thread(thread_id);
1487                    }
1488                    dap::ThreadEventReason::Exited => {
1489                        self.thread_states.exit_thread(thread_id);
1490                    }
1491                    reason => {
1492                        log::error!("Unhandled thread event reason {:?}", reason);
1493                    }
1494                }
1495                self.invalidate_state(&ThreadsCommand.into());
1496                cx.notify();
1497            }
1498            Events::Output(event) => {
1499                if event
1500                    .category
1501                    .as_ref()
1502                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1503                {
1504                    return;
1505                }
1506
1507                self.push_output(event);
1508                cx.notify();
1509            }
1510            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1511                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1512            }),
1513            Events::Module(event) => {
1514                match event.reason {
1515                    dap::ModuleEventReason::New => {
1516                        self.modules.push(event.module);
1517                    }
1518                    dap::ModuleEventReason::Changed => {
1519                        if let Some(module) = self
1520                            .modules
1521                            .iter_mut()
1522                            .find(|other| event.module.id == other.id)
1523                        {
1524                            *module = event.module;
1525                        }
1526                    }
1527                    dap::ModuleEventReason::Removed => {
1528                        self.modules.retain(|other| event.module.id != other.id);
1529                    }
1530                }
1531
1532                // todo(debugger): We should only send the invalidate command to downstream clients.
1533                // self.invalidate_state(&ModulesCommand.into());
1534            }
1535            Events::LoadedSource(_) => {
1536                self.invalidate_state(&LoadedSourcesCommand.into());
1537            }
1538            Events::Capabilities(event) => {
1539                self.capabilities = self.capabilities.merge(event.capabilities);
1540
1541                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1542                let recent_filters = self
1543                    .capabilities
1544                    .exception_breakpoint_filters
1545                    .iter()
1546                    .flatten()
1547                    .map(|filter| (filter.filter.clone(), filter.clone()))
1548                    .collect::<BTreeMap<_, _>>();
1549                for filter in recent_filters.values() {
1550                    let default = filter.default.unwrap_or_default();
1551                    self.exception_breakpoints
1552                        .entry(filter.filter.clone())
1553                        .or_insert_with(|| (filter.clone(), default));
1554                }
1555                self.exception_breakpoints
1556                    .retain(|k, _| recent_filters.contains_key(k));
1557                if self.is_started() {
1558                    self.send_exception_breakpoints(cx);
1559                }
1560
1561                // Remove the ones that no longer exist.
1562                cx.notify();
1563            }
1564            Events::Memory(_) => {}
1565            Events::Process(_) => {}
1566            Events::ProgressEnd(_) => {}
1567            Events::ProgressStart(_) => {}
1568            Events::ProgressUpdate(_) => {}
1569            Events::Invalidated(_) => {}
1570            Events::Other(event) => {
1571                // FIXME handle killRemoteBrowser too
1572                if event.event == "launchBrowserInCompanion" {
1573                    let Some(request) = serde_json::from_value(event.body).ok() else {
1574                        log::error!("failed to deserialize launchBrowserInCompanion event");
1575                        return;
1576                    };
1577                    let Some(remote_client) = self.remote_client.clone() else {
1578                        log::error!(
1579                            "no remote client so not handling launchBrowserInCompanion event"
1580                        );
1581                        return;
1582                    };
1583                    self.launch_browser_for_remote_server(remote_client, request, cx);
1584                }
1585            }
1586        }
1587    }
1588
1589    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1590    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1591        &mut self,
1592        request: T,
1593        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1594        cx: &mut Context<Self>,
1595    ) {
1596        const {
1597            assert!(
1598                T::CACHEABLE,
1599                "Only requests marked as cacheable should invoke `fetch`"
1600            );
1601        }
1602
1603        if !self.thread_states.any_stopped_thread()
1604            && request.type_id() != TypeId::of::<ThreadsCommand>()
1605            || self.is_session_terminated
1606        {
1607            return;
1608        }
1609
1610        let request_map = self
1611            .requests
1612            .entry(std::any::TypeId::of::<T>())
1613            .or_default();
1614
1615        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1616            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1617
1618            let task = Self::request_inner::<Arc<T>>(
1619                &self.capabilities,
1620                &self.mode,
1621                command,
1622                |this, result, cx| {
1623                    process_result(this, result, cx);
1624                    None
1625                },
1626                cx,
1627            );
1628            let task = cx
1629                .background_executor()
1630                .spawn(async move {
1631                    let _ = task.await?;
1632                    Some(())
1633                })
1634                .shared();
1635
1636            vacant.insert(task);
1637            cx.notify();
1638        }
1639    }
1640
1641    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1642        capabilities: &Capabilities,
1643        mode: &SessionState,
1644        request: T,
1645        process_result: impl FnOnce(
1646            &mut Self,
1647            Result<T::Response>,
1648            &mut Context<Self>,
1649        ) -> Option<T::Response>
1650        + 'static,
1651        cx: &mut Context<Self>,
1652    ) -> Task<Option<T::Response>> {
1653        if !T::is_supported(capabilities) {
1654            log::warn!(
1655                "Attempted to send a DAP request that isn't supported: {:?}",
1656                request
1657            );
1658            let error = Err(anyhow::Error::msg(
1659                "Couldn't complete request because it's not supported",
1660            ));
1661            return cx.spawn(async move |this, cx| {
1662                this.update(cx, |this, cx| process_result(this, error, cx))
1663                    .ok()
1664                    .flatten()
1665            });
1666        }
1667
1668        let request = mode.request_dap(request);
1669        cx.spawn(async move |this, cx| {
1670            let result = request.await;
1671            this.update(cx, |this, cx| process_result(this, result, cx))
1672                .ok()
1673                .flatten()
1674        })
1675    }
1676
1677    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1678        &self,
1679        request: T,
1680        process_result: impl FnOnce(
1681            &mut Self,
1682            Result<T::Response>,
1683            &mut Context<Self>,
1684        ) -> Option<T::Response>
1685        + 'static,
1686        cx: &mut Context<Self>,
1687    ) -> Task<Option<T::Response>> {
1688        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1689    }
1690
1691    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1692        self.requests.remove(&std::any::TypeId::of::<Command>());
1693    }
1694
1695    fn invalidate_generic(&mut self) {
1696        self.invalidate_command_type::<ModulesCommand>();
1697        self.invalidate_command_type::<LoadedSourcesCommand>();
1698        self.invalidate_command_type::<ThreadsCommand>();
1699        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1700        self.invalidate_command_type::<ReadMemory>();
1701        let executor = self.as_running().map(|running| running.executor.clone());
1702        if let Some(executor) = executor {
1703            self.memory.clear(&executor);
1704        }
1705    }
1706
1707    fn invalidate_state(&mut self, key: &RequestSlot) {
1708        self.requests
1709            .entry((&*key.0 as &dyn Any).type_id())
1710            .and_modify(|request_map| {
1711                request_map.remove(key);
1712            });
1713    }
1714
1715    fn push_output(&mut self, event: OutputEvent) {
1716        self.output.push_back(event);
1717        self.output_token.0 += 1;
1718    }
1719
1720    pub fn any_stopped_thread(&self) -> bool {
1721        self.thread_states.any_stopped_thread()
1722    }
1723
1724    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1725        self.thread_states.thread_status(thread_id)
1726    }
1727
1728    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1729        self.fetch(
1730            dap_command::ThreadsCommand,
1731            |this, result, cx| {
1732                let Some(result) = result.log_err() else {
1733                    return;
1734                };
1735
1736                this.threads = result
1737                    .into_iter()
1738                    .map(|thread| (ThreadId(thread.id), Thread::from(thread)))
1739                    .collect();
1740
1741                this.invalidate_command_type::<StackTraceCommand>();
1742                cx.emit(SessionEvent::Threads);
1743                cx.notify();
1744            },
1745            cx,
1746        );
1747
1748        self.threads
1749            .values()
1750            .map(|thread| {
1751                (
1752                    thread.dap.clone(),
1753                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1754                )
1755            })
1756            .collect()
1757    }
1758
1759    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1760        self.fetch(
1761            dap_command::ModulesCommand,
1762            |this, result, cx| {
1763                let Some(result) = result.log_err() else {
1764                    return;
1765                };
1766
1767                this.modules = result;
1768                cx.emit(SessionEvent::Modules);
1769                cx.notify();
1770            },
1771            cx,
1772        );
1773
1774        &self.modules
1775    }
1776
1777    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1778    pub fn data_access_size(
1779        &mut self,
1780        frame_id: Option<u64>,
1781        evaluate_name: &str,
1782        cx: &mut Context<Self>,
1783    ) -> Task<Option<u64>> {
1784        let request = self.request(
1785            EvaluateCommand {
1786                expression: format!("?${{sizeof({evaluate_name})}}"),
1787                frame_id,
1788
1789                context: Some(EvaluateArgumentsContext::Repl),
1790                source: None,
1791            },
1792            |_, response, _| response.ok(),
1793            cx,
1794        );
1795        cx.background_spawn(async move {
1796            let result = request.await?;
1797            result.result.parse().ok()
1798        })
1799    }
1800
1801    pub fn memory_reference_of_expr(
1802        &mut self,
1803        frame_id: Option<u64>,
1804        expression: String,
1805        cx: &mut Context<Self>,
1806    ) -> Task<Option<(String, Option<String>)>> {
1807        let request = self.request(
1808            EvaluateCommand {
1809                expression,
1810                frame_id,
1811
1812                context: Some(EvaluateArgumentsContext::Repl),
1813                source: None,
1814            },
1815            |_, response, _| response.ok(),
1816            cx,
1817        );
1818        cx.background_spawn(async move {
1819            let result = request.await?;
1820            result
1821                .memory_reference
1822                .map(|reference| (reference, result.type_))
1823        })
1824    }
1825
1826    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1827        let data = base64::engine::general_purpose::STANDARD.encode(data);
1828        self.request(
1829            WriteMemoryArguments {
1830                memory_reference: address.to_string(),
1831                data,
1832                allow_partial: None,
1833                offset: None,
1834            },
1835            |this, response, cx| {
1836                this.memory.clear(cx.background_executor());
1837                this.invalidate_command_type::<ReadMemory>();
1838                this.invalidate_command_type::<VariablesCommand>();
1839                cx.emit(SessionEvent::Variables);
1840                response.ok()
1841            },
1842            cx,
1843        )
1844        .detach();
1845    }
1846    pub fn read_memory(
1847        &mut self,
1848        range: RangeInclusive<u64>,
1849        cx: &mut Context<Self>,
1850    ) -> MemoryIterator {
1851        // This function is a bit more involved when it comes to fetching data.
1852        // Since we attempt to read memory in pages, we need to account for some parts
1853        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1854        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1855        let page_range = Memory::memory_range_to_page_range(range.clone());
1856        for page_address in PageAddress::iter_range(page_range) {
1857            self.read_single_page_memory(page_address, cx);
1858        }
1859        self.memory.memory_range(range)
1860    }
1861
1862    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1863        _ = maybe!({
1864            let builder = self.memory.build_page(page_start)?;
1865
1866            self.memory_read_fetch_page_recursive(builder, cx);
1867            Some(())
1868        });
1869    }
1870    fn memory_read_fetch_page_recursive(
1871        &mut self,
1872        mut builder: MemoryPageBuilder,
1873        cx: &mut Context<Self>,
1874    ) {
1875        let Some(next_request) = builder.next_request() else {
1876            // We're done fetching. Let's grab the page and insert it into our memory store.
1877            let (address, contents) = builder.build();
1878            self.memory.insert_page(address, contents);
1879
1880            return;
1881        };
1882        let size = next_request.size;
1883        self.fetch(
1884            ReadMemory {
1885                memory_reference: format!("0x{:X}", next_request.address),
1886                offset: Some(0),
1887                count: next_request.size,
1888            },
1889            move |this, memory, cx| {
1890                if let Ok(memory) = memory {
1891                    builder.known(memory.content);
1892                    if let Some(unknown) = memory.unreadable_bytes {
1893                        builder.unknown(unknown);
1894                    }
1895                    // This is the recursive bit: if we're not yet done with
1896                    // the whole page, we'll kick off a new request with smaller range.
1897                    // Note that this function is recursive only conceptually;
1898                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1899                    this.memory_read_fetch_page_recursive(builder, cx);
1900                } else {
1901                    builder.unknown(size);
1902                }
1903            },
1904            cx,
1905        );
1906    }
1907
1908    pub fn ignore_breakpoints(&self) -> bool {
1909        self.ignore_breakpoints
1910    }
1911
1912    pub fn toggle_ignore_breakpoints(
1913        &mut self,
1914        cx: &mut App,
1915    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1916        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1917    }
1918
1919    pub(crate) fn set_ignore_breakpoints(
1920        &mut self,
1921        ignore: bool,
1922        cx: &mut App,
1923    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1924        if self.ignore_breakpoints == ignore {
1925            return Task::ready(HashMap::default());
1926        }
1927
1928        self.ignore_breakpoints = ignore;
1929
1930        if let Some(local) = self.as_running() {
1931            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1932        } else {
1933            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1934            unimplemented!()
1935        }
1936    }
1937
1938    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
1939        self.data_breakpoints.values()
1940    }
1941
1942    pub fn exception_breakpoints(
1943        &self,
1944    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1945        self.exception_breakpoints.values()
1946    }
1947
1948    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1949        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1950            *is_enabled = !*is_enabled;
1951            self.send_exception_breakpoints(cx);
1952        }
1953    }
1954
1955    fn send_exception_breakpoints(&mut self, cx: &App) {
1956        if let Some(local) = self.as_running() {
1957            let exception_filters = self
1958                .exception_breakpoints
1959                .values()
1960                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1961                .collect();
1962
1963            let supports_exception_filters = self
1964                .capabilities
1965                .supports_exception_filter_options
1966                .unwrap_or_default();
1967            local
1968                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1969                .detach_and_log_err(cx);
1970        } else {
1971            debug_assert!(false, "Not implemented");
1972        }
1973    }
1974
1975    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
1976        if let Some(state) = self.data_breakpoints.get_mut(id) {
1977            state.is_enabled = !state.is_enabled;
1978            self.send_exception_breakpoints(cx);
1979        }
1980    }
1981
1982    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
1983        if let Some(mode) = self.as_running() {
1984            let breakpoints = self
1985                .data_breakpoints
1986                .values()
1987                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
1988                .collect();
1989            let command = SetDataBreakpointsCommand { breakpoints };
1990            mode.request(command).detach_and_log_err(cx);
1991        }
1992    }
1993
1994    pub fn create_data_breakpoint(
1995        &mut self,
1996        context: Arc<DataBreakpointContext>,
1997        data_id: String,
1998        dap: dap::DataBreakpoint,
1999        cx: &mut Context<Self>,
2000    ) {
2001        if self.data_breakpoints.remove(&data_id).is_none() {
2002            self.data_breakpoints.insert(
2003                data_id,
2004                DataBreakpointState {
2005                    dap,
2006                    is_enabled: true,
2007                    context,
2008                },
2009            );
2010        }
2011        self.send_data_breakpoints(cx);
2012    }
2013
2014    pub fn breakpoints_enabled(&self) -> bool {
2015        self.ignore_breakpoints
2016    }
2017
2018    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
2019        self.fetch(
2020            dap_command::LoadedSourcesCommand,
2021            |this, result, cx| {
2022                let Some(result) = result.log_err() else {
2023                    return;
2024                };
2025                this.loaded_sources = result;
2026                cx.emit(SessionEvent::LoadedSources);
2027                cx.notify();
2028            },
2029            cx,
2030        );
2031
2032        &self.loaded_sources
2033    }
2034
2035    fn fallback_to_manual_restart(
2036        &mut self,
2037        res: Result<()>,
2038        cx: &mut Context<Self>,
2039    ) -> Option<()> {
2040        if res.log_err().is_none() {
2041            cx.emit(SessionStateEvent::Restart);
2042            return None;
2043        }
2044        Some(())
2045    }
2046
2047    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2048        res.log_err()?;
2049        Some(())
2050    }
2051
2052    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2053        thread_id: ThreadId,
2054    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2055    {
2056        move |this, response, cx| match response.log_err() {
2057            Some(response) => {
2058                this.breakpoint_store.update(cx, |store, cx| {
2059                    store.remove_active_position(Some(this.session_id()), cx)
2060                });
2061                Some(response)
2062            }
2063            None => {
2064                this.thread_states.stop_thread(thread_id);
2065                cx.notify();
2066                None
2067            }
2068        }
2069    }
2070
2071    fn clear_active_debug_line_response(
2072        &mut self,
2073        response: Result<()>,
2074        cx: &mut Context<Session>,
2075    ) -> Option<()> {
2076        response.log_err()?;
2077        self.clear_active_debug_line(cx);
2078        Some(())
2079    }
2080
2081    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2082        self.breakpoint_store.update(cx, |store, cx| {
2083            store.remove_active_position(Some(self.id), cx)
2084        });
2085    }
2086
2087    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2088        self.request(
2089            PauseCommand {
2090                thread_id: thread_id.0,
2091            },
2092            Self::empty_response,
2093            cx,
2094        )
2095        .detach();
2096    }
2097
2098    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2099        self.request(
2100            RestartStackFrameCommand { stack_frame_id },
2101            Self::empty_response,
2102            cx,
2103        )
2104        .detach();
2105    }
2106
2107    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2108        if self.restart_task.is_some() || self.as_running().is_none() {
2109            return;
2110        }
2111
2112        let supports_dap_restart =
2113            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2114
2115        self.restart_task = Some(cx.spawn(async move |this, cx| {
2116            let _ = this.update(cx, |session, cx| {
2117                if supports_dap_restart {
2118                    session
2119                        .request(
2120                            RestartCommand {
2121                                raw: args.unwrap_or(Value::Null),
2122                            },
2123                            Self::fallback_to_manual_restart,
2124                            cx,
2125                        )
2126                        .detach();
2127                } else {
2128                    cx.emit(SessionStateEvent::Restart);
2129                }
2130            });
2131        }));
2132    }
2133
2134    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2135        if self.is_session_terminated {
2136            return Task::ready(());
2137        }
2138
2139        self.is_session_terminated = true;
2140        self.thread_states.exit_all_threads();
2141        cx.notify();
2142
2143        let task = match &mut self.mode {
2144            SessionState::Running(_) => {
2145                if self
2146                    .capabilities
2147                    .supports_terminate_request
2148                    .unwrap_or_default()
2149                {
2150                    self.request(
2151                        TerminateCommand {
2152                            restart: Some(false),
2153                        },
2154                        Self::clear_active_debug_line_response,
2155                        cx,
2156                    )
2157                } else {
2158                    self.request(
2159                        DisconnectCommand {
2160                            restart: Some(false),
2161                            terminate_debuggee: Some(true),
2162                            suspend_debuggee: Some(false),
2163                        },
2164                        Self::clear_active_debug_line_response,
2165                        cx,
2166                    )
2167                }
2168            }
2169            SessionState::Booting(build_task) => {
2170                build_task.take();
2171                Task::ready(Some(()))
2172            }
2173        };
2174
2175        cx.emit(SessionStateEvent::Shutdown);
2176
2177        cx.spawn(async move |this, cx| {
2178            task.await;
2179            let _ = this.update(cx, |this, _| {
2180                if let Some(adapter_client) = this.adapter_client() {
2181                    adapter_client.kill();
2182                }
2183            });
2184        })
2185    }
2186
2187    pub fn completions(
2188        &mut self,
2189        query: CompletionsQuery,
2190        cx: &mut Context<Self>,
2191    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2192        let task = self.request(query, |_, result, _| result.log_err(), cx);
2193
2194        cx.background_executor().spawn(async move {
2195            anyhow::Ok(
2196                task.await
2197                    .map(|response| response.targets)
2198                    .context("failed to fetch completions")?,
2199            )
2200        })
2201    }
2202
2203    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2204        let supports_single_thread_execution_requests =
2205            self.capabilities.supports_single_thread_execution_requests;
2206        self.thread_states.continue_thread(thread_id);
2207        self.request(
2208            ContinueCommand {
2209                args: ContinueArguments {
2210                    thread_id: thread_id.0,
2211                    single_thread: supports_single_thread_execution_requests,
2212                },
2213            },
2214            Self::on_step_response::<ContinueCommand>(thread_id),
2215            cx,
2216        )
2217        .detach();
2218    }
2219
2220    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2221        match self.mode {
2222            SessionState::Running(ref local) => Some(local.client.clone()),
2223            SessionState::Booting(_) => None,
2224        }
2225    }
2226
2227    pub fn has_ever_stopped(&self) -> bool {
2228        self.mode.has_ever_stopped()
2229    }
2230    pub fn step_over(
2231        &mut self,
2232        thread_id: ThreadId,
2233        granularity: SteppingGranularity,
2234        cx: &mut Context<Self>,
2235    ) {
2236        let supports_single_thread_execution_requests =
2237            self.capabilities.supports_single_thread_execution_requests;
2238        let supports_stepping_granularity = self
2239            .capabilities
2240            .supports_stepping_granularity
2241            .unwrap_or_default();
2242
2243        let command = NextCommand {
2244            inner: StepCommand {
2245                thread_id: thread_id.0,
2246                granularity: supports_stepping_granularity.then(|| granularity),
2247                single_thread: supports_single_thread_execution_requests,
2248            },
2249        };
2250
2251        self.thread_states.process_step(thread_id);
2252        self.request(
2253            command,
2254            Self::on_step_response::<NextCommand>(thread_id),
2255            cx,
2256        )
2257        .detach();
2258    }
2259
2260    pub fn step_in(
2261        &mut self,
2262        thread_id: ThreadId,
2263        granularity: SteppingGranularity,
2264        cx: &mut Context<Self>,
2265    ) {
2266        let supports_single_thread_execution_requests =
2267            self.capabilities.supports_single_thread_execution_requests;
2268        let supports_stepping_granularity = self
2269            .capabilities
2270            .supports_stepping_granularity
2271            .unwrap_or_default();
2272
2273        let command = StepInCommand {
2274            inner: StepCommand {
2275                thread_id: thread_id.0,
2276                granularity: supports_stepping_granularity.then(|| granularity),
2277                single_thread: supports_single_thread_execution_requests,
2278            },
2279        };
2280
2281        self.thread_states.process_step(thread_id);
2282        self.request(
2283            command,
2284            Self::on_step_response::<StepInCommand>(thread_id),
2285            cx,
2286        )
2287        .detach();
2288    }
2289
2290    pub fn step_out(
2291        &mut self,
2292        thread_id: ThreadId,
2293        granularity: SteppingGranularity,
2294        cx: &mut Context<Self>,
2295    ) {
2296        let supports_single_thread_execution_requests =
2297            self.capabilities.supports_single_thread_execution_requests;
2298        let supports_stepping_granularity = self
2299            .capabilities
2300            .supports_stepping_granularity
2301            .unwrap_or_default();
2302
2303        let command = StepOutCommand {
2304            inner: StepCommand {
2305                thread_id: thread_id.0,
2306                granularity: supports_stepping_granularity.then(|| granularity),
2307                single_thread: supports_single_thread_execution_requests,
2308            },
2309        };
2310
2311        self.thread_states.process_step(thread_id);
2312        self.request(
2313            command,
2314            Self::on_step_response::<StepOutCommand>(thread_id),
2315            cx,
2316        )
2317        .detach();
2318    }
2319
2320    pub fn step_back(
2321        &mut self,
2322        thread_id: ThreadId,
2323        granularity: SteppingGranularity,
2324        cx: &mut Context<Self>,
2325    ) {
2326        let supports_single_thread_execution_requests =
2327            self.capabilities.supports_single_thread_execution_requests;
2328        let supports_stepping_granularity = self
2329            .capabilities
2330            .supports_stepping_granularity
2331            .unwrap_or_default();
2332
2333        let command = StepBackCommand {
2334            inner: StepCommand {
2335                thread_id: thread_id.0,
2336                granularity: supports_stepping_granularity.then(|| granularity),
2337                single_thread: supports_single_thread_execution_requests,
2338            },
2339        };
2340
2341        self.thread_states.process_step(thread_id);
2342
2343        self.request(
2344            command,
2345            Self::on_step_response::<StepBackCommand>(thread_id),
2346            cx,
2347        )
2348        .detach();
2349    }
2350
2351    pub fn stack_frames(
2352        &mut self,
2353        thread_id: ThreadId,
2354        cx: &mut Context<Self>,
2355    ) -> Result<Vec<StackFrame>> {
2356        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2357            && self.requests.contains_key(&ThreadsCommand.type_id())
2358            && self.threads.contains_key(&thread_id)
2359        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2360        // We could still be using an old thread id and have sent a new thread's request
2361        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2362        // But it very well could cause a minor bug in the future that is hard to track down
2363        {
2364            self.fetch(
2365                super::dap_command::StackTraceCommand {
2366                    thread_id: thread_id.0,
2367                    start_frame: None,
2368                    levels: None,
2369                },
2370                move |this, stack_frames, cx| {
2371                    let entry =
2372                        this.threads
2373                            .entry(thread_id)
2374                            .and_modify(|thread| match &stack_frames {
2375                                Ok(stack_frames) => {
2376                                    thread.stack_frames = stack_frames
2377                                        .iter()
2378                                        .cloned()
2379                                        .map(StackFrame::from)
2380                                        .collect();
2381                                    thread.stack_frames_error = None;
2382                                }
2383                                Err(error) => {
2384                                    thread.stack_frames.clear();
2385                                    thread.stack_frames_error = Some(error.cloned());
2386                                }
2387                            });
2388                    debug_assert!(
2389                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2390                        "Sent request for thread_id that doesn't exist"
2391                    );
2392                    if let Ok(stack_frames) = stack_frames {
2393                        this.stack_frames.extend(
2394                            stack_frames
2395                                .into_iter()
2396                                .filter(|frame| {
2397                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2398                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2399                                    !(frame.id == 0
2400                                        && frame.line == 0
2401                                        && frame.column == 0
2402                                        && frame.presentation_hint
2403                                            == Some(StackFramePresentationHint::Label))
2404                                })
2405                                .map(|frame| (frame.id, StackFrame::from(frame))),
2406                        );
2407                    }
2408
2409                    this.invalidate_command_type::<ScopesCommand>();
2410                    this.invalidate_command_type::<VariablesCommand>();
2411
2412                    cx.emit(SessionEvent::StackTrace);
2413                },
2414                cx,
2415            );
2416        }
2417
2418        match self.threads.get(&thread_id) {
2419            Some(thread) => {
2420                if let Some(error) = &thread.stack_frames_error {
2421                    Err(error.cloned())
2422                } else {
2423                    Ok(thread.stack_frames.clone())
2424                }
2425            }
2426            None => Ok(Vec::new()),
2427        }
2428    }
2429
2430    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2431        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2432            && self
2433                .requests
2434                .contains_key(&TypeId::of::<StackTraceCommand>())
2435        {
2436            self.fetch(
2437                ScopesCommand { stack_frame_id },
2438                move |this, scopes, cx| {
2439                    let Some(scopes) = scopes.log_err() else {
2440                        return
2441                    };
2442
2443                    for scope in scopes.iter() {
2444                        this.variables(scope.variables_reference, cx);
2445                    }
2446
2447                    let entry = this
2448                        .stack_frames
2449                        .entry(stack_frame_id)
2450                        .and_modify(|stack_frame| {
2451                            stack_frame.scopes = scopes;
2452                        });
2453
2454                    cx.emit(SessionEvent::Variables);
2455
2456                    debug_assert!(
2457                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2458                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2459                    );
2460                },
2461                cx,
2462            );
2463        }
2464
2465        self.stack_frames
2466            .get(&stack_frame_id)
2467            .map(|frame| frame.scopes.as_slice())
2468            .unwrap_or_default()
2469    }
2470
2471    pub fn variables_by_stack_frame_id(
2472        &self,
2473        stack_frame_id: StackFrameId,
2474        globals: bool,
2475        locals: bool,
2476    ) -> Vec<dap::Variable> {
2477        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2478            return Vec::new();
2479        };
2480
2481        stack_frame
2482            .scopes
2483            .iter()
2484            .filter(|scope| {
2485                (scope.name.to_lowercase().contains("local") && locals)
2486                    || (scope.name.to_lowercase().contains("global") && globals)
2487            })
2488            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2489            .flatten()
2490            .cloned()
2491            .collect()
2492    }
2493
2494    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2495        &self.watchers
2496    }
2497
2498    pub fn add_watcher(
2499        &mut self,
2500        expression: SharedString,
2501        frame_id: u64,
2502        cx: &mut Context<Self>,
2503    ) -> Task<Result<()>> {
2504        let request = self.mode.request_dap(EvaluateCommand {
2505            expression: expression.to_string(),
2506            context: Some(EvaluateArgumentsContext::Watch),
2507            frame_id: Some(frame_id),
2508            source: None,
2509        });
2510
2511        cx.spawn(async move |this, cx| {
2512            let response = request.await?;
2513
2514            this.update(cx, |session, cx| {
2515                session.watchers.insert(
2516                    expression.clone(),
2517                    Watcher {
2518                        expression,
2519                        value: response.result.into(),
2520                        variables_reference: response.variables_reference,
2521                        presentation_hint: response.presentation_hint,
2522                    },
2523                );
2524                cx.emit(SessionEvent::Watchers);
2525            })
2526        })
2527    }
2528
2529    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2530        let watches = self.watchers.clone();
2531        for (_, watch) in watches.into_iter() {
2532            self.add_watcher(watch.expression.clone(), frame_id, cx)
2533                .detach();
2534        }
2535    }
2536
2537    pub fn remove_watcher(&mut self, expression: SharedString) {
2538        self.watchers.remove(&expression);
2539    }
2540
2541    pub fn variables(
2542        &mut self,
2543        variables_reference: VariableReference,
2544        cx: &mut Context<Self>,
2545    ) -> Vec<dap::Variable> {
2546        let command = VariablesCommand {
2547            variables_reference,
2548            filter: None,
2549            start: None,
2550            count: None,
2551            format: None,
2552        };
2553
2554        self.fetch(
2555            command,
2556            move |this, variables, cx| {
2557                let Some(variables) = variables.log_err() else {
2558                    return;
2559                };
2560
2561                this.variables.insert(variables_reference, variables);
2562
2563                cx.emit(SessionEvent::Variables);
2564                cx.emit(SessionEvent::InvalidateInlineValue);
2565            },
2566            cx,
2567        );
2568
2569        self.variables
2570            .get(&variables_reference)
2571            .cloned()
2572            .unwrap_or_default()
2573    }
2574
2575    pub fn data_breakpoint_info(
2576        &mut self,
2577        context: Arc<DataBreakpointContext>,
2578        mode: Option<String>,
2579        cx: &mut Context<Self>,
2580    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2581        let command = DataBreakpointInfoCommand { context, mode };
2582
2583        self.request(command, |_, response, _| response.ok(), cx)
2584    }
2585
2586    pub fn set_variable_value(
2587        &mut self,
2588        stack_frame_id: u64,
2589        variables_reference: u64,
2590        name: String,
2591        value: String,
2592        cx: &mut Context<Self>,
2593    ) {
2594        if self.capabilities.supports_set_variable.unwrap_or_default() {
2595            self.request(
2596                SetVariableValueCommand {
2597                    name,
2598                    value,
2599                    variables_reference,
2600                },
2601                move |this, response, cx| {
2602                    let response = response.log_err()?;
2603                    this.invalidate_command_type::<VariablesCommand>();
2604                    this.invalidate_command_type::<ReadMemory>();
2605                    this.memory.clear(cx.background_executor());
2606                    this.refresh_watchers(stack_frame_id, cx);
2607                    cx.emit(SessionEvent::Variables);
2608                    Some(response)
2609                },
2610                cx,
2611            )
2612            .detach();
2613        }
2614    }
2615
2616    pub fn evaluate(
2617        &mut self,
2618        expression: String,
2619        context: Option<EvaluateArgumentsContext>,
2620        frame_id: Option<u64>,
2621        source: Option<Source>,
2622        cx: &mut Context<Self>,
2623    ) -> Task<()> {
2624        let event = dap::OutputEvent {
2625            category: None,
2626            output: format!("> {expression}"),
2627            group: None,
2628            variables_reference: None,
2629            source: None,
2630            line: None,
2631            column: None,
2632            data: None,
2633            location_reference: None,
2634        };
2635        self.push_output(event);
2636        let request = self.mode.request_dap(EvaluateCommand {
2637            expression,
2638            context,
2639            frame_id,
2640            source,
2641        });
2642        cx.spawn(async move |this, cx| {
2643            let response = request.await;
2644            this.update(cx, |this, cx| {
2645                this.memory.clear(cx.background_executor());
2646                this.invalidate_command_type::<ReadMemory>();
2647                match response {
2648                    Ok(response) => {
2649                        let event = dap::OutputEvent {
2650                            category: None,
2651                            output: format!("< {}", &response.result),
2652                            group: None,
2653                            variables_reference: Some(response.variables_reference),
2654                            source: None,
2655                            line: None,
2656                            column: None,
2657                            data: None,
2658                            location_reference: None,
2659                        };
2660                        this.push_output(event);
2661                    }
2662                    Err(e) => {
2663                        let event = dap::OutputEvent {
2664                            category: None,
2665                            output: format!("{}", e),
2666                            group: None,
2667                            variables_reference: None,
2668                            source: None,
2669                            line: None,
2670                            column: None,
2671                            data: None,
2672                            location_reference: None,
2673                        };
2674                        this.push_output(event);
2675                    }
2676                };
2677                cx.notify();
2678            })
2679            .ok();
2680        })
2681    }
2682
2683    pub fn location(
2684        &mut self,
2685        reference: u64,
2686        cx: &mut Context<Self>,
2687    ) -> Option<dap::LocationsResponse> {
2688        self.fetch(
2689            LocationsCommand { reference },
2690            move |this, response, _| {
2691                let Some(response) = response.log_err() else {
2692                    return;
2693                };
2694                this.locations.insert(reference, response);
2695            },
2696            cx,
2697        );
2698        self.locations.get(&reference).cloned()
2699    }
2700
2701    pub fn is_attached(&self) -> bool {
2702        let SessionState::Running(local_mode) = &self.mode else {
2703            return false;
2704        };
2705        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2706    }
2707
2708    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2709        let command = DisconnectCommand {
2710            restart: Some(false),
2711            terminate_debuggee: Some(false),
2712            suspend_debuggee: Some(false),
2713        };
2714
2715        self.request(command, Self::empty_response, cx).detach()
2716    }
2717
2718    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2719        if self
2720            .capabilities
2721            .supports_terminate_threads_request
2722            .unwrap_or_default()
2723        {
2724            self.request(
2725                TerminateThreadsCommand {
2726                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2727                },
2728                Self::clear_active_debug_line_response,
2729                cx,
2730            )
2731            .detach();
2732        } else {
2733            self.shutdown(cx).detach();
2734        }
2735    }
2736
2737    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2738        self.thread_states.thread_state(thread_id)
2739    }
2740
2741    pub fn quirks(&self) -> SessionQuirks {
2742        self.quirks
2743    }
2744
2745    fn launch_browser_for_remote_server(
2746        &mut self,
2747        remote_client: Entity<RemoteClient>,
2748        request: LaunchBrowserInCompanionParams,
2749        cx: &mut Context<Self>,
2750    ) {
2751        let task = cx.spawn(async move |_, cx| {
2752            let (port_for_dap, _child) =
2753                if remote_client.read_with(cx, |client, _| client.shares_network_interface())? {
2754                    (request.server_port, None)
2755                } else {
2756                    let port = {
2757                        let listener = TcpListener::bind("127.0.0.1:0")
2758                            .await
2759                            .context("getting port for DAP")?;
2760                        listener.local_addr()?.port()
2761                    };
2762                    let child = remote_client.update(cx, |client, _| {
2763                        let command = client.build_forward_port_command(
2764                            port,
2765                            "localhost".into(),
2766                            request.server_port,
2767                        )?;
2768                        let child = new_smol_command(command.program)
2769                            .args(command.args)
2770                            .envs(command.env)
2771                            .spawn()
2772                            .context("spawning port forwarding process")?;
2773                        anyhow::Ok(child)
2774                    })??;
2775                    (port, Some(child))
2776                };
2777
2778            let port_for_browser = {
2779                let listener = TcpListener::bind("127.0.0.1:0")
2780                    .await
2781                    .context("getting port for browser")?;
2782                listener.local_addr()?.port()
2783            };
2784
2785            let path = request.path.clone();
2786            let _child = spawn_browser(request, port_for_browser)?;
2787
2788            Tokio::spawn(cx, async move {
2789                let url = format!("ws://localhost:{port_for_dap}{path}");
2790                log::info!("will connect to DAP running on remote at {url}");
2791                let (dap_stream, _response) = tokio_tungstenite::connect_async(&url)
2792                    .await
2793                    .context("connecting to DAP")?;
2794                let (mut dap_in, mut dap_out) = dap_stream.split();
2795                log::info!("established websocket connection to DAP running on remote");
2796
2797                let url = format!("ws://localhost:{port_for_browser}");
2798                log::info!("will connect to browser running running locally at {url}");
2799                tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
2800                let (browser_stream, _response) = tokio_tungstenite::connect_async(&url)
2801                    .await
2802                    .context("connecting to browser")?;
2803                let (mut browser_in, mut browser_out) = browser_stream.split();
2804                log::info!("established websocket connection to browser running locally");
2805
2806                let down_task = tokio::spawn(async move {
2807                    while let Some(message) = dap_out.next().await {
2808                        let message = message.context("reading message from DAP")?;
2809                        browser_in
2810                            .send(message)
2811                            .await
2812                            .context("sending message to browser")?;
2813                    }
2814                    anyhow::Ok(())
2815                });
2816                let up_task = tokio::spawn(async move {
2817                    while let Some(message) = browser_out.next().await {
2818                        let message = message.context("reading message from browser")?;
2819                        dap_in
2820                            .send(message)
2821                            .await
2822                            .context("sending message to DAP")?;
2823                    }
2824                    anyhow::Ok(())
2825                });
2826                down_task.await.ok();
2827                up_task.await.ok();
2828                anyhow::Ok(())
2829            })?
2830            .await??;
2831            anyhow::Ok(())
2832        });
2833        self.background_tasks.push(cx.spawn(async move |_, _| {
2834            task.await.log_err();
2835        }));
2836    }
2837}
2838
2839fn spawn_browser(
2840    mut request: LaunchBrowserInCompanionParams,
2841    port_for_browser: u16,
2842) -> Result<smol::process::Child> {
2843    if let Some(ix) = request
2844        .browser_args
2845        .iter()
2846        .position(|arg| arg == "--remote-debugging-pipe")
2847    {
2848        request.browser_args[ix] = format!("--remote-debugging-port={port_for_browser}");
2849        request
2850            .browser_args
2851            .retain(|arg| !arg.starts_with("--remote-debugging-io-pipes"));
2852    } else {
2853        // FIXME
2854        bail!("expected --remote-debugging-pipe")
2855    }
2856
2857    dbg!(&request.browser_args);
2858
2859    // FIXME
2860    let path = match request.r#type.as_str() {
2861        "edge" => "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe",
2862        "chrome" => "C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
2863        other => bail!("unrecognized browser debugging type: {other:?}"),
2864    };
2865
2866    let child = new_smol_command(path)
2867        .args(request.browser_args)
2868        .stdin(Stdio::null())
2869        .stdout(Stdio::null())
2870        .stderr(Stdio::null())
2871        .spawn()
2872        .context("spawning browser")?;
2873    Ok(child)
2874}
2875
2876#[derive(Deserialize)]
2877#[serde(rename_all = "camelCase")]
2878struct LaunchBrowserInCompanionParams {
2879    r#type: String,
2880    browser_args: Vec<String>,
2881    server_port: u16,
2882    path: String,
2883    launch_id: u64,
2884    params: serde_json::Value,
2885}