session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
   3use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
   4
   5use super::breakpoint_store::{
   6    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   7};
   8use super::dap_command::{
   9    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
  10    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
  11    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  12    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
  13    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  14    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  15};
  16use super::dap_store::DapStore;
  17use anyhow::{Context as _, Result, anyhow};
  18use base64::Engine;
  19use collections::{HashMap, HashSet, IndexMap};
  20use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  21use dap::messages::Response;
  22use dap::requests::{Request, RunInTerminal, StartDebugging};
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::SinkExt;
  35use futures::channel::mpsc::UnboundedSender;
  36use futures::channel::{mpsc, oneshot};
  37use futures::{FutureExt, future::Shared};
  38use gpui::{
  39    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  40    Task, WeakEntity,
  41};
  42
  43use rpc::ErrorExt;
  44use serde_json::Value;
  45use smol::stream::StreamExt;
  46use std::any::TypeId;
  47use std::collections::BTreeMap;
  48use std::ops::RangeInclusive;
  49use std::u64;
  50use std::{
  51    any::Any,
  52    collections::hash_map::Entry,
  53    hash::{Hash, Hasher},
  54    path::Path,
  55    sync::Arc,
  56};
  57use task::TaskContext;
  58use text::{PointUtf16, ToPointUtf16};
  59use util::{ResultExt, debug_panic, maybe};
  60use worktree::Worktree;
  61
  62#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  63#[repr(transparent)]
  64pub struct ThreadId(pub i64);
  65
  66impl From<i64> for ThreadId {
  67    fn from(id: i64) -> Self {
  68        Self(id)
  69    }
  70}
  71
  72#[derive(Clone, Debug)]
  73pub struct StackFrame {
  74    pub dap: dap::StackFrame,
  75    pub scopes: Vec<dap::Scope>,
  76}
  77
  78impl From<dap::StackFrame> for StackFrame {
  79    fn from(stack_frame: dap::StackFrame) -> Self {
  80        Self {
  81            scopes: vec![],
  82            dap: stack_frame,
  83        }
  84    }
  85}
  86
  87#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  88pub enum ThreadStatus {
  89    #[default]
  90    Running,
  91    Stopped,
  92    Stepping,
  93    Exited,
  94    Ended,
  95}
  96
  97impl ThreadStatus {
  98    pub fn label(&self) -> &'static str {
  99        match self {
 100            ThreadStatus::Running => "Running",
 101            ThreadStatus::Stopped => "Stopped",
 102            ThreadStatus::Stepping => "Stepping",
 103            ThreadStatus::Exited => "Exited",
 104            ThreadStatus::Ended => "Ended",
 105        }
 106    }
 107}
 108
 109#[derive(Debug)]
 110pub struct Thread {
 111    dap: dap::Thread,
 112    stack_frames: Vec<StackFrame>,
 113    stack_frames_error: Option<anyhow::Error>,
 114    _has_stopped: bool,
 115}
 116
 117impl From<dap::Thread> for Thread {
 118    fn from(dap: dap::Thread) -> Self {
 119        Self {
 120            dap,
 121            stack_frames: Default::default(),
 122            stack_frames_error: None,
 123            _has_stopped: false,
 124        }
 125    }
 126}
 127
 128#[derive(Debug, Clone, PartialEq)]
 129pub struct Watcher {
 130    pub expression: SharedString,
 131    pub value: SharedString,
 132    pub variables_reference: u64,
 133    pub presentation_hint: Option<VariablePresentationHint>,
 134}
 135
 136#[derive(Debug, Clone, PartialEq)]
 137pub struct DataBreakpointState {
 138    pub dap: dap::DataBreakpoint,
 139    pub is_enabled: bool,
 140    pub context: Arc<DataBreakpointContext>,
 141}
 142
 143pub enum SessionState {
 144    /// Represents a session that is building/initializing
 145    /// even if a session doesn't have a pre build task this state
 146    /// is used to run all the async tasks that are required to start the session
 147    Booting(Option<Task<Result<()>>>),
 148    Running(RunningMode),
 149}
 150
 151#[derive(Clone)]
 152pub struct RunningMode {
 153    client: Arc<DebugAdapterClient>,
 154    binary: DebugAdapterBinary,
 155    tmp_breakpoint: Option<SourceBreakpoint>,
 156    worktree: WeakEntity<Worktree>,
 157    executor: BackgroundExecutor,
 158    is_started: bool,
 159    has_ever_stopped: bool,
 160    messages_tx: UnboundedSender<Message>,
 161}
 162
 163#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 164pub struct SessionQuirks {
 165    pub compact: bool,
 166    pub prefer_thread_name: bool,
 167}
 168
 169fn client_source(abs_path: &Path) -> dap::Source {
 170    dap::Source {
 171        name: abs_path
 172            .file_name()
 173            .map(|filename| filename.to_string_lossy().to_string()),
 174        path: Some(abs_path.to_string_lossy().to_string()),
 175        source_reference: None,
 176        presentation_hint: None,
 177        origin: None,
 178        sources: None,
 179        adapter_data: None,
 180        checksums: None,
 181    }
 182}
 183
 184impl RunningMode {
 185    async fn new(
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        worktree: WeakEntity<Worktree>,
 189        binary: DebugAdapterBinary,
 190        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 191        cx: &mut AsyncApp,
 192    ) -> Result<Self> {
 193        let message_handler = Box::new({
 194            let messages_tx = messages_tx.clone();
 195            move |message| {
 196                messages_tx.unbounded_send(message).ok();
 197            }
 198        });
 199
 200        let client = if let Some(client) = parent_session
 201            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 202            .flatten()
 203        {
 204            client
 205                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 206                .await?
 207        } else {
 208            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 209        };
 210
 211        Ok(Self {
 212            client: Arc::new(client),
 213            worktree,
 214            tmp_breakpoint: None,
 215            binary,
 216            executor: cx.background_executor().clone(),
 217            is_started: false,
 218            has_ever_stopped: false,
 219            messages_tx,
 220        })
 221    }
 222
 223    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 224        &self.worktree
 225    }
 226
 227    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 228        let tasks: Vec<_> = paths
 229            .into_iter()
 230            .map(|path| {
 231                self.request(dap_command::SetBreakpoints {
 232                    source: client_source(path),
 233                    source_modified: None,
 234                    breakpoints: vec![],
 235                })
 236            })
 237            .collect();
 238
 239        cx.background_spawn(async move {
 240            futures::future::join_all(tasks)
 241                .await
 242                .iter()
 243                .for_each(|res| match res {
 244                    Ok(_) => {}
 245                    Err(err) => {
 246                        log::warn!("Set breakpoints request failed: {}", err);
 247                    }
 248                });
 249        })
 250    }
 251
 252    fn send_breakpoints_from_path(
 253        &self,
 254        abs_path: Arc<Path>,
 255        reason: BreakpointUpdatedReason,
 256        breakpoint_store: &Entity<BreakpointStore>,
 257        cx: &mut App,
 258    ) -> Task<()> {
 259        let breakpoints =
 260            breakpoint_store
 261                .read(cx)
 262                .source_breakpoints_from_path(&abs_path, cx)
 263                .into_iter()
 264                .filter(|bp| bp.state.is_enabled())
 265                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 266                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 267                }))
 268                .map(Into::into)
 269                .collect();
 270
 271        let raw_breakpoints = breakpoint_store
 272            .read(cx)
 273            .breakpoints_from_path(&abs_path)
 274            .into_iter()
 275            .filter(|bp| bp.bp.state.is_enabled())
 276            .collect::<Vec<_>>();
 277
 278        let task = self.request(dap_command::SetBreakpoints {
 279            source: client_source(&abs_path),
 280            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 281            breakpoints,
 282        });
 283        let session_id = self.client.id();
 284        let breakpoint_store = breakpoint_store.downgrade();
 285        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 286            Ok(breakpoints) => {
 287                let breakpoints =
 288                    breakpoints
 289                        .into_iter()
 290                        .zip(raw_breakpoints)
 291                        .filter_map(|(dap_bp, zed_bp)| {
 292                            Some((
 293                                zed_bp,
 294                                BreakpointSessionState {
 295                                    id: dap_bp.id?,
 296                                    verified: dap_bp.verified,
 297                                },
 298                            ))
 299                        });
 300                breakpoint_store
 301                    .update(cx, |this, _| {
 302                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 303                    })
 304                    .ok();
 305            }
 306            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 307        })
 308    }
 309
 310    fn send_exception_breakpoints(
 311        &self,
 312        filters: Vec<ExceptionBreakpointsFilter>,
 313        supports_filter_options: bool,
 314    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 315        let arg = if supports_filter_options {
 316            SetExceptionBreakpoints::WithOptions {
 317                filters: filters
 318                    .into_iter()
 319                    .map(|filter| ExceptionFilterOptions {
 320                        filter_id: filter.filter,
 321                        condition: None,
 322                        mode: None,
 323                    })
 324                    .collect(),
 325            }
 326        } else {
 327            SetExceptionBreakpoints::Plain {
 328                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 329            }
 330        };
 331        self.request(arg)
 332    }
 333
 334    fn send_source_breakpoints(
 335        &self,
 336        ignore_breakpoints: bool,
 337        breakpoint_store: &Entity<BreakpointStore>,
 338        cx: &App,
 339    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 340        let mut breakpoint_tasks = Vec::new();
 341        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 342        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 343        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 344        let session_id = self.client.id();
 345        for (path, breakpoints) in breakpoints {
 346            let breakpoints = if ignore_breakpoints {
 347                vec![]
 348            } else {
 349                breakpoints
 350                    .into_iter()
 351                    .filter(|bp| bp.state.is_enabled())
 352                    .map(Into::into)
 353                    .collect()
 354            };
 355
 356            let raw_breakpoints = raw_breakpoints
 357                .remove(&path)
 358                .unwrap_or_default()
 359                .into_iter()
 360                .filter(|bp| bp.bp.state.is_enabled());
 361            let error_path = path.clone();
 362            let send_request = self
 363                .request(dap_command::SetBreakpoints {
 364                    source: client_source(&path),
 365                    source_modified: Some(false),
 366                    breakpoints,
 367                })
 368                .map(|result| result.map_err(move |e| (error_path, e)));
 369
 370            let task = cx.spawn({
 371                let breakpoint_store = breakpoint_store.downgrade();
 372                async move |cx| {
 373                    let breakpoints = cx.background_spawn(send_request).await?;
 374
 375                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 376                        |(dap_bp, zed_bp)| {
 377                            Some((
 378                                zed_bp,
 379                                BreakpointSessionState {
 380                                    id: dap_bp.id?,
 381                                    verified: dap_bp.verified,
 382                                },
 383                            ))
 384                        },
 385                    );
 386                    breakpoint_store
 387                        .update(cx, |this, _| {
 388                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 389                        })
 390                        .ok();
 391
 392                    Ok(())
 393                }
 394            });
 395            breakpoint_tasks.push(task);
 396        }
 397
 398        cx.background_spawn(async move {
 399            futures::future::join_all(breakpoint_tasks)
 400                .await
 401                .into_iter()
 402                .filter_map(Result::err)
 403                .collect::<HashMap<_, _>>()
 404        })
 405    }
 406
 407    fn initialize_sequence(
 408        &self,
 409        capabilities: &Capabilities,
 410        initialized_rx: oneshot::Receiver<()>,
 411        dap_store: WeakEntity<DapStore>,
 412        cx: &mut Context<Session>,
 413    ) -> Task<Result<()>> {
 414        let raw = self.binary.request_args.clone();
 415
 416        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 417        let launch = match raw.request {
 418            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 419                raw: raw.configuration,
 420            }),
 421            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 422                raw: raw.configuration,
 423            }),
 424        };
 425
 426        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 427        // From spec (on initialization sequence):
 428        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 429        //
 430        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 431        let should_send_exception_breakpoints = capabilities
 432            .exception_breakpoint_filters
 433            .as_ref()
 434            .map_or(false, |filters| !filters.is_empty())
 435            || !configuration_done_supported;
 436        let supports_exception_filters = capabilities
 437            .supports_exception_filter_options
 438            .unwrap_or_default();
 439        let this = self.clone();
 440        let worktree = self.worktree().clone();
 441        let mut filters = capabilities
 442            .exception_breakpoint_filters
 443            .clone()
 444            .unwrap_or_default();
 445        let configuration_sequence = cx.spawn({
 446            async move |session, cx| {
 447                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 448                let (breakpoint_store, adapter_defaults) =
 449                    dap_store.read_with(cx, |dap_store, _| {
 450                        (
 451                            dap_store.breakpoint_store().clone(),
 452                            dap_store.adapter_options(&adapter_name),
 453                        )
 454                    })?;
 455                initialized_rx.await?;
 456                let errors_by_path = cx
 457                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 458                    .await;
 459
 460                dap_store.update(cx, |_, cx| {
 461                    let Some(worktree) = worktree.upgrade() else {
 462                        return;
 463                    };
 464
 465                    for (path, error) in &errors_by_path {
 466                        log::error!("failed to set breakpoints for {path:?}: {error}");
 467                    }
 468
 469                    if let Some(failed_path) = errors_by_path.keys().next() {
 470                        let failed_path = failed_path
 471                            .strip_prefix(worktree.read(cx).abs_path())
 472                            .unwrap_or(failed_path)
 473                            .display();
 474                        let message = format!(
 475                            "Failed to set breakpoints for {failed_path}{}",
 476                            match errors_by_path.len() {
 477                                0 => unreachable!(),
 478                                1 => "".into(),
 479                                2 => " and 1 other path".into(),
 480                                n => format!(" and {} other paths", n - 1),
 481                            }
 482                        );
 483                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 484                    }
 485                })?;
 486
 487                if should_send_exception_breakpoints {
 488                    _ = session.update(cx, |this, _| {
 489                        filters.retain(|filter| {
 490                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 491                                defaults
 492                                    .exception_breakpoints
 493                                    .get(&filter.filter)
 494                                    .map(|options| options.enabled)
 495                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 496                            } else {
 497                                filter.default.unwrap_or_default()
 498                            };
 499                            this.exception_breakpoints
 500                                .entry(filter.filter.clone())
 501                                .or_insert_with(|| (filter.clone(), is_enabled));
 502                            is_enabled
 503                        });
 504                    });
 505
 506                    this.send_exception_breakpoints(filters, supports_exception_filters)
 507                        .await
 508                        .ok();
 509                }
 510
 511                let ret = if configuration_done_supported {
 512                    this.request(ConfigurationDone {})
 513                } else {
 514                    Task::ready(Ok(()))
 515                }
 516                .await;
 517                ret
 518            }
 519        });
 520
 521        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 522
 523        cx.spawn(async move |this, cx| {
 524            let result = task.await;
 525
 526            this.update(cx, |this, cx| {
 527                if let Some(this) = this.as_running_mut() {
 528                    this.is_started = true;
 529                    cx.notify();
 530                }
 531            })
 532            .ok();
 533
 534            result?;
 535            anyhow::Ok(())
 536        })
 537    }
 538
 539    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 540        let client = self.client.clone();
 541        let messages_tx = self.messages_tx.clone();
 542        let message_handler = Box::new(move |message| {
 543            messages_tx.unbounded_send(message).ok();
 544        });
 545        if client.should_reconnect_for_ssh() {
 546            Some(cx.spawn(async move |cx| {
 547                client.connect(message_handler, cx).await?;
 548                anyhow::Ok(())
 549            }))
 550        } else {
 551            None
 552        }
 553    }
 554
 555    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 556    where
 557        <R::DapRequest as dap::requests::Request>::Response: 'static,
 558        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 559    {
 560        let request = Arc::new(request);
 561
 562        let request_clone = request.clone();
 563        let connection = self.client.clone();
 564        self.executor.spawn(async move {
 565            let args = request_clone.to_dap();
 566            let response = connection.request::<R::DapRequest>(args).await?;
 567            request.response_from_dap(response)
 568        })
 569    }
 570}
 571
 572impl SessionState {
 573    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 574    where
 575        <R::DapRequest as dap::requests::Request>::Response: 'static,
 576        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 577    {
 578        match self {
 579            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 580            SessionState::Booting(_) => Task::ready(Err(anyhow!(
 581                "no adapter running to send request: {request:?}"
 582            ))),
 583        }
 584    }
 585
 586    /// Did this debug session stop at least once?
 587    pub(crate) fn has_ever_stopped(&self) -> bool {
 588        match self {
 589            SessionState::Booting(_) => false,
 590            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 591        }
 592    }
 593
 594    fn stopped(&mut self) {
 595        if let SessionState::Running(running) = self {
 596            running.has_ever_stopped = true;
 597        }
 598    }
 599}
 600
 601#[derive(Default)]
 602struct ThreadStates {
 603    global_state: Option<ThreadStatus>,
 604    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 605}
 606
 607impl ThreadStates {
 608    fn stop_all_threads(&mut self) {
 609        self.global_state = Some(ThreadStatus::Stopped);
 610        self.known_thread_states.clear();
 611    }
 612
 613    fn exit_all_threads(&mut self) {
 614        self.global_state = Some(ThreadStatus::Exited);
 615        self.known_thread_states.clear();
 616    }
 617
 618    fn continue_all_threads(&mut self) {
 619        self.global_state = Some(ThreadStatus::Running);
 620        self.known_thread_states.clear();
 621    }
 622
 623    fn stop_thread(&mut self, thread_id: ThreadId) {
 624        self.known_thread_states
 625            .insert(thread_id, ThreadStatus::Stopped);
 626    }
 627
 628    fn continue_thread(&mut self, thread_id: ThreadId) {
 629        self.known_thread_states
 630            .insert(thread_id, ThreadStatus::Running);
 631    }
 632
 633    fn process_step(&mut self, thread_id: ThreadId) {
 634        self.known_thread_states
 635            .insert(thread_id, ThreadStatus::Stepping);
 636    }
 637
 638    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 639        self.thread_state(thread_id)
 640            .unwrap_or(ThreadStatus::Running)
 641    }
 642
 643    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 644        self.known_thread_states
 645            .get(&thread_id)
 646            .copied()
 647            .or(self.global_state)
 648    }
 649
 650    fn exit_thread(&mut self, thread_id: ThreadId) {
 651        self.known_thread_states
 652            .insert(thread_id, ThreadStatus::Exited);
 653    }
 654
 655    fn any_stopped_thread(&self) -> bool {
 656        self.global_state
 657            .is_some_and(|state| state == ThreadStatus::Stopped)
 658            || self
 659                .known_thread_states
 660                .values()
 661                .any(|status| *status == ThreadStatus::Stopped)
 662    }
 663}
 664const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 665
 666type IsEnabled = bool;
 667
 668#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 669pub struct OutputToken(pub usize);
 670/// Represents a current state of a single debug adapter and provides ways to mutate it.
 671pub struct Session {
 672    pub mode: SessionState,
 673    id: SessionId,
 674    label: Option<SharedString>,
 675    adapter: DebugAdapterName,
 676    pub(super) capabilities: Capabilities,
 677    child_session_ids: HashSet<SessionId>,
 678    parent_session: Option<Entity<Session>>,
 679    modules: Vec<dap::Module>,
 680    loaded_sources: Vec<dap::Source>,
 681    output_token: OutputToken,
 682    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 683    threads: IndexMap<ThreadId, Thread>,
 684    thread_states: ThreadStates,
 685    watchers: HashMap<SharedString, Watcher>,
 686    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 687    stack_frames: IndexMap<StackFrameId, StackFrame>,
 688    locations: HashMap<u64, dap::LocationsResponse>,
 689    is_session_terminated: bool,
 690    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 691    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 692    ignore_breakpoints: bool,
 693    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 694    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 695    background_tasks: Vec<Task<()>>,
 696    restart_task: Option<Task<()>>,
 697    task_context: TaskContext,
 698    memory: memory::Memory,
 699    quirks: SessionQuirks,
 700}
 701
 702trait CacheableCommand: Any + Send + Sync {
 703    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 704    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 705    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 706}
 707
 708impl<T> CacheableCommand for T
 709where
 710    T: LocalDapCommand + PartialEq + Eq + Hash,
 711{
 712    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 713        (rhs as &dyn Any)
 714            .downcast_ref::<Self>()
 715            .map_or(false, |rhs| self == rhs)
 716    }
 717
 718    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 719        T::hash(self, &mut hasher);
 720    }
 721
 722    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 723        self
 724    }
 725}
 726
 727pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 728
 729impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 730    fn from(request: T) -> Self {
 731        Self(Arc::new(request))
 732    }
 733}
 734
 735impl PartialEq for RequestSlot {
 736    fn eq(&self, other: &Self) -> bool {
 737        self.0.dyn_eq(other.0.as_ref())
 738    }
 739}
 740
 741impl Eq for RequestSlot {}
 742
 743impl Hash for RequestSlot {
 744    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 745        self.0.dyn_hash(state);
 746        (&*self.0 as &dyn Any).type_id().hash(state)
 747    }
 748}
 749
 750#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 751pub struct CompletionsQuery {
 752    pub query: String,
 753    pub column: u64,
 754    pub line: Option<u64>,
 755    pub frame_id: Option<u64>,
 756}
 757
 758impl CompletionsQuery {
 759    pub fn new(
 760        buffer: &language::Buffer,
 761        cursor_position: language::Anchor,
 762        frame_id: Option<u64>,
 763    ) -> Self {
 764        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 765        Self {
 766            query: buffer.text(),
 767            column: column as u64,
 768            frame_id,
 769            line: Some(row as u64),
 770        }
 771    }
 772}
 773
 774#[derive(Debug)]
 775pub enum SessionEvent {
 776    Modules,
 777    LoadedSources,
 778    Stopped(Option<ThreadId>),
 779    StackTrace,
 780    Variables,
 781    Watchers,
 782    Threads,
 783    InvalidateInlineValue,
 784    CapabilitiesLoaded,
 785    RunInTerminal {
 786        request: RunInTerminalRequestArguments,
 787        sender: mpsc::Sender<Result<u32>>,
 788    },
 789    DataBreakpointInfo,
 790    ConsoleOutput,
 791}
 792
 793#[derive(Clone, Debug, PartialEq, Eq)]
 794pub enum SessionStateEvent {
 795    Running,
 796    Shutdown,
 797    Restart,
 798    SpawnChildSession {
 799        request: StartDebuggingRequestArguments,
 800    },
 801}
 802
 803impl EventEmitter<SessionEvent> for Session {}
 804impl EventEmitter<SessionStateEvent> for Session {}
 805
 806// local session will send breakpoint updates to DAP for all new breakpoints
 807// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 808// BreakpointStore notifies session on breakpoint changes
 809impl Session {
 810    pub(crate) fn new(
 811        breakpoint_store: Entity<BreakpointStore>,
 812        session_id: SessionId,
 813        parent_session: Option<Entity<Session>>,
 814        label: Option<SharedString>,
 815        adapter: DebugAdapterName,
 816        task_context: TaskContext,
 817        quirks: SessionQuirks,
 818        cx: &mut App,
 819    ) -> Entity<Self> {
 820        cx.new::<Self>(|cx| {
 821            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 822                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 823                    if let Some(local) = (!this.ignore_breakpoints)
 824                        .then(|| this.as_running_mut())
 825                        .flatten()
 826                    {
 827                        local
 828                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 829                            .detach();
 830                    };
 831                }
 832                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 833                    if let Some(local) = (!this.ignore_breakpoints)
 834                        .then(|| this.as_running_mut())
 835                        .flatten()
 836                    {
 837                        local.unset_breakpoints_from_paths(paths, cx).detach();
 838                    }
 839                }
 840                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 841            })
 842            .detach();
 843
 844            let this = Self {
 845                mode: SessionState::Booting(None),
 846                id: session_id,
 847                child_session_ids: HashSet::default(),
 848                parent_session,
 849                capabilities: Capabilities::default(),
 850                watchers: HashMap::default(),
 851                variables: Default::default(),
 852                stack_frames: Default::default(),
 853                thread_states: ThreadStates::default(),
 854                output_token: OutputToken(0),
 855                output: circular_buffer::CircularBuffer::boxed(),
 856                requests: HashMap::default(),
 857                modules: Vec::default(),
 858                loaded_sources: Vec::default(),
 859                threads: IndexMap::default(),
 860                background_tasks: Vec::default(),
 861                restart_task: None,
 862                locations: Default::default(),
 863                is_session_terminated: false,
 864                ignore_breakpoints: false,
 865                breakpoint_store,
 866                data_breakpoints: Default::default(),
 867                exception_breakpoints: Default::default(),
 868                label,
 869                adapter,
 870                task_context,
 871                memory: memory::Memory::new(),
 872                quirks,
 873            };
 874
 875            this
 876        })
 877    }
 878
 879    pub fn task_context(&self) -> &TaskContext {
 880        &self.task_context
 881    }
 882
 883    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 884        match &self.mode {
 885            SessionState::Booting(_) => None,
 886            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 887        }
 888    }
 889
 890    pub fn boot(
 891        &mut self,
 892        binary: DebugAdapterBinary,
 893        worktree: Entity<Worktree>,
 894        dap_store: WeakEntity<DapStore>,
 895        cx: &mut Context<Self>,
 896    ) -> Task<Result<()>> {
 897        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 898        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 899
 900        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 901            let mut initialized_tx = Some(initialized_tx);
 902            while let Some(message) = message_rx.next().await {
 903                if let Message::Event(event) = message {
 904                    if let Events::Initialized(_) = *event {
 905                        if let Some(tx) = initialized_tx.take() {
 906                            tx.send(()).ok();
 907                        }
 908                    } else {
 909                        let Ok(_) = this.update(cx, |session, cx| {
 910                            session.handle_dap_event(event, cx);
 911                        }) else {
 912                            break;
 913                        };
 914                    }
 915                } else if let Message::Request(request) = message {
 916                    let Ok(_) = this.update(cx, |this, cx| {
 917                        if request.command == StartDebugging::COMMAND {
 918                            this.handle_start_debugging_request(request, cx)
 919                                .detach_and_log_err(cx);
 920                        } else if request.command == RunInTerminal::COMMAND {
 921                            this.handle_run_in_terminal_request(request, cx)
 922                                .detach_and_log_err(cx);
 923                        }
 924                    }) else {
 925                        break;
 926                    };
 927                }
 928            }
 929        })];
 930        self.background_tasks = background_tasks;
 931        let id = self.id;
 932        let parent_session = self.parent_session.clone();
 933
 934        cx.spawn(async move |this, cx| {
 935            let mode = RunningMode::new(
 936                id,
 937                parent_session,
 938                worktree.downgrade(),
 939                binary.clone(),
 940                message_tx,
 941                cx,
 942            )
 943            .await?;
 944            this.update(cx, |this, cx| {
 945                match &mut this.mode {
 946                    SessionState::Booting(task) if task.is_some() => {
 947                        task.take().unwrap().detach_and_log_err(cx);
 948                    }
 949                    SessionState::Booting(_) => {}
 950                    SessionState::Running(_) => {
 951                        debug_panic!("Attempting to boot a session that is already running");
 952                    }
 953                };
 954                this.mode = SessionState::Running(mode);
 955                cx.emit(SessionStateEvent::Running);
 956            })?;
 957
 958            this.update(cx, |session, cx| session.request_initialize(cx))?
 959                .await?;
 960
 961            let result = this
 962                .update(cx, |session, cx| {
 963                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 964                })?
 965                .await;
 966
 967            if result.is_err() {
 968                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 969
 970                console
 971                    .send(format!(
 972                        "Tried to launch debugger with: {}",
 973                        serde_json::to_string_pretty(&binary.request_args.configuration)
 974                            .unwrap_or_default(),
 975                    ))
 976                    .await
 977                    .ok();
 978            }
 979
 980            result
 981        })
 982    }
 983
 984    pub fn session_id(&self) -> SessionId {
 985        self.id
 986    }
 987
 988    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 989        self.child_session_ids.clone()
 990    }
 991
 992    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 993        self.child_session_ids.insert(session_id);
 994    }
 995
 996    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 997        self.child_session_ids.remove(&session_id);
 998    }
 999
1000    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1001        self.parent_session
1002            .as_ref()
1003            .map(|session| session.read(cx).id)
1004    }
1005
1006    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1007        self.parent_session.as_ref()
1008    }
1009
1010    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1011        let Some(client) = self.adapter_client() else {
1012            return Task::ready(());
1013        };
1014
1015        let supports_terminate = self
1016            .capabilities
1017            .support_terminate_debuggee
1018            .unwrap_or(false);
1019
1020        cx.background_spawn(async move {
1021            if supports_terminate {
1022                client
1023                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1024                        restart: Some(false),
1025                    })
1026                    .await
1027                    .ok();
1028            } else {
1029                client
1030                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1031                        restart: Some(false),
1032                        terminate_debuggee: Some(true),
1033                        suspend_debuggee: Some(false),
1034                    })
1035                    .await
1036                    .ok();
1037            }
1038        })
1039    }
1040
1041    pub fn capabilities(&self) -> &Capabilities {
1042        &self.capabilities
1043    }
1044
1045    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1046        match &self.mode {
1047            SessionState::Booting(_) => None,
1048            SessionState::Running(running_mode) => Some(&running_mode.binary),
1049        }
1050    }
1051
1052    pub fn adapter(&self) -> DebugAdapterName {
1053        self.adapter.clone()
1054    }
1055
1056    pub fn label(&self) -> Option<SharedString> {
1057        self.label.clone()
1058    }
1059
1060    pub fn is_terminated(&self) -> bool {
1061        self.is_session_terminated
1062    }
1063
1064    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1065        let (tx, mut rx) = mpsc::unbounded();
1066
1067        cx.spawn(async move |this, cx| {
1068            while let Some(output) = rx.next().await {
1069                this.update(cx, |this, _| {
1070                    let event = dap::OutputEvent {
1071                        category: None,
1072                        output,
1073                        group: None,
1074                        variables_reference: None,
1075                        source: None,
1076                        line: None,
1077                        column: None,
1078                        data: None,
1079                        location_reference: None,
1080                    };
1081                    this.push_output(event);
1082                })?;
1083            }
1084            anyhow::Ok(())
1085        })
1086        .detach();
1087
1088        return tx;
1089    }
1090
1091    pub fn is_started(&self) -> bool {
1092        match &self.mode {
1093            SessionState::Booting(_) => false,
1094            SessionState::Running(running) => running.is_started,
1095        }
1096    }
1097
1098    pub fn is_building(&self) -> bool {
1099        matches!(self.mode, SessionState::Booting(_))
1100    }
1101
1102    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1103        match &mut self.mode {
1104            SessionState::Running(local_mode) => Some(local_mode),
1105            SessionState::Booting(_) => None,
1106        }
1107    }
1108
1109    pub fn as_running(&self) -> Option<&RunningMode> {
1110        match &self.mode {
1111            SessionState::Running(local_mode) => Some(local_mode),
1112            SessionState::Booting(_) => None,
1113        }
1114    }
1115
1116    fn handle_start_debugging_request(
1117        &mut self,
1118        request: dap::messages::Request,
1119        cx: &mut Context<Self>,
1120    ) -> Task<Result<()>> {
1121        let request_seq = request.seq;
1122
1123        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1124            .arguments
1125            .as_ref()
1126            .map(|value| serde_json::from_value(value.clone()));
1127
1128        let mut success = true;
1129        if let Some(Ok(request)) = launch_request {
1130            cx.emit(SessionStateEvent::SpawnChildSession { request });
1131        } else {
1132            log::error!(
1133                "Failed to parse launch request arguments: {:?}",
1134                request.arguments
1135            );
1136            success = false;
1137        }
1138
1139        cx.spawn(async move |this, cx| {
1140            this.update(cx, |this, cx| {
1141                this.respond_to_client(
1142                    request_seq,
1143                    success,
1144                    StartDebugging::COMMAND.to_string(),
1145                    None,
1146                    cx,
1147                )
1148            })?
1149            .await
1150        })
1151    }
1152
1153    fn handle_run_in_terminal_request(
1154        &mut self,
1155        request: dap::messages::Request,
1156        cx: &mut Context<Self>,
1157    ) -> Task<Result<()>> {
1158        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1159            request.arguments.unwrap_or_default(),
1160        ) {
1161            Ok(args) => args,
1162            Err(error) => {
1163                return cx.spawn(async move |session, cx| {
1164                    let error = serde_json::to_value(dap::ErrorResponse {
1165                        error: Some(dap::Message {
1166                            id: request.seq,
1167                            format: error.to_string(),
1168                            variables: None,
1169                            send_telemetry: None,
1170                            show_user: None,
1171                            url: None,
1172                            url_label: None,
1173                        }),
1174                    })
1175                    .ok();
1176
1177                    session
1178                        .update(cx, |this, cx| {
1179                            this.respond_to_client(
1180                                request.seq,
1181                                false,
1182                                StartDebugging::COMMAND.to_string(),
1183                                error,
1184                                cx,
1185                            )
1186                        })?
1187                        .await?;
1188
1189                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1190                });
1191            }
1192        };
1193
1194        let seq = request.seq;
1195
1196        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1197        cx.emit(SessionEvent::RunInTerminal {
1198            request: request_args,
1199            sender: tx,
1200        });
1201        cx.notify();
1202
1203        cx.spawn(async move |session, cx| {
1204            let result = util::maybe!(async move {
1205                rx.next().await.ok_or_else(|| {
1206                    anyhow!("failed to receive response from spawn terminal".to_string())
1207                })?
1208            })
1209            .await;
1210            let (success, body) = match result {
1211                Ok(pid) => (
1212                    true,
1213                    serde_json::to_value(dap::RunInTerminalResponse {
1214                        process_id: None,
1215                        shell_process_id: Some(pid as u64),
1216                    })
1217                    .ok(),
1218                ),
1219                Err(error) => (
1220                    false,
1221                    serde_json::to_value(dap::ErrorResponse {
1222                        error: Some(dap::Message {
1223                            id: seq,
1224                            format: error.to_string(),
1225                            variables: None,
1226                            send_telemetry: None,
1227                            show_user: None,
1228                            url: None,
1229                            url_label: None,
1230                        }),
1231                    })
1232                    .ok(),
1233                ),
1234            };
1235
1236            session
1237                .update(cx, |session, cx| {
1238                    session.respond_to_client(
1239                        seq,
1240                        success,
1241                        RunInTerminal::COMMAND.to_string(),
1242                        body,
1243                        cx,
1244                    )
1245                })?
1246                .await
1247        })
1248    }
1249
1250    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1251        let adapter_id = self.adapter().to_string();
1252        let request = Initialize { adapter_id };
1253
1254        let SessionState::Running(running) = &self.mode else {
1255            return Task::ready(Err(anyhow!(
1256                "Cannot send initialize request, task still building"
1257            )));
1258        };
1259        let mut response = running.request(request.clone());
1260
1261        cx.spawn(async move |this, cx| {
1262            loop {
1263                let capabilities = response.await;
1264                match capabilities {
1265                    Err(e) => {
1266                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1267                            this.as_running()
1268                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1269                        }) else {
1270                            return Err(e);
1271                        };
1272                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1273                        reconnect.await?;
1274
1275                        let Ok(Some(r)) = this.update(cx, |this, _| {
1276                            this.as_running()
1277                                .map(|running| running.request(request.clone()))
1278                        }) else {
1279                            return Err(e);
1280                        };
1281                        response = r
1282                    }
1283                    Ok(capabilities) => {
1284                        this.update(cx, |session, cx| {
1285                            session.capabilities = capabilities;
1286
1287                            cx.emit(SessionEvent::CapabilitiesLoaded);
1288                        })?;
1289                        return Ok(());
1290                    }
1291                }
1292            }
1293        })
1294    }
1295
1296    pub(super) fn initialize_sequence(
1297        &mut self,
1298        initialize_rx: oneshot::Receiver<()>,
1299        dap_store: WeakEntity<DapStore>,
1300        cx: &mut Context<Self>,
1301    ) -> Task<Result<()>> {
1302        match &self.mode {
1303            SessionState::Running(local_mode) => {
1304                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1305            }
1306            SessionState::Booting(_) => {
1307                Task::ready(Err(anyhow!("cannot initialize, still building")))
1308            }
1309        }
1310    }
1311
1312    pub fn run_to_position(
1313        &mut self,
1314        breakpoint: SourceBreakpoint,
1315        active_thread_id: ThreadId,
1316        cx: &mut Context<Self>,
1317    ) {
1318        match &mut self.mode {
1319            SessionState::Running(local_mode) => {
1320                if !matches!(
1321                    self.thread_states.thread_state(active_thread_id),
1322                    Some(ThreadStatus::Stopped)
1323                ) {
1324                    return;
1325                };
1326                let path = breakpoint.path.clone();
1327                local_mode.tmp_breakpoint = Some(breakpoint);
1328                let task = local_mode.send_breakpoints_from_path(
1329                    path,
1330                    BreakpointUpdatedReason::Toggled,
1331                    &self.breakpoint_store,
1332                    cx,
1333                );
1334
1335                cx.spawn(async move |this, cx| {
1336                    task.await;
1337                    this.update(cx, |this, cx| {
1338                        this.continue_thread(active_thread_id, cx);
1339                    })
1340                })
1341                .detach();
1342            }
1343            SessionState::Booting(_) => {}
1344        }
1345    }
1346
1347    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1348        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1349    }
1350
1351    pub fn output(
1352        &self,
1353        since: OutputToken,
1354    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1355        if self.output_token.0 == 0 {
1356            return (self.output.range(0..0), OutputToken(0));
1357        };
1358
1359        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1360
1361        let clamped_events_since = events_since.clamp(0, self.output.len());
1362        (
1363            self.output
1364                .range(self.output.len() - clamped_events_since..),
1365            self.output_token,
1366        )
1367    }
1368
1369    pub fn respond_to_client(
1370        &self,
1371        request_seq: u64,
1372        success: bool,
1373        command: String,
1374        body: Option<serde_json::Value>,
1375        cx: &mut Context<Self>,
1376    ) -> Task<Result<()>> {
1377        let Some(local_session) = self.as_running() else {
1378            unreachable!("Cannot respond to remote client");
1379        };
1380        let client = local_session.client.clone();
1381
1382        cx.background_spawn(async move {
1383            client
1384                .send_message(Message::Response(Response {
1385                    body,
1386                    success,
1387                    command,
1388                    seq: request_seq + 1,
1389                    request_seq,
1390                    message: None,
1391                }))
1392                .await
1393        })
1394    }
1395
1396    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1397        self.mode.stopped();
1398        // todo(debugger): Find a clean way to get around the clone
1399        let breakpoint_store = self.breakpoint_store.clone();
1400        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1401            let breakpoint = local.tmp_breakpoint.take()?;
1402            let path = breakpoint.path.clone();
1403            Some((local, path))
1404        }) {
1405            local
1406                .send_breakpoints_from_path(
1407                    path,
1408                    BreakpointUpdatedReason::Toggled,
1409                    &breakpoint_store,
1410                    cx,
1411                )
1412                .detach();
1413        };
1414
1415        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1416            self.thread_states.stop_all_threads();
1417            self.invalidate_command_type::<StackTraceCommand>();
1418        }
1419
1420        // Event if we stopped all threads we still need to insert the thread_id
1421        // to our own data
1422        if let Some(thread_id) = event.thread_id {
1423            self.thread_states.stop_thread(ThreadId(thread_id));
1424
1425            self.invalidate_state(
1426                &StackTraceCommand {
1427                    thread_id,
1428                    start_frame: None,
1429                    levels: None,
1430                }
1431                .into(),
1432            );
1433        }
1434
1435        self.invalidate_generic();
1436        self.threads.clear();
1437        self.variables.clear();
1438        cx.emit(SessionEvent::Stopped(
1439            event
1440                .thread_id
1441                .map(Into::into)
1442                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1443        ));
1444        cx.emit(SessionEvent::InvalidateInlineValue);
1445        cx.notify();
1446    }
1447
1448    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1449        match *event {
1450            Events::Initialized(_) => {
1451                debug_assert!(
1452                    false,
1453                    "Initialized event should have been handled in LocalMode"
1454                );
1455            }
1456            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1457            Events::Continued(event) => {
1458                if event.all_threads_continued.unwrap_or_default() {
1459                    self.thread_states.continue_all_threads();
1460                    self.breakpoint_store.update(cx, |store, cx| {
1461                        store.remove_active_position(Some(self.session_id()), cx)
1462                    });
1463                } else {
1464                    self.thread_states
1465                        .continue_thread(ThreadId(event.thread_id));
1466                }
1467                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1468                self.invalidate_generic();
1469            }
1470            Events::Exited(_event) => {
1471                self.clear_active_debug_line(cx);
1472            }
1473            Events::Terminated(_) => {
1474                self.shutdown(cx).detach();
1475            }
1476            Events::Thread(event) => {
1477                let thread_id = ThreadId(event.thread_id);
1478
1479                match event.reason {
1480                    dap::ThreadEventReason::Started => {
1481                        self.thread_states.continue_thread(thread_id);
1482                    }
1483                    dap::ThreadEventReason::Exited => {
1484                        self.thread_states.exit_thread(thread_id);
1485                    }
1486                    reason => {
1487                        log::error!("Unhandled thread event reason {:?}", reason);
1488                    }
1489                }
1490                self.invalidate_state(&ThreadsCommand.into());
1491                cx.notify();
1492            }
1493            Events::Output(event) => {
1494                if event
1495                    .category
1496                    .as_ref()
1497                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1498                {
1499                    return;
1500                }
1501
1502                self.push_output(event);
1503                cx.notify();
1504            }
1505            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1506                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1507            }),
1508            Events::Module(event) => {
1509                match event.reason {
1510                    dap::ModuleEventReason::New => {
1511                        self.modules.push(event.module);
1512                    }
1513                    dap::ModuleEventReason::Changed => {
1514                        if let Some(module) = self
1515                            .modules
1516                            .iter_mut()
1517                            .find(|other| event.module.id == other.id)
1518                        {
1519                            *module = event.module;
1520                        }
1521                    }
1522                    dap::ModuleEventReason::Removed => {
1523                        self.modules.retain(|other| event.module.id != other.id);
1524                    }
1525                }
1526
1527                // todo(debugger): We should only send the invalidate command to downstream clients.
1528                // self.invalidate_state(&ModulesCommand.into());
1529            }
1530            Events::LoadedSource(_) => {
1531                self.invalidate_state(&LoadedSourcesCommand.into());
1532            }
1533            Events::Capabilities(event) => {
1534                self.capabilities = self.capabilities.merge(event.capabilities);
1535
1536                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1537                let recent_filters = self
1538                    .capabilities
1539                    .exception_breakpoint_filters
1540                    .iter()
1541                    .flatten()
1542                    .map(|filter| (filter.filter.clone(), filter.clone()))
1543                    .collect::<BTreeMap<_, _>>();
1544                for filter in recent_filters.values() {
1545                    let default = filter.default.unwrap_or_default();
1546                    self.exception_breakpoints
1547                        .entry(filter.filter.clone())
1548                        .or_insert_with(|| (filter.clone(), default));
1549                }
1550                self.exception_breakpoints
1551                    .retain(|k, _| recent_filters.contains_key(k));
1552                if self.is_started() {
1553                    self.send_exception_breakpoints(cx);
1554                }
1555
1556                // Remove the ones that no longer exist.
1557                cx.notify();
1558            }
1559            Events::Memory(_) => {}
1560            Events::Process(_) => {}
1561            Events::ProgressEnd(_) => {}
1562            Events::ProgressStart(_) => {}
1563            Events::ProgressUpdate(_) => {}
1564            Events::Invalidated(_) => {}
1565            Events::Other(_) => {}
1566        }
1567    }
1568
1569    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1570    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1571        &mut self,
1572        request: T,
1573        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1574        cx: &mut Context<Self>,
1575    ) {
1576        const {
1577            assert!(
1578                T::CACHEABLE,
1579                "Only requests marked as cacheable should invoke `fetch`"
1580            );
1581        }
1582
1583        if !self.thread_states.any_stopped_thread()
1584            && request.type_id() != TypeId::of::<ThreadsCommand>()
1585            || self.is_session_terminated
1586        {
1587            return;
1588        }
1589
1590        let request_map = self
1591            .requests
1592            .entry(std::any::TypeId::of::<T>())
1593            .or_default();
1594
1595        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1596            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1597
1598            let task = Self::request_inner::<Arc<T>>(
1599                &self.capabilities,
1600                &self.mode,
1601                command,
1602                |this, result, cx| {
1603                    process_result(this, result, cx);
1604                    None
1605                },
1606                cx,
1607            );
1608            let task = cx
1609                .background_executor()
1610                .spawn(async move {
1611                    let _ = task.await?;
1612                    Some(())
1613                })
1614                .shared();
1615
1616            vacant.insert(task);
1617            cx.notify();
1618        }
1619    }
1620
1621    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1622        capabilities: &Capabilities,
1623        mode: &SessionState,
1624        request: T,
1625        process_result: impl FnOnce(
1626            &mut Self,
1627            Result<T::Response>,
1628            &mut Context<Self>,
1629        ) -> Option<T::Response>
1630        + 'static,
1631        cx: &mut Context<Self>,
1632    ) -> Task<Option<T::Response>> {
1633        if !T::is_supported(capabilities) {
1634            log::warn!(
1635                "Attempted to send a DAP request that isn't supported: {:?}",
1636                request
1637            );
1638            let error = Err(anyhow::Error::msg(
1639                "Couldn't complete request because it's not supported",
1640            ));
1641            return cx.spawn(async move |this, cx| {
1642                this.update(cx, |this, cx| process_result(this, error, cx))
1643                    .ok()
1644                    .flatten()
1645            });
1646        }
1647
1648        let request = mode.request_dap(request);
1649        cx.spawn(async move |this, cx| {
1650            let result = request.await;
1651            this.update(cx, |this, cx| process_result(this, result, cx))
1652                .ok()
1653                .flatten()
1654        })
1655    }
1656
1657    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1658        &self,
1659        request: T,
1660        process_result: impl FnOnce(
1661            &mut Self,
1662            Result<T::Response>,
1663            &mut Context<Self>,
1664        ) -> Option<T::Response>
1665        + 'static,
1666        cx: &mut Context<Self>,
1667    ) -> Task<Option<T::Response>> {
1668        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1669    }
1670
1671    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1672        self.requests.remove(&std::any::TypeId::of::<Command>());
1673    }
1674
1675    fn invalidate_generic(&mut self) {
1676        self.invalidate_command_type::<ModulesCommand>();
1677        self.invalidate_command_type::<LoadedSourcesCommand>();
1678        self.invalidate_command_type::<ThreadsCommand>();
1679        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1680        self.invalidate_command_type::<ReadMemory>();
1681        let executor = self.as_running().map(|running| running.executor.clone());
1682        if let Some(executor) = executor {
1683            self.memory.clear(&executor);
1684        }
1685    }
1686
1687    fn invalidate_state(&mut self, key: &RequestSlot) {
1688        self.requests
1689            .entry((&*key.0 as &dyn Any).type_id())
1690            .and_modify(|request_map| {
1691                request_map.remove(key);
1692            });
1693    }
1694
1695    fn push_output(&mut self, event: OutputEvent) {
1696        self.output.push_back(event);
1697        self.output_token.0 += 1;
1698    }
1699
1700    pub fn any_stopped_thread(&self) -> bool {
1701        self.thread_states.any_stopped_thread()
1702    }
1703
1704    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1705        self.thread_states.thread_status(thread_id)
1706    }
1707
1708    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1709        self.fetch(
1710            dap_command::ThreadsCommand,
1711            |this, result, cx| {
1712                let Some(result) = result.log_err() else {
1713                    return;
1714                };
1715
1716                this.threads = result
1717                    .into_iter()
1718                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1719                    .collect();
1720
1721                this.invalidate_command_type::<StackTraceCommand>();
1722                cx.emit(SessionEvent::Threads);
1723                cx.notify();
1724            },
1725            cx,
1726        );
1727
1728        self.threads
1729            .values()
1730            .map(|thread| {
1731                (
1732                    thread.dap.clone(),
1733                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1734                )
1735            })
1736            .collect()
1737    }
1738
1739    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1740        self.fetch(
1741            dap_command::ModulesCommand,
1742            |this, result, cx| {
1743                let Some(result) = result.log_err() else {
1744                    return;
1745                };
1746
1747                this.modules = result;
1748                cx.emit(SessionEvent::Modules);
1749                cx.notify();
1750            },
1751            cx,
1752        );
1753
1754        &self.modules
1755    }
1756
1757    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1758    pub fn data_access_size(
1759        &mut self,
1760        frame_id: Option<u64>,
1761        evaluate_name: &str,
1762        cx: &mut Context<Self>,
1763    ) -> Task<Option<u64>> {
1764        let request = self.request(
1765            EvaluateCommand {
1766                expression: format!("?${{sizeof({evaluate_name})}}"),
1767                frame_id,
1768
1769                context: Some(EvaluateArgumentsContext::Repl),
1770                source: None,
1771            },
1772            |_, response, _| response.ok(),
1773            cx,
1774        );
1775        cx.background_spawn(async move {
1776            let result = request.await?;
1777            result.result.parse().ok()
1778        })
1779    }
1780
1781    pub fn memory_reference_of_expr(
1782        &mut self,
1783        frame_id: Option<u64>,
1784        expression: String,
1785        cx: &mut Context<Self>,
1786    ) -> Task<Option<(String, Option<String>)>> {
1787        let request = self.request(
1788            EvaluateCommand {
1789                expression,
1790                frame_id,
1791
1792                context: Some(EvaluateArgumentsContext::Repl),
1793                source: None,
1794            },
1795            |_, response, _| response.ok(),
1796            cx,
1797        );
1798        cx.background_spawn(async move {
1799            let result = request.await?;
1800            result
1801                .memory_reference
1802                .map(|reference| (reference, result.type_))
1803        })
1804    }
1805
1806    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1807        let data = base64::engine::general_purpose::STANDARD.encode(data);
1808        self.request(
1809            WriteMemoryArguments {
1810                memory_reference: address.to_string(),
1811                data,
1812                allow_partial: None,
1813                offset: None,
1814            },
1815            |this, response, cx| {
1816                this.memory.clear(cx.background_executor());
1817                this.invalidate_command_type::<ReadMemory>();
1818                this.invalidate_command_type::<VariablesCommand>();
1819                cx.emit(SessionEvent::Variables);
1820                response.ok()
1821            },
1822            cx,
1823        )
1824        .detach();
1825    }
1826    pub fn read_memory(
1827        &mut self,
1828        range: RangeInclusive<u64>,
1829        cx: &mut Context<Self>,
1830    ) -> MemoryIterator {
1831        // This function is a bit more involved when it comes to fetching data.
1832        // Since we attempt to read memory in pages, we need to account for some parts
1833        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1834        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1835        let page_range = Memory::memory_range_to_page_range(range.clone());
1836        for page_address in PageAddress::iter_range(page_range) {
1837            self.read_single_page_memory(page_address, cx);
1838        }
1839        self.memory.memory_range(range)
1840    }
1841
1842    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1843        _ = maybe!({
1844            let builder = self.memory.build_page(page_start)?;
1845
1846            self.memory_read_fetch_page_recursive(builder, cx);
1847            Some(())
1848        });
1849    }
1850    fn memory_read_fetch_page_recursive(
1851        &mut self,
1852        mut builder: MemoryPageBuilder,
1853        cx: &mut Context<Self>,
1854    ) {
1855        let Some(next_request) = builder.next_request() else {
1856            // We're done fetching. Let's grab the page and insert it into our memory store.
1857            let (address, contents) = builder.build();
1858            self.memory.insert_page(address, contents);
1859
1860            return;
1861        };
1862        let size = next_request.size;
1863        self.fetch(
1864            ReadMemory {
1865                memory_reference: format!("0x{:X}", next_request.address),
1866                offset: Some(0),
1867                count: next_request.size,
1868            },
1869            move |this, memory, cx| {
1870                if let Ok(memory) = memory {
1871                    builder.known(memory.content);
1872                    if let Some(unknown) = memory.unreadable_bytes {
1873                        builder.unknown(unknown);
1874                    }
1875                    // This is the recursive bit: if we're not yet done with
1876                    // the whole page, we'll kick off a new request with smaller range.
1877                    // Note that this function is recursive only conceptually;
1878                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1879                    this.memory_read_fetch_page_recursive(builder, cx);
1880                } else {
1881                    builder.unknown(size);
1882                }
1883            },
1884            cx,
1885        );
1886    }
1887
1888    pub fn ignore_breakpoints(&self) -> bool {
1889        self.ignore_breakpoints
1890    }
1891
1892    pub fn toggle_ignore_breakpoints(
1893        &mut self,
1894        cx: &mut App,
1895    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1896        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1897    }
1898
1899    pub(crate) fn set_ignore_breakpoints(
1900        &mut self,
1901        ignore: bool,
1902        cx: &mut App,
1903    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1904        if self.ignore_breakpoints == ignore {
1905            return Task::ready(HashMap::default());
1906        }
1907
1908        self.ignore_breakpoints = ignore;
1909
1910        if let Some(local) = self.as_running() {
1911            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1912        } else {
1913            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1914            unimplemented!()
1915        }
1916    }
1917
1918    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
1919        self.data_breakpoints.values()
1920    }
1921
1922    pub fn exception_breakpoints(
1923        &self,
1924    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1925        self.exception_breakpoints.values()
1926    }
1927
1928    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1929        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1930            *is_enabled = !*is_enabled;
1931            self.send_exception_breakpoints(cx);
1932        }
1933    }
1934
1935    fn send_exception_breakpoints(&mut self, cx: &App) {
1936        if let Some(local) = self.as_running() {
1937            let exception_filters = self
1938                .exception_breakpoints
1939                .values()
1940                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1941                .collect();
1942
1943            let supports_exception_filters = self
1944                .capabilities
1945                .supports_exception_filter_options
1946                .unwrap_or_default();
1947            local
1948                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1949                .detach_and_log_err(cx);
1950        } else {
1951            debug_assert!(false, "Not implemented");
1952        }
1953    }
1954
1955    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
1956        if let Some(state) = self.data_breakpoints.get_mut(id) {
1957            state.is_enabled = !state.is_enabled;
1958            self.send_exception_breakpoints(cx);
1959        }
1960    }
1961
1962    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
1963        if let Some(mode) = self.as_running() {
1964            let breakpoints = self
1965                .data_breakpoints
1966                .values()
1967                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
1968                .collect();
1969            let command = SetDataBreakpointsCommand { breakpoints };
1970            mode.request(command).detach_and_log_err(cx);
1971        }
1972    }
1973
1974    pub fn create_data_breakpoint(
1975        &mut self,
1976        context: Arc<DataBreakpointContext>,
1977        data_id: String,
1978        dap: dap::DataBreakpoint,
1979        cx: &mut Context<Self>,
1980    ) {
1981        if self.data_breakpoints.remove(&data_id).is_none() {
1982            self.data_breakpoints.insert(
1983                data_id,
1984                DataBreakpointState {
1985                    dap,
1986                    is_enabled: true,
1987                    context,
1988                },
1989            );
1990        }
1991        self.send_data_breakpoints(cx);
1992    }
1993
1994    pub fn breakpoints_enabled(&self) -> bool {
1995        self.ignore_breakpoints
1996    }
1997
1998    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1999        self.fetch(
2000            dap_command::LoadedSourcesCommand,
2001            |this, result, cx| {
2002                let Some(result) = result.log_err() else {
2003                    return;
2004                };
2005                this.loaded_sources = result;
2006                cx.emit(SessionEvent::LoadedSources);
2007                cx.notify();
2008            },
2009            cx,
2010        );
2011
2012        &self.loaded_sources
2013    }
2014
2015    fn fallback_to_manual_restart(
2016        &mut self,
2017        res: Result<()>,
2018        cx: &mut Context<Self>,
2019    ) -> Option<()> {
2020        if res.log_err().is_none() {
2021            cx.emit(SessionStateEvent::Restart);
2022            return None;
2023        }
2024        Some(())
2025    }
2026
2027    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2028        res.log_err()?;
2029        Some(())
2030    }
2031
2032    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2033        thread_id: ThreadId,
2034    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2035    {
2036        move |this, response, cx| match response.log_err() {
2037            Some(response) => {
2038                this.breakpoint_store.update(cx, |store, cx| {
2039                    store.remove_active_position(Some(this.session_id()), cx)
2040                });
2041                Some(response)
2042            }
2043            None => {
2044                this.thread_states.stop_thread(thread_id);
2045                cx.notify();
2046                None
2047            }
2048        }
2049    }
2050
2051    fn clear_active_debug_line_response(
2052        &mut self,
2053        response: Result<()>,
2054        cx: &mut Context<Session>,
2055    ) -> Option<()> {
2056        response.log_err()?;
2057        self.clear_active_debug_line(cx);
2058        Some(())
2059    }
2060
2061    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2062        self.breakpoint_store.update(cx, |store, cx| {
2063            store.remove_active_position(Some(self.id), cx)
2064        });
2065    }
2066
2067    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2068        self.request(
2069            PauseCommand {
2070                thread_id: thread_id.0,
2071            },
2072            Self::empty_response,
2073            cx,
2074        )
2075        .detach();
2076    }
2077
2078    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2079        self.request(
2080            RestartStackFrameCommand { stack_frame_id },
2081            Self::empty_response,
2082            cx,
2083        )
2084        .detach();
2085    }
2086
2087    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2088        if self.restart_task.is_some() || self.as_running().is_none() {
2089            return;
2090        }
2091
2092        let supports_dap_restart =
2093            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2094
2095        self.restart_task = Some(cx.spawn(async move |this, cx| {
2096            let _ = this.update(cx, |session, cx| {
2097                if supports_dap_restart {
2098                    session
2099                        .request(
2100                            RestartCommand {
2101                                raw: args.unwrap_or(Value::Null),
2102                            },
2103                            Self::fallback_to_manual_restart,
2104                            cx,
2105                        )
2106                        .detach();
2107                } else {
2108                    cx.emit(SessionStateEvent::Restart);
2109                }
2110            });
2111        }));
2112    }
2113
2114    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2115        if self.is_session_terminated {
2116            return Task::ready(());
2117        }
2118
2119        self.is_session_terminated = true;
2120        self.thread_states.exit_all_threads();
2121        cx.notify();
2122
2123        let task = match &mut self.mode {
2124            SessionState::Running(_) => {
2125                if self
2126                    .capabilities
2127                    .supports_terminate_request
2128                    .unwrap_or_default()
2129                {
2130                    self.request(
2131                        TerminateCommand {
2132                            restart: Some(false),
2133                        },
2134                        Self::clear_active_debug_line_response,
2135                        cx,
2136                    )
2137                } else {
2138                    self.request(
2139                        DisconnectCommand {
2140                            restart: Some(false),
2141                            terminate_debuggee: Some(true),
2142                            suspend_debuggee: Some(false),
2143                        },
2144                        Self::clear_active_debug_line_response,
2145                        cx,
2146                    )
2147                }
2148            }
2149            SessionState::Booting(build_task) => {
2150                build_task.take();
2151                Task::ready(Some(()))
2152            }
2153        };
2154
2155        cx.emit(SessionStateEvent::Shutdown);
2156
2157        cx.spawn(async move |this, cx| {
2158            task.await;
2159            let _ = this.update(cx, |this, _| {
2160                if let Some(adapter_client) = this.adapter_client() {
2161                    adapter_client.kill();
2162                }
2163            });
2164        })
2165    }
2166
2167    pub fn completions(
2168        &mut self,
2169        query: CompletionsQuery,
2170        cx: &mut Context<Self>,
2171    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2172        let task = self.request(query, |_, result, _| result.log_err(), cx);
2173
2174        cx.background_executor().spawn(async move {
2175            anyhow::Ok(
2176                task.await
2177                    .map(|response| response.targets)
2178                    .context("failed to fetch completions")?,
2179            )
2180        })
2181    }
2182
2183    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2184        let supports_single_thread_execution_requests =
2185            self.capabilities.supports_single_thread_execution_requests;
2186        self.thread_states.continue_thread(thread_id);
2187        self.request(
2188            ContinueCommand {
2189                args: ContinueArguments {
2190                    thread_id: thread_id.0,
2191                    single_thread: supports_single_thread_execution_requests,
2192                },
2193            },
2194            Self::on_step_response::<ContinueCommand>(thread_id),
2195            cx,
2196        )
2197        .detach();
2198    }
2199
2200    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2201        match self.mode {
2202            SessionState::Running(ref local) => Some(local.client.clone()),
2203            SessionState::Booting(_) => None,
2204        }
2205    }
2206
2207    pub fn has_ever_stopped(&self) -> bool {
2208        self.mode.has_ever_stopped()
2209    }
2210    pub fn step_over(
2211        &mut self,
2212        thread_id: ThreadId,
2213        granularity: SteppingGranularity,
2214        cx: &mut Context<Self>,
2215    ) {
2216        let supports_single_thread_execution_requests =
2217            self.capabilities.supports_single_thread_execution_requests;
2218        let supports_stepping_granularity = self
2219            .capabilities
2220            .supports_stepping_granularity
2221            .unwrap_or_default();
2222
2223        let command = NextCommand {
2224            inner: StepCommand {
2225                thread_id: thread_id.0,
2226                granularity: supports_stepping_granularity.then(|| granularity),
2227                single_thread: supports_single_thread_execution_requests,
2228            },
2229        };
2230
2231        self.thread_states.process_step(thread_id);
2232        self.request(
2233            command,
2234            Self::on_step_response::<NextCommand>(thread_id),
2235            cx,
2236        )
2237        .detach();
2238    }
2239
2240    pub fn step_in(
2241        &mut self,
2242        thread_id: ThreadId,
2243        granularity: SteppingGranularity,
2244        cx: &mut Context<Self>,
2245    ) {
2246        let supports_single_thread_execution_requests =
2247            self.capabilities.supports_single_thread_execution_requests;
2248        let supports_stepping_granularity = self
2249            .capabilities
2250            .supports_stepping_granularity
2251            .unwrap_or_default();
2252
2253        let command = StepInCommand {
2254            inner: StepCommand {
2255                thread_id: thread_id.0,
2256                granularity: supports_stepping_granularity.then(|| granularity),
2257                single_thread: supports_single_thread_execution_requests,
2258            },
2259        };
2260
2261        self.thread_states.process_step(thread_id);
2262        self.request(
2263            command,
2264            Self::on_step_response::<StepInCommand>(thread_id),
2265            cx,
2266        )
2267        .detach();
2268    }
2269
2270    pub fn step_out(
2271        &mut self,
2272        thread_id: ThreadId,
2273        granularity: SteppingGranularity,
2274        cx: &mut Context<Self>,
2275    ) {
2276        let supports_single_thread_execution_requests =
2277            self.capabilities.supports_single_thread_execution_requests;
2278        let supports_stepping_granularity = self
2279            .capabilities
2280            .supports_stepping_granularity
2281            .unwrap_or_default();
2282
2283        let command = StepOutCommand {
2284            inner: StepCommand {
2285                thread_id: thread_id.0,
2286                granularity: supports_stepping_granularity.then(|| granularity),
2287                single_thread: supports_single_thread_execution_requests,
2288            },
2289        };
2290
2291        self.thread_states.process_step(thread_id);
2292        self.request(
2293            command,
2294            Self::on_step_response::<StepOutCommand>(thread_id),
2295            cx,
2296        )
2297        .detach();
2298    }
2299
2300    pub fn step_back(
2301        &mut self,
2302        thread_id: ThreadId,
2303        granularity: SteppingGranularity,
2304        cx: &mut Context<Self>,
2305    ) {
2306        let supports_single_thread_execution_requests =
2307            self.capabilities.supports_single_thread_execution_requests;
2308        let supports_stepping_granularity = self
2309            .capabilities
2310            .supports_stepping_granularity
2311            .unwrap_or_default();
2312
2313        let command = StepBackCommand {
2314            inner: StepCommand {
2315                thread_id: thread_id.0,
2316                granularity: supports_stepping_granularity.then(|| granularity),
2317                single_thread: supports_single_thread_execution_requests,
2318            },
2319        };
2320
2321        self.thread_states.process_step(thread_id);
2322
2323        self.request(
2324            command,
2325            Self::on_step_response::<StepBackCommand>(thread_id),
2326            cx,
2327        )
2328        .detach();
2329    }
2330
2331    pub fn stack_frames(
2332        &mut self,
2333        thread_id: ThreadId,
2334        cx: &mut Context<Self>,
2335    ) -> Result<Vec<StackFrame>> {
2336        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2337            && self.requests.contains_key(&ThreadsCommand.type_id())
2338            && self.threads.contains_key(&thread_id)
2339        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2340        // We could still be using an old thread id and have sent a new thread's request
2341        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2342        // But it very well could cause a minor bug in the future that is hard to track down
2343        {
2344            self.fetch(
2345                super::dap_command::StackTraceCommand {
2346                    thread_id: thread_id.0,
2347                    start_frame: None,
2348                    levels: None,
2349                },
2350                move |this, stack_frames, cx| {
2351                    let entry =
2352                        this.threads
2353                            .entry(thread_id)
2354                            .and_modify(|thread| match &stack_frames {
2355                                Ok(stack_frames) => {
2356                                    thread.stack_frames = stack_frames
2357                                        .iter()
2358                                        .cloned()
2359                                        .map(StackFrame::from)
2360                                        .collect();
2361                                    thread.stack_frames_error = None;
2362                                }
2363                                Err(error) => {
2364                                    thread.stack_frames.clear();
2365                                    thread.stack_frames_error = Some(error.cloned());
2366                                }
2367                            });
2368                    debug_assert!(
2369                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2370                        "Sent request for thread_id that doesn't exist"
2371                    );
2372                    if let Ok(stack_frames) = stack_frames {
2373                        this.stack_frames.extend(
2374                            stack_frames
2375                                .into_iter()
2376                                .filter(|frame| {
2377                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2378                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2379                                    !(frame.id == 0
2380                                        && frame.line == 0
2381                                        && frame.column == 0
2382                                        && frame.presentation_hint
2383                                            == Some(StackFramePresentationHint::Label))
2384                                })
2385                                .map(|frame| (frame.id, StackFrame::from(frame))),
2386                        );
2387                    }
2388
2389                    this.invalidate_command_type::<ScopesCommand>();
2390                    this.invalidate_command_type::<VariablesCommand>();
2391
2392                    cx.emit(SessionEvent::StackTrace);
2393                },
2394                cx,
2395            );
2396        }
2397
2398        match self.threads.get(&thread_id) {
2399            Some(thread) => {
2400                if let Some(error) = &thread.stack_frames_error {
2401                    Err(error.cloned())
2402                } else {
2403                    Ok(thread.stack_frames.clone())
2404                }
2405            }
2406            None => Ok(Vec::new()),
2407        }
2408    }
2409
2410    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2411        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2412            && self
2413                .requests
2414                .contains_key(&TypeId::of::<StackTraceCommand>())
2415        {
2416            self.fetch(
2417                ScopesCommand { stack_frame_id },
2418                move |this, scopes, cx| {
2419                    let Some(scopes) = scopes.log_err() else {
2420                        return
2421                    };
2422
2423                    for scope in scopes.iter() {
2424                        this.variables(scope.variables_reference, cx);
2425                    }
2426
2427                    let entry = this
2428                        .stack_frames
2429                        .entry(stack_frame_id)
2430                        .and_modify(|stack_frame| {
2431                            stack_frame.scopes = scopes;
2432                        });
2433
2434                    cx.emit(SessionEvent::Variables);
2435
2436                    debug_assert!(
2437                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2438                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2439                    );
2440                },
2441                cx,
2442            );
2443        }
2444
2445        self.stack_frames
2446            .get(&stack_frame_id)
2447            .map(|frame| frame.scopes.as_slice())
2448            .unwrap_or_default()
2449    }
2450
2451    pub fn variables_by_stack_frame_id(
2452        &self,
2453        stack_frame_id: StackFrameId,
2454        globals: bool,
2455        locals: bool,
2456    ) -> Vec<dap::Variable> {
2457        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2458            return Vec::new();
2459        };
2460
2461        stack_frame
2462            .scopes
2463            .iter()
2464            .filter(|scope| {
2465                (scope.name.to_lowercase().contains("local") && locals)
2466                    || (scope.name.to_lowercase().contains("global") && globals)
2467            })
2468            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2469            .flatten()
2470            .cloned()
2471            .collect()
2472    }
2473
2474    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2475        &self.watchers
2476    }
2477
2478    pub fn add_watcher(
2479        &mut self,
2480        expression: SharedString,
2481        frame_id: u64,
2482        cx: &mut Context<Self>,
2483    ) -> Task<Result<()>> {
2484        let request = self.mode.request_dap(EvaluateCommand {
2485            expression: expression.to_string(),
2486            context: Some(EvaluateArgumentsContext::Watch),
2487            frame_id: Some(frame_id),
2488            source: None,
2489        });
2490
2491        cx.spawn(async move |this, cx| {
2492            let response = request.await?;
2493
2494            this.update(cx, |session, cx| {
2495                session.watchers.insert(
2496                    expression.clone(),
2497                    Watcher {
2498                        expression,
2499                        value: response.result.into(),
2500                        variables_reference: response.variables_reference,
2501                        presentation_hint: response.presentation_hint,
2502                    },
2503                );
2504                cx.emit(SessionEvent::Watchers);
2505            })
2506        })
2507    }
2508
2509    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2510        let watches = self.watchers.clone();
2511        for (_, watch) in watches.into_iter() {
2512            self.add_watcher(watch.expression.clone(), frame_id, cx)
2513                .detach();
2514        }
2515    }
2516
2517    pub fn remove_watcher(&mut self, expression: SharedString) {
2518        self.watchers.remove(&expression);
2519    }
2520
2521    pub fn variables(
2522        &mut self,
2523        variables_reference: VariableReference,
2524        cx: &mut Context<Self>,
2525    ) -> Vec<dap::Variable> {
2526        let command = VariablesCommand {
2527            variables_reference,
2528            filter: None,
2529            start: None,
2530            count: None,
2531            format: None,
2532        };
2533
2534        self.fetch(
2535            command,
2536            move |this, variables, cx| {
2537                let Some(variables) = variables.log_err() else {
2538                    return;
2539                };
2540
2541                this.variables.insert(variables_reference, variables);
2542
2543                cx.emit(SessionEvent::Variables);
2544                cx.emit(SessionEvent::InvalidateInlineValue);
2545            },
2546            cx,
2547        );
2548
2549        self.variables
2550            .get(&variables_reference)
2551            .cloned()
2552            .unwrap_or_default()
2553    }
2554
2555    pub fn data_breakpoint_info(
2556        &mut self,
2557        context: Arc<DataBreakpointContext>,
2558        mode: Option<String>,
2559        cx: &mut Context<Self>,
2560    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2561        let command = DataBreakpointInfoCommand {
2562            context: context.clone(),
2563            mode,
2564        };
2565
2566        self.request(command, |_, response, _| response.ok(), cx)
2567    }
2568
2569    pub fn set_variable_value(
2570        &mut self,
2571        stack_frame_id: u64,
2572        variables_reference: u64,
2573        name: String,
2574        value: String,
2575        cx: &mut Context<Self>,
2576    ) {
2577        if self.capabilities.supports_set_variable.unwrap_or_default() {
2578            self.request(
2579                SetVariableValueCommand {
2580                    name,
2581                    value,
2582                    variables_reference,
2583                },
2584                move |this, response, cx| {
2585                    let response = response.log_err()?;
2586                    this.invalidate_command_type::<VariablesCommand>();
2587                    this.invalidate_command_type::<ReadMemory>();
2588                    this.memory.clear(cx.background_executor());
2589                    this.refresh_watchers(stack_frame_id, cx);
2590                    cx.emit(SessionEvent::Variables);
2591                    Some(response)
2592                },
2593                cx,
2594            )
2595            .detach();
2596        }
2597    }
2598
2599    pub fn evaluate(
2600        &mut self,
2601        expression: String,
2602        context: Option<EvaluateArgumentsContext>,
2603        frame_id: Option<u64>,
2604        source: Option<Source>,
2605        cx: &mut Context<Self>,
2606    ) -> Task<()> {
2607        let event = dap::OutputEvent {
2608            category: None,
2609            output: format!("> {expression}"),
2610            group: None,
2611            variables_reference: None,
2612            source: None,
2613            line: None,
2614            column: None,
2615            data: None,
2616            location_reference: None,
2617        };
2618        self.push_output(event);
2619        let request = self.mode.request_dap(EvaluateCommand {
2620            expression,
2621            context,
2622            frame_id,
2623            source,
2624        });
2625        cx.spawn(async move |this, cx| {
2626            let response = request.await;
2627            this.update(cx, |this, cx| {
2628                this.memory.clear(cx.background_executor());
2629                this.invalidate_command_type::<ReadMemory>();
2630                match response {
2631                    Ok(response) => {
2632                        let event = dap::OutputEvent {
2633                            category: None,
2634                            output: format!("< {}", &response.result),
2635                            group: None,
2636                            variables_reference: Some(response.variables_reference),
2637                            source: None,
2638                            line: None,
2639                            column: None,
2640                            data: None,
2641                            location_reference: None,
2642                        };
2643                        this.push_output(event);
2644                    }
2645                    Err(e) => {
2646                        let event = dap::OutputEvent {
2647                            category: None,
2648                            output: format!("{}", e),
2649                            group: None,
2650                            variables_reference: None,
2651                            source: None,
2652                            line: None,
2653                            column: None,
2654                            data: None,
2655                            location_reference: None,
2656                        };
2657                        this.push_output(event);
2658                    }
2659                };
2660                cx.notify();
2661            })
2662            .ok();
2663        })
2664    }
2665
2666    pub fn location(
2667        &mut self,
2668        reference: u64,
2669        cx: &mut Context<Self>,
2670    ) -> Option<dap::LocationsResponse> {
2671        self.fetch(
2672            LocationsCommand { reference },
2673            move |this, response, _| {
2674                let Some(response) = response.log_err() else {
2675                    return;
2676                };
2677                this.locations.insert(reference, response);
2678            },
2679            cx,
2680        );
2681        self.locations.get(&reference).cloned()
2682    }
2683
2684    pub fn is_attached(&self) -> bool {
2685        let SessionState::Running(local_mode) = &self.mode else {
2686            return false;
2687        };
2688        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2689    }
2690
2691    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2692        let command = DisconnectCommand {
2693            restart: Some(false),
2694            terminate_debuggee: Some(false),
2695            suspend_debuggee: Some(false),
2696        };
2697
2698        self.request(command, Self::empty_response, cx).detach()
2699    }
2700
2701    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2702        if self
2703            .capabilities
2704            .supports_terminate_threads_request
2705            .unwrap_or_default()
2706        {
2707            self.request(
2708                TerminateThreadsCommand {
2709                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2710                },
2711                Self::clear_active_debug_line_response,
2712                cx,
2713            )
2714            .detach();
2715        } else {
2716            self.shutdown(cx).detach();
2717        }
2718    }
2719
2720    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2721        self.thread_states.thread_state(thread_id)
2722    }
2723
2724    pub fn quirks(&self) -> SessionQuirks {
2725        self.quirks
2726    }
2727}