session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2use crate::debugger::dap_command::ReadMemory;
   3use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
   4
   5use super::breakpoint_store::{
   6    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   7};
   8use super::dap_command::{
   9    self, Attach, ConfigurationDone, ContinueCommand, DisconnectCommand, EvaluateCommand,
  10    Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand, ModulesCommand,
  11    NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand, ScopesCommand,
  12    SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand, StepBackCommand,
  13    StepCommand, StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand,
  14    ThreadsCommand, VariablesCommand,
  15};
  16use super::dap_store::DapStore;
  17use anyhow::{Context as _, Result, anyhow};
  18use base64::Engine;
  19use collections::{HashMap, HashSet, IndexMap};
  20use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  21use dap::messages::Response;
  22use dap::requests::{Request, RunInTerminal, StartDebugging};
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::SinkExt;
  35use futures::channel::mpsc::UnboundedSender;
  36use futures::channel::{mpsc, oneshot};
  37use futures::{FutureExt, future::Shared};
  38use gpui::{
  39    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  40    Task, WeakEntity,
  41};
  42
  43use rpc::ErrorExt;
  44use serde_json::Value;
  45use smol::stream::StreamExt;
  46use std::any::TypeId;
  47use std::collections::BTreeMap;
  48use std::ops::RangeInclusive;
  49use std::u64;
  50use std::{
  51    any::Any,
  52    collections::hash_map::Entry,
  53    hash::{Hash, Hasher},
  54    path::Path,
  55    sync::Arc,
  56};
  57use task::TaskContext;
  58use text::{PointUtf16, ToPointUtf16};
  59use util::{ResultExt, maybe};
  60use worktree::Worktree;
  61
  62#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  63#[repr(transparent)]
  64pub struct ThreadId(pub u64);
  65
  66impl ThreadId {
  67    pub const MIN: ThreadId = ThreadId(u64::MIN);
  68    pub const MAX: ThreadId = ThreadId(u64::MAX);
  69}
  70
  71impl From<u64> for ThreadId {
  72    fn from(id: u64) -> Self {
  73        Self(id)
  74    }
  75}
  76
  77#[derive(Clone, Debug)]
  78pub struct StackFrame {
  79    pub dap: dap::StackFrame,
  80    pub scopes: Vec<dap::Scope>,
  81}
  82
  83impl From<dap::StackFrame> for StackFrame {
  84    fn from(stack_frame: dap::StackFrame) -> Self {
  85        Self {
  86            scopes: vec![],
  87            dap: stack_frame,
  88        }
  89    }
  90}
  91
  92#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  93pub enum ThreadStatus {
  94    #[default]
  95    Running,
  96    Stopped,
  97    Stepping,
  98    Exited,
  99    Ended,
 100}
 101
 102impl ThreadStatus {
 103    pub fn label(&self) -> &'static str {
 104        match self {
 105            ThreadStatus::Running => "Running",
 106            ThreadStatus::Stopped => "Stopped",
 107            ThreadStatus::Stepping => "Stepping",
 108            ThreadStatus::Exited => "Exited",
 109            ThreadStatus::Ended => "Ended",
 110        }
 111    }
 112}
 113
 114#[derive(Debug)]
 115pub struct Thread {
 116    dap: dap::Thread,
 117    stack_frames: Vec<StackFrame>,
 118    stack_frames_error: Option<anyhow::Error>,
 119    _has_stopped: bool,
 120}
 121
 122impl From<dap::Thread> for Thread {
 123    fn from(dap: dap::Thread) -> Self {
 124        Self {
 125            dap,
 126            stack_frames: Default::default(),
 127            stack_frames_error: None,
 128            _has_stopped: false,
 129        }
 130    }
 131}
 132
 133#[derive(Debug, Clone, PartialEq)]
 134pub struct Watcher {
 135    pub expression: SharedString,
 136    pub value: SharedString,
 137    pub variables_reference: u64,
 138    pub presentation_hint: Option<VariablePresentationHint>,
 139}
 140
 141pub enum SessionState {
 142    Building(Option<Task<Result<()>>>),
 143    Running(RunningMode),
 144}
 145
 146#[derive(Clone)]
 147pub struct RunningMode {
 148    client: Arc<DebugAdapterClient>,
 149    binary: DebugAdapterBinary,
 150    tmp_breakpoint: Option<SourceBreakpoint>,
 151    worktree: WeakEntity<Worktree>,
 152    executor: BackgroundExecutor,
 153    is_started: bool,
 154    has_ever_stopped: bool,
 155    messages_tx: UnboundedSender<Message>,
 156}
 157
 158#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 159pub struct SessionQuirks {
 160    pub compact: bool,
 161    pub prefer_thread_name: bool,
 162}
 163
 164fn client_source(abs_path: &Path) -> dap::Source {
 165    dap::Source {
 166        name: abs_path
 167            .file_name()
 168            .map(|filename| filename.to_string_lossy().to_string()),
 169        path: Some(abs_path.to_string_lossy().to_string()),
 170        source_reference: None,
 171        presentation_hint: None,
 172        origin: None,
 173        sources: None,
 174        adapter_data: None,
 175        checksums: None,
 176    }
 177}
 178
 179impl RunningMode {
 180    async fn new(
 181        session_id: SessionId,
 182        parent_session: Option<Entity<Session>>,
 183        worktree: WeakEntity<Worktree>,
 184        binary: DebugAdapterBinary,
 185        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 186        cx: &mut AsyncApp,
 187    ) -> Result<Self> {
 188        let message_handler = Box::new({
 189            let messages_tx = messages_tx.clone();
 190            move |message| {
 191                messages_tx.unbounded_send(message).ok();
 192            }
 193        });
 194
 195        let client = if let Some(client) = parent_session
 196            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 197            .flatten()
 198        {
 199            client
 200                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 201                .await?
 202        } else {
 203            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 204        };
 205
 206        Ok(Self {
 207            client: Arc::new(client),
 208            worktree,
 209            tmp_breakpoint: None,
 210            binary,
 211            executor: cx.background_executor().clone(),
 212            is_started: false,
 213            has_ever_stopped: false,
 214            messages_tx,
 215        })
 216    }
 217
 218    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 219        &self.worktree
 220    }
 221
 222    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 223        let tasks: Vec<_> = paths
 224            .into_iter()
 225            .map(|path| {
 226                self.request(dap_command::SetBreakpoints {
 227                    source: client_source(path),
 228                    source_modified: None,
 229                    breakpoints: vec![],
 230                })
 231            })
 232            .collect();
 233
 234        cx.background_spawn(async move {
 235            futures::future::join_all(tasks)
 236                .await
 237                .iter()
 238                .for_each(|res| match res {
 239                    Ok(_) => {}
 240                    Err(err) => {
 241                        log::warn!("Set breakpoints request failed: {}", err);
 242                    }
 243                });
 244        })
 245    }
 246
 247    fn send_breakpoints_from_path(
 248        &self,
 249        abs_path: Arc<Path>,
 250        reason: BreakpointUpdatedReason,
 251        breakpoint_store: &Entity<BreakpointStore>,
 252        cx: &mut App,
 253    ) -> Task<()> {
 254        let breakpoints =
 255            breakpoint_store
 256                .read(cx)
 257                .source_breakpoints_from_path(&abs_path, cx)
 258                .into_iter()
 259                .filter(|bp| bp.state.is_enabled())
 260                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 261                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 262                }))
 263                .map(Into::into)
 264                .collect();
 265
 266        let raw_breakpoints = breakpoint_store
 267            .read(cx)
 268            .breakpoints_from_path(&abs_path)
 269            .into_iter()
 270            .filter(|bp| bp.bp.state.is_enabled())
 271            .collect::<Vec<_>>();
 272
 273        let task = self.request(dap_command::SetBreakpoints {
 274            source: client_source(&abs_path),
 275            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 276            breakpoints,
 277        });
 278        let session_id = self.client.id();
 279        let breakpoint_store = breakpoint_store.downgrade();
 280        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 281            Ok(breakpoints) => {
 282                let breakpoints =
 283                    breakpoints
 284                        .into_iter()
 285                        .zip(raw_breakpoints)
 286                        .filter_map(|(dap_bp, zed_bp)| {
 287                            Some((
 288                                zed_bp,
 289                                BreakpointSessionState {
 290                                    id: dap_bp.id?,
 291                                    verified: dap_bp.verified,
 292                                },
 293                            ))
 294                        });
 295                breakpoint_store
 296                    .update(cx, |this, _| {
 297                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 298                    })
 299                    .ok();
 300            }
 301            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 302        })
 303    }
 304
 305    fn send_exception_breakpoints(
 306        &self,
 307        filters: Vec<ExceptionBreakpointsFilter>,
 308        supports_filter_options: bool,
 309    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 310        let arg = if supports_filter_options {
 311            SetExceptionBreakpoints::WithOptions {
 312                filters: filters
 313                    .into_iter()
 314                    .map(|filter| ExceptionFilterOptions {
 315                        filter_id: filter.filter,
 316                        condition: None,
 317                        mode: None,
 318                    })
 319                    .collect(),
 320            }
 321        } else {
 322            SetExceptionBreakpoints::Plain {
 323                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 324            }
 325        };
 326        self.request(arg)
 327    }
 328
 329    fn send_source_breakpoints(
 330        &self,
 331        ignore_breakpoints: bool,
 332        breakpoint_store: &Entity<BreakpointStore>,
 333        cx: &App,
 334    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 335        let mut breakpoint_tasks = Vec::new();
 336        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 337        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 338        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 339        let session_id = self.client.id();
 340        for (path, breakpoints) in breakpoints {
 341            let breakpoints = if ignore_breakpoints {
 342                vec![]
 343            } else {
 344                breakpoints
 345                    .into_iter()
 346                    .filter(|bp| bp.state.is_enabled())
 347                    .map(Into::into)
 348                    .collect()
 349            };
 350
 351            let raw_breakpoints = raw_breakpoints
 352                .remove(&path)
 353                .unwrap_or_default()
 354                .into_iter()
 355                .filter(|bp| bp.bp.state.is_enabled());
 356            let error_path = path.clone();
 357            let send_request = self
 358                .request(dap_command::SetBreakpoints {
 359                    source: client_source(&path),
 360                    source_modified: Some(false),
 361                    breakpoints,
 362                })
 363                .map(|result| result.map_err(move |e| (error_path, e)));
 364
 365            let task = cx.spawn({
 366                let breakpoint_store = breakpoint_store.downgrade();
 367                async move |cx| {
 368                    let breakpoints = cx.background_spawn(send_request).await?;
 369
 370                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 371                        |(dap_bp, zed_bp)| {
 372                            Some((
 373                                zed_bp,
 374                                BreakpointSessionState {
 375                                    id: dap_bp.id?,
 376                                    verified: dap_bp.verified,
 377                                },
 378                            ))
 379                        },
 380                    );
 381                    breakpoint_store
 382                        .update(cx, |this, _| {
 383                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 384                        })
 385                        .ok();
 386
 387                    Ok(())
 388                }
 389            });
 390            breakpoint_tasks.push(task);
 391        }
 392
 393        cx.background_spawn(async move {
 394            futures::future::join_all(breakpoint_tasks)
 395                .await
 396                .into_iter()
 397                .filter_map(Result::err)
 398                .collect::<HashMap<_, _>>()
 399        })
 400    }
 401
 402    fn initialize_sequence(
 403        &self,
 404        capabilities: &Capabilities,
 405        initialized_rx: oneshot::Receiver<()>,
 406        dap_store: WeakEntity<DapStore>,
 407        cx: &mut Context<Session>,
 408    ) -> Task<Result<()>> {
 409        let raw = self.binary.request_args.clone();
 410
 411        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 412        let launch = match raw.request {
 413            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 414                raw: raw.configuration,
 415            }),
 416            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 417                raw: raw.configuration,
 418            }),
 419        };
 420
 421        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 422        // From spec (on initialization sequence):
 423        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 424        //
 425        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 426        let should_send_exception_breakpoints = capabilities
 427            .exception_breakpoint_filters
 428            .as_ref()
 429            .map_or(false, |filters| !filters.is_empty())
 430            || !configuration_done_supported;
 431        let supports_exception_filters = capabilities
 432            .supports_exception_filter_options
 433            .unwrap_or_default();
 434        let this = self.clone();
 435        let worktree = self.worktree().clone();
 436        let mut filters = capabilities
 437            .exception_breakpoint_filters
 438            .clone()
 439            .unwrap_or_default();
 440        let configuration_sequence = cx.spawn({
 441            async move |session, cx| {
 442                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 443                let (breakpoint_store, adapter_defaults) =
 444                    dap_store.read_with(cx, |dap_store, _| {
 445                        (
 446                            dap_store.breakpoint_store().clone(),
 447                            dap_store.adapter_options(&adapter_name),
 448                        )
 449                    })?;
 450                initialized_rx.await?;
 451                let errors_by_path = cx
 452                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 453                    .await;
 454
 455                dap_store.update(cx, |_, cx| {
 456                    let Some(worktree) = worktree.upgrade() else {
 457                        return;
 458                    };
 459
 460                    for (path, error) in &errors_by_path {
 461                        log::error!("failed to set breakpoints for {path:?}: {error}");
 462                    }
 463
 464                    if let Some(failed_path) = errors_by_path.keys().next() {
 465                        let failed_path = failed_path
 466                            .strip_prefix(worktree.read(cx).abs_path())
 467                            .unwrap_or(failed_path)
 468                            .display();
 469                        let message = format!(
 470                            "Failed to set breakpoints for {failed_path}{}",
 471                            match errors_by_path.len() {
 472                                0 => unreachable!(),
 473                                1 => "".into(),
 474                                2 => " and 1 other path".into(),
 475                                n => format!(" and {} other paths", n - 1),
 476                            }
 477                        );
 478                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 479                    }
 480                })?;
 481
 482                if should_send_exception_breakpoints {
 483                    _ = session.update(cx, |this, _| {
 484                        filters.retain(|filter| {
 485                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 486                                defaults
 487                                    .exception_breakpoints
 488                                    .get(&filter.filter)
 489                                    .map(|options| options.enabled)
 490                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 491                            } else {
 492                                filter.default.unwrap_or_default()
 493                            };
 494                            this.exception_breakpoints
 495                                .entry(filter.filter.clone())
 496                                .or_insert_with(|| (filter.clone(), is_enabled));
 497                            is_enabled
 498                        });
 499                    });
 500
 501                    this.send_exception_breakpoints(filters, supports_exception_filters)
 502                        .await
 503                        .ok();
 504                }
 505
 506                let ret = if configuration_done_supported {
 507                    this.request(ConfigurationDone {})
 508                } else {
 509                    Task::ready(Ok(()))
 510                }
 511                .await;
 512                ret
 513            }
 514        });
 515
 516        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 517
 518        cx.spawn(async move |this, cx| {
 519            let result = task.await;
 520
 521            this.update(cx, |this, cx| {
 522                if let Some(this) = this.as_running_mut() {
 523                    this.is_started = true;
 524                    cx.notify();
 525                }
 526            })
 527            .ok();
 528
 529            result?;
 530            anyhow::Ok(())
 531        })
 532    }
 533
 534    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 535        let client = self.client.clone();
 536        let messages_tx = self.messages_tx.clone();
 537        let message_handler = Box::new(move |message| {
 538            messages_tx.unbounded_send(message).ok();
 539        });
 540        if client.should_reconnect_for_ssh() {
 541            Some(cx.spawn(async move |cx| {
 542                client.connect(message_handler, cx).await?;
 543                anyhow::Ok(())
 544            }))
 545        } else {
 546            None
 547        }
 548    }
 549
 550    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 551    where
 552        <R::DapRequest as dap::requests::Request>::Response: 'static,
 553        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 554    {
 555        let request = Arc::new(request);
 556
 557        let request_clone = request.clone();
 558        let connection = self.client.clone();
 559        self.executor.spawn(async move {
 560            let args = request_clone.to_dap();
 561            let response = connection.request::<R::DapRequest>(args).await?;
 562            request.response_from_dap(response)
 563        })
 564    }
 565}
 566
 567impl SessionState {
 568    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 569    where
 570        <R::DapRequest as dap::requests::Request>::Response: 'static,
 571        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 572    {
 573        match self {
 574            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 575            SessionState::Building(_) => Task::ready(Err(anyhow!(
 576                "no adapter running to send request: {request:?}"
 577            ))),
 578        }
 579    }
 580
 581    /// Did this debug session stop at least once?
 582    pub(crate) fn has_ever_stopped(&self) -> bool {
 583        match self {
 584            SessionState::Building(_) => false,
 585            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 586        }
 587    }
 588
 589    fn stopped(&mut self) {
 590        if let SessionState::Running(running) = self {
 591            running.has_ever_stopped = true;
 592        }
 593    }
 594}
 595
 596#[derive(Default)]
 597struct ThreadStates {
 598    global_state: Option<ThreadStatus>,
 599    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 600}
 601
 602impl ThreadStates {
 603    fn stop_all_threads(&mut self) {
 604        self.global_state = Some(ThreadStatus::Stopped);
 605        self.known_thread_states.clear();
 606    }
 607
 608    fn exit_all_threads(&mut self) {
 609        self.global_state = Some(ThreadStatus::Exited);
 610        self.known_thread_states.clear();
 611    }
 612
 613    fn continue_all_threads(&mut self) {
 614        self.global_state = Some(ThreadStatus::Running);
 615        self.known_thread_states.clear();
 616    }
 617
 618    fn stop_thread(&mut self, thread_id: ThreadId) {
 619        self.known_thread_states
 620            .insert(thread_id, ThreadStatus::Stopped);
 621    }
 622
 623    fn continue_thread(&mut self, thread_id: ThreadId) {
 624        self.known_thread_states
 625            .insert(thread_id, ThreadStatus::Running);
 626    }
 627
 628    fn process_step(&mut self, thread_id: ThreadId) {
 629        self.known_thread_states
 630            .insert(thread_id, ThreadStatus::Stepping);
 631    }
 632
 633    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 634        self.thread_state(thread_id)
 635            .unwrap_or(ThreadStatus::Running)
 636    }
 637
 638    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 639        self.known_thread_states
 640            .get(&thread_id)
 641            .copied()
 642            .or(self.global_state)
 643    }
 644
 645    fn exit_thread(&mut self, thread_id: ThreadId) {
 646        self.known_thread_states
 647            .insert(thread_id, ThreadStatus::Exited);
 648    }
 649
 650    fn any_stopped_thread(&self) -> bool {
 651        self.global_state
 652            .is_some_and(|state| state == ThreadStatus::Stopped)
 653            || self
 654                .known_thread_states
 655                .values()
 656                .any(|status| *status == ThreadStatus::Stopped)
 657    }
 658}
 659const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 660
 661type IsEnabled = bool;
 662
 663#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 664pub struct OutputToken(pub usize);
 665/// Represents a current state of a single debug adapter and provides ways to mutate it.
 666pub struct Session {
 667    pub mode: SessionState,
 668    id: SessionId,
 669    label: Option<SharedString>,
 670    adapter: DebugAdapterName,
 671    pub(super) capabilities: Capabilities,
 672    child_session_ids: HashSet<SessionId>,
 673    parent_session: Option<Entity<Session>>,
 674    modules: Vec<dap::Module>,
 675    loaded_sources: Vec<dap::Source>,
 676    output_token: OutputToken,
 677    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 678    threads: IndexMap<ThreadId, Thread>,
 679    thread_states: ThreadStates,
 680    watchers: HashMap<SharedString, Watcher>,
 681    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 682    stack_frames: IndexMap<StackFrameId, StackFrame>,
 683    locations: HashMap<u64, dap::LocationsResponse>,
 684    is_session_terminated: bool,
 685    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 686    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 687    ignore_breakpoints: bool,
 688    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 689    background_tasks: Vec<Task<()>>,
 690    restart_task: Option<Task<()>>,
 691    task_context: TaskContext,
 692    memory: memory::Memory,
 693    quirks: SessionQuirks,
 694}
 695
 696trait CacheableCommand: Any + Send + Sync {
 697    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 698    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 699    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 700}
 701
 702impl<T> CacheableCommand for T
 703where
 704    T: LocalDapCommand + PartialEq + Eq + Hash,
 705{
 706    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 707        (rhs as &dyn Any)
 708            .downcast_ref::<Self>()
 709            .map_or(false, |rhs| self == rhs)
 710    }
 711
 712    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 713        T::hash(self, &mut hasher);
 714    }
 715
 716    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 717        self
 718    }
 719}
 720
 721pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 722
 723impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 724    fn from(request: T) -> Self {
 725        Self(Arc::new(request))
 726    }
 727}
 728
 729impl PartialEq for RequestSlot {
 730    fn eq(&self, other: &Self) -> bool {
 731        self.0.dyn_eq(other.0.as_ref())
 732    }
 733}
 734
 735impl Eq for RequestSlot {}
 736
 737impl Hash for RequestSlot {
 738    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 739        self.0.dyn_hash(state);
 740        (&*self.0 as &dyn Any).type_id().hash(state)
 741    }
 742}
 743
 744#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 745pub struct CompletionsQuery {
 746    pub query: String,
 747    pub column: u64,
 748    pub line: Option<u64>,
 749    pub frame_id: Option<u64>,
 750}
 751
 752impl CompletionsQuery {
 753    pub fn new(
 754        buffer: &language::Buffer,
 755        cursor_position: language::Anchor,
 756        frame_id: Option<u64>,
 757    ) -> Self {
 758        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 759        Self {
 760            query: buffer.text(),
 761            column: column as u64,
 762            frame_id,
 763            line: Some(row as u64),
 764        }
 765    }
 766}
 767
 768#[derive(Debug)]
 769pub enum SessionEvent {
 770    Modules,
 771    LoadedSources,
 772    Stopped(Option<ThreadId>),
 773    StackTrace,
 774    Variables,
 775    Watchers,
 776    Threads,
 777    InvalidateInlineValue,
 778    CapabilitiesLoaded,
 779    RunInTerminal {
 780        request: RunInTerminalRequestArguments,
 781        sender: mpsc::Sender<Result<u32>>,
 782    },
 783    ConsoleOutput,
 784}
 785
 786#[derive(Clone, Debug, PartialEq, Eq)]
 787pub enum SessionStateEvent {
 788    Running,
 789    Shutdown,
 790    Restart,
 791    SpawnChildSession {
 792        request: StartDebuggingRequestArguments,
 793    },
 794}
 795
 796impl EventEmitter<SessionEvent> for Session {}
 797impl EventEmitter<SessionStateEvent> for Session {}
 798
 799// local session will send breakpoint updates to DAP for all new breakpoints
 800// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 801// BreakpointStore notifies session on breakpoint changes
 802impl Session {
 803    pub(crate) fn new(
 804        breakpoint_store: Entity<BreakpointStore>,
 805        session_id: SessionId,
 806        parent_session: Option<Entity<Session>>,
 807        label: Option<SharedString>,
 808        adapter: DebugAdapterName,
 809        task_context: TaskContext,
 810        quirks: SessionQuirks,
 811        cx: &mut App,
 812    ) -> Entity<Self> {
 813        cx.new::<Self>(|cx| {
 814            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 815                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 816                    if let Some(local) = (!this.ignore_breakpoints)
 817                        .then(|| this.as_running_mut())
 818                        .flatten()
 819                    {
 820                        local
 821                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 822                            .detach();
 823                    };
 824                }
 825                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 826                    if let Some(local) = (!this.ignore_breakpoints)
 827                        .then(|| this.as_running_mut())
 828                        .flatten()
 829                    {
 830                        local.unset_breakpoints_from_paths(paths, cx).detach();
 831                    }
 832                }
 833                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 834            })
 835            .detach();
 836
 837            let this = Self {
 838                mode: SessionState::Building(None),
 839                id: session_id,
 840                child_session_ids: HashSet::default(),
 841                parent_session,
 842                capabilities: Capabilities::default(),
 843                watchers: HashMap::default(),
 844                variables: Default::default(),
 845                stack_frames: Default::default(),
 846                thread_states: ThreadStates::default(),
 847                output_token: OutputToken(0),
 848                output: circular_buffer::CircularBuffer::boxed(),
 849                requests: HashMap::default(),
 850                modules: Vec::default(),
 851                loaded_sources: Vec::default(),
 852                threads: IndexMap::default(),
 853                background_tasks: Vec::default(),
 854                restart_task: None,
 855                locations: Default::default(),
 856                is_session_terminated: false,
 857                ignore_breakpoints: false,
 858                breakpoint_store,
 859                exception_breakpoints: Default::default(),
 860                label,
 861                adapter,
 862                task_context,
 863                memory: memory::Memory::new(),
 864                quirks,
 865            };
 866
 867            this
 868        })
 869    }
 870
 871    pub fn task_context(&self) -> &TaskContext {
 872        &self.task_context
 873    }
 874
 875    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 876        match &self.mode {
 877            SessionState::Building(_) => None,
 878            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 879        }
 880    }
 881
 882    pub fn boot(
 883        &mut self,
 884        binary: DebugAdapterBinary,
 885        worktree: Entity<Worktree>,
 886        dap_store: WeakEntity<DapStore>,
 887        cx: &mut Context<Self>,
 888    ) -> Task<Result<()>> {
 889        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 890        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 891
 892        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 893            let mut initialized_tx = Some(initialized_tx);
 894            while let Some(message) = message_rx.next().await {
 895                if let Message::Event(event) = message {
 896                    if let Events::Initialized(_) = *event {
 897                        if let Some(tx) = initialized_tx.take() {
 898                            tx.send(()).ok();
 899                        }
 900                    } else {
 901                        let Ok(_) = this.update(cx, |session, cx| {
 902                            session.handle_dap_event(event, cx);
 903                        }) else {
 904                            break;
 905                        };
 906                    }
 907                } else if let Message::Request(request) = message {
 908                    let Ok(_) = this.update(cx, |this, cx| {
 909                        if request.command == StartDebugging::COMMAND {
 910                            this.handle_start_debugging_request(request, cx)
 911                                .detach_and_log_err(cx);
 912                        } else if request.command == RunInTerminal::COMMAND {
 913                            this.handle_run_in_terminal_request(request, cx)
 914                                .detach_and_log_err(cx);
 915                        }
 916                    }) else {
 917                        break;
 918                    };
 919                }
 920            }
 921        })];
 922        self.background_tasks = background_tasks;
 923        let id = self.id;
 924        let parent_session = self.parent_session.clone();
 925
 926        cx.spawn(async move |this, cx| {
 927            let mode = RunningMode::new(
 928                id,
 929                parent_session,
 930                worktree.downgrade(),
 931                binary.clone(),
 932                message_tx,
 933                cx,
 934            )
 935            .await?;
 936            this.update(cx, |this, cx| {
 937                match &mut this.mode {
 938                    SessionState::Building(task) if task.is_some() => {
 939                        task.take().unwrap().detach_and_log_err(cx);
 940                    }
 941                    _ => {
 942                        debug_assert!(
 943                            this.parent_session.is_some(),
 944                            "Booting a root debug session without a boot task"
 945                        );
 946                    }
 947                };
 948                this.mode = SessionState::Running(mode);
 949                cx.emit(SessionStateEvent::Running);
 950            })?;
 951
 952            this.update(cx, |session, cx| session.request_initialize(cx))?
 953                .await?;
 954
 955            let result = this
 956                .update(cx, |session, cx| {
 957                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 958                })?
 959                .await;
 960
 961            if result.is_err() {
 962                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 963
 964                console
 965                    .send(format!(
 966                        "Tried to launch debugger with: {}",
 967                        serde_json::to_string_pretty(&binary.request_args.configuration)
 968                            .unwrap_or_default(),
 969                    ))
 970                    .await
 971                    .ok();
 972            }
 973
 974            result
 975        })
 976    }
 977
 978    pub fn session_id(&self) -> SessionId {
 979        self.id
 980    }
 981
 982    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 983        self.child_session_ids.clone()
 984    }
 985
 986    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 987        self.child_session_ids.insert(session_id);
 988    }
 989
 990    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 991        self.child_session_ids.remove(&session_id);
 992    }
 993
 994    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 995        self.parent_session
 996            .as_ref()
 997            .map(|session| session.read(cx).id)
 998    }
 999
