session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
   3use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
   4
   5use super::breakpoint_store::{
   6    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   7};
   8use super::dap_command::{
   9    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
  10    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
  11    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  12    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
  13    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  14    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  15};
  16use super::dap_store::DapStore;
  17use anyhow::{Context as _, Result, anyhow};
  18use base64::Engine;
  19use collections::{HashMap, HashSet, IndexMap};
  20use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  21use dap::messages::Response;
  22use dap::requests::{Request, RunInTerminal, StartDebugging};
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::SinkExt;
  35use futures::channel::mpsc::UnboundedSender;
  36use futures::channel::{mpsc, oneshot};
  37use futures::{FutureExt, future::Shared};
  38use gpui::{
  39    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  40    Task, WeakEntity,
  41};
  42
  43use rpc::ErrorExt;
  44use serde_json::Value;
  45use smol::stream::StreamExt;
  46use std::any::TypeId;
  47use std::collections::BTreeMap;
  48use std::ops::RangeInclusive;
  49use std::u64;
  50use std::{
  51    any::Any,
  52    collections::hash_map::Entry,
  53    hash::{Hash, Hasher},
  54    path::Path,
  55    sync::Arc,
  56};
  57use task::TaskContext;
  58use text::{PointUtf16, ToPointUtf16};
  59use util::{ResultExt, debug_panic, maybe};
  60use worktree::Worktree;
  61
  62#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  63#[repr(transparent)]
  64pub struct ThreadId(pub i64);
  65
  66impl From<i64> for ThreadId {
  67    fn from(id: i64) -> Self {
  68        Self(id)
  69    }
  70}
  71
  72#[derive(Clone, Debug)]
  73pub struct StackFrame {
  74    pub dap: dap::StackFrame,
  75    pub scopes: Vec<dap::Scope>,
  76}
  77
  78impl From<dap::StackFrame> for StackFrame {
  79    fn from(stack_frame: dap::StackFrame) -> Self {
  80        Self {
  81            scopes: vec![],
  82            dap: stack_frame,
  83        }
  84    }
  85}
  86
  87#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  88pub enum ThreadStatus {
  89    #[default]
  90    Running,
  91    Stopped,
  92    Stepping,
  93    Exited,
  94    Ended,
  95}
  96
  97impl ThreadStatus {
  98    pub fn label(&self) -> &'static str {
  99        match self {
 100            ThreadStatus::Running => "Running",
 101            ThreadStatus::Stopped => "Stopped",
 102            ThreadStatus::Stepping => "Stepping",
 103            ThreadStatus::Exited => "Exited",
 104            ThreadStatus::Ended => "Ended",
 105        }
 106    }
 107}
 108
 109#[derive(Debug)]
 110pub struct Thread {
 111    dap: dap::Thread,
 112    stack_frames: Vec<StackFrame>,
 113    stack_frames_error: Option<anyhow::Error>,
 114    _has_stopped: bool,
 115}
 116
 117impl From<dap::Thread> for Thread {
 118    fn from(dap: dap::Thread) -> Self {
 119        Self {
 120            dap,
 121            stack_frames: Default::default(),
 122            stack_frames_error: None,
 123            _has_stopped: false,
 124        }
 125    }
 126}
 127
 128#[derive(Debug, Clone, PartialEq)]
 129pub struct Watcher {
 130    pub expression: SharedString,
 131    pub value: SharedString,
 132    pub variables_reference: u64,
 133    pub presentation_hint: Option<VariablePresentationHint>,
 134}
 135
 136#[derive(Debug, Clone, PartialEq)]
 137pub struct DataBreakpointState {
 138    pub dap: dap::DataBreakpoint,
 139    pub is_enabled: bool,
 140    pub context: Arc<DataBreakpointContext>,
 141}
 142
 143pub enum SessionState {
 144    /// Represents a session that is building/initializing
 145    /// even if a session doesn't have a pre build task this state
 146    /// is used to run all the async tasks that are required to start the session
 147    Booting(Option<Task<Result<()>>>),
 148    Running(RunningMode),
 149}
 150
 151#[derive(Clone)]
 152pub struct RunningMode {
 153    client: Arc<DebugAdapterClient>,
 154    binary: DebugAdapterBinary,
 155    tmp_breakpoint: Option<SourceBreakpoint>,
 156    worktree: WeakEntity<Worktree>,
 157    executor: BackgroundExecutor,
 158    is_started: bool,
 159    has_ever_stopped: bool,
 160    messages_tx: UnboundedSender<Message>,
 161}
 162
 163#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 164pub struct SessionQuirks {
 165    pub compact: bool,
 166    pub prefer_thread_name: bool,
 167}
 168
 169fn client_source(abs_path: &Path) -> dap::Source {
 170    dap::Source {
 171        name: abs_path
 172            .file_name()
 173            .map(|filename| filename.to_string_lossy().to_string()),
 174        path: Some(abs_path.to_string_lossy().to_string()),
 175        source_reference: None,
 176        presentation_hint: None,
 177        origin: None,
 178        sources: None,
 179        adapter_data: None,
 180        checksums: None,
 181    }
 182}
 183
 184impl RunningMode {
 185    async fn new(
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        worktree: WeakEntity<Worktree>,
 189        binary: DebugAdapterBinary,
 190        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 191        cx: &mut AsyncApp,
 192    ) -> Result<Self> {
 193        let message_handler = Box::new({
 194            let messages_tx = messages_tx.clone();
 195            move |message| {
 196                messages_tx.unbounded_send(message).ok();
 197            }
 198        });
 199
 200        let client = if let Some(client) = parent_session
 201            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 202            .flatten()
 203        {
 204            client
 205                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 206                .await?
 207        } else {
 208            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 209        };
 210
 211        Ok(Self {
 212            client: Arc::new(client),
 213            worktree,
 214            tmp_breakpoint: None,
 215            binary,
 216            executor: cx.background_executor().clone(),
 217            is_started: false,
 218            has_ever_stopped: false,
 219            messages_tx,
 220        })
 221    }
 222
 223    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 224        &self.worktree
 225    }
 226
 227    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 228        let tasks: Vec<_> = paths
 229            .iter()
 230            .map(|path| {
 231                self.request(dap_command::SetBreakpoints {
 232                    source: client_source(path),
 233                    source_modified: None,
 234                    breakpoints: vec![],
 235                })
 236            })
 237            .collect();
 238
 239        cx.background_spawn(async move {
 240            futures::future::join_all(tasks)
 241                .await
 242                .iter()
 243                .for_each(|res| match res {
 244                    Ok(_) => {}
 245                    Err(err) => {
 246                        log::warn!("Set breakpoints request failed: {}", err);
 247                    }
 248                });
 249        })
 250    }
 251
 252    fn send_breakpoints_from_path(
 253        &self,
 254        abs_path: Arc<Path>,
 255        reason: BreakpointUpdatedReason,
 256        breakpoint_store: &Entity<BreakpointStore>,
 257        cx: &mut App,
 258    ) -> Task<()> {
 259        let breakpoints =
 260            breakpoint_store
 261                .read(cx)
 262                .source_breakpoints_from_path(&abs_path, cx)
 263                .into_iter()
 264                .filter(|bp| bp.state.is_enabled())
 265                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 266                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 267                }))
 268                .map(Into::into)
 269                .collect();
 270
 271        let raw_breakpoints = breakpoint_store
 272            .read(cx)
 273            .breakpoints_from_path(&abs_path)
 274            .into_iter()
 275            .filter(|bp| bp.bp.state.is_enabled())
 276            .collect::<Vec<_>>();
 277
 278        let task = self.request(dap_command::SetBreakpoints {
 279            source: client_source(&abs_path),
 280            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 281            breakpoints,
 282        });
 283        let session_id = self.client.id();
 284        let breakpoint_store = breakpoint_store.downgrade();
 285        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 286            Ok(breakpoints) => {
 287                let breakpoints =
 288                    breakpoints
 289                        .into_iter()
 290                        .zip(raw_breakpoints)
 291                        .filter_map(|(dap_bp, zed_bp)| {
 292                            Some((
 293                                zed_bp,
 294                                BreakpointSessionState {
 295                                    id: dap_bp.id?,
 296                                    verified: dap_bp.verified,
 297                                },
 298                            ))
 299                        });
 300                breakpoint_store
 301                    .update(cx, |this, _| {
 302                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 303                    })
 304                    .ok();
 305            }
 306            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 307        })
 308    }
 309
 310    fn send_exception_breakpoints(
 311        &self,
 312        filters: Vec<ExceptionBreakpointsFilter>,
 313        supports_filter_options: bool,
 314    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 315        let arg = if supports_filter_options {
 316            SetExceptionBreakpoints::WithOptions {
 317                filters: filters
 318                    .into_iter()
 319                    .map(|filter| ExceptionFilterOptions {
 320                        filter_id: filter.filter,
 321                        condition: None,
 322                        mode: None,
 323                    })
 324                    .collect(),
 325            }
 326        } else {
 327            SetExceptionBreakpoints::Plain {
 328                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 329            }
 330        };
 331        self.request(arg)
 332    }
 333
 334    fn send_source_breakpoints(
 335        &self,
 336        ignore_breakpoints: bool,
 337        breakpoint_store: &Entity<BreakpointStore>,
 338        cx: &App,
 339    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 340        let mut breakpoint_tasks = Vec::new();
 341        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 342        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 343        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 344        let session_id = self.client.id();
 345        for (path, breakpoints) in breakpoints {
 346            let breakpoints = if ignore_breakpoints {
 347                vec![]
 348            } else {
 349                breakpoints
 350                    .into_iter()
 351                    .filter(|bp| bp.state.is_enabled())
 352                    .map(Into::into)
 353                    .collect()
 354            };
 355
 356            let raw_breakpoints = raw_breakpoints
 357                .remove(&path)
 358                .unwrap_or_default()
 359                .into_iter()
 360                .filter(|bp| bp.bp.state.is_enabled());
 361            let error_path = path.clone();
 362            let send_request = self
 363                .request(dap_command::SetBreakpoints {
 364                    source: client_source(&path),
 365                    source_modified: Some(false),
 366                    breakpoints,
 367                })
 368                .map(|result| result.map_err(move |e| (error_path, e)));
 369
 370            let task = cx.spawn({
 371                let breakpoint_store = breakpoint_store.downgrade();
 372                async move |cx| {
 373                    let breakpoints = cx.background_spawn(send_request).await?;
 374
 375                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 376                        |(dap_bp, zed_bp)| {
 377                            Some((
 378                                zed_bp,
 379                                BreakpointSessionState {
 380                                    id: dap_bp.id?,
 381                                    verified: dap_bp.verified,
 382                                },
 383                            ))
 384                        },
 385                    );
 386                    breakpoint_store
 387                        .update(cx, |this, _| {
 388                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 389                        })
 390                        .ok();
 391
 392                    Ok(())
 393                }
 394            });
 395            breakpoint_tasks.push(task);
 396        }
 397
 398        cx.background_spawn(async move {
 399            futures::future::join_all(breakpoint_tasks)
 400                .await
 401                .into_iter()
 402                .filter_map(Result::err)
 403                .collect::<HashMap<_, _>>()
 404        })
 405    }
 406
 407    fn initialize_sequence(
 408        &self,
 409        capabilities: &Capabilities,
 410        initialized_rx: oneshot::Receiver<()>,
 411        dap_store: WeakEntity<DapStore>,
 412        cx: &mut Context<Session>,
 413    ) -> Task<Result<()>> {
 414        let raw = self.binary.request_args.clone();
 415
 416        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 417        let launch = match raw.request {
 418            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 419                raw: raw.configuration,
 420            }),
 421            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 422                raw: raw.configuration,
 423            }),
 424        };
 425
 426        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 427        // From spec (on initialization sequence):
 428        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 429        //
 430        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 431        let should_send_exception_breakpoints = capabilities
 432            .exception_breakpoint_filters
 433            .as_ref()
 434            .is_some_and(|filters| !filters.is_empty())
 435            || !configuration_done_supported;
 436        let supports_exception_filters = capabilities
 437            .supports_exception_filter_options
 438            .unwrap_or_default();
 439        let this = self.clone();
 440        let worktree = self.worktree().clone();
 441        let mut filters = capabilities
 442            .exception_breakpoint_filters
 443            .clone()
 444            .unwrap_or_default();
 445        let configuration_sequence = cx.spawn({
 446            async move |session, cx| {
 447                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 448                let (breakpoint_store, adapter_defaults) =
 449                    dap_store.read_with(cx, |dap_store, _| {
 450                        (
 451                            dap_store.breakpoint_store().clone(),
 452                            dap_store.adapter_options(&adapter_name),
 453                        )
 454                    })?;
 455                initialized_rx.await?;
 456                let errors_by_path = cx
 457                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 458                    .await;
 459
 460                dap_store.update(cx, |_, cx| {
 461                    let Some(worktree) = worktree.upgrade() else {
 462                        return;
 463                    };
 464
 465                    for (path, error) in &errors_by_path {
 466                        log::error!("failed to set breakpoints for {path:?}: {error}");
 467                    }
 468
 469                    if let Some(failed_path) = errors_by_path.keys().next() {
 470                        let failed_path = failed_path
 471                            .strip_prefix(worktree.read(cx).abs_path())
 472                            .unwrap_or(failed_path)
 473                            .display();
 474                        let message = format!(
 475                            "Failed to set breakpoints for {failed_path}{}",
 476                            match errors_by_path.len() {
 477                                0 => unreachable!(),
 478                                1 => "".into(),
 479                                2 => " and 1 other path".into(),
 480                                n => format!(" and {} other paths", n - 1),
 481                            }
 482                        );
 483                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 484                    }
 485                })?;
 486
 487                if should_send_exception_breakpoints {
 488                    _ = session.update(cx, |this, _| {
 489                        filters.retain(|filter| {
 490                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 491                                defaults
 492                                    .exception_breakpoints
 493                                    .get(&filter.filter)
 494                                    .map(|options| options.enabled)
 495                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 496                            } else {
 497                                filter.default.unwrap_or_default()
 498                            };
 499                            this.exception_breakpoints
 500                                .entry(filter.filter.clone())
 501                                .or_insert_with(|| (filter.clone(), is_enabled));
 502                            is_enabled
 503                        });
 504                    });
 505
 506                    this.send_exception_breakpoints(filters, supports_exception_filters)
 507                        .await
 508                        .ok();
 509                }
 510
 511                if configuration_done_supported {
 512                    this.request(ConfigurationDone {})
 513                } else {
 514                    Task::ready(Ok(()))
 515                }
 516                .await
 517            }
 518        });
 519
 520        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 521
 522        cx.spawn(async move |this, cx| {
 523            let result = task.await;
 524
 525            this.update(cx, |this, cx| {
 526                if let Some(this) = this.as_running_mut() {
 527                    this.is_started = true;
 528                    cx.notify();
 529                }
 530            })
 531            .ok();
 532
 533            result?;
 534            anyhow::Ok(())
 535        })
 536    }
 537
 538    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 539        let client = self.client.clone();
 540        let messages_tx = self.messages_tx.clone();
 541        let message_handler = Box::new(move |message| {
 542            messages_tx.unbounded_send(message).ok();
 543        });
 544        if client.should_reconnect_for_ssh() {
 545            Some(cx.spawn(async move |cx| {
 546                client.connect(message_handler, cx).await?;
 547                anyhow::Ok(())
 548            }))
 549        } else {
 550            None
 551        }
 552    }
 553
 554    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 555    where
 556        <R::DapRequest as dap::requests::Request>::Response: 'static,
 557        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 558    {
 559        let request = Arc::new(request);
 560
 561        let request_clone = request.clone();
 562        let connection = self.client.clone();
 563        self.executor.spawn(async move {
 564            let args = request_clone.to_dap();
 565            let response = connection.request::<R::DapRequest>(args).await?;
 566            request.response_from_dap(response)
 567        })
 568    }
 569}
 570
 571impl SessionState {
 572    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 573    where
 574        <R::DapRequest as dap::requests::Request>::Response: 'static,
 575        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 576    {
 577        match self {
 578            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 579            SessionState::Booting(_) => Task::ready(Err(anyhow!(
 580                "no adapter running to send request: {request:?}"
 581            ))),
 582        }
 583    }
 584
 585    /// Did this debug session stop at least once?
 586    pub(crate) fn has_ever_stopped(&self) -> bool {
 587        match self {
 588            SessionState::Booting(_) => false,
 589            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 590        }
 591    }
 592
 593    fn stopped(&mut self) {
 594        if let SessionState::Running(running) = self {
 595            running.has_ever_stopped = true;
 596        }
 597    }
 598}
 599
 600#[derive(Default)]
 601struct ThreadStates {
 602    global_state: Option<ThreadStatus>,
 603    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 604}
 605
 606impl ThreadStates {
 607    fn stop_all_threads(&mut self) {
 608        self.global_state = Some(ThreadStatus::Stopped);
 609        self.known_thread_states.clear();
 610    }
 611
 612    fn exit_all_threads(&mut self) {
 613        self.global_state = Some(ThreadStatus::Exited);
 614        self.known_thread_states.clear();
 615    }
 616
 617    fn continue_all_threads(&mut self) {
 618        self.global_state = Some(ThreadStatus::Running);
 619        self.known_thread_states.clear();
 620    }
 621
 622    fn stop_thread(&mut self, thread_id: ThreadId) {
 623        self.known_thread_states
 624            .insert(thread_id, ThreadStatus::Stopped);
 625    }
 626
 627    fn continue_thread(&mut self, thread_id: ThreadId) {
 628        self.known_thread_states
 629            .insert(thread_id, ThreadStatus::Running);
 630    }
 631
 632    fn process_step(&mut self, thread_id: ThreadId) {
 633        self.known_thread_states
 634            .insert(thread_id, ThreadStatus::Stepping);
 635    }
 636
 637    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 638        self.thread_state(thread_id)
 639            .unwrap_or(ThreadStatus::Running)
 640    }
 641
 642    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 643        self.known_thread_states
 644            .get(&thread_id)
 645            .copied()
 646            .or(self.global_state)
 647    }
 648
 649    fn exit_thread(&mut self, thread_id: ThreadId) {
 650        self.known_thread_states
 651            .insert(thread_id, ThreadStatus::Exited);
 652    }
 653
 654    fn any_stopped_thread(&self) -> bool {
 655        self.global_state
 656            .is_some_and(|state| state == ThreadStatus::Stopped)
 657            || self
 658                .known_thread_states
 659                .values()
 660                .any(|status| *status == ThreadStatus::Stopped)
 661    }
 662}
 663const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 664
 665type IsEnabled = bool;
 666
 667#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 668pub struct OutputToken(pub usize);
 669/// Represents a current state of a single debug adapter and provides ways to mutate it.
 670pub struct Session {
 671    pub mode: SessionState,
 672    id: SessionId,
 673    label: Option<SharedString>,
 674    adapter: DebugAdapterName,
 675    pub(super) capabilities: Capabilities,
 676    child_session_ids: HashSet<SessionId>,
 677    parent_session: Option<Entity<Session>>,
 678    modules: Vec<dap::Module>,
 679    loaded_sources: Vec<dap::Source>,
 680    output_token: OutputToken,
 681    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 682    threads: IndexMap<ThreadId, Thread>,
 683    thread_states: ThreadStates,
 684    watchers: HashMap<SharedString, Watcher>,
 685    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 686    stack_frames: IndexMap<StackFrameId, StackFrame>,
 687    locations: HashMap<u64, dap::LocationsResponse>,
 688    is_session_terminated: bool,
 689    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 690    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 691    ignore_breakpoints: bool,
 692    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 693    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 694    background_tasks: Vec<Task<()>>,
 695    restart_task: Option<Task<()>>,
 696    task_context: TaskContext,
 697    memory: memory::Memory,
 698    quirks: SessionQuirks,
 699}
 700
 701trait CacheableCommand: Any + Send + Sync {
 702    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 703    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 704    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 705}
 706
 707impl<T> CacheableCommand for T
 708where
 709    T: LocalDapCommand + PartialEq + Eq + Hash,
 710{
 711    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 712        (rhs as &dyn Any).downcast_ref::<Self>() == Some(self)
 713    }
 714
 715    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 716        T::hash(self, &mut hasher);
 717    }
 718
 719    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 720        self
 721    }
 722}
 723
 724pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 725
 726impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 727    fn from(request: T) -> Self {
 728        Self(Arc::new(request))
 729    }
 730}
 731
 732impl PartialEq for RequestSlot {
 733    fn eq(&self, other: &Self) -> bool {
 734        self.0.dyn_eq(other.0.as_ref())
 735    }
 736}
 737
 738impl Eq for RequestSlot {}
 739
 740impl Hash for RequestSlot {
 741    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 742        self.0.dyn_hash(state);
 743        (&*self.0 as &dyn Any).type_id().hash(state)
 744    }
 745}
 746
 747#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 748pub struct CompletionsQuery {
 749    pub query: String,
 750    pub column: u64,
 751    pub line: Option<u64>,
 752    pub frame_id: Option<u64>,
 753}
 754
 755impl CompletionsQuery {
 756    pub fn new(
 757        buffer: &language::Buffer,
 758        cursor_position: language::Anchor,
 759        frame_id: Option<u64>,
 760    ) -> Self {
 761        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 762        Self {
 763            query: buffer.text(),
 764            column: column as u64,
 765            frame_id,
 766            line: Some(row as u64),
 767        }
 768    }
 769}
 770
 771#[derive(Debug)]
 772pub enum SessionEvent {
 773    Modules,
 774    LoadedSources,
 775    Stopped(Option<ThreadId>),
 776    StackTrace,
 777    Variables,
 778    Watchers,
 779    Threads,
 780    InvalidateInlineValue,
 781    CapabilitiesLoaded,
 782    RunInTerminal {
 783        request: RunInTerminalRequestArguments,
 784        sender: mpsc::Sender<Result<u32>>,
 785    },
 786    DataBreakpointInfo,
 787    ConsoleOutput,
 788}
 789
 790#[derive(Clone, Debug, PartialEq, Eq)]
 791pub enum SessionStateEvent {
 792    Running,
 793    Shutdown,
 794    Restart,
 795    SpawnChildSession {
 796        request: StartDebuggingRequestArguments,
 797    },
 798}
 799
 800impl EventEmitter<SessionEvent> for Session {}
 801impl EventEmitter<SessionStateEvent> for Session {}
 802
 803// local session will send breakpoint updates to DAP for all new breakpoints
 804// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 805// BreakpointStore notifies session on breakpoint changes
 806impl Session {
 807    pub(crate) fn new(
 808        breakpoint_store: Entity<BreakpointStore>,
 809        session_id: SessionId,
 810        parent_session: Option<Entity<Session>>,
 811        label: Option<SharedString>,
 812        adapter: DebugAdapterName,
 813        task_context: TaskContext,
 814        quirks: SessionQuirks,
 815        cx: &mut App,
 816    ) -> Entity<Self> {
 817        cx.new::<Self>(|cx| {
 818            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 819                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 820                    if let Some(local) = (!this.ignore_breakpoints)
 821                        .then(|| this.as_running_mut())
 822                        .flatten()
 823                    {
 824                        local
 825                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 826                            .detach();
 827                    };
 828                }
 829                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 830                    if let Some(local) = (!this.ignore_breakpoints)
 831                        .then(|| this.as_running_mut())
 832                        .flatten()
 833                    {
 834                        local.unset_breakpoints_from_paths(paths, cx).detach();
 835                    }
 836                }
 837                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 838            })
 839            .detach();
 840
 841            Self {
 842                mode: SessionState::Booting(None),
 843                id: session_id,
 844                child_session_ids: HashSet::default(),
 845                parent_session,
 846                capabilities: Capabilities::default(),
 847                watchers: HashMap::default(),
 848                variables: Default::default(),
 849                stack_frames: Default::default(),
 850                thread_states: ThreadStates::default(),
 851                output_token: OutputToken(0),
 852                output: circular_buffer::CircularBuffer::boxed(),
 853                requests: HashMap::default(),
 854                modules: Vec::default(),
 855                loaded_sources: Vec::default(),
 856                threads: IndexMap::default(),
 857                background_tasks: Vec::default(),
 858                restart_task: None,
 859                locations: Default::default(),
 860                is_session_terminated: false,
 861                ignore_breakpoints: false,
 862                breakpoint_store,
 863                data_breakpoints: Default::default(),
 864                exception_breakpoints: Default::default(),
 865                label,
 866                adapter,
 867                task_context,
 868                memory: memory::Memory::new(),
 869                quirks,
 870            }
 871        })
 872    }
 873
 874    pub fn task_context(&self) -> &TaskContext {
 875        &self.task_context
 876    }
 877
 878    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 879        match &self.mode {
 880            SessionState::Booting(_) => None,
 881            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 882        }
 883    }
 884
 885    pub fn boot(
 886        &mut self,
 887        binary: DebugAdapterBinary,
 888        worktree: Entity<Worktree>,
 889        dap_store: WeakEntity<DapStore>,
 890        cx: &mut Context<Self>,
 891    ) -> Task<Result<()>> {
 892        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 893        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 894
 895        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 896            let mut initialized_tx = Some(initialized_tx);
 897            while let Some(message) = message_rx.next().await {
 898                if let Message::Event(event) = message {
 899                    if let Events::Initialized(_) = *event {
 900                        if let Some(tx) = initialized_tx.take() {
 901                            tx.send(()).ok();
 902                        }
 903                    } else {
 904                        let Ok(_) = this.update(cx, |session, cx| {
 905                            session.handle_dap_event(event, cx);
 906                        }) else {
 907                            break;
 908                        };
 909                    }
 910                } else if let Message::Request(request) = message {
 911                    let Ok(_) = this.update(cx, |this, cx| {
 912                        if request.command == StartDebugging::COMMAND {
 913                            this.handle_start_debugging_request(request, cx)
 914                                .detach_and_log_err(cx);
 915                        } else if request.command == RunInTerminal::COMMAND {
 916                            this.handle_run_in_terminal_request(request, cx)
 917                                .detach_and_log_err(cx);
 918                        }
 919                    }) else {
 920                        break;
 921                    };
 922                }
 923            }
 924        })];
 925        self.background_tasks = background_tasks;
 926        let id = self.id;
 927        let parent_session = self.parent_session.clone();
 928
 929        cx.spawn(async move |this, cx| {
 930            let mode = RunningMode::new(
 931                id,
 932                parent_session,
 933                worktree.downgrade(),
 934                binary.clone(),
 935                message_tx,
 936                cx,
 937            )
 938            .await?;
 939            this.update(cx, |this, cx| {
 940                match &mut this.mode {
 941                    SessionState::Booting(task) if task.is_some() => {
 942                        task.take().unwrap().detach_and_log_err(cx);
 943                    }
 944                    SessionState::Booting(_) => {}
 945                    SessionState::Running(_) => {
 946                        debug_panic!("Attempting to boot a session that is already running");
 947                    }
 948                };
 949                this.mode = SessionState::Running(mode);
 950                cx.emit(SessionStateEvent::Running);
 951            })?;
 952
 953            this.update(cx, |session, cx| session.request_initialize(cx))?
 954                .await?;
 955
 956            let result = this
 957                .update(cx, |session, cx| {
 958                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 959                })?
 960                .await;
 961
 962            if result.is_err() {
 963                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 964
 965                console
 966                    .send(format!(
 967                        "Tried to launch debugger with: {}",
 968                        serde_json::to_string_pretty(&binary.request_args.configuration)
 969                            .unwrap_or_default(),
 970                    ))
 971                    .await
 972                    .ok();
 973            }
 974
 975            result
 976        })
 977    }
 978
 979    pub fn session_id(&self) -> SessionId {
 980        self.id
 981    }
 982
 983    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 984        self.child_session_ids.clone()
 985    }
 986
 987    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 988        self.child_session_ids.insert(session_id);
 989    }
 990
 991    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 992        self.child_session_ids.remove(&session_id);
 993    }
 994
 995    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 996        self.parent_session
 997            .as_ref()
 998            .map(|session| session.read(cx).id)
 999    }
1000
1001    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1002        self.parent_session.as_ref()
1003    }
1004
1005    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1006        let Some(client) = self.adapter_client() else {
1007            return Task::ready(());
1008        };
1009
1010        let supports_terminate = self
1011            .capabilities
1012            .support_terminate_debuggee
1013            .unwrap_or(false);
1014
1015        cx.background_spawn(async move {
1016            if supports_terminate {
1017                client
1018                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1019                        restart: Some(false),
1020                    })
1021                    .await
1022                    .ok();
1023            } else {
1024                client
1025                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1026                        restart: Some(false),
1027                        terminate_debuggee: Some(true),
1028                        suspend_debuggee: Some(false),
1029                    })
1030                    .await
1031                    .ok();
1032            }
1033        })
1034    }
1035
1036    pub fn capabilities(&self) -> &Capabilities {
1037        &self.capabilities
1038    }
1039
1040    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1041        match &self.mode {
1042            SessionState::Booting(_) => None,
1043            SessionState::Running(running_mode) => Some(&running_mode.binary),
1044        }
1045    }
1046
1047    pub fn adapter(&self) -> DebugAdapterName {
1048        self.adapter.clone()
1049    }
1050
1051    pub fn label(&self) -> Option<SharedString> {
1052        self.label.clone()
1053    }
1054
1055    pub fn is_terminated(&self) -> bool {
1056        self.is_session_terminated
1057    }
1058
1059    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1060        let (tx, mut rx) = mpsc::unbounded();
1061
1062        cx.spawn(async move |this, cx| {
1063            while let Some(output) = rx.next().await {
1064                this.update(cx, |this, _| {
1065                    let event = dap::OutputEvent {
1066                        category: None,
1067                        output,
1068                        group: None,
1069                        variables_reference: None,
1070                        source: None,
1071                        line: None,
1072                        column: None,
1073                        data: None,
1074                        location_reference: None,
1075                    };
1076                    this.push_output(event);
1077                })?;
1078            }
1079            anyhow::Ok(())
1080        })
1081        .detach();
1082
1083        tx
1084    }
1085
1086    pub fn is_started(&self) -> bool {
1087        match &self.mode {
1088            SessionState::Booting(_) => false,
1089            SessionState::Running(running) => running.is_started,
1090        }
1091    }
1092
1093    pub fn is_building(&self) -> bool {
1094        matches!(self.mode, SessionState::Booting(_))
1095    }
1096
1097    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1098        match &mut self.mode {
1099            SessionState::Running(local_mode) => Some(local_mode),
1100            SessionState::Booting(_) => None,
1101        }
1102    }
1103
1104    pub fn as_running(&self) -> Option<&RunningMode> {
1105        match &self.mode {
1106            SessionState::Running(local_mode) => Some(local_mode),
1107            SessionState::Booting(_) => None,
1108        }
1109    }
1110
1111    fn handle_start_debugging_request(
1112        &mut self,
1113        request: dap::messages::Request,
1114        cx: &mut Context<Self>,
1115    ) -> Task<Result<()>> {
1116        let request_seq = request.seq;
1117
1118        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1119            .arguments
1120            .as_ref()
1121            .map(|value| serde_json::from_value(value.clone()));
1122
1123        let mut success = true;
1124        if let Some(Ok(request)) = launch_request {
1125            cx.emit(SessionStateEvent::SpawnChildSession { request });
1126        } else {
1127            log::error!(
1128                "Failed to parse launch request arguments: {:?}",
1129                request.arguments
1130            );
1131            success = false;
1132        }
1133
1134        cx.spawn(async move |this, cx| {
1135            this.update(cx, |this, cx| {
1136                this.respond_to_client(
1137                    request_seq,
1138                    success,
1139                    StartDebugging::COMMAND.to_string(),
1140                    None,
1141                    cx,
1142                )
1143            })?
1144            .await
1145        })
1146    }
1147
1148    fn handle_run_in_terminal_request(
1149        &mut self,
1150        request: dap::messages::Request,
1151        cx: &mut Context<Self>,
1152    ) -> Task<Result<()>> {
1153        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1154            request.arguments.unwrap_or_default(),
1155        ) {
1156            Ok(args) => args,
1157            Err(error) => {
1158                return cx.spawn(async move |session, cx| {
1159                    let error = serde_json::to_value(dap::ErrorResponse {
1160                        error: Some(dap::Message {
1161                            id: request.seq,
1162                            format: error.to_string(),
1163                            variables: None,
1164                            send_telemetry: None,
1165                            show_user: None,
1166                            url: None,
1167                            url_label: None,
1168                        }),
1169                    })
1170                    .ok();
1171
1172                    session
1173                        .update(cx, |this, cx| {
1174                            this.respond_to_client(
1175                                request.seq,
1176                                false,
1177                                StartDebugging::COMMAND.to_string(),
1178                                error,
1179                                cx,
1180                            )
1181                        })?
1182                        .await?;
1183
1184                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1185                });
1186            }
1187        };
1188
1189        let seq = request.seq;
1190
1191        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1192        cx.emit(SessionEvent::RunInTerminal {
1193            request: request_args,
1194            sender: tx,
1195        });
1196        cx.notify();
1197
1198        cx.spawn(async move |session, cx| {
1199            let result = util::maybe!(async move {
1200                rx.next().await.ok_or_else(|| {
1201                    anyhow!("failed to receive response from spawn terminal".to_string())
1202                })?
1203            })
1204            .await;
1205            let (success, body) = match result {
1206                Ok(pid) => (
1207                    true,
1208                    serde_json::to_value(dap::RunInTerminalResponse {
1209                        process_id: None,
1210                        shell_process_id: Some(pid as u64),
1211                    })
1212                    .ok(),
1213                ),
1214                Err(error) => (
1215                    false,
1216                    serde_json::to_value(dap::ErrorResponse {
1217                        error: Some(dap::Message {
1218                            id: seq,
1219                            format: error.to_string(),
1220                            variables: None,
1221                            send_telemetry: None,
1222                            show_user: None,
1223                            url: None,
1224                            url_label: None,
1225                        }),
1226                    })
1227                    .ok(),
1228                ),
1229            };
1230
1231            session
1232                .update(cx, |session, cx| {
1233                    session.respond_to_client(
1234                        seq,
1235                        success,
1236                        RunInTerminal::COMMAND.to_string(),
1237                        body,
1238                        cx,
1239                    )
1240                })?
1241                .await
1242        })
1243    }
1244
1245    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1246        let adapter_id = self.adapter().to_string();
1247        let request = Initialize { adapter_id };
1248
1249        let SessionState::Running(running) = &self.mode else {
1250            return Task::ready(Err(anyhow!(
1251                "Cannot send initialize request, task still building"
1252            )));
1253        };
1254        let mut response = running.request(request.clone());
1255
1256        cx.spawn(async move |this, cx| {
1257            loop {
1258                let capabilities = response.await;
1259                match capabilities {
1260                    Err(e) => {
1261                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1262                            this.as_running()
1263                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1264                        }) else {
1265                            return Err(e);
1266                        };
1267                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1268                        reconnect.await?;
1269
1270                        let Ok(Some(r)) = this.update(cx, |this, _| {
1271                            this.as_running()
1272                                .map(|running| running.request(request.clone()))
1273                        }) else {
1274                            return Err(e);
1275                        };
1276                        response = r
1277                    }
1278                    Ok(capabilities) => {
1279                        this.update(cx, |session, cx| {
1280                            session.capabilities = capabilities;
1281
1282                            cx.emit(SessionEvent::CapabilitiesLoaded);
1283                        })?;
1284                        return Ok(());
1285                    }
1286                }
1287            }
1288        })
1289    }
1290
1291    pub(super) fn initialize_sequence(
1292        &mut self,
1293        initialize_rx: oneshot::Receiver<()>,
1294        dap_store: WeakEntity<DapStore>,
1295        cx: &mut Context<Self>,
1296    ) -> Task<Result<()>> {
1297        match &self.mode {
1298            SessionState::Running(local_mode) => {
1299                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1300            }
1301            SessionState::Booting(_) => {
1302                Task::ready(Err(anyhow!("cannot initialize, still building")))
1303            }
1304        }
1305    }
1306
1307    pub fn run_to_position(
1308        &mut self,
1309        breakpoint: SourceBreakpoint,
1310        active_thread_id: ThreadId,
1311        cx: &mut Context<Self>,
1312    ) {
1313        match &mut self.mode {
1314            SessionState::Running(local_mode) => {
1315                if !matches!(
1316                    self.thread_states.thread_state(active_thread_id),
1317                    Some(ThreadStatus::Stopped)
1318                ) {
1319                    return;
1320                };
1321                let path = breakpoint.path.clone();
1322                local_mode.tmp_breakpoint = Some(breakpoint);
1323                let task = local_mode.send_breakpoints_from_path(
1324                    path,
1325                    BreakpointUpdatedReason::Toggled,
1326                    &self.breakpoint_store,
1327                    cx,
1328                );
1329
1330                cx.spawn(async move |this, cx| {
1331                    task.await;
1332                    this.update(cx, |this, cx| {
1333                        this.continue_thread(active_thread_id, cx);
1334                    })
1335                })
1336                .detach();
1337            }
1338            SessionState::Booting(_) => {}
1339        }
1340    }
1341
1342    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1343        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1344    }
1345
1346    pub fn output(
1347        &self,
1348        since: OutputToken,
1349    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1350        if self.output_token.0 == 0 {
1351            return (self.output.range(0..0), OutputToken(0));
1352        };
1353
1354        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1355
1356        let clamped_events_since = events_since.clamp(0, self.output.len());
1357        (
1358            self.output
1359                .range(self.output.len() - clamped_events_since..),
1360            self.output_token,
1361        )
1362    }
1363
1364    pub fn respond_to_client(
1365        &self,
1366        request_seq: u64,
1367        success: bool,
1368        command: String,
1369        body: Option<serde_json::Value>,
1370        cx: &mut Context<Self>,
1371    ) -> Task<Result<()>> {
1372        let Some(local_session) = self.as_running() else {
1373            unreachable!("Cannot respond to remote client");
1374        };
1375        let client = local_session.client.clone();
1376
1377        cx.background_spawn(async move {
1378            client
1379                .send_message(Message::Response(Response {
1380                    body,
1381                    success,
1382                    command,
1383                    seq: request_seq + 1,
1384                    request_seq,
1385                    message: None,
1386                }))
1387                .await
1388        })
1389    }
1390
1391    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1392        self.mode.stopped();
1393        // todo(debugger): Find a clean way to get around the clone
1394        let breakpoint_store = self.breakpoint_store.clone();
1395        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1396            let breakpoint = local.tmp_breakpoint.take()?;
1397            let path = breakpoint.path;
1398            Some((local, path))
1399        }) {
1400            local
1401                .send_breakpoints_from_path(
1402                    path,
1403                    BreakpointUpdatedReason::Toggled,
1404                    &breakpoint_store,
1405                    cx,
1406                )
1407                .detach();
1408        };
1409
1410        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1411            self.thread_states.stop_all_threads();
1412            self.invalidate_command_type::<StackTraceCommand>();
1413        }
1414
1415        // Event if we stopped all threads we still need to insert the thread_id
1416        // to our own data
1417        if let Some(thread_id) = event.thread_id {
1418            self.thread_states.stop_thread(ThreadId(thread_id));
1419
1420            self.invalidate_state(
1421                &StackTraceCommand {
1422                    thread_id,
1423                    start_frame: None,
1424                    levels: None,
1425                }
1426                .into(),
1427            );
1428        }
1429
1430        self.invalidate_generic();
1431        self.threads.clear();
1432        self.variables.clear();
1433        cx.emit(SessionEvent::Stopped(
1434            event
1435                .thread_id
1436                .map(Into::into)
1437                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1438        ));
1439        cx.emit(SessionEvent::InvalidateInlineValue);
1440        cx.notify();
1441    }
1442
1443    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1444        match *event {
1445            Events::Initialized(_) => {
1446                debug_assert!(
1447                    false,
1448                    "Initialized event should have been handled in LocalMode"
1449                );
1450            }
1451            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1452            Events::Continued(event) => {
1453                if event.all_threads_continued.unwrap_or_default() {
1454                    self.thread_states.continue_all_threads();
1455                    self.breakpoint_store.update(cx, |store, cx| {
1456                        store.remove_active_position(Some(self.session_id()), cx)
1457                    });
1458                } else {
1459                    self.thread_states
1460                        .continue_thread(ThreadId(event.thread_id));
1461                }
1462                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1463                self.invalidate_generic();
1464            }
1465            Events::Exited(_event) => {
1466                self.clear_active_debug_line(cx);
1467            }
1468            Events::Terminated(_) => {
1469                self.shutdown(cx).detach();
1470            }
1471            Events::Thread(event) => {
1472                let thread_id = ThreadId(event.thread_id);
1473
1474                match event.reason {
1475                    dap::ThreadEventReason::Started => {
1476                        self.thread_states.continue_thread(thread_id);
1477                    }
1478                    dap::ThreadEventReason::Exited => {
1479                        self.thread_states.exit_thread(thread_id);
1480                    }
1481                    reason => {
1482                        log::error!("Unhandled thread event reason {:?}", reason);
1483                    }
1484                }
1485                self.invalidate_state(&ThreadsCommand.into());
1486                cx.notify();
1487            }
1488            Events::Output(event) => {
1489                if event
1490                    .category
1491                    .as_ref()
1492                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1493                {
1494                    return;
1495                }
1496
1497                self.push_output(event);
1498                cx.notify();
1499            }
1500            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1501                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1502            }),
1503            Events::Module(event) => {
1504                match event.reason {
1505                    dap::ModuleEventReason::New => {
1506                        self.modules.push(event.module);
1507                    }
1508                    dap::ModuleEventReason::Changed => {
1509                        if let Some(module) = self
1510                            .modules
1511                            .iter_mut()
1512                            .find(|other| event.module.id == other.id)
1513                        {
1514                            *module = event.module;
1515                        }
1516                    }
1517                    dap::ModuleEventReason::Removed => {
1518                        self.modules.retain(|other| event.module.id != other.id);
1519                    }
1520                }
1521
1522                // todo(debugger): We should only send the invalidate command to downstream clients.
1523                // self.invalidate_state(&ModulesCommand.into());
1524            }
1525            Events::LoadedSource(_) => {
1526                self.invalidate_state(&LoadedSourcesCommand.into());
1527            }
1528            Events::Capabilities(event) => {
1529                self.capabilities = self.capabilities.merge(event.capabilities);
1530
1531                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1532                let recent_filters = self
1533                    .capabilities
1534                    .exception_breakpoint_filters
1535                    .iter()
1536                    .flatten()
1537                    .map(|filter| (filter.filter.clone(), filter.clone()))
1538                    .collect::<BTreeMap<_, _>>();
1539                for filter in recent_filters.values() {
1540                    let default = filter.default.unwrap_or_default();
1541                    self.exception_breakpoints
1542                        .entry(filter.filter.clone())
1543                        .or_insert_with(|| (filter.clone(), default));
1544                }
1545                self.exception_breakpoints
1546                    .retain(|k, _| recent_filters.contains_key(k));
1547                if self.is_started() {
1548                    self.send_exception_breakpoints(cx);
1549                }
1550
1551                // Remove the ones that no longer exist.
1552                cx.notify();
1553            }
1554            Events::Memory(_) => {}
1555            Events::Process(_) => {}
1556            Events::ProgressEnd(_) => {}
1557            Events::ProgressStart(_) => {}
1558            Events::ProgressUpdate(_) => {}
1559            Events::Invalidated(_) => {}
1560            Events::Other(_) => {}
1561        }
1562    }
1563
1564    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1565    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1566        &mut self,
1567        request: T,
1568        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1569        cx: &mut Context<Self>,
1570    ) {
1571        const {
1572            assert!(
1573                T::CACHEABLE,
1574                "Only requests marked as cacheable should invoke `fetch`"
1575            );
1576        }
1577
1578        if !self.thread_states.any_stopped_thread()
1579            && request.type_id() != TypeId::of::<ThreadsCommand>()
1580            || self.is_session_terminated
1581        {
1582            return;
1583        }
1584
1585        let request_map = self
1586            .requests
1587            .entry(std::any::TypeId::of::<T>())
1588            .or_default();
1589
1590        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1591            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1592
1593            let task = Self::request_inner::<Arc<T>>(
1594                &self.capabilities,
1595                &self.mode,
1596                command,
1597                |this, result, cx| {
1598                    process_result(this, result, cx);
1599                    None
1600                },
1601                cx,
1602            );
1603            let task = cx
1604                .background_executor()
1605                .spawn(async move {
1606                    let _ = task.await?;
1607                    Some(())
1608                })
1609                .shared();
1610
1611            vacant.insert(task);
1612            cx.notify();
1613        }
1614    }
1615
1616    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1617        capabilities: &Capabilities,
1618        mode: &SessionState,
1619        request: T,
1620        process_result: impl FnOnce(
1621            &mut Self,
1622            Result<T::Response>,
1623            &mut Context<Self>,
1624        ) -> Option<T::Response>
1625        + 'static,
1626        cx: &mut Context<Self>,
1627    ) -> Task<Option<T::Response>> {
1628        if !T::is_supported(capabilities) {
1629            log::warn!(
1630                "Attempted to send a DAP request that isn't supported: {:?}",
1631                request
1632            );
1633            let error = Err(anyhow::Error::msg(
1634                "Couldn't complete request because it's not supported",
1635            ));
1636            return cx.spawn(async move |this, cx| {
1637                this.update(cx, |this, cx| process_result(this, error, cx))
1638                    .ok()
1639                    .flatten()
1640            });
1641        }
1642
1643        let request = mode.request_dap(request);
1644        cx.spawn(async move |this, cx| {
1645            let result = request.await;
1646            this.update(cx, |this, cx| process_result(this, result, cx))
1647                .ok()
1648                .flatten()
1649        })
1650    }
1651
1652    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1653        &self,
1654        request: T,
1655        process_result: impl FnOnce(
1656            &mut Self,
1657            Result<T::Response>,
1658            &mut Context<Self>,
1659        ) -> Option<T::Response>
1660        + 'static,
1661        cx: &mut Context<Self>,
1662    ) -> Task<Option<T::Response>> {
1663        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1664    }
1665
1666    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1667        self.requests.remove(&std::any::TypeId::of::<Command>());
1668    }
1669
1670    fn invalidate_generic(&mut self) {
1671        self.invalidate_command_type::<ModulesCommand>();
1672        self.invalidate_command_type::<LoadedSourcesCommand>();
1673        self.invalidate_command_type::<ThreadsCommand>();
1674        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1675        self.invalidate_command_type::<ReadMemory>();
1676        let executor = self.as_running().map(|running| running.executor.clone());
1677        if let Some(executor) = executor {
1678            self.memory.clear(&executor);
1679        }
1680    }
1681
1682    fn invalidate_state(&mut self, key: &RequestSlot) {
1683        self.requests
1684            .entry((&*key.0 as &dyn Any).type_id())
1685            .and_modify(|request_map| {
1686                request_map.remove(key);
1687            });
1688    }
1689
1690    fn push_output(&mut self, event: OutputEvent) {
1691        self.output.push_back(event);
1692        self.output_token.0 += 1;
1693    }
1694
1695    pub fn any_stopped_thread(&self) -> bool {
1696        self.thread_states.any_stopped_thread()
1697    }
1698
1699    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1700        self.thread_states.thread_status(thread_id)
1701    }
1702
1703    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1704        self.fetch(
1705            dap_command::ThreadsCommand,
1706            |this, result, cx| {
1707                let Some(result) = result.log_err() else {
1708                    return;
1709                };
1710
1711                this.threads = result
1712                    .into_iter()
1713                    .map(|thread| (ThreadId(thread.id), Thread::from(thread)))
1714                    .collect();
1715
1716                this.invalidate_command_type::<StackTraceCommand>();
1717                cx.emit(SessionEvent::Threads);
1718                cx.notify();
1719            },
1720            cx,
1721        );
1722
1723        self.threads
1724            .values()
1725            .map(|thread| {
1726                (
1727                    thread.dap.clone(),
1728                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1729                )
1730            })
1731            .collect()
1732    }
1733
1734    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1735        self.fetch(
1736            dap_command::ModulesCommand,
1737            |this, result, cx| {
1738                let Some(result) = result.log_err() else {
1739                    return;
1740                };
1741
1742                this.modules = result;
1743                cx.emit(SessionEvent::Modules);
1744                cx.notify();
1745            },
1746            cx,
1747        );
1748
1749        &self.modules
1750    }
1751
1752    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1753    pub fn data_access_size(
1754        &mut self,
1755        frame_id: Option<u64>,
1756        evaluate_name: &str,
1757        cx: &mut Context<Self>,
1758    ) -> Task<Option<u64>> {
1759        let request = self.request(
1760            EvaluateCommand {
1761                expression: format!("?${{sizeof({evaluate_name})}}"),
1762                frame_id,
1763
1764                context: Some(EvaluateArgumentsContext::Repl),
1765                source: None,
1766            },
1767            |_, response, _| response.ok(),
1768            cx,
1769        );
1770        cx.background_spawn(async move {
1771            let result = request.await?;
1772            result.result.parse().ok()
1773        })
1774    }
1775
1776    pub fn memory_reference_of_expr(
1777        &mut self,
1778        frame_id: Option<u64>,
1779        expression: String,
1780        cx: &mut Context<Self>,
1781    ) -> Task<Option<(String, Option<String>)>> {
1782        let request = self.request(
1783            EvaluateCommand {
1784                expression,
1785                frame_id,
1786
1787                context: Some(EvaluateArgumentsContext::Repl),
1788                source: None,
1789            },
1790            |_, response, _| response.ok(),
1791            cx,
1792        );
1793        cx.background_spawn(async move {
1794            let result = request.await?;
1795            result
1796                .memory_reference
1797                .map(|reference| (reference, result.type_))
1798        })
1799    }
1800
1801    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1802        let data = base64::engine::general_purpose::STANDARD.encode(data);
1803        self.request(
1804            WriteMemoryArguments {
1805                memory_reference: address.to_string(),
1806                data,
1807                allow_partial: None,
1808                offset: None,
1809            },
1810            |this, response, cx| {
1811                this.memory.clear(cx.background_executor());
1812                this.invalidate_command_type::<ReadMemory>();
1813                this.invalidate_command_type::<VariablesCommand>();
1814                cx.emit(SessionEvent::Variables);
1815                response.ok()
1816            },
1817            cx,
1818        )
1819        .detach();
1820    }
1821    pub fn read_memory(
1822        &mut self,
1823        range: RangeInclusive<u64>,
1824        cx: &mut Context<Self>,
1825    ) -> MemoryIterator {
1826        // This function is a bit more involved when it comes to fetching data.
1827        // Since we attempt to read memory in pages, we need to account for some parts
1828        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1829        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1830        let page_range = Memory::memory_range_to_page_range(range.clone());
1831        for page_address in PageAddress::iter_range(page_range) {
1832            self.read_single_page_memory(page_address, cx);
1833        }
1834        self.memory.memory_range(range)
1835    }
1836
1837    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1838        _ = maybe!({
1839            let builder = self.memory.build_page(page_start)?;
1840
1841            self.memory_read_fetch_page_recursive(builder, cx);
1842            Some(())
1843        });
1844    }
1845    fn memory_read_fetch_page_recursive(
1846        &mut self,
1847        mut builder: MemoryPageBuilder,
1848        cx: &mut Context<Self>,
1849    ) {
1850        let Some(next_request) = builder.next_request() else {
1851            // We're done fetching. Let's grab the page and insert it into our memory store.
1852            let (address, contents) = builder.build();
1853            self.memory.insert_page(address, contents);
1854
1855            return;
1856        };
1857        let size = next_request.size;
1858        self.fetch(
1859            ReadMemory {
1860                memory_reference: format!("0x{:X}", next_request.address),
1861                offset: Some(0),
1862                count: next_request.size,
1863            },
1864            move |this, memory, cx| {
1865                if let Ok(memory) = memory {
1866                    builder.known(memory.content);
1867                    if let Some(unknown) = memory.unreadable_bytes {
1868                        builder.unknown(unknown);
1869                    }
1870                    // This is the recursive bit: if we're not yet done with
1871                    // the whole page, we'll kick off a new request with smaller range.
1872                    // Note that this function is recursive only conceptually;
1873                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1874                    this.memory_read_fetch_page_recursive(builder, cx);
1875                } else {
1876                    builder.unknown(size);
1877                }
1878            },
1879            cx,
1880        );
1881    }
1882
1883    pub fn ignore_breakpoints(&self) -> bool {
1884        self.ignore_breakpoints
1885    }
1886
1887    pub fn toggle_ignore_breakpoints(
1888        &mut self,
1889        cx: &mut App,
1890    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1891        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1892    }
1893
1894    pub(crate) fn set_ignore_breakpoints(
1895        &mut self,
1896        ignore: bool,
1897        cx: &mut App,
1898    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1899        if self.ignore_breakpoints == ignore {
1900            return Task::ready(HashMap::default());
1901        }
1902
1903        self.ignore_breakpoints = ignore;
1904
1905        if let Some(local) = self.as_running() {
1906            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1907        } else {
1908            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1909            unimplemented!()
1910        }
1911    }
1912
1913    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
1914        self.data_breakpoints.values()
1915    }
1916
1917    pub fn exception_breakpoints(
1918        &self,
1919    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1920        self.exception_breakpoints.values()
1921    }
1922
1923    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1924        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1925            *is_enabled = !*is_enabled;
1926            self.send_exception_breakpoints(cx);
1927        }
1928    }
1929
1930    fn send_exception_breakpoints(&mut self, cx: &App) {
1931        if let Some(local) = self.as_running() {
1932            let exception_filters = self
1933                .exception_breakpoints
1934                .values()
1935                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1936                .collect();
1937
1938            let supports_exception_filters = self
1939                .capabilities
1940                .supports_exception_filter_options
1941                .unwrap_or_default();
1942            local
1943                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1944                .detach_and_log_err(cx);
1945        } else {
1946            debug_assert!(false, "Not implemented");
1947        }
1948    }
1949
1950    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
1951        if let Some(state) = self.data_breakpoints.get_mut(id) {
1952            state.is_enabled = !state.is_enabled;
1953            self.send_exception_breakpoints(cx);
1954        }
1955    }
1956
1957    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
1958        if let Some(mode) = self.as_running() {
1959            let breakpoints = self
1960                .data_breakpoints
1961                .values()
1962                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
1963                .collect();
1964            let command = SetDataBreakpointsCommand { breakpoints };
1965            mode.request(command).detach_and_log_err(cx);
1966        }
1967    }
1968
1969    pub fn create_data_breakpoint(
1970        &mut self,
1971        context: Arc<DataBreakpointContext>,
1972        data_id: String,
1973        dap: dap::DataBreakpoint,
1974        cx: &mut Context<Self>,
1975    ) {
1976        if self.data_breakpoints.remove(&data_id).is_none() {
1977            self.data_breakpoints.insert(
1978                data_id,
1979                DataBreakpointState {
1980                    dap,
1981                    is_enabled: true,
1982                    context,
1983                },
1984            );
1985        }
1986        self.send_data_breakpoints(cx);
1987    }
1988
1989    pub fn breakpoints_enabled(&self) -> bool {
1990        self.ignore_breakpoints
1991    }
1992
1993    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1994        self.fetch(
1995            dap_command::LoadedSourcesCommand,
1996            |this, result, cx| {
1997                let Some(result) = result.log_err() else {
1998                    return;
1999                };
2000                this.loaded_sources = result;
2001                cx.emit(SessionEvent::LoadedSources);
2002                cx.notify();
2003            },
2004            cx,
2005        );
2006
2007        &self.loaded_sources
2008    }
2009
2010    fn fallback_to_manual_restart(
2011        &mut self,
2012        res: Result<()>,
2013        cx: &mut Context<Self>,
2014    ) -> Option<()> {
2015        if res.log_err().is_none() {
2016            cx.emit(SessionStateEvent::Restart);
2017            return None;
2018        }
2019        Some(())
2020    }
2021
2022    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2023        res.log_err()?;
2024        Some(())
2025    }
2026
2027    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2028        thread_id: ThreadId,
2029    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2030    {
2031        move |this, response, cx| match response.log_err() {
2032            Some(response) => {
2033                this.breakpoint_store.update(cx, |store, cx| {
2034                    store.remove_active_position(Some(this.session_id()), cx)
2035                });
2036                Some(response)
2037            }
2038            None => {
2039                this.thread_states.stop_thread(thread_id);
2040                cx.notify();
2041                None
2042            }
2043        }
2044    }
2045
2046    fn clear_active_debug_line_response(
2047        &mut self,
2048        response: Result<()>,
2049        cx: &mut Context<Session>,
2050    ) -> Option<()> {
2051        response.log_err()?;
2052        self.clear_active_debug_line(cx);
2053        Some(())
2054    }
2055
2056    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2057        self.breakpoint_store.update(cx, |store, cx| {
2058            store.remove_active_position(Some(self.id), cx)
2059        });
2060    }
2061
2062    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2063        self.request(
2064            PauseCommand {
2065                thread_id: thread_id.0,
2066            },
2067            Self::empty_response,
2068            cx,
2069        )
2070        .detach();
2071    }
2072
2073    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2074        self.request(
2075            RestartStackFrameCommand { stack_frame_id },
2076            Self::empty_response,
2077            cx,
2078        )
2079        .detach();
2080    }
2081
2082    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2083        if self.restart_task.is_some() || self.as_running().is_none() {
2084            return;
2085        }
2086
2087        let supports_dap_restart =
2088            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2089
2090        self.restart_task = Some(cx.spawn(async move |this, cx| {
2091            let _ = this.update(cx, |session, cx| {
2092                if supports_dap_restart {
2093                    session
2094                        .request(
2095                            RestartCommand {
2096                                raw: args.unwrap_or(Value::Null),
2097                            },
2098                            Self::fallback_to_manual_restart,
2099                            cx,
2100                        )
2101                        .detach();
2102                } else {
2103                    cx.emit(SessionStateEvent::Restart);
2104                }
2105            });
2106        }));
2107    }
2108
2109    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2110        if self.is_session_terminated {
2111            return Task::ready(());
2112        }
2113
2114        self.is_session_terminated = true;
2115        self.thread_states.exit_all_threads();
2116        cx.notify();
2117
2118        let task = match &mut self.mode {
2119            SessionState::Running(_) => {
2120                if self
2121                    .capabilities
2122                    .supports_terminate_request
2123                    .unwrap_or_default()
2124                {
2125                    self.request(
2126                        TerminateCommand {
2127                            restart: Some(false),
2128                        },
2129                        Self::clear_active_debug_line_response,
2130                        cx,
2131                    )
2132                } else {
2133                    self.request(
2134                        DisconnectCommand {
2135                            restart: Some(false),
2136                            terminate_debuggee: Some(true),
2137                            suspend_debuggee: Some(false),
2138                        },
2139                        Self::clear_active_debug_line_response,
2140                        cx,
2141                    )
2142                }
2143            }
2144            SessionState::Booting(build_task) => {
2145                build_task.take();
2146                Task::ready(Some(()))
2147            }
2148        };
2149
2150        cx.emit(SessionStateEvent::Shutdown);
2151
2152        cx.spawn(async move |this, cx| {
2153            task.await;
2154            let _ = this.update(cx, |this, _| {
2155                if let Some(adapter_client) = this.adapter_client() {
2156                    adapter_client.kill();
2157                }
2158            });
2159        })
2160    }
2161
2162    pub fn completions(
2163        &mut self,
2164        query: CompletionsQuery,
2165        cx: &mut Context<Self>,
2166    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2167        let task = self.request(query, |_, result, _| result.log_err(), cx);
2168
2169        cx.background_executor().spawn(async move {
2170            anyhow::Ok(
2171                task.await
2172                    .map(|response| response.targets)
2173                    .context("failed to fetch completions")?,
2174            )
2175        })
2176    }
2177
2178    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2179        let supports_single_thread_execution_requests =
2180            self.capabilities.supports_single_thread_execution_requests;
2181        self.thread_states.continue_thread(thread_id);
2182        self.request(
2183            ContinueCommand {
2184                args: ContinueArguments {
2185                    thread_id: thread_id.0,
2186                    single_thread: supports_single_thread_execution_requests,
2187                },
2188            },
2189            Self::on_step_response::<ContinueCommand>(thread_id),
2190            cx,
2191        )
2192        .detach();
2193    }
2194
2195    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2196        match self.mode {
2197            SessionState::Running(ref local) => Some(local.client.clone()),
2198            SessionState::Booting(_) => None,
2199        }
2200    }
2201
2202    pub fn has_ever_stopped(&self) -> bool {
2203        self.mode.has_ever_stopped()
2204    }
2205    pub fn step_over(
2206        &mut self,
2207        thread_id: ThreadId,
2208        granularity: SteppingGranularity,
2209        cx: &mut Context<Self>,
2210    ) {
2211        let supports_single_thread_execution_requests =
2212            self.capabilities.supports_single_thread_execution_requests;
2213        let supports_stepping_granularity = self
2214            .capabilities
2215            .supports_stepping_granularity
2216            .unwrap_or_default();
2217
2218        let command = NextCommand {
2219            inner: StepCommand {
2220                thread_id: thread_id.0,
2221                granularity: supports_stepping_granularity.then(|| granularity),
2222                single_thread: supports_single_thread_execution_requests,
2223            },
2224        };
2225
2226        self.thread_states.process_step(thread_id);
2227        self.request(
2228            command,
2229            Self::on_step_response::<NextCommand>(thread_id),
2230            cx,
2231        )
2232        .detach();
2233    }
2234
2235    pub fn step_in(
2236        &mut self,
2237        thread_id: ThreadId,
2238        granularity: SteppingGranularity,
2239        cx: &mut Context<Self>,
2240    ) {
2241        let supports_single_thread_execution_requests =
2242            self.capabilities.supports_single_thread_execution_requests;
2243        let supports_stepping_granularity = self
2244            .capabilities
2245            .supports_stepping_granularity
2246            .unwrap_or_default();
2247
2248        let command = StepInCommand {
2249            inner: StepCommand {
2250                thread_id: thread_id.0,
2251                granularity: supports_stepping_granularity.then(|| granularity),
2252                single_thread: supports_single_thread_execution_requests,
2253            },
2254        };
2255
2256        self.thread_states.process_step(thread_id);
2257        self.request(
2258            command,
2259            Self::on_step_response::<StepInCommand>(thread_id),
2260            cx,
2261        )
2262        .detach();
2263    }
2264
2265    pub fn step_out(
2266        &mut self,
2267        thread_id: ThreadId,
2268        granularity: SteppingGranularity,
2269        cx: &mut Context<Self>,
2270    ) {
2271        let supports_single_thread_execution_requests =
2272            self.capabilities.supports_single_thread_execution_requests;
2273        let supports_stepping_granularity = self
2274            .capabilities
2275            .supports_stepping_granularity
2276            .unwrap_or_default();
2277
2278        let command = StepOutCommand {
2279            inner: StepCommand {
2280                thread_id: thread_id.0,
2281                granularity: supports_stepping_granularity.then(|| granularity),
2282                single_thread: supports_single_thread_execution_requests,
2283            },
2284        };
2285
2286        self.thread_states.process_step(thread_id);
2287        self.request(
2288            command,
2289            Self::on_step_response::<StepOutCommand>(thread_id),
2290            cx,
2291        )
2292        .detach();
2293    }
2294
2295    pub fn step_back(
2296        &mut self,
2297        thread_id: ThreadId,
2298        granularity: SteppingGranularity,
2299        cx: &mut Context<Self>,
2300    ) {
2301        let supports_single_thread_execution_requests =
2302            self.capabilities.supports_single_thread_execution_requests;
2303        let supports_stepping_granularity = self
2304            .capabilities
2305            .supports_stepping_granularity
2306            .unwrap_or_default();
2307
2308        let command = StepBackCommand {
2309            inner: StepCommand {
2310                thread_id: thread_id.0,
2311                granularity: supports_stepping_granularity.then(|| granularity),
2312                single_thread: supports_single_thread_execution_requests,
2313            },
2314        };
2315
2316        self.thread_states.process_step(thread_id);
2317
2318        self.request(
2319            command,
2320            Self::on_step_response::<StepBackCommand>(thread_id),
2321            cx,
2322        )
2323        .detach();
2324    }
2325
2326    pub fn stack_frames(
2327        &mut self,
2328        thread_id: ThreadId,
2329        cx: &mut Context<Self>,
2330    ) -> Result<Vec<StackFrame>> {
2331        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2332            && self.requests.contains_key(&ThreadsCommand.type_id())
2333            && self.threads.contains_key(&thread_id)
2334        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2335        // We could still be using an old thread id and have sent a new thread's request
2336        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2337        // But it very well could cause a minor bug in the future that is hard to track down
2338        {
2339            self.fetch(
2340                super::dap_command::StackTraceCommand {
2341                    thread_id: thread_id.0,
2342                    start_frame: None,
2343                    levels: None,
2344                },
2345                move |this, stack_frames, cx| {
2346                    let entry =
2347                        this.threads
2348                            .entry(thread_id)
2349                            .and_modify(|thread| match &stack_frames {
2350                                Ok(stack_frames) => {
2351                                    thread.stack_frames = stack_frames
2352                                        .iter()
2353                                        .cloned()
2354                                        .map(StackFrame::from)
2355                                        .collect();
2356                                    thread.stack_frames_error = None;
2357                                }
2358                                Err(error) => {
2359                                    thread.stack_frames.clear();
2360                                    thread.stack_frames_error = Some(error.cloned());
2361                                }
2362                            });
2363                    debug_assert!(
2364                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2365                        "Sent request for thread_id that doesn't exist"
2366                    );
2367                    if let Ok(stack_frames) = stack_frames {
2368                        this.stack_frames.extend(
2369                            stack_frames
2370                                .into_iter()
2371                                .filter(|frame| {
2372                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2373                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2374                                    !(frame.id == 0
2375                                        && frame.line == 0
2376                                        && frame.column == 0
2377                                        && frame.presentation_hint
2378                                            == Some(StackFramePresentationHint::Label))
2379                                })
2380                                .map(|frame| (frame.id, StackFrame::from(frame))),
2381                        );
2382                    }
2383
2384                    this.invalidate_command_type::<ScopesCommand>();
2385                    this.invalidate_command_type::<VariablesCommand>();
2386
2387                    cx.emit(SessionEvent::StackTrace);
2388                },
2389                cx,
2390            );
2391        }
2392
2393        match self.threads.get(&thread_id) {
2394            Some(thread) => {
2395                if let Some(error) = &thread.stack_frames_error {
2396                    Err(error.cloned())
2397                } else {
2398                    Ok(thread.stack_frames.clone())
2399                }
2400            }
2401            None => Ok(Vec::new()),
2402        }
2403    }
2404
2405    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2406        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2407            && self
2408                .requests
2409                .contains_key(&TypeId::of::<StackTraceCommand>())
2410        {
2411            self.fetch(
2412                ScopesCommand { stack_frame_id },
2413                move |this, scopes, cx| {
2414                    let Some(scopes) = scopes.log_err() else {
2415                        return
2416                    };
2417
2418                    for scope in scopes.iter() {
2419                        this.variables(scope.variables_reference, cx);
2420                    }
2421
2422                    let entry = this
2423                        .stack_frames
2424                        .entry(stack_frame_id)
2425                        .and_modify(|stack_frame| {
2426                            stack_frame.scopes = scopes;
2427                        });
2428
2429                    cx.emit(SessionEvent::Variables);
2430
2431                    debug_assert!(
2432                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2433                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2434                    );
2435                },
2436                cx,
2437            );
2438        }
2439
2440        self.stack_frames
2441            .get(&stack_frame_id)
2442            .map(|frame| frame.scopes.as_slice())
2443            .unwrap_or_default()
2444    }
2445
2446    pub fn variables_by_stack_frame_id(
2447        &self,
2448        stack_frame_id: StackFrameId,
2449        globals: bool,
2450        locals: bool,
2451    ) -> Vec<dap::Variable> {
2452        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2453            return Vec::new();
2454        };
2455
2456        stack_frame
2457            .scopes
2458            .iter()
2459            .filter(|scope| {
2460                (scope.name.to_lowercase().contains("local") && locals)
2461                    || (scope.name.to_lowercase().contains("global") && globals)
2462            })
2463            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2464            .flatten()
2465            .cloned()
2466            .collect()
2467    }
2468
2469    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2470        &self.watchers
2471    }
2472
2473    pub fn add_watcher(
2474        &mut self,
2475        expression: SharedString,
2476        frame_id: u64,
2477        cx: &mut Context<Self>,
2478    ) -> Task<Result<()>> {
2479        let request = self.mode.request_dap(EvaluateCommand {
2480            expression: expression.to_string(),
2481            context: Some(EvaluateArgumentsContext::Watch),
2482            frame_id: Some(frame_id),
2483            source: None,
2484        });
2485
2486        cx.spawn(async move |this, cx| {
2487            let response = request.await?;
2488
2489            this.update(cx, |session, cx| {
2490                session.watchers.insert(
2491                    expression.clone(),
2492                    Watcher {
2493                        expression,
2494                        value: response.result.into(),
2495                        variables_reference: response.variables_reference,
2496                        presentation_hint: response.presentation_hint,
2497                    },
2498                );
2499                cx.emit(SessionEvent::Watchers);
2500            })
2501        })
2502    }
2503
2504    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2505        let watches = self.watchers.clone();
2506        for (_, watch) in watches.into_iter() {
2507            self.add_watcher(watch.expression.clone(), frame_id, cx)
2508                .detach();
2509        }
2510    }
2511
2512    pub fn remove_watcher(&mut self, expression: SharedString) {
2513        self.watchers.remove(&expression);
2514    }
2515
2516    pub fn variables(
2517        &mut self,
2518        variables_reference: VariableReference,
2519        cx: &mut Context<Self>,
2520    ) -> Vec<dap::Variable> {
2521        let command = VariablesCommand {
2522            variables_reference,
2523            filter: None,
2524            start: None,
2525            count: None,
2526            format: None,
2527        };
2528
2529        self.fetch(
2530            command,
2531            move |this, variables, cx| {
2532                let Some(variables) = variables.log_err() else {
2533                    return;
2534                };
2535
2536                this.variables.insert(variables_reference, variables);
2537
2538                cx.emit(SessionEvent::Variables);
2539                cx.emit(SessionEvent::InvalidateInlineValue);
2540            },
2541            cx,
2542        );
2543
2544        self.variables
2545            .get(&variables_reference)
2546            .cloned()
2547            .unwrap_or_default()
2548    }
2549
2550    pub fn data_breakpoint_info(
2551        &mut self,
2552        context: Arc<DataBreakpointContext>,
2553        mode: Option<String>,
2554        cx: &mut Context<Self>,
2555    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2556        let command = DataBreakpointInfoCommand { context, mode };
2557
2558        self.request(command, |_, response, _| response.ok(), cx)
2559    }
2560
2561    pub fn set_variable_value(
2562        &mut self,
2563        stack_frame_id: u64,
2564        variables_reference: u64,
2565        name: String,
2566        value: String,
2567        cx: &mut Context<Self>,
2568    ) {
2569        if self.capabilities.supports_set_variable.unwrap_or_default() {
2570            self.request(
2571                SetVariableValueCommand {
2572                    name,
2573                    value,
2574                    variables_reference,
2575                },
2576                move |this, response, cx| {
2577                    let response = response.log_err()?;
2578                    this.invalidate_command_type::<VariablesCommand>();
2579                    this.invalidate_command_type::<ReadMemory>();
2580                    this.memory.clear(cx.background_executor());
2581                    this.refresh_watchers(stack_frame_id, cx);
2582                    cx.emit(SessionEvent::Variables);
2583                    Some(response)
2584                },
2585                cx,
2586            )
2587            .detach();
2588        }
2589    }
2590
2591    pub fn evaluate(
2592        &mut self,
2593        expression: String,
2594        context: Option<EvaluateArgumentsContext>,
2595        frame_id: Option<u64>,
2596        source: Option<Source>,
2597        cx: &mut Context<Self>,
2598    ) -> Task<()> {
2599        let event = dap::OutputEvent {
2600            category: None,
2601            output: format!("> {expression}"),
2602            group: None,
2603            variables_reference: None,
2604            source: None,
2605            line: None,
2606            column: None,
2607            data: None,
2608            location_reference: None,
2609        };
2610        self.push_output(event);
2611        let request = self.mode.request_dap(EvaluateCommand {
2612            expression,
2613            context,
2614            frame_id,
2615            source,
2616        });
2617        cx.spawn(async move |this, cx| {
2618            let response = request.await;
2619            this.update(cx, |this, cx| {
2620                this.memory.clear(cx.background_executor());
2621                this.invalidate_command_type::<ReadMemory>();
2622                match response {
2623                    Ok(response) => {
2624                        let event = dap::OutputEvent {
2625                            category: None,
2626                            output: format!("< {}", &response.result),
2627                            group: None,
2628                            variables_reference: Some(response.variables_reference),
2629                            source: None,
2630                            line: None,
2631                            column: None,
2632                            data: None,
2633                            location_reference: None,
2634                        };
2635                        this.push_output(event);
2636                    }
2637                    Err(e) => {
2638                        let event = dap::OutputEvent {
2639                            category: None,
2640                            output: format!("{}", e),
2641                            group: None,
2642                            variables_reference: None,
2643                            source: None,
2644                            line: None,
2645                            column: None,
2646                            data: None,
2647                            location_reference: None,
2648                        };
2649                        this.push_output(event);
2650                    }
2651                };
2652                cx.notify();
2653            })
2654            .ok();
2655        })
2656    }
2657
2658    pub fn location(
2659        &mut self,
2660        reference: u64,
2661        cx: &mut Context<Self>,
2662    ) -> Option<dap::LocationsResponse> {
2663        self.fetch(
2664            LocationsCommand { reference },
2665            move |this, response, _| {
2666                let Some(response) = response.log_err() else {
2667                    return;
2668                };
2669                this.locations.insert(reference, response);
2670            },
2671            cx,
2672        );
2673        self.locations.get(&reference).cloned()
2674    }
2675
2676    pub fn is_attached(&self) -> bool {
2677        let SessionState::Running(local_mode) = &self.mode else {
2678            return false;
2679        };
2680        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2681    }
2682
2683    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2684        let command = DisconnectCommand {
2685            restart: Some(false),
2686            terminate_debuggee: Some(false),
2687            suspend_debuggee: Some(false),
2688        };
2689
2690        self.request(command, Self::empty_response, cx).detach()
2691    }
2692
2693    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2694        if self
2695            .capabilities
2696            .supports_terminate_threads_request
2697            .unwrap_or_default()
2698        {
2699            self.request(
2700                TerminateThreadsCommand {
2701                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2702                },
2703                Self::clear_active_debug_line_response,
2704                cx,
2705            )
2706            .detach();
2707        } else {
2708            self.shutdown(cx).detach();
2709        }
2710    }
2711
2712    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2713        self.thread_states.thread_state(thread_id)
2714    }
2715
2716    pub fn quirks(&self) -> SessionQuirks {
2717        self.quirks
2718    }
2719}