1000    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1001        self.parent_session.as_ref()
1002    }
1003
1004    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1005        let Some(client) = self.adapter_client() else {
1006            return Task::ready(());
1007        };
1008
1009        let supports_terminate = self
1010            .capabilities
1011            .support_terminate_debuggee
1012            .unwrap_or(false);
1013
1014        cx.background_spawn(async move {
1015            if supports_terminate {
1016                client
1017                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1018                        restart: Some(false),
1019                    })
1020                    .await
1021                    .ok();
1022            } else {
1023                client
1024                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1025                        restart: Some(false),
1026                        terminate_debuggee: Some(true),
1027                        suspend_debuggee: Some(false),
1028                    })
1029                    .await
1030                    .ok();
1031            }
1032        })
1033    }
1034
1035    pub fn capabilities(&self) -> &Capabilities {
1036        &self.capabilities
1037    }
1038
1039    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1040        match &self.mode {
1041            SessionState::Building(_) => None,
1042            SessionState::Running(running_mode) => Some(&running_mode.binary),
1043        }
1044    }
1045
1046    pub fn adapter(&self) -> DebugAdapterName {
1047        self.adapter.clone()
1048    }
1049
1050    pub fn label(&self) -> Option<SharedString> {
1051        self.label.clone()
1052    }
1053
1054    pub fn is_terminated(&self) -> bool {
1055        self.is_session_terminated
1056    }
1057
1058    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1059        let (tx, mut rx) = mpsc::unbounded();
1060
1061        cx.spawn(async move |this, cx| {
1062            while let Some(output) = rx.next().await {
1063                this.update(cx, |this, _| {
1064                    let event = dap::OutputEvent {
1065                        category: None,
1066                        output,
1067                        group: None,
1068                        variables_reference: None,
1069                        source: None,
1070                        line: None,
1071                        column: None,
1072                        data: None,
1073                        location_reference: None,
1074                    };
1075                    this.push_output(event);
1076                })?;
1077            }
1078            anyhow::Ok(())
1079        })
1080        .detach();
1081
1082        return tx;
1083    }
1084
1085    pub fn is_started(&self) -> bool {
1086        match &self.mode {
1087            SessionState::Building(_) => false,
1088            SessionState::Running(running) => running.is_started,
1089        }
1090    }
1091
1092    pub fn is_building(&self) -> bool {
1093        matches!(self.mode, SessionState::Building(_))
1094    }
1095
1096    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1097        match &mut self.mode {
1098            SessionState::Running(local_mode) => Some(local_mode),
1099            SessionState::Building(_) => None,
1100        }
1101    }
1102
1103    pub fn as_running(&self) -> Option<&RunningMode> {
1104        match &self.mode {
1105            SessionState::Running(local_mode) => Some(local_mode),
1106            SessionState::Building(_) => None,
1107        }
1108    }
1109
1110    fn handle_start_debugging_request(
1111        &mut self,
1112        request: dap::messages::Request,
1113        cx: &mut Context<Self>,
1114    ) -> Task<Result<()>> {
1115        let request_seq = request.seq;
1116
1117        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1118            .arguments
1119            .as_ref()
1120            .map(|value| serde_json::from_value(value.clone()));
1121
1122        let mut success = true;
1123        if let Some(Ok(request)) = launch_request {
1124            cx.emit(SessionStateEvent::SpawnChildSession { request });
1125        } else {
1126            log::error!(
1127                "Failed to parse launch request arguments: {:?}",
1128                request.arguments
1129            );
1130            success = false;
1131        }
1132
1133        cx.spawn(async move |this, cx| {
1134            this.update(cx, |this, cx| {
1135                this.respond_to_client(
1136                    request_seq,
1137                    success,
1138                    StartDebugging::COMMAND.to_string(),
1139                    None,
1140                    cx,
1141                )
1142            })?
1143            .await
1144        })
1145    }
1146
1147    fn handle_run_in_terminal_request(
1148        &mut self,
1149        request: dap::messages::Request,
1150        cx: &mut Context<Self>,
1151    ) -> Task<Result<()>> {
1152        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1153            request.arguments.unwrap_or_default(),
1154        ) {
1155            Ok(args) => args,
1156            Err(error) => {
1157                return cx.spawn(async move |session, cx| {
1158                    let error = serde_json::to_value(dap::ErrorResponse {
1159                        error: Some(dap::Message {
1160                            id: request.seq,
1161                            format: error.to_string(),
1162                            variables: None,
1163                            send_telemetry: None,
1164                            show_user: None,
1165                            url: None,
1166                            url_label: None,
1167                        }),
1168                    })
1169                    .ok();
1170
1171                    session
1172                        .update(cx, |this, cx| {
1173                            this.respond_to_client(
1174                                request.seq,
1175                                false,
1176                                StartDebugging::COMMAND.to_string(),
1177                                error,
1178                                cx,
1179                            )
1180                        })?
1181                        .await?;
1182
1183                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1184                });
1185            }
1186        };
1187
1188        let seq = request.seq;
1189
1190        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1191        cx.emit(SessionEvent::RunInTerminal {
1192            request: request_args,
1193            sender: tx,
1194        });
1195        cx.notify();
1196
1197        cx.spawn(async move |session, cx| {
1198            let result = util::maybe!(async move {
1199                rx.next().await.ok_or_else(|| {
1200                    anyhow!("failed to receive response from spawn terminal".to_string())
1201                })?
1202            })
1203            .await;
1204            let (success, body) = match result {
1205                Ok(pid) => (
1206                    true,
1207                    serde_json::to_value(dap::RunInTerminalResponse {
1208                        process_id: None,
1209                        shell_process_id: Some(pid as u64),
1210                    })
1211                    .ok(),
1212                ),
1213                Err(error) => (
1214                    false,
1215                    serde_json::to_value(dap::ErrorResponse {
1216                        error: Some(dap::Message {
1217                            id: seq,
1218                            format: error.to_string(),
1219                            variables: None,
1220                            send_telemetry: None,
1221                            show_user: None,
1222                            url: None,
1223                            url_label: None,
1224                        }),
1225                    })
1226                    .ok(),
1227                ),
1228            };
1229
1230            session
1231                .update(cx, |session, cx| {
1232                    session.respond_to_client(
1233                        seq,
1234                        success,
1235                        RunInTerminal::COMMAND.to_string(),
1236                        body,
1237                        cx,
1238                    )
1239                })?
1240                .await
1241        })
1242    }
1243
1244    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1245        let adapter_id = self.adapter().to_string();
1246        let request = Initialize { adapter_id };
1247
1248        let SessionState::Running(running) = &self.mode else {
1249            return Task::ready(Err(anyhow!(
1250                "Cannot send initialize request, task still building"
1251            )));
1252        };
1253        let mut response = running.request(request.clone());
1254
1255        cx.spawn(async move |this, cx| {
1256            loop {
1257                let capabilities = response.await;
1258                match capabilities {
1259                    Err(e) => {
1260                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1261                            this.as_running()
1262                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1263                        }) else {
1264                            return Err(e);
1265                        };
1266                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1267                        reconnect.await?;
1268
1269                        let Ok(Some(r)) = this.update(cx, |this, _| {
1270                            this.as_running()
1271                                .map(|running| running.request(request.clone()))
1272                        }) else {
1273                            return Err(e);
1274                        };
1275                        response = r
1276                    }
1277                    Ok(capabilities) => {
1278                        this.update(cx, |session, cx| {
1279                            session.capabilities = capabilities;
1280
1281                            cx.emit(SessionEvent::CapabilitiesLoaded);
1282                        })?;
1283                        return Ok(());
1284                    }
1285                }
1286            }
1287        })
1288    }
1289
1290    pub(super) fn initialize_sequence(
1291        &mut self,
1292        initialize_rx: oneshot::Receiver<()>,
1293        dap_store: WeakEntity<DapStore>,
1294        cx: &mut Context<Self>,
1295    ) -> Task<Result<()>> {
1296        match &self.mode {
1297            SessionState::Running(local_mode) => {
1298                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1299            }
1300            SessionState::Building(_) => {
1301                Task::ready(Err(anyhow!("cannot initialize, still building")))
1302            }
1303        }
1304    }
1305
1306    pub fn run_to_position(
1307        &mut self,
1308        breakpoint: SourceBreakpoint,
1309        active_thread_id: ThreadId,
1310        cx: &mut Context<Self>,
1311    ) {
1312        match &mut self.mode {
1313            SessionState::Running(local_mode) => {
1314                if !matches!(
1315                    self.thread_states.thread_state(active_thread_id),
1316                    Some(ThreadStatus::Stopped)
1317                ) {
1318                    return;
1319                };
1320                let path = breakpoint.path.clone();
1321                local_mode.tmp_breakpoint = Some(breakpoint);
1322                let task = local_mode.send_breakpoints_from_path(
1323                    path,
1324                    BreakpointUpdatedReason::Toggled,
1325                    &self.breakpoint_store,
1326                    cx,
1327                );
1328
1329                cx.spawn(async move |this, cx| {
1330                    task.await;
1331                    this.update(cx, |this, cx| {
1332                        this.continue_thread(active_thread_id, cx);
1333                    })
1334                })
1335                .detach();
1336            }
1337            SessionState::Building(_) => {}
1338        }
1339    }
1340
1341    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1342        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1343    }
1344
1345    pub fn output(
1346        &self,
1347        since: OutputToken,
1348    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1349        if self.output_token.0 == 0 {
1350            return (self.output.range(0..0), OutputToken(0));
1351        };
1352
1353        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1354
1355        let clamped_events_since = events_since.clamp(0, self.output.len());
1356        (
1357            self.output
1358                .range(self.output.len() - clamped_events_since..),
1359            self.output_token,
1360        )
1361    }
1362
1363    pub fn respond_to_client(
1364        &self,
1365        request_seq: u64,
1366        success: bool,
1367        command: String,
1368        body: Option<serde_json::Value>,
1369        cx: &mut Context<Self>,
1370    ) -> Task<Result<()>> {
1371        let Some(local_session) = self.as_running() else {
1372            unreachable!("Cannot respond to remote client");
1373        };
1374        let client = local_session.client.clone();
1375
1376        cx.background_spawn(async move {
1377            client
1378                .send_message(Message::Response(Response {
1379                    body,
1380                    success,
1381                    command,
1382                    seq: request_seq + 1,
1383                    request_seq,
1384                    message: None,
1385                }))
1386                .await
1387        })
1388    }
1389
1390    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1391        self.mode.stopped();
1392        // todo(debugger): Find a clean way to get around the clone
1393        let breakpoint_store = self.breakpoint_store.clone();
1394        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1395            let breakpoint = local.tmp_breakpoint.take()?;
1396            let path = breakpoint.path.clone();
1397            Some((local, path))
1398        }) {
1399            local
1400                .send_breakpoints_from_path(
1401                    path,
1402                    BreakpointUpdatedReason::Toggled,
1403                    &breakpoint_store,
1404                    cx,
1405                )
1406                .detach();
1407        };
1408
1409        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1410            self.thread_states.stop_all_threads();
1411            self.invalidate_command_type::<StackTraceCommand>();
1412        }
1413
1414        // Event if we stopped all threads we still need to insert the thread_id
1415        // to our own data
1416        if let Some(thread_id) = event.thread_id {
1417            self.thread_states.stop_thread(ThreadId(thread_id));
1418
1419            self.invalidate_state(
1420                &StackTraceCommand {
1421                    thread_id,
1422                    start_frame: None,
1423                    levels: None,
1424                }
1425                .into(),
1426            );
1427        }
1428
1429        self.invalidate_generic();
1430        self.threads.clear();
1431        self.variables.clear();
1432        cx.emit(SessionEvent::Stopped(
1433            event
1434                .thread_id
1435                .map(Into::into)
1436                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1437        ));
1438        cx.emit(SessionEvent::InvalidateInlineValue);
1439        cx.notify();
1440    }
1441
1442    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1443        match *event {
1444            Events::Initialized(_) => {
1445                debug_assert!(
1446                    false,
1447                    "Initialized event should have been handled in LocalMode"
1448                );
1449            }
1450            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1451            Events::Continued(event) => {
1452                if event.all_threads_continued.unwrap_or_default() {
1453                    self.thread_states.continue_all_threads();
1454                    self.breakpoint_store.update(cx, |store, cx| {
1455                        store.remove_active_position(Some(self.session_id()), cx)
1456                    });
1457                } else {
1458                    self.thread_states
1459                        .continue_thread(ThreadId(event.thread_id));
1460                }
1461                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1462                self.invalidate_generic();
1463            }
1464            Events::Exited(_event) => {
1465                self.clear_active_debug_line(cx);
1466            }
1467            Events::Terminated(_) => {
1468                self.shutdown(cx).detach();
1469            }
1470            Events::Thread(event) => {
1471                let thread_id = ThreadId(event.thread_id);
1472
1473                match event.reason {
1474                    dap::ThreadEventReason::Started => {
1475                        self.thread_states.continue_thread(thread_id);
1476                    }
1477                    dap::ThreadEventReason::Exited => {
1478                        self.thread_states.exit_thread(thread_id);
1479                    }
1480                    reason => {
1481                        log::error!("Unhandled thread event reason {:?}", reason);
1482                    }
1483                }
1484                self.invalidate_state(&ThreadsCommand.into());
1485                cx.notify();
1486            }
1487            Events::Output(event) => {
1488                if event
1489                    .category
1490                    .as_ref()
1491                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1492                {
1493                    return;
1494                }
1495
1496                self.push_output(event);
1497                cx.notify();
1498            }
1499            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1500                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1501            }),
1502            Events::Module(event) => {
1503                match event.reason {
1504                    dap::ModuleEventReason::New => {
1505                        self.modules.push(event.module);
1506                    }
1507                    dap::ModuleEventReason::Changed => {
1508                        if let Some(module) = self
1509                            .modules
1510                            .iter_mut()
1511                            .find(|other| event.module.id == other.id)
1512                        {
1513                            *module = event.module;
1514                        }
1515                    }
1516                    dap::ModuleEventReason::Removed => {
1517                        self.modules.retain(|other| event.module.id != other.id);
1518                    }
1519                }
1520
1521                // todo(debugger): We should only send the invalidate command to downstream clients.
1522                // self.invalidate_state(&ModulesCommand.into());
1523            }
1524            Events::LoadedSource(_) => {
1525                self.invalidate_state(&LoadedSourcesCommand.into());
1526            }
1527            Events::Capabilities(event) => {
1528                self.capabilities = self.capabilities.merge(event.capabilities);
1529
1530                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1531                let recent_filters = self
1532                    .capabilities
1533                    .exception_breakpoint_filters
1534                    .iter()
1535                    .flatten()
1536                    .map(|filter| (filter.filter.clone(), filter.clone()))
1537                    .collect::<BTreeMap<_, _>>();
1538                for filter in recent_filters.values() {
1539                    let default = filter.default.unwrap_or_default();
1540                    self.exception_breakpoints
1541                        .entry(filter.filter.clone())
1542                        .or_insert_with(|| (filter.clone(), default));
1543                }
1544                self.exception_breakpoints
1545                    .retain(|k, _| recent_filters.contains_key(k));
1546                if self.is_started() {
1547                    self.send_exception_breakpoints(cx);
1548                }
1549
1550                // Remove the ones that no longer exist.
1551                cx.notify();
1552            }
1553            Events::Memory(_) => {}
1554            Events::Process(_) => {}
1555            Events::ProgressEnd(_) => {}
1556            Events::ProgressStart(_) => {}
1557            Events::ProgressUpdate(_) => {}
1558            Events::Invalidated(_) => {}
1559            Events::Other(_) => {}
1560        }
1561    }
1562
1563    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1564    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1565        &mut self,
1566        request: T,
1567        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1568        cx: &mut Context<Self>,
1569    ) {
1570        const {
1571            assert!(
1572                T::CACHEABLE,
1573                "Only requests marked as cacheable should invoke `fetch`"
1574            );
1575        }
1576
1577        if !self.thread_states.any_stopped_thread()
1578            && request.type_id() != TypeId::of::<ThreadsCommand>()
1579            || self.is_session_terminated
1580        {
1581            return;
1582        }
1583
1584        let request_map = self
1585            .requests
1586            .entry(std::any::TypeId::of::<T>())
1587            .or_default();
1588
1589        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1590            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1591
1592            let task = Self::request_inner::<Arc<T>>(
1593                &self.capabilities,
1594                &self.mode,
1595                command,
1596                |this, result, cx| {
1597                    process_result(this, result, cx);
1598                    None
1599                },
1600                cx,
1601            );
1602            let task = cx
1603                .background_executor()
1604                .spawn(async move {
1605                    let _ = task.await?;
1606                    Some(())
1607                })
1608                .shared();
1609
1610            vacant.insert(task);
1611            cx.notify();
1612        }
1613    }
1614
1615    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1616        capabilities: &Capabilities,
1617        mode: &SessionState,
1618        request: T,
1619        process_result: impl FnOnce(
1620            &mut Self,
1621            Result<T::Response>,
1622            &mut Context<Self>,
1623        ) -> Option<T::Response>
1624        + 'static,
1625        cx: &mut Context<Self>,
1626    ) -> Task<Option<T::Response>> {
1627        if !T::is_supported(&capabilities) {
1628            log::warn!(
1629                "Attempted to send a DAP request that isn't supported: {:?}",
1630                request
1631            );
1632            let error = Err(anyhow::Error::msg(
1633                "Couldn't complete request because it's not supported",
1634            ));
1635            return cx.spawn(async move |this, cx| {
1636                this.update(cx, |this, cx| process_result(this, error, cx))
1637                    .ok()
1638                    .flatten()
1639            });
1640        }
1641
1642        let request = mode.request_dap(request);
1643        cx.spawn(async move |this, cx| {
1644            let result = request.await;
1645            this.update(cx, |this, cx| process_result(this, result, cx))
1646                .ok()
1647                .flatten()
1648        })
1649    }
1650
1651    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1652        &self,
1653        request: T,
1654        process_result: impl FnOnce(
1655            &mut Self,
1656            Result<T::Response>,
1657            &mut Context<Self>,
1658        ) -> Option<T::Response>
1659        + 'static,
1660        cx: &mut Context<Self>,
1661    ) -> Task<Option<T::Response>> {
1662        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1663    }
1664
1665    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1666        self.requests.remove(&std::any::TypeId::of::<Command>());
1667    }
1668
1669    fn invalidate_generic(&mut self) {
1670        self.invalidate_command_type::<ModulesCommand>();
1671        self.invalidate_command_type::<LoadedSourcesCommand>();
1672        self.invalidate_command_type::<ThreadsCommand>();
1673        self.invalidate_command_type::<ReadMemory>();
1674        let executor = self.as_running().map(|running| running.executor.clone());
1675        if let Some(executor) = executor {
1676            self.memory.clear(&executor);
1677        }
1678    }
1679
1680    fn invalidate_state(&mut self, key: &RequestSlot) {
1681        self.requests
1682            .entry((&*key.0 as &dyn Any).type_id())
1683            .and_modify(|request_map| {
1684                request_map.remove(&key);
1685            });
1686    }
1687
1688    fn push_output(&mut self, event: OutputEvent) {
1689        self.output.push_back(event);
1690        self.output_token.0 += 1;
1691    }
1692
1693    pub fn any_stopped_thread(&self) -> bool {
1694        self.thread_states.any_stopped_thread()
1695    }
1696
1697    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1698        self.thread_states.thread_status(thread_id)
1699    }
1700
1701    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1702        self.fetch(
1703            dap_command::ThreadsCommand,
1704            |this, result, cx| {
1705                let Some(result) = result.log_err() else {
1706                    return;
1707                };
1708
1709                this.threads = result
1710                    .into_iter()
1711                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1712                    .collect();
1713
1714                this.invalidate_command_type::<StackTraceCommand>();
1715                cx.emit(SessionEvent::Threads);
1716                cx.notify();
1717            },
1718            cx,
1719        );
1720
1721        self.threads
1722            .values()
1723            .map(|thread| {
1724                (
1725                    thread.dap.clone(),
1726                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1727                )
1728            })
1729            .collect()
1730    }
1731
1732    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1733        self.fetch(
1734            dap_command::ModulesCommand,
1735            |this, result, cx| {
1736                let Some(result) = result.log_err() else {
1737                    return;
1738                };
1739
1740                this.modules = result;
1741                cx.emit(SessionEvent::Modules);
1742                cx.notify();
1743            },
1744            cx,
1745        );
1746
1747        &self.modules
1748    }
1749
1750    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1751    pub fn data_access_size(
1752        &mut self,
1753        frame_id: Option<u64>,
1754        evaluate_name: &str,
1755        cx: &mut Context<Self>,
1756    ) -> Task<Option<u64>> {
1757        let request = self.request(
1758            EvaluateCommand {
1759                expression: format!("?${{sizeof({evaluate_name})}}"),
1760                frame_id,
1761
1762                context: Some(EvaluateArgumentsContext::Repl),
1763                source: None,
1764            },
1765            |_, response, _| response.ok(),
1766            cx,
1767        );
1768        cx.background_spawn(async move {
1769            let result = request.await?;
1770            result.result.parse().ok()
1771        })
1772    }
1773
1774    pub fn memory_reference_of_expr(
1775        &mut self,
1776        frame_id: Option<u64>,
1777        expression: String,
1778        cx: &mut Context<Self>,
1779    ) -> Task<Option<String>> {
1780        let request = self.request(
1781            EvaluateCommand {
1782                expression,
1783                frame_id,
1784
1785                context: Some(EvaluateArgumentsContext::Repl),
1786                source: None,
1787            },
1788            |_, response, _| response.ok(),
1789            cx,
1790        );
1791        cx.background_spawn(async move {
1792            let result = request.await?;
1793            result.memory_reference
1794        })
1795    }
1796
1797    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1798        let data = base64::engine::general_purpose::STANDARD.encode(data);
1799        self.request(
1800            WriteMemoryArguments {
1801                memory_reference: address.to_string(),
1802                data,
1803                allow_partial: None,
1804                offset: None,
1805            },
1806            |this, response, cx| {
1807                this.memory.clear(cx.background_executor());
1808                this.invalidate_command_type::<ReadMemory>();
1809                this.invalidate_command_type::<VariablesCommand>();
1810                cx.emit(SessionEvent::Variables);
1811                response.ok()
1812            },
1813            cx,
1814        )
1815        .detach();
1816    }
1817    pub fn read_memory(
1818        &mut self,
1819        range: RangeInclusive<u64>,
1820        cx: &mut Context<Self>,
1821    ) -> MemoryIterator {
1822        // This function is a bit more involved when it comes to fetching data.
1823        // Since we attempt to read memory in pages, we need to account for some parts
1824        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1825        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1826        let page_range = Memory::memory_range_to_page_range(range.clone());
1827        for page_address in PageAddress::iter_range(page_range) {
1828            self.read_single_page_memory(page_address, cx);
1829        }
1830        self.memory.memory_range(range)
1831    }
1832
1833    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1834        _ = maybe!({
1835            let builder = self.memory.build_page(page_start)?;
1836
1837            self.memory_read_fetch_page_recursive(builder, cx);
1838            Some(())
1839        });
1840    }
1841    fn memory_read_fetch_page_recursive(
1842        &mut self,
1843        mut builder: MemoryPageBuilder,
1844        cx: &mut Context<Self>,
1845    ) {
1846        let Some(next_request) = builder.next_request() else {
1847            // We're done fetching. Let's grab the page and insert it into our memory store.
1848            let (address, contents) = builder.build();
1849            self.memory.insert_page(address, contents);
1850
1851            return;
1852        };
1853        let size = next_request.size;
1854        self.fetch(
1855            ReadMemory {
1856                memory_reference: format!("0x{:X}", next_request.address),
1857                offset: Some(0),
1858                count: next_request.size,
1859            },
1860            move |this, memory, cx| {
1861                if let Ok(memory) = memory {
1862                    builder.known(memory.content);
1863                    if let Some(unknown) = memory.unreadable_bytes {
1864                        builder.unknown(unknown);
1865                    }
1866                    // This is the recursive bit: if we're not yet done with
1867                    // the whole page, we'll kick off a new request with smaller range.
1868                    // Note that this function is recursive only conceptually;
1869                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1870                    this.memory_read_fetch_page_recursive(builder, cx);
1871                } else {
1872                    builder.unknown(size);
1873                }
1874            },
1875            cx,
1876        );
1877    }
1878
1879    pub fn ignore_breakpoints(&self) -> bool {
1880        self.ignore_breakpoints
1881    }
1882
1883    pub fn toggle_ignore_breakpoints(
1884        &mut self,
1885        cx: &mut App,
1886    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1887        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1888    }
1889
1890    pub(crate) fn set_ignore_breakpoints(
1891        &mut self,
1892        ignore: bool,
1893        cx: &mut App,
1894    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1895        if self.ignore_breakpoints == ignore {
1896            return Task::ready(HashMap::default());
1897        }
1898
1899        self.ignore_breakpoints = ignore;
1900
1901        if let Some(local) = self.as_running() {
1902            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1903        } else {
1904            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1905            unimplemented!()
1906        }
1907    }
1908
1909    pub fn exception_breakpoints(
1910        &self,
1911    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1912        self.exception_breakpoints.values()
1913    }
1914
1915    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1916        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1917            *is_enabled = !*is_enabled;
1918            self.send_exception_breakpoints(cx);
1919        }
1920    }
1921
1922    fn send_exception_breakpoints(&mut self, cx: &App) {
1923        if let Some(local) = self.as_running() {
1924            let exception_filters = self
1925                .exception_breakpoints
1926                .values()
1927                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1928                .collect();
1929
1930            let supports_exception_filters = self
1931                .capabilities
1932                .supports_exception_filter_options
1933                .unwrap_or_default();
1934            local
1935                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1936                .detach_and_log_err(cx);
1937        } else {
1938            debug_assert!(false, "Not implemented");
1939        }
1940    }
1941
1942    pub fn breakpoints_enabled(&self) -> bool {
1943        self.ignore_breakpoints
1944    }
1945
1946    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1947        self.fetch(
1948            dap_command::LoadedSourcesCommand,
1949            |this, result, cx| {
1950                let Some(result) = result.log_err() else {
1951                    return;
1952                };
1953                this.loaded_sources = result;
1954                cx.emit(SessionEvent::LoadedSources);
1955                cx.notify();
1956            },
1957            cx,
1958        );
1959
1960        &self.loaded_sources
1961    }
1962
1963    fn fallback_to_manual_restart(
1964        &mut self,
1965        res: Result<()>,
1966        cx: &mut Context<Self>,
1967    ) -> Option<()> {
1968        if res.log_err().is_none() {
1969            cx.emit(SessionStateEvent::Restart);
1970            return None;
1971        }
1972        Some(())
1973    }
1974
1975    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1976        res.log_err()?;
1977        Some(())
1978    }
1979
1980    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
1981        thread_id: ThreadId,
1982    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1983    {
1984        move |this, response, cx| match response.log_err() {
1985            Some(response) => {
1986                this.breakpoint_store.update(cx, |store, cx| {
1987                    store.remove_active_position(Some(this.session_id()), cx)
1988                });
1989                Some(response)
1990            }
1991            None => {
1992                this.thread_states.stop_thread(thread_id);
1993                cx.notify();
1994                None
1995            }
1996        }
1997    }
1998
1999    fn clear_active_debug_line_response(
2000        &mut self,
2001        response: Result<()>,
2002        cx: &mut Context<Session>,
2003    ) -> Option<()> {
2004        response.log_err()?;
2005        self.clear_active_debug_line(cx);
2006        Some(())
2007    }
2008
2009    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2010        self.breakpoint_store.update(cx, |store, cx| {
2011            store.remove_active_position(Some(self.id), cx)
2012        });
2013    }
2014
2015    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2016        self.request(
2017            PauseCommand {
2018                thread_id: thread_id.0,
2019            },
2020            Self::empty_response,
2021            cx,
2022        )
2023        .detach();
2024    }
2025
2026    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2027        self.request(
2028            RestartStackFrameCommand { stack_frame_id },
2029            Self::empty_response,
2030            cx,
2031        )
2032        .detach();
2033    }
2034
2035    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2036        if self.restart_task.is_some() || self.as_running().is_none() {
2037            return;
2038        }
2039
2040        let supports_dap_restart =
2041            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2042
2043        self.restart_task = Some(cx.spawn(async move |this, cx| {
2044            let _ = this.update(cx, |session, cx| {
2045                if supports_dap_restart {
2046                    session
2047                        .request(
2048                            RestartCommand {
2049                                raw: args.unwrap_or(Value::Null),
2050                            },
2051                            Self::fallback_to_manual_restart,
2052                            cx,
2053                        )
2054                        .detach();
2055                } else {
2056                    cx.emit(SessionStateEvent::Restart);
2057                }
2058            });
2059        }));
2060    }
2061
2062    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2063        if self.is_session_terminated {
2064            return Task::ready(());
2065        }
2066
2067        self.is_session_terminated = true;
2068        self.thread_states.exit_all_threads();
2069        cx.notify();
2070
2071        let task = match &mut self.mode {
2072            SessionState::Running(_) => {
2073                if self
2074                    .capabilities
2075                    .supports_terminate_request
2076                    .unwrap_or_default()
2077                {
2078                    self.request(
2079                        TerminateCommand {
2080                            restart: Some(false),
2081                        },
2082                        Self::clear_active_debug_line_response,
2083                        cx,
2084                    )
2085                } else {
2086                    self.request(
2087                        DisconnectCommand {
2088                            restart: Some(false),
2089                            terminate_debuggee: Some(true),
2090                            suspend_debuggee: Some(false),
2091                        },
2092                        Self::clear_active_debug_line_response,
2093                        cx,
2094                    )
2095                }
2096            }
2097            SessionState::Building(build_task) => {
2098                build_task.take();
2099                Task::ready(Some(()))
2100            }
2101        };
2102
2103        cx.emit(SessionStateEvent::Shutdown);
2104
2105        cx.spawn(async move |this, cx| {
2106            task.await;
2107            let _ = this.update(cx, |this, _| {
2108                if let Some(adapter_client) = this.adapter_client() {
2109                    adapter_client.kill();
2110                }
2111            });
2112        })
2113    }
2114
2115    pub fn completions(
2116        &mut self,
2117        query: CompletionsQuery,
2118        cx: &mut Context<Self>,
2119    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2120        let task = self.request(query, |_, result, _| result.log_err(), cx);
2121
2122        cx.background_executor().spawn(async move {
2123            anyhow::Ok(
2124                task.await
2125                    .map(|response| response.targets)
2126                    .context("failed to fetch completions")?,
2127            )
2128        })
2129    }
2130
2131    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2132        let supports_single_thread_execution_requests =
2133            self.capabilities.supports_single_thread_execution_requests;
2134        self.thread_states.continue_thread(thread_id);
2135        self.request(
2136            ContinueCommand {
2137                args: ContinueArguments {
2138                    thread_id: thread_id.0,
2139                    single_thread: supports_single_thread_execution_requests,
2140                },
2141            },
2142            Self::on_step_response::<ContinueCommand>(thread_id),
2143            cx,
2144        )
2145        .detach();
2146    }
2147
2148    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2149        match self.mode {
2150            SessionState::Running(ref local) => Some(local.client.clone()),
2151            SessionState::Building(_) => None,
2152        }
2153    }
2154
2155    pub fn has_ever_stopped(&self) -> bool {
2156        self.mode.has_ever_stopped()
2157    }
2158    pub fn step_over(
2159        &mut self,
2160        thread_id: ThreadId,
2161        granularity: SteppingGranularity,
2162        cx: &mut Context<Self>,
2163    ) {
2164        let supports_single_thread_execution_requests =
2165            self.capabilities.supports_single_thread_execution_requests;
2166        let supports_stepping_granularity = self
2167            .capabilities
2168            .supports_stepping_granularity
2169            .unwrap_or_default();
2170
2171        let command = NextCommand {
2172            inner: StepCommand {
2173                thread_id: thread_id.0,
2174                granularity: supports_stepping_granularity.then(|| granularity),
2175                single_thread: supports_single_thread_execution_requests,
2176            },
2177        };
2178
2179        self.thread_states.process_step(thread_id);
2180        self.request(
2181            command,
2182            Self::on_step_response::<NextCommand>(thread_id),
2183            cx,
2184        )
2185        .detach();
2186    }
2187
2188    pub fn step_in(
2189        &mut self,
2190        thread_id: ThreadId,
2191        granularity: SteppingGranularity,
2192        cx: &mut Context<Self>,
2193    ) {
2194        let supports_single_thread_execution_requests =
2195            self.capabilities.supports_single_thread_execution_requests;
2196        let supports_stepping_granularity = self
2197            .capabilities
2198            .supports_stepping_granularity
2199            .unwrap_or_default();
2200
2201        let command = StepInCommand {
2202            inner: StepCommand {
2203                thread_id: thread_id.0,
2204                granularity: supports_stepping_granularity.then(|| granularity),
2205                single_thread: supports_single_thread_execution_requests,
2206            },
2207        };
2208
2209        self.thread_states.process_step(thread_id);
2210        self.request(
2211            command,
2212            Self::on_step_response::<StepInCommand>(thread_id),
2213            cx,
2214        )
2215        .detach();
2216    }
2217
2218    pub fn step_out(
2219        &mut self,
2220        thread_id: ThreadId,
2221        granularity: SteppingGranularity,
2222        cx: &mut Context<Self>,
2223    ) {
2224        let supports_single_thread_execution_requests =
2225            self.capabilities.supports_single_thread_execution_requests;
2226        let supports_stepping_granularity = self
2227            .capabilities
2228            .supports_stepping_granularity
2229            .unwrap_or_default();
2230
2231        let command = StepOutCommand {
2232            inner: StepCommand {
2233                thread_id: thread_id.0,
2234                granularity: supports_stepping_granularity.then(|| granularity),
2235                single_thread: supports_single_thread_execution_requests,
2236            },
2237        };
2238
2239        self.thread_states.process_step(thread_id);
2240        self.request(
2241            command,
2242            Self::on_step_response::<StepOutCommand>(thread_id),
2243            cx,
2244        )
2245        .detach();
2246    }
2247
2248    pub fn step_back(
2249        &mut self,
2250        thread_id: ThreadId,
2251        granularity: SteppingGranularity,
2252        cx: &mut Context<Self>,
2253    ) {
2254        let supports_single_thread_execution_requests =
2255            self.capabilities.supports_single_thread_execution_requests;
2256        let supports_stepping_granularity = self
2257            .capabilities
2258            .supports_stepping_granularity
2259            .unwrap_or_default();
2260
2261        let command = StepBackCommand {
2262            inner: StepCommand {
2263                thread_id: thread_id.0,
2264                granularity: supports_stepping_granularity.then(|| granularity),
2265                single_thread: supports_single_thread_execution_requests,
2266            },
2267        };
2268
2269        self.thread_states.process_step(thread_id);
2270
2271        self.request(
2272            command,
2273            Self::on_step_response::<StepBackCommand>(thread_id),
2274            cx,
2275        )
2276        .detach();
2277    }
2278
2279    pub fn stack_frames(
2280        &mut self,
2281        thread_id: ThreadId,
2282        cx: &mut Context<Self>,
2283    ) -> Result<Vec<StackFrame>> {
2284        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2285            && self.requests.contains_key(&ThreadsCommand.type_id())
2286            && self.threads.contains_key(&thread_id)
2287        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2288        // We could still be using an old thread id and have sent a new thread's request
2289        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2290        // But it very well could cause a minor bug in the future that is hard to track down
2291        {
2292            self.fetch(
2293                super::dap_command::StackTraceCommand {
2294                    thread_id: thread_id.0,
2295                    start_frame: None,
2296                    levels: None,
2297                },
2298                move |this, stack_frames, cx| {
2299                    let entry =
2300                        this.threads
2301                            .entry(thread_id)
2302                            .and_modify(|thread| match &stack_frames {
2303                                Ok(stack_frames) => {
2304                                    thread.stack_frames = stack_frames
2305                                        .iter()
2306                                        .cloned()
2307                                        .map(StackFrame::from)
2308                                        .collect();
2309                                    thread.stack_frames_error = None;
2310                                }
2311                                Err(error) => {
2312                                    thread.stack_frames.clear();
2313                                    thread.stack_frames_error = Some(error.cloned());
2314                                }
2315                            });
2316                    debug_assert!(
2317                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2318                        "Sent request for thread_id that doesn't exist"
2319                    );
2320                    if let Ok(stack_frames) = stack_frames {
2321                        this.stack_frames.extend(
2322                            stack_frames
2323                                .into_iter()
2324                                .filter(|frame| {
2325                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2326                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2327                                    !(frame.id == 0
2328                                        && frame.line == 0
2329                                        && frame.column == 0
2330                                        && frame.presentation_hint
2331                                            == Some(StackFramePresentationHint::Label))
2332                                })
2333                                .map(|frame| (frame.id, StackFrame::from(frame))),
2334                        );
2335                    }
2336
2337                    this.invalidate_command_type::<ScopesCommand>();
2338                    this.invalidate_command_type::<VariablesCommand>();
2339
2340                    cx.emit(SessionEvent::StackTrace);
2341                },
2342                cx,
2343            );
2344        }
2345
2346        match self.threads.get(&thread_id) {
2347            Some(thread) => {
2348                if let Some(error) = &thread.stack_frames_error {
2349                    Err(error.cloned())
2350                } else {
2351                    Ok(thread.stack_frames.clone())
2352                }
2353            }
2354            None => Ok(Vec::new()),
2355        }
2356    }
2357
2358    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2359        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2360            && self
2361                .requests
2362                .contains_key(&TypeId::of::<StackTraceCommand>())
2363        {
2364            self.fetch(
2365                ScopesCommand { stack_frame_id },
2366                move |this, scopes, cx| {
2367                    let Some(scopes) = scopes.log_err() else {
2368                        return
2369                    };
2370
2371                    for scope in scopes.iter() {
2372                        this.variables(scope.variables_reference, cx);
2373                    }
2374
2375                    let entry = this
2376                        .stack_frames
2377                        .entry(stack_frame_id)
2378                        .and_modify(|stack_frame| {
2379                            stack_frame.scopes = scopes;
2380                        });
2381
2382                    cx.emit(SessionEvent::Variables);
2383
2384                    debug_assert!(
2385                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2386                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2387                    );
2388                },
2389                cx,
2390            );
2391        }
2392
2393        self.stack_frames
2394            .get(&stack_frame_id)
2395            .map(|frame| frame.scopes.as_slice())
2396            .unwrap_or_default()
2397    }
2398
2399    pub fn variables_by_stack_frame_id(
2400        &self,
2401        stack_frame_id: StackFrameId,
2402        globals: bool,
2403        locals: bool,
2404    ) -> Vec<dap::Variable> {
2405        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2406            return Vec::new();
2407        };
2408
2409        stack_frame
2410            .scopes
2411            .iter()
2412            .filter(|scope| {
2413                (scope.name.to_lowercase().contains("local") && locals)
2414                    || (scope.name.to_lowercase().contains("global") && globals)
2415            })
2416            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2417            .flatten()
2418            .cloned()
2419            .collect()
2420    }
2421
2422    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2423        &self.watchers
2424    }
2425
2426    pub fn add_watcher(
2427        &mut self,
2428        expression: SharedString,
2429        frame_id: u64,
2430        cx: &mut Context<Self>,
2431    ) -> Task<Result<()>> {
2432        let request = self.mode.request_dap(EvaluateCommand {
2433            expression: expression.to_string(),
2434            context: Some(EvaluateArgumentsContext::Watch),
2435            frame_id: Some(frame_id),
2436            source: None,
2437        });
2438
2439        cx.spawn(async move |this, cx| {
2440            let response = request.await?;
2441
2442            this.update(cx, |session, cx| {
2443                session.watchers.insert(
2444                    expression.clone(),
2445                    Watcher {
2446                        expression,
2447                        value: response.result.into(),
2448                        variables_reference: response.variables_reference,
2449                        presentation_hint: response.presentation_hint,
2450                    },
2451                );
2452                cx.emit(SessionEvent::Watchers);
2453            })
2454        })
2455    }
2456
2457    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2458        let watches = self.watchers.clone();
2459        for (_, watch) in watches.into_iter() {
2460            self.add_watcher(watch.expression.clone(), frame_id, cx)
2461                .detach();
2462        }
2463    }
2464
2465    pub fn remove_watcher(&mut self, expression: SharedString) {
2466        self.watchers.remove(&expression);
2467    }
2468
2469    pub fn variables(
2470        &mut self,
2471        variables_reference: VariableReference,
2472        cx: &mut Context<Self>,
2473    ) -> Vec<dap::Variable> {
2474        let command = VariablesCommand {
2475            variables_reference,
2476            filter: None,
2477            start: None,
2478            count: None,
2479            format: None,
2480        };
2481
2482        self.fetch(
2483            command,
2484            move |this, variables, cx| {
2485                let Some(variables) = variables.log_err() else {
2486                    return;
2487                };
2488
2489                this.variables.insert(variables_reference, variables);
2490
2491                cx.emit(SessionEvent::Variables);
2492                cx.emit(SessionEvent::InvalidateInlineValue);
2493            },
2494            cx,
2495        );
2496
2497        self.variables
2498            .get(&variables_reference)
2499            .cloned()
2500            .unwrap_or_default()
2501    }
2502
2503    pub fn set_variable_value(
2504        &mut self,
2505        stack_frame_id: u64,
2506        variables_reference: u64,
2507        name: String,
2508        value: String,
2509        cx: &mut Context<Self>,
2510    ) {
2511        if self.capabilities.supports_set_variable.unwrap_or_default() {
2512            self.request(
2513                SetVariableValueCommand {
2514                    name,
2515                    value,
2516                    variables_reference,
2517                },
2518                move |this, response, cx| {
2519                    let response = response.log_err()?;
2520                    this.invalidate_command_type::<VariablesCommand>();
2521                    this.invalidate_command_type::<ReadMemory>();
2522                    this.memory.clear(cx.background_executor());
2523                    this.refresh_watchers(stack_frame_id, cx);
2524                    cx.emit(SessionEvent::Variables);
2525                    Some(response)
2526                },
2527                cx,
2528            )
2529            .detach();
2530        }
2531    }
2532
2533    pub fn evaluate(
2534        &mut self,
2535        expression: String,
2536        context: Option<EvaluateArgumentsContext>,
2537        frame_id: Option<u64>,
2538        source: Option<Source>,
2539        cx: &mut Context<Self>,
2540    ) -> Task<()> {
2541        let event = dap::OutputEvent {
2542            category: None,
2543            output: format!("> {expression}"),
2544            group: None,
2545            variables_reference: None,
2546            source: None,
2547            line: None,
2548            column: None,
2549            data: None,
2550            location_reference: None,
2551        };
2552        self.push_output(event);
2553        let request = self.mode.request_dap(EvaluateCommand {
2554            expression,
2555            context,
2556            frame_id,
2557            source,
2558        });
2559        cx.spawn(async move |this, cx| {
2560            let response = request.await;
2561            this.update(cx, |this, cx| {
2562                this.memory.clear(cx.background_executor());
2563                this.invalidate_command_type::<ReadMemory>();
2564                match response {
2565                    Ok(response) => {
2566                        let event = dap::OutputEvent {
2567                            category: None,
2568                            output: format!("< {}", &response.result),
2569                            group: None,
2570                            variables_reference: Some(response.variables_reference),
2571                            source: None,
2572                            line: None,
2573                            column: None,
2574                            data: None,
2575                            location_reference: None,
2576                        };
2577                        this.push_output(event);
2578                    }
2579                    Err(e) => {
2580                        let event = dap::OutputEvent {
2581                            category: None,
2582                            output: format!("{}", e),
2583                            group: None,
2584                            variables_reference: None,
2585                            source: None,
2586                            line: None,
2587                            column: None,
2588                            data: None,
2589                            location_reference: None,
2590                        };
2591                        this.push_output(event);
2592                    }
2593                };
2594                cx.notify();
2595            })
2596            .ok();
2597        })
2598    }
2599
2600    pub fn location(
2601        &mut self,
2602        reference: u64,
2603        cx: &mut Context<Self>,
2604    ) -> Option<dap::LocationsResponse> {
2605        self.fetch(
2606            LocationsCommand { reference },
2607            move |this, response, _| {
2608                let Some(response) = response.log_err() else {
2609                    return;
2610                };
2611                this.locations.insert(reference, response);
2612            },
2613            cx,
2614        );
2615        self.locations.get(&reference).cloned()
2616    }
2617
2618    pub fn is_attached(&self) -> bool {
2619        let SessionState::Running(local_mode) = &self.mode else {
2620            return false;
2621        };
2622        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2623    }
2624
2625    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2626        let command = DisconnectCommand {
2627            restart: Some(false),
2628            terminate_debuggee: Some(false),
2629            suspend_debuggee: Some(false),
2630        };
2631
2632        self.request(command, Self::empty_response, cx).detach()
2633    }
2634
2635    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2636        if self
2637            .capabilities
2638            .supports_terminate_threads_request
2639            .unwrap_or_default()
2640        {
2641            self.request(
2642                TerminateThreadsCommand {
2643                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2644                },
2645                Self::clear_active_debug_line_response,
2646                cx,
2647            )
2648            .detach();
2649        } else {
2650            self.shutdown(cx).detach();
2651        }
2652    }
2653
2654    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2655        self.thread_states.thread_state(thread_id)
2656    }
2657
2658    pub fn quirks(&self) -> SessionQuirks {
2659        self.quirks
2660    }
2661}