session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
   3use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
   4
   5use super::breakpoint_store::{
   6    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   7};
   8use super::dap_command::{
   9    self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
  10    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
  11    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  12    ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
  13    StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  14    TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  15};
  16use super::dap_store::DapStore;
  17use anyhow::{Context as _, Result, anyhow};
  18use base64::Engine;
  19use collections::{HashMap, HashSet, IndexMap};
  20use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  21use dap::messages::Response;
  22use dap::requests::{Request, RunInTerminal, StartDebugging};
  23use dap::{
  24    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  25    SteppingGranularity, StoppedEvent, VariableReference,
  26    client::{DebugAdapterClient, SessionId},
  27    messages::{Events, Message},
  28};
  29use dap::{
  30    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  31    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  32    StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
  33};
  34use futures::SinkExt;
  35use futures::channel::mpsc::UnboundedSender;
  36use futures::channel::{mpsc, oneshot};
  37use futures::{FutureExt, future::Shared};
  38use gpui::{
  39    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  40    Task, WeakEntity,
  41};
  42
  43use rpc::ErrorExt;
  44use serde_json::Value;
  45use smol::stream::StreamExt;
  46use std::any::TypeId;
  47use std::collections::BTreeMap;
  48use std::ops::RangeInclusive;
  49use std::u64;
  50use std::{
  51    any::Any,
  52    collections::hash_map::Entry,
  53    hash::{Hash, Hasher},
  54    path::Path,
  55    sync::Arc,
  56};
  57use task::TaskContext;
  58use text::{PointUtf16, ToPointUtf16};
  59use util::{ResultExt, maybe};
  60use worktree::Worktree;
  61
  62#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  63#[repr(transparent)]
  64pub struct ThreadId(pub u64);
  65
  66impl ThreadId {
  67    pub const MIN: ThreadId = ThreadId(u64::MIN);
  68    pub const MAX: ThreadId = ThreadId(u64::MAX);
  69}
  70
  71impl From<u64> for ThreadId {
  72    fn from(id: u64) -> Self {
  73        Self(id)
  74    }
  75}
  76
  77#[derive(Clone, Debug)]
  78pub struct StackFrame {
  79    pub dap: dap::StackFrame,
  80    pub scopes: Vec<dap::Scope>,
  81}
  82
  83impl From<dap::StackFrame> for StackFrame {
  84    fn from(stack_frame: dap::StackFrame) -> Self {
  85        Self {
  86            scopes: vec![],
  87            dap: stack_frame,
  88        }
  89    }
  90}
  91
  92#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  93pub enum ThreadStatus {
  94    #[default]
  95    Running,
  96    Stopped,
  97    Stepping,
  98    Exited,
  99    Ended,
 100}
 101
 102impl ThreadStatus {
 103    pub fn label(&self) -> &'static str {
 104        match self {
 105            ThreadStatus::Running => "Running",
 106            ThreadStatus::Stopped => "Stopped",
 107            ThreadStatus::Stepping => "Stepping",
 108            ThreadStatus::Exited => "Exited",
 109            ThreadStatus::Ended => "Ended",
 110        }
 111    }
 112}
 113
 114#[derive(Debug)]
 115pub struct Thread {
 116    dap: dap::Thread,
 117    stack_frames: Vec<StackFrame>,
 118    stack_frames_error: Option<anyhow::Error>,
 119    _has_stopped: bool,
 120}
 121
 122impl From<dap::Thread> for Thread {
 123    fn from(dap: dap::Thread) -> Self {
 124        Self {
 125            dap,
 126            stack_frames: Default::default(),
 127            stack_frames_error: None,
 128            _has_stopped: false,
 129        }
 130    }
 131}
 132
 133#[derive(Debug, Clone, PartialEq)]
 134pub struct Watcher {
 135    pub expression: SharedString,
 136    pub value: SharedString,
 137    pub variables_reference: u64,
 138    pub presentation_hint: Option<VariablePresentationHint>,
 139}
 140
 141#[derive(Debug, Clone, PartialEq)]
 142pub struct DataBreakpointState {
 143    pub dap: dap::DataBreakpoint,
 144    pub is_enabled: bool,
 145    pub context: Arc<DataBreakpointContext>,
 146}
 147
 148pub enum SessionState {
 149    Building(Option<Task<Result<()>>>),
 150    Running(RunningMode),
 151}
 152
 153#[derive(Clone)]
 154pub struct RunningMode {
 155    client: Arc<DebugAdapterClient>,
 156    binary: DebugAdapterBinary,
 157    tmp_breakpoint: Option<SourceBreakpoint>,
 158    worktree: WeakEntity<Worktree>,
 159    executor: BackgroundExecutor,
 160    is_started: bool,
 161    has_ever_stopped: bool,
 162    messages_tx: UnboundedSender<Message>,
 163}
 164
 165#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 166pub struct SessionQuirks {
 167    pub compact: bool,
 168    pub prefer_thread_name: bool,
 169}
 170
 171fn client_source(abs_path: &Path) -> dap::Source {
 172    dap::Source {
 173        name: abs_path
 174            .file_name()
 175            .map(|filename| filename.to_string_lossy().to_string()),
 176        path: Some(abs_path.to_string_lossy().to_string()),
 177        source_reference: None,
 178        presentation_hint: None,
 179        origin: None,
 180        sources: None,
 181        adapter_data: None,
 182        checksums: None,
 183    }
 184}
 185
 186impl RunningMode {
 187    async fn new(
 188        session_id: SessionId,
 189        parent_session: Option<Entity<Session>>,
 190        worktree: WeakEntity<Worktree>,
 191        binary: DebugAdapterBinary,
 192        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 193        cx: &mut AsyncApp,
 194    ) -> Result<Self> {
 195        let message_handler = Box::new({
 196            let messages_tx = messages_tx.clone();
 197            move |message| {
 198                messages_tx.unbounded_send(message).ok();
 199            }
 200        });
 201
 202        let client = if let Some(client) = parent_session
 203            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 204            .flatten()
 205        {
 206            client
 207                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 208                .await?
 209        } else {
 210            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 211        };
 212
 213        Ok(Self {
 214            client: Arc::new(client),
 215            worktree,
 216            tmp_breakpoint: None,
 217            binary,
 218            executor: cx.background_executor().clone(),
 219            is_started: false,
 220            has_ever_stopped: false,
 221            messages_tx,
 222        })
 223    }
 224
 225    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 226        &self.worktree
 227    }
 228
 229    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 230        let tasks: Vec<_> = paths
 231            .into_iter()
 232            .map(|path| {
 233                self.request(dap_command::SetBreakpoints {
 234                    source: client_source(path),
 235                    source_modified: None,
 236                    breakpoints: vec![],
 237                })
 238            })
 239            .collect();
 240
 241        cx.background_spawn(async move {
 242            futures::future::join_all(tasks)
 243                .await
 244                .iter()
 245                .for_each(|res| match res {
 246                    Ok(_) => {}
 247                    Err(err) => {
 248                        log::warn!("Set breakpoints request failed: {}", err);
 249                    }
 250                });
 251        })
 252    }
 253
 254    fn send_breakpoints_from_path(
 255        &self,
 256        abs_path: Arc<Path>,
 257        reason: BreakpointUpdatedReason,
 258        breakpoint_store: &Entity<BreakpointStore>,
 259        cx: &mut App,
 260    ) -> Task<()> {
 261        let breakpoints =
 262            breakpoint_store
 263                .read(cx)
 264                .source_breakpoints_from_path(&abs_path, cx)
 265                .into_iter()
 266                .filter(|bp| bp.state.is_enabled())
 267                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 268                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 269                }))
 270                .map(Into::into)
 271                .collect();
 272
 273        let raw_breakpoints = breakpoint_store
 274            .read(cx)
 275            .breakpoints_from_path(&abs_path)
 276            .into_iter()
 277            .filter(|bp| bp.bp.state.is_enabled())
 278            .collect::<Vec<_>>();
 279
 280        let task = self.request(dap_command::SetBreakpoints {
 281            source: client_source(&abs_path),
 282            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 283            breakpoints,
 284        });
 285        let session_id = self.client.id();
 286        let breakpoint_store = breakpoint_store.downgrade();
 287        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 288            Ok(breakpoints) => {
 289                let breakpoints =
 290                    breakpoints
 291                        .into_iter()
 292                        .zip(raw_breakpoints)
 293                        .filter_map(|(dap_bp, zed_bp)| {
 294                            Some((
 295                                zed_bp,
 296                                BreakpointSessionState {
 297                                    id: dap_bp.id?,
 298                                    verified: dap_bp.verified,
 299                                },
 300                            ))
 301                        });
 302                breakpoint_store
 303                    .update(cx, |this, _| {
 304                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 305                    })
 306                    .ok();
 307            }
 308            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 309        })
 310    }
 311
 312    fn send_exception_breakpoints(
 313        &self,
 314        filters: Vec<ExceptionBreakpointsFilter>,
 315        supports_filter_options: bool,
 316    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 317        let arg = if supports_filter_options {
 318            SetExceptionBreakpoints::WithOptions {
 319                filters: filters
 320                    .into_iter()
 321                    .map(|filter| ExceptionFilterOptions {
 322                        filter_id: filter.filter,
 323                        condition: None,
 324                        mode: None,
 325                    })
 326                    .collect(),
 327            }
 328        } else {
 329            SetExceptionBreakpoints::Plain {
 330                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 331            }
 332        };
 333        self.request(arg)
 334    }
 335
 336    fn send_source_breakpoints(
 337        &self,
 338        ignore_breakpoints: bool,
 339        breakpoint_store: &Entity<BreakpointStore>,
 340        cx: &App,
 341    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 342        let mut breakpoint_tasks = Vec::new();
 343        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 344        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 345        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 346        let session_id = self.client.id();
 347        for (path, breakpoints) in breakpoints {
 348            let breakpoints = if ignore_breakpoints {
 349                vec![]
 350            } else {
 351                breakpoints
 352                    .into_iter()
 353                    .filter(|bp| bp.state.is_enabled())
 354                    .map(Into::into)
 355                    .collect()
 356            };
 357
 358            let raw_breakpoints = raw_breakpoints
 359                .remove(&path)
 360                .unwrap_or_default()
 361                .into_iter()
 362                .filter(|bp| bp.bp.state.is_enabled());
 363            let error_path = path.clone();
 364            let send_request = self
 365                .request(dap_command::SetBreakpoints {
 366                    source: client_source(&path),
 367                    source_modified: Some(false),
 368                    breakpoints,
 369                })
 370                .map(|result| result.map_err(move |e| (error_path, e)));
 371
 372            let task = cx.spawn({
 373                let breakpoint_store = breakpoint_store.downgrade();
 374                async move |cx| {
 375                    let breakpoints = cx.background_spawn(send_request).await?;
 376
 377                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 378                        |(dap_bp, zed_bp)| {
 379                            Some((
 380                                zed_bp,
 381                                BreakpointSessionState {
 382                                    id: dap_bp.id?,
 383                                    verified: dap_bp.verified,
 384                                },
 385                            ))
 386                        },
 387                    );
 388                    breakpoint_store
 389                        .update(cx, |this, _| {
 390                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 391                        })
 392                        .ok();
 393
 394                    Ok(())
 395                }
 396            });
 397            breakpoint_tasks.push(task);
 398        }
 399
 400        cx.background_spawn(async move {
 401            futures::future::join_all(breakpoint_tasks)
 402                .await
 403                .into_iter()
 404                .filter_map(Result::err)
 405                .collect::<HashMap<_, _>>()
 406        })
 407    }
 408
 409    fn initialize_sequence(
 410        &self,
 411        capabilities: &Capabilities,
 412        initialized_rx: oneshot::Receiver<()>,
 413        dap_store: WeakEntity<DapStore>,
 414        cx: &mut Context<Session>,
 415    ) -> Task<Result<()>> {
 416        let raw = self.binary.request_args.clone();
 417
 418        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 419        let launch = match raw.request {
 420            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 421                raw: raw.configuration,
 422            }),
 423            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 424                raw: raw.configuration,
 425            }),
 426        };
 427
 428        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 429        // From spec (on initialization sequence):
 430        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 431        //
 432        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 433        let should_send_exception_breakpoints = capabilities
 434            .exception_breakpoint_filters
 435            .as_ref()
 436            .map_or(false, |filters| !filters.is_empty())
 437            || !configuration_done_supported;
 438        let supports_exception_filters = capabilities
 439            .supports_exception_filter_options
 440            .unwrap_or_default();
 441        let this = self.clone();
 442        let worktree = self.worktree().clone();
 443        let mut filters = capabilities
 444            .exception_breakpoint_filters
 445            .clone()
 446            .unwrap_or_default();
 447        let configuration_sequence = cx.spawn({
 448            async move |session, cx| {
 449                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 450                let (breakpoint_store, adapter_defaults) =
 451                    dap_store.read_with(cx, |dap_store, _| {
 452                        (
 453                            dap_store.breakpoint_store().clone(),
 454                            dap_store.adapter_options(&adapter_name),
 455                        )
 456                    })?;
 457                initialized_rx.await?;
 458                let errors_by_path = cx
 459                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 460                    .await;
 461
 462                dap_store.update(cx, |_, cx| {
 463                    let Some(worktree) = worktree.upgrade() else {
 464                        return;
 465                    };
 466
 467                    for (path, error) in &errors_by_path {
 468                        log::error!("failed to set breakpoints for {path:?}: {error}");
 469                    }
 470
 471                    if let Some(failed_path) = errors_by_path.keys().next() {
 472                        let failed_path = failed_path
 473                            .strip_prefix(worktree.read(cx).abs_path())
 474                            .unwrap_or(failed_path)
 475                            .display();
 476                        let message = format!(
 477                            "Failed to set breakpoints for {failed_path}{}",
 478                            match errors_by_path.len() {
 479                                0 => unreachable!(),
 480                                1 => "".into(),
 481                                2 => " and 1 other path".into(),
 482                                n => format!(" and {} other paths", n - 1),
 483                            }
 484                        );
 485                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 486                    }
 487                })?;
 488
 489                if should_send_exception_breakpoints {
 490                    _ = session.update(cx, |this, _| {
 491                        filters.retain(|filter| {
 492                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 493                                defaults
 494                                    .exception_breakpoints
 495                                    .get(&filter.filter)
 496                                    .map(|options| options.enabled)
 497                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 498                            } else {
 499                                filter.default.unwrap_or_default()
 500                            };
 501                            this.exception_breakpoints
 502                                .entry(filter.filter.clone())
 503                                .or_insert_with(|| (filter.clone(), is_enabled));
 504                            is_enabled
 505                        });
 506                    });
 507
 508                    this.send_exception_breakpoints(filters, supports_exception_filters)
 509                        .await
 510                        .ok();
 511                }
 512
 513                let ret = if configuration_done_supported {
 514                    this.request(ConfigurationDone {})
 515                } else {
 516                    Task::ready(Ok(()))
 517                }
 518                .await;
 519                ret
 520            }
 521        });
 522
 523        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 524
 525        cx.spawn(async move |this, cx| {
 526            let result = task.await;
 527
 528            this.update(cx, |this, cx| {
 529                if let Some(this) = this.as_running_mut() {
 530                    this.is_started = true;
 531                    cx.notify();
 532                }
 533            })
 534            .ok();
 535
 536            result?;
 537            anyhow::Ok(())
 538        })
 539    }
 540
 541    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 542        let client = self.client.clone();
 543        let messages_tx = self.messages_tx.clone();
 544        let message_handler = Box::new(move |message| {
 545            messages_tx.unbounded_send(message).ok();
 546        });
 547        if client.should_reconnect_for_ssh() {
 548            Some(cx.spawn(async move |cx| {
 549                client.connect(message_handler, cx).await?;
 550                anyhow::Ok(())
 551            }))
 552        } else {
 553            None
 554        }
 555    }
 556
 557    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 558    where
 559        <R::DapRequest as dap::requests::Request>::Response: 'static,
 560        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 561    {
 562        let request = Arc::new(request);
 563
 564        let request_clone = request.clone();
 565        let connection = self.client.clone();
 566        self.executor.spawn(async move {
 567            let args = request_clone.to_dap();
 568            let response = connection.request::<R::DapRequest>(args).await?;
 569            request.response_from_dap(response)
 570        })
 571    }
 572}
 573
 574impl SessionState {
 575    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 576    where
 577        <R::DapRequest as dap::requests::Request>::Response: 'static,
 578        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 579    {
 580        match self {
 581            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 582            SessionState::Building(_) => Task::ready(Err(anyhow!(
 583                "no adapter running to send request: {request:?}"
 584            ))),
 585        }
 586    }
 587
 588    /// Did this debug session stop at least once?
 589    pub(crate) fn has_ever_stopped(&self) -> bool {
 590        match self {
 591            SessionState::Building(_) => false,
 592            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 593        }
 594    }
 595
 596    fn stopped(&mut self) {
 597        if let SessionState::Running(running) = self {
 598            running.has_ever_stopped = true;
 599        }
 600    }
 601}
 602
 603#[derive(Default)]
 604struct ThreadStates {
 605    global_state: Option<ThreadStatus>,
 606    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 607}
 608
 609impl ThreadStates {
 610    fn stop_all_threads(&mut self) {
 611        self.global_state = Some(ThreadStatus::Stopped);
 612        self.known_thread_states.clear();
 613    }
 614
 615    fn exit_all_threads(&mut self) {
 616        self.global_state = Some(ThreadStatus::Exited);
 617        self.known_thread_states.clear();
 618    }
 619
 620    fn continue_all_threads(&mut self) {
 621        self.global_state = Some(ThreadStatus::Running);
 622        self.known_thread_states.clear();
 623    }
 624
 625    fn stop_thread(&mut self, thread_id: ThreadId) {
 626        self.known_thread_states
 627            .insert(thread_id, ThreadStatus::Stopped);
 628    }
 629
 630    fn continue_thread(&mut self, thread_id: ThreadId) {
 631        self.known_thread_states
 632            .insert(thread_id, ThreadStatus::Running);
 633    }
 634
 635    fn process_step(&mut self, thread_id: ThreadId) {
 636        self.known_thread_states
 637            .insert(thread_id, ThreadStatus::Stepping);
 638    }
 639
 640    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 641        self.thread_state(thread_id)
 642            .unwrap_or(ThreadStatus::Running)
 643    }
 644
 645    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 646        self.known_thread_states
 647            .get(&thread_id)
 648            .copied()
 649            .or(self.global_state)
 650    }
 651
 652    fn exit_thread(&mut self, thread_id: ThreadId) {
 653        self.known_thread_states
 654            .insert(thread_id, ThreadStatus::Exited);
 655    }
 656
 657    fn any_stopped_thread(&self) -> bool {
 658        self.global_state
 659            .is_some_and(|state| state == ThreadStatus::Stopped)
 660            || self
 661                .known_thread_states
 662                .values()
 663                .any(|status| *status == ThreadStatus::Stopped)
 664    }
 665}
 666const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 667
 668type IsEnabled = bool;
 669
 670#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 671pub struct OutputToken(pub usize);
 672/// Represents a current state of a single debug adapter and provides ways to mutate it.
 673pub struct Session {
 674    pub mode: SessionState,
 675    id: SessionId,
 676    label: Option<SharedString>,
 677    adapter: DebugAdapterName,
 678    pub(super) capabilities: Capabilities,
 679    child_session_ids: HashSet<SessionId>,
 680    parent_session: Option<Entity<Session>>,
 681    modules: Vec<dap::Module>,
 682    loaded_sources: Vec<dap::Source>,
 683    output_token: OutputToken,
 684    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 685    threads: IndexMap<ThreadId, Thread>,
 686    thread_states: ThreadStates,
 687    watchers: HashMap<SharedString, Watcher>,
 688    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 689    stack_frames: IndexMap<StackFrameId, StackFrame>,
 690    locations: HashMap<u64, dap::LocationsResponse>,
 691    is_session_terminated: bool,
 692    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 693    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 694    ignore_breakpoints: bool,
 695    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 696    data_breakpoints: BTreeMap<String, DataBreakpointState>,
 697    background_tasks: Vec<Task<()>>,
 698    restart_task: Option<Task<()>>,
 699    task_context: TaskContext,
 700    memory: memory::Memory,
 701    quirks: SessionQuirks,
 702}
 703
 704trait CacheableCommand: Any + Send + Sync {
 705    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 706    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 707    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 708}
 709
 710impl<T> CacheableCommand for T
 711where
 712    T: LocalDapCommand + PartialEq + Eq + Hash,
 713{
 714    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 715        (rhs as &dyn Any)
 716            .downcast_ref::<Self>()
 717            .map_or(false, |rhs| self == rhs)
 718    }
 719
 720    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 721        T::hash(self, &mut hasher);
 722    }
 723
 724    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 725        self
 726    }
 727}
 728
 729pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 730
 731impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 732    fn from(request: T) -> Self {
 733        Self(Arc::new(request))
 734    }
 735}
 736
 737impl PartialEq for RequestSlot {
 738    fn eq(&self, other: &Self) -> bool {
 739        self.0.dyn_eq(other.0.as_ref())
 740    }
 741}
 742
 743impl Eq for RequestSlot {}
 744
 745impl Hash for RequestSlot {
 746    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 747        self.0.dyn_hash(state);
 748        (&*self.0 as &dyn Any).type_id().hash(state)
 749    }
 750}
 751
 752#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 753pub struct CompletionsQuery {
 754    pub query: String,
 755    pub column: u64,
 756    pub line: Option<u64>,
 757    pub frame_id: Option<u64>,
 758}
 759
 760impl CompletionsQuery {
 761    pub fn new(
 762        buffer: &language::Buffer,
 763        cursor_position: language::Anchor,
 764        frame_id: Option<u64>,
 765    ) -> Self {
 766        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 767        Self {
 768            query: buffer.text(),
 769            column: column as u64,
 770            frame_id,
 771            line: Some(row as u64),
 772        }
 773    }
 774}
 775
 776#[derive(Debug)]
 777pub enum SessionEvent {
 778    Modules,
 779    LoadedSources,
 780    Stopped(Option<ThreadId>),
 781    StackTrace,
 782    Variables,
 783    Watchers,
 784    Threads,
 785    InvalidateInlineValue,
 786    CapabilitiesLoaded,
 787    RunInTerminal {
 788        request: RunInTerminalRequestArguments,
 789        sender: mpsc::Sender<Result<u32>>,
 790    },
 791    DataBreakpointInfo,
 792    ConsoleOutput,
 793}
 794
 795#[derive(Clone, Debug, PartialEq, Eq)]
 796pub enum SessionStateEvent {
 797    Running,
 798    Shutdown,
 799    Restart,
 800    SpawnChildSession {
 801        request: StartDebuggingRequestArguments,
 802    },
 803}
 804
 805impl EventEmitter<SessionEvent> for Session {}
 806impl EventEmitter<SessionStateEvent> for Session {}
 807
 808// local session will send breakpoint updates to DAP for all new breakpoints
 809// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 810// BreakpointStore notifies session on breakpoint changes
 811impl Session {
 812    pub(crate) fn new(
 813        breakpoint_store: Entity<BreakpointStore>,
 814        session_id: SessionId,
 815        parent_session: Option<Entity<Session>>,
 816        label: Option<SharedString>,
 817        adapter: DebugAdapterName,
 818        task_context: TaskContext,
 819        quirks: SessionQuirks,
 820        cx: &mut App,
 821    ) -> Entity<Self> {
 822        cx.new::<Self>(|cx| {
 823            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 824                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 825                    if let Some(local) = (!this.ignore_breakpoints)
 826                        .then(|| this.as_running_mut())
 827                        .flatten()
 828                    {
 829                        local
 830                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 831                            .detach();
 832                    };
 833                }
 834                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 835                    if let Some(local) = (!this.ignore_breakpoints)
 836                        .then(|| this.as_running_mut())
 837                        .flatten()
 838                    {
 839                        local.unset_breakpoints_from_paths(paths, cx).detach();
 840                    }
 841                }
 842                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 843            })
 844            .detach();
 845
 846            let this = Self {
 847                mode: SessionState::Building(None),
 848                id: session_id,
 849                child_session_ids: HashSet::default(),
 850                parent_session,
 851                capabilities: Capabilities::default(),
 852                watchers: HashMap::default(),
 853                variables: Default::default(),
 854                stack_frames: Default::default(),
 855                thread_states: ThreadStates::default(),
 856                output_token: OutputToken(0),
 857                output: circular_buffer::CircularBuffer::boxed(),
 858                requests: HashMap::default(),
 859                modules: Vec::default(),
 860                loaded_sources: Vec::default(),
 861                threads: IndexMap::default(),
 862                background_tasks: Vec::default(),
 863                restart_task: None,
 864                locations: Default::default(),
 865                is_session_terminated: false,
 866                ignore_breakpoints: false,
 867                breakpoint_store,
 868                data_breakpoints: Default::default(),
 869                exception_breakpoints: Default::default(),
 870                label,
 871                adapter,
 872                task_context,
 873                memory: memory::Memory::new(),
 874                quirks,
 875            };
 876
 877            this
 878        })
 879    }
 880
 881    pub fn task_context(&self) -> &TaskContext {
 882        &self.task_context
 883    }
 884
 885    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 886        match &self.mode {
 887            SessionState::Building(_) => None,
 888            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 889        }
 890    }
 891
 892    pub fn boot(
 893        &mut self,
 894        binary: DebugAdapterBinary,
 895        worktree: Entity<Worktree>,
 896        dap_store: WeakEntity<DapStore>,
 897        cx: &mut Context<Self>,
 898    ) -> Task<Result<()>> {
 899        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 900        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 901
 902        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 903            let mut initialized_tx = Some(initialized_tx);
 904            while let Some(message) = message_rx.next().await {
 905                if let Message::Event(event) = message {
 906                    if let Events::Initialized(_) = *event {
 907                        if let Some(tx) = initialized_tx.take() {
 908                            tx.send(()).ok();
 909                        }
 910                    } else {
 911                        let Ok(_) = this.update(cx, |session, cx| {
 912                            session.handle_dap_event(event, cx);
 913                        }) else {
 914                            break;
 915                        };
 916                    }
 917                } else if let Message::Request(request) = message {
 918                    let Ok(_) = this.update(cx, |this, cx| {
 919                        if request.command == StartDebugging::COMMAND {
 920                            this.handle_start_debugging_request(request, cx)
 921                                .detach_and_log_err(cx);
 922                        } else if request.command == RunInTerminal::COMMAND {
 923                            this.handle_run_in_terminal_request(request, cx)
 924                                .detach_and_log_err(cx);
 925                        }
 926                    }) else {
 927                        break;
 928                    };
 929                }
 930            }
 931        })];
 932        self.background_tasks = background_tasks;
 933        let id = self.id;
 934        let parent_session = self.parent_session.clone();
 935
 936        cx.spawn(async move |this, cx| {
 937            let mode = RunningMode::new(
 938                id,
 939                parent_session,
 940                worktree.downgrade(),
 941                binary.clone(),
 942                message_tx,
 943                cx,
 944            )
 945            .await?;
 946            this.update(cx, |this, cx| {
 947                match &mut this.mode {
 948                    SessionState::Building(task) if task.is_some() => {
 949                        task.take().unwrap().detach_and_log_err(cx);
 950                    }
 951                    _ => {
 952                        debug_assert!(
 953                            this.parent_session.is_some(),
 954                            "Booting a root debug session without a boot task"
 955                        );
 956                    }
 957                };
 958                this.mode = SessionState::Running(mode);
 959                cx.emit(SessionStateEvent::Running);
 960            })?;
 961
 962            this.update(cx, |session, cx| session.request_initialize(cx))?
 963                .await?;
 964
 965            let result = this
 966                .update(cx, |session, cx| {
 967                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 968                })?
 969                .await;
 970
 971            if result.is_err() {
 972                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 973
 974                console
 975                    .send(format!(
 976                        "Tried to launch debugger with: {}",
 977                        serde_json::to_string_pretty(&binary.request_args.configuration)
 978                            .unwrap_or_default(),
 979                    ))
 980                    .await
 981                    .ok();
 982            }
 983
 984            result
 985        })
 986    }
 987
 988    pub fn session_id(&self) -> SessionId {
 989        self.id
 990    }
 991
 992    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 993        self.child_session_ids.clone()
 994    }
 995
 996    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 997        self.child_session_ids.insert(session_id);
 998    }
 999
1000    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1001        self.child_session_ids.remove(&session_id);
1002    }
1003
1004    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1005        self.parent_session
1006            .as_ref()
1007            .map(|session| session.read(cx).id)
1008    }
1009
1010    pub fn parent_session(&self) -> Option<&Entity<Self>> {
1011        self.parent_session.as_ref()
1012    }
1013
1014    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1015        let Some(client) = self.adapter_client() else {
1016            return Task::ready(());
1017        };
1018
1019        let supports_terminate = self
1020            .capabilities
1021            .support_terminate_debuggee
1022            .unwrap_or(false);
1023
1024        cx.background_spawn(async move {
1025            if supports_terminate {
1026                client
1027                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1028                        restart: Some(false),
1029                    })
1030                    .await
1031                    .ok();
1032            } else {
1033                client
1034                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1035                        restart: Some(false),
1036                        terminate_debuggee: Some(true),
1037                        suspend_debuggee: Some(false),
1038                    })
1039                    .await
1040                    .ok();
1041            }
1042        })
1043    }
1044
1045    pub fn capabilities(&self) -> &Capabilities {
1046        &self.capabilities
1047    }
1048
1049    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1050        match &self.mode {
1051            SessionState::Building(_) => None,
1052            SessionState::Running(running_mode) => Some(&running_mode.binary),
1053        }
1054    }
1055
1056    pub fn adapter(&self) -> DebugAdapterName {
1057        self.adapter.clone()
1058    }
1059
1060    pub fn label(&self) -> Option<SharedString> {
1061        self.label.clone()
1062    }
1063
1064    pub fn is_terminated(&self) -> bool {
1065        self.is_session_terminated
1066    }
1067
1068    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1069        let (tx, mut rx) = mpsc::unbounded();
1070
1071        cx.spawn(async move |this, cx| {
1072            while let Some(output) = rx.next().await {
1073                this.update(cx, |this, _| {
1074                    let event = dap::OutputEvent {
1075                        category: None,
1076                        output,
1077                        group: None,
1078                        variables_reference: None,
1079                        source: None,
1080                        line: None,
1081                        column: None,
1082                        data: None,
1083                        location_reference: None,
1084                    };
1085                    this.push_output(event);
1086                })?;
1087            }
1088            anyhow::Ok(())
1089        })
1090        .detach();
1091
1092        return tx;
1093    }
1094
1095    pub fn is_started(&self) -> bool {
1096        match &self.mode {
1097            SessionState::Building(_) => false,
1098            SessionState::Running(running) => running.is_started,
1099        }
1100    }
1101
1102    pub fn is_building(&self) -> bool {
1103        matches!(self.mode, SessionState::Building(_))
1104    }
1105
1106    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1107        match &mut self.mode {
1108            SessionState::Running(local_mode) => Some(local_mode),
1109            SessionState::Building(_) => None,
1110        }
1111    }
1112
1113    pub fn as_running(&self) -> Option<&RunningMode> {
1114        match &self.mode {
1115            SessionState::Running(local_mode) => Some(local_mode),
1116            SessionState::Building(_) => None,
1117        }
1118    }
1119
1120    fn handle_start_debugging_request(
1121        &mut self,
1122        request: dap::messages::Request,
1123        cx: &mut Context<Self>,
1124    ) -> Task<Result<()>> {
1125        let request_seq = request.seq;
1126
1127        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1128            .arguments
1129            .as_ref()
1130            .map(|value| serde_json::from_value(value.clone()));
1131
1132        let mut success = true;
1133        if let Some(Ok(request)) = launch_request {
1134            cx.emit(SessionStateEvent::SpawnChildSession { request });
1135        } else {
1136            log::error!(
1137                "Failed to parse launch request arguments: {:?}",
1138                request.arguments
1139            );
1140            success = false;
1141        }
1142
1143        cx.spawn(async move |this, cx| {
1144            this.update(cx, |this, cx| {
1145                this.respond_to_client(
1146                    request_seq,
1147                    success,
1148                    StartDebugging::COMMAND.to_string(),
1149                    None,
1150                    cx,
1151                )
1152            })?
1153            .await
1154        })
1155    }
1156
1157    fn handle_run_in_terminal_request(
1158        &mut self,
1159        request: dap::messages::Request,
1160        cx: &mut Context<Self>,
1161    ) -> Task<Result<()>> {
1162        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1163            request.arguments.unwrap_or_default(),
1164        ) {
1165            Ok(args) => args,
1166            Err(error) => {
1167                return cx.spawn(async move |session, cx| {
1168                    let error = serde_json::to_value(dap::ErrorResponse {
1169                        error: Some(dap::Message {
1170                            id: request.seq,
1171                            format: error.to_string(),
1172                            variables: None,
1173                            send_telemetry: None,
1174                            show_user: None,
1175                            url: None,
1176                            url_label: None,
1177                        }),
1178                    })
1179                    .ok();
1180
1181                    session
1182                        .update(cx, |this, cx| {
1183                            this.respond_to_client(
1184                                request.seq,
1185                                false,
1186                                StartDebugging::COMMAND.to_string(),
1187                                error,
1188                                cx,
1189                            )
1190                        })?
1191                        .await?;
1192
1193                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1194                });
1195            }
1196        };
1197
1198        let seq = request.seq;
1199
1200        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1201        cx.emit(SessionEvent::RunInTerminal {
1202            request: request_args,
1203            sender: tx,
1204        });
1205        cx.notify();
1206
1207        cx.spawn(async move |session, cx| {
1208            let result = util::maybe!(async move {
1209                rx.next().await.ok_or_else(|| {
1210                    anyhow!("failed to receive response from spawn terminal".to_string())
1211                })?
1212            })
1213            .await;
1214            let (success, body) = match result {
1215                Ok(pid) => (
1216                    true,
1217                    serde_json::to_value(dap::RunInTerminalResponse {
1218                        process_id: None,
1219                        shell_process_id: Some(pid as u64),
1220                    })
1221                    .ok(),
1222                ),
1223                Err(error) => (
1224                    false,
1225                    serde_json::to_value(dap::ErrorResponse {
1226                        error: Some(dap::Message {
1227                            id: seq,
1228                            format: error.to_string(),
1229                            variables: None,
1230                            send_telemetry: None,
1231                            show_user: None,
1232                            url: None,
1233                            url_label: None,
1234                        }),
1235                    })
1236                    .ok(),
1237                ),
1238            };
1239
1240            session
1241                .update(cx, |session, cx| {
1242                    session.respond_to_client(
1243                        seq,
1244                        success,
1245                        RunInTerminal::COMMAND.to_string(),
1246                        body,
1247                        cx,
1248                    )
1249                })?
1250                .await
1251        })
1252    }
1253
1254    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1255        let adapter_id = self.adapter().to_string();
1256        let request = Initialize { adapter_id };
1257
1258        let SessionState::Running(running) = &self.mode else {
1259            return Task::ready(Err(anyhow!(
1260                "Cannot send initialize request, task still building"
1261            )));
1262        };
1263        let mut response = running.request(request.clone());
1264
1265        cx.spawn(async move |this, cx| {
1266            loop {
1267                let capabilities = response.await;
1268                match capabilities {
1269                    Err(e) => {
1270                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1271                            this.as_running()
1272                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1273                        }) else {
1274                            return Err(e);
1275                        };
1276                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1277                        reconnect.await?;
1278
1279                        let Ok(Some(r)) = this.update(cx, |this, _| {
1280                            this.as_running()
1281                                .map(|running| running.request(request.clone()))
1282                        }) else {
1283                            return Err(e);
1284                        };
1285                        response = r
1286                    }
1287                    Ok(capabilities) => {
1288                        this.update(cx, |session, cx| {
1289                            session.capabilities = capabilities;
1290
1291                            cx.emit(SessionEvent::CapabilitiesLoaded);
1292                        })?;
1293                        return Ok(());
1294                    }
1295                }
1296            }
1297        })
1298    }
1299
1300    pub(super) fn initialize_sequence(
1301        &mut self,
1302        initialize_rx: oneshot::Receiver<()>,
1303        dap_store: WeakEntity<DapStore>,
1304        cx: &mut Context<Self>,
1305    ) -> Task<Result<()>> {
1306        match &self.mode {
1307            SessionState::Running(local_mode) => {
1308                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1309            }
1310            SessionState::Building(_) => {
1311                Task::ready(Err(anyhow!("cannot initialize, still building")))
1312            }
1313        }
1314    }
1315
1316    pub fn run_to_position(
1317        &mut self,
1318        breakpoint: SourceBreakpoint,
1319        active_thread_id: ThreadId,
1320        cx: &mut Context<Self>,
1321    ) {
1322        match &mut self.mode {
1323            SessionState::Running(local_mode) => {
1324                if !matches!(
1325                    self.thread_states.thread_state(active_thread_id),
1326                    Some(ThreadStatus::Stopped)
1327                ) {
1328                    return;
1329                };
1330                let path = breakpoint.path.clone();
1331                local_mode.tmp_breakpoint = Some(breakpoint);
1332                let task = local_mode.send_breakpoints_from_path(
1333                    path,
1334                    BreakpointUpdatedReason::Toggled,
1335                    &self.breakpoint_store,
1336                    cx,
1337                );
1338
1339                cx.spawn(async move |this, cx| {
1340                    task.await;
1341                    this.update(cx, |this, cx| {
1342                        this.continue_thread(active_thread_id, cx);
1343                    })
1344                })
1345                .detach();
1346            }
1347            SessionState::Building(_) => {}
1348        }
1349    }
1350
1351    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1352        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1353    }
1354
1355    pub fn output(
1356        &self,
1357        since: OutputToken,
1358    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1359        if self.output_token.0 == 0 {
1360            return (self.output.range(0..0), OutputToken(0));
1361        };
1362
1363        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1364
1365        let clamped_events_since = events_since.clamp(0, self.output.len());
1366        (
1367            self.output
1368                .range(self.output.len() - clamped_events_since..),
1369            self.output_token,
1370        )
1371    }
1372
1373    pub fn respond_to_client(
1374        &self,
1375        request_seq: u64,
1376        success: bool,
1377        command: String,
1378        body: Option<serde_json::Value>,
1379        cx: &mut Context<Self>,
1380    ) -> Task<Result<()>> {
1381        let Some(local_session) = self.as_running() else {
1382            unreachable!("Cannot respond to remote client");
1383        };
1384        let client = local_session.client.clone();
1385
1386        cx.background_spawn(async move {
1387            client
1388                .send_message(Message::Response(Response {
1389                    body,
1390                    success,
1391                    command,
1392                    seq: request_seq + 1,
1393                    request_seq,
1394                    message: None,
1395                }))
1396                .await
1397        })
1398    }
1399
1400    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1401        self.mode.stopped();
1402        // todo(debugger): Find a clean way to get around the clone
1403        let breakpoint_store = self.breakpoint_store.clone();
1404        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1405            let breakpoint = local.tmp_breakpoint.take()?;
1406            let path = breakpoint.path.clone();
1407            Some((local, path))
1408        }) {
1409            local
1410                .send_breakpoints_from_path(
1411                    path,
1412                    BreakpointUpdatedReason::Toggled,
1413                    &breakpoint_store,
1414                    cx,
1415                )
1416                .detach();
1417        };
1418
1419        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1420            self.thread_states.stop_all_threads();
1421            self.invalidate_command_type::<StackTraceCommand>();
1422        }
1423
1424        // Event if we stopped all threads we still need to insert the thread_id
1425        // to our own data
1426        if let Some(thread_id) = event.thread_id {
1427            self.thread_states.stop_thread(ThreadId(thread_id));
1428
1429            self.invalidate_state(
1430                &StackTraceCommand {
1431                    thread_id,
1432                    start_frame: None,
1433                    levels: None,
1434                }
1435                .into(),
1436            );
1437        }
1438
1439        self.invalidate_generic();
1440        self.threads.clear();
1441        self.variables.clear();
1442        cx.emit(SessionEvent::Stopped(
1443            event
1444                .thread_id
1445                .map(Into::into)
1446                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1447        ));
1448        cx.emit(SessionEvent::InvalidateInlineValue);
1449        cx.notify();
1450    }
1451
1452    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1453        match *event {
1454            Events::Initialized(_) => {
1455                debug_assert!(
1456                    false,
1457                    "Initialized event should have been handled in LocalMode"
1458                );
1459            }
1460            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1461            Events::Continued(event) => {
1462                if event.all_threads_continued.unwrap_or_default() {
1463                    self.thread_states.continue_all_threads();
1464                    self.breakpoint_store.update(cx, |store, cx| {
1465                        store.remove_active_position(Some(self.session_id()), cx)
1466                    });
1467                } else {
1468                    self.thread_states
1469                        .continue_thread(ThreadId(event.thread_id));
1470                }
1471                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1472                self.invalidate_generic();
1473            }
1474            Events::Exited(_event) => {
1475                self.clear_active_debug_line(cx);
1476            }
1477            Events::Terminated(_) => {
1478                self.shutdown(cx).detach();
1479            }
1480            Events::Thread(event) => {
1481                let thread_id = ThreadId(event.thread_id);
1482
1483                match event.reason {
1484                    dap::ThreadEventReason::Started => {
1485                        self.thread_states.continue_thread(thread_id);
1486                    }
1487                    dap::ThreadEventReason::Exited => {
1488                        self.thread_states.exit_thread(thread_id);
1489                    }
1490                    reason => {
1491                        log::error!("Unhandled thread event reason {:?}", reason);
1492                    }
1493                }
1494                self.invalidate_state(&ThreadsCommand.into());
1495                cx.notify();
1496            }
1497            Events::Output(event) => {
1498                if event
1499                    .category
1500                    .as_ref()
1501                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1502                {
1503                    return;
1504                }
1505
1506                self.push_output(event);
1507                cx.notify();
1508            }
1509            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1510                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1511            }),
1512            Events::Module(event) => {
1513                match event.reason {
1514                    dap::ModuleEventReason::New => {
1515                        self.modules.push(event.module);
1516                    }
1517                    dap::ModuleEventReason::Changed => {
1518                        if let Some(module) = self
1519                            .modules
1520                            .iter_mut()
1521                            .find(|other| event.module.id == other.id)
1522                        {
1523                            *module = event.module;
1524                        }
1525                    }
1526                    dap::ModuleEventReason::Removed => {
1527                        self.modules.retain(|other| event.module.id != other.id);
1528                    }
1529                }
1530
1531                // todo(debugger): We should only send the invalidate command to downstream clients.
1532                // self.invalidate_state(&ModulesCommand.into());
1533            }
1534            Events::LoadedSource(_) => {
1535                self.invalidate_state(&LoadedSourcesCommand.into());
1536            }
1537            Events::Capabilities(event) => {
1538                self.capabilities = self.capabilities.merge(event.capabilities);
1539
1540                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1541                let recent_filters = self
1542                    .capabilities
1543                    .exception_breakpoint_filters
1544                    .iter()
1545                    .flatten()
1546                    .map(|filter| (filter.filter.clone(), filter.clone()))
1547                    .collect::<BTreeMap<_, _>>();
1548                for filter in recent_filters.values() {
1549                    let default = filter.default.unwrap_or_default();
1550                    self.exception_breakpoints
1551                        .entry(filter.filter.clone())
1552                        .or_insert_with(|| (filter.clone(), default));
1553                }
1554                self.exception_breakpoints
1555                    .retain(|k, _| recent_filters.contains_key(k));
1556                if self.is_started() {
1557                    self.send_exception_breakpoints(cx);
1558                }
1559
1560                // Remove the ones that no longer exist.
1561                cx.notify();
1562            }
1563            Events::Memory(_) => {}
1564            Events::Process(_) => {}
1565            Events::ProgressEnd(_) => {}
1566            Events::ProgressStart(_) => {}
1567            Events::ProgressUpdate(_) => {}
1568            Events::Invalidated(_) => {}
1569            Events::Other(_) => {}
1570        }
1571    }
1572
1573    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1574    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1575        &mut self,
1576        request: T,
1577        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1578        cx: &mut Context<Self>,
1579    ) {
1580        const {
1581            assert!(
1582                T::CACHEABLE,
1583                "Only requests marked as cacheable should invoke `fetch`"
1584            );
1585        }
1586
1587        if !self.thread_states.any_stopped_thread()
1588            && request.type_id() != TypeId::of::<ThreadsCommand>()
1589            || self.is_session_terminated
1590        {
1591            return;
1592        }
1593
1594        let request_map = self
1595            .requests
1596            .entry(std::any::TypeId::of::<T>())
1597            .or_default();
1598
1599        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1600            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1601
1602            let task = Self::request_inner::<Arc<T>>(
1603                &self.capabilities,
1604                &self.mode,
1605                command,
1606                |this, result, cx| {
1607                    process_result(this, result, cx);
1608                    None
1609                },
1610                cx,
1611            );
1612            let task = cx
1613                .background_executor()
1614                .spawn(async move {
1615                    let _ = task.await?;
1616                    Some(())
1617                })
1618                .shared();
1619
1620            vacant.insert(task);
1621            cx.notify();
1622        }
1623    }
1624
1625    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1626        capabilities: &Capabilities,
1627        mode: &SessionState,
1628        request: T,
1629        process_result: impl FnOnce(
1630            &mut Self,
1631            Result<T::Response>,
1632            &mut Context<Self>,
1633        ) -> Option<T::Response>
1634        + 'static,
1635        cx: &mut Context<Self>,
1636    ) -> Task<Option<T::Response>> {
1637        if !T::is_supported(&capabilities) {
1638            log::warn!(
1639                "Attempted to send a DAP request that isn't supported: {:?}",
1640                request
1641            );
1642            let error = Err(anyhow::Error::msg(
1643                "Couldn't complete request because it's not supported",
1644            ));
1645            return cx.spawn(async move |this, cx| {
1646                this.update(cx, |this, cx| process_result(this, error, cx))
1647                    .ok()
1648                    .flatten()
1649            });
1650        }
1651
1652        let request = mode.request_dap(request);
1653        cx.spawn(async move |this, cx| {
1654            let result = request.await;
1655            this.update(cx, |this, cx| process_result(this, result, cx))
1656                .ok()
1657                .flatten()
1658        })
1659    }
1660
1661    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1662        &self,
1663        request: T,
1664        process_result: impl FnOnce(
1665            &mut Self,
1666            Result<T::Response>,
1667            &mut Context<Self>,
1668        ) -> Option<T::Response>
1669        + 'static,
1670        cx: &mut Context<Self>,
1671    ) -> Task<Option<T::Response>> {
1672        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1673    }
1674
1675    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1676        self.requests.remove(&std::any::TypeId::of::<Command>());
1677    }
1678
1679    fn invalidate_generic(&mut self) {
1680        self.invalidate_command_type::<ModulesCommand>();
1681        self.invalidate_command_type::<LoadedSourcesCommand>();
1682        self.invalidate_command_type::<ThreadsCommand>();
1683        self.invalidate_command_type::<DataBreakpointInfoCommand>();
1684        self.invalidate_command_type::<ReadMemory>();
1685        let executor = self.as_running().map(|running| running.executor.clone());
1686        if let Some(executor) = executor {
1687            self.memory.clear(&executor);
1688        }
1689    }
1690
1691    fn invalidate_state(&mut self, key: &RequestSlot) {
1692        self.requests
1693            .entry((&*key.0 as &dyn Any).type_id())
1694            .and_modify(|request_map| {
1695                request_map.remove(&key);
1696            });
1697    }
1698
1699    fn push_output(&mut self, event: OutputEvent) {
1700        self.output.push_back(event);
1701        self.output_token.0 += 1;
1702    }
1703
1704    pub fn any_stopped_thread(&self) -> bool {
1705        self.thread_states.any_stopped_thread()
1706    }
1707
1708    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1709        self.thread_states.thread_status(thread_id)
1710    }
1711
1712    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1713        self.fetch(
1714            dap_command::ThreadsCommand,
1715            |this, result, cx| {
1716                let Some(result) = result.log_err() else {
1717                    return;
1718                };
1719
1720                this.threads = result
1721                    .into_iter()
1722                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1723                    .collect();
1724
1725                this.invalidate_command_type::<StackTraceCommand>();
1726                cx.emit(SessionEvent::Threads);
1727                cx.notify();
1728            },
1729            cx,
1730        );
1731
1732        self.threads
1733            .values()
1734            .map(|thread| {
1735                (
1736                    thread.dap.clone(),
1737                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1738                )
1739            })
1740            .collect()
1741    }
1742
1743    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1744        self.fetch(
1745            dap_command::ModulesCommand,
1746            |this, result, cx| {
1747                let Some(result) = result.log_err() else {
1748                    return;
1749                };
1750
1751                this.modules = result;
1752                cx.emit(SessionEvent::Modules);
1753                cx.notify();
1754            },
1755            cx,
1756        );
1757
1758        &self.modules
1759    }
1760
1761    // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1762    pub fn data_access_size(
1763        &mut self,
1764        frame_id: Option<u64>,
1765        evaluate_name: &str,
1766        cx: &mut Context<Self>,
1767    ) -> Task<Option<u64>> {
1768        let request = self.request(
1769            EvaluateCommand {
1770                expression: format!("?${{sizeof({evaluate_name})}}"),
1771                frame_id,
1772
1773                context: Some(EvaluateArgumentsContext::Repl),
1774                source: None,
1775            },
1776            |_, response, _| response.ok(),
1777            cx,
1778        );
1779        cx.background_spawn(async move {
1780            let result = request.await?;
1781            result.result.parse().ok()
1782        })
1783    }
1784
1785    pub fn memory_reference_of_expr(
1786        &mut self,
1787        frame_id: Option<u64>,
1788        expression: String,
1789        cx: &mut Context<Self>,
1790    ) -> Task<Option<(String, Option<String>)>> {
1791        let request = self.request(
1792            EvaluateCommand {
1793                expression,
1794                frame_id,
1795
1796                context: Some(EvaluateArgumentsContext::Repl),
1797                source: None,
1798            },
1799            |_, response, _| response.ok(),
1800            cx,
1801        );
1802        cx.background_spawn(async move {
1803            let result = request.await?;
1804            result
1805                .memory_reference
1806                .map(|reference| (reference, result.type_))
1807        })
1808    }
1809
1810    pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1811        let data = base64::engine::general_purpose::STANDARD.encode(data);
1812        self.request(
1813            WriteMemoryArguments {
1814                memory_reference: address.to_string(),
1815                data,
1816                allow_partial: None,
1817                offset: None,
1818            },
1819            |this, response, cx| {
1820                this.memory.clear(cx.background_executor());
1821                this.invalidate_command_type::<ReadMemory>();
1822                this.invalidate_command_type::<VariablesCommand>();
1823                cx.emit(SessionEvent::Variables);
1824                response.ok()
1825            },
1826            cx,
1827        )
1828        .detach();
1829    }
1830    pub fn read_memory(
1831        &mut self,
1832        range: RangeInclusive<u64>,
1833        cx: &mut Context<Self>,
1834    ) -> MemoryIterator {
1835        // This function is a bit more involved when it comes to fetching data.
1836        // Since we attempt to read memory in pages, we need to account for some parts
1837        // of memory being unreadable. Therefore, we start off by fetching a page per request.
1838        // In case that fails, we try to re-fetch smaller regions until we have the full range.
1839        let page_range = Memory::memory_range_to_page_range(range.clone());
1840        for page_address in PageAddress::iter_range(page_range) {
1841            self.read_single_page_memory(page_address, cx);
1842        }
1843        self.memory.memory_range(range)
1844    }
1845
1846    fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1847        _ = maybe!({
1848            let builder = self.memory.build_page(page_start)?;
1849
1850            self.memory_read_fetch_page_recursive(builder, cx);
1851            Some(())
1852        });
1853    }
1854    fn memory_read_fetch_page_recursive(
1855        &mut self,
1856        mut builder: MemoryPageBuilder,
1857        cx: &mut Context<Self>,
1858    ) {
1859        let Some(next_request) = builder.next_request() else {
1860            // We're done fetching. Let's grab the page and insert it into our memory store.
1861            let (address, contents) = builder.build();
1862            self.memory.insert_page(address, contents);
1863
1864            return;
1865        };
1866        let size = next_request.size;
1867        self.fetch(
1868            ReadMemory {
1869                memory_reference: format!("0x{:X}", next_request.address),
1870                offset: Some(0),
1871                count: next_request.size,
1872            },
1873            move |this, memory, cx| {
1874                if let Ok(memory) = memory {
1875                    builder.known(memory.content);
1876                    if let Some(unknown) = memory.unreadable_bytes {
1877                        builder.unknown(unknown);
1878                    }
1879                    // This is the recursive bit: if we're not yet done with
1880                    // the whole page, we'll kick off a new request with smaller range.
1881                    // Note that this function is recursive only conceptually;
1882                    // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1883                    this.memory_read_fetch_page_recursive(builder, cx);
1884                } else {
1885                    builder.unknown(size);
1886                }
1887            },
1888            cx,
1889        );
1890    }
1891
1892    pub fn ignore_breakpoints(&self) -> bool {
1893        self.ignore_breakpoints
1894    }
1895
1896    pub fn toggle_ignore_breakpoints(
1897        &mut self,
1898        cx: &mut App,
1899    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1900        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1901    }
1902
1903    pub(crate) fn set_ignore_breakpoints(
1904        &mut self,
1905        ignore: bool,
1906        cx: &mut App,
1907    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1908        if self.ignore_breakpoints == ignore {
1909            return Task::ready(HashMap::default());
1910        }
1911
1912        self.ignore_breakpoints = ignore;
1913
1914        if let Some(local) = self.as_running() {
1915            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1916        } else {
1917            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1918            unimplemented!()
1919        }
1920    }
1921
1922    pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
1923        self.data_breakpoints.values()
1924    }
1925
1926    pub fn exception_breakpoints(
1927        &self,
1928    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1929        self.exception_breakpoints.values()
1930    }
1931
1932    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1933        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1934            *is_enabled = !*is_enabled;
1935            self.send_exception_breakpoints(cx);
1936        }
1937    }
1938
1939    fn send_exception_breakpoints(&mut self, cx: &App) {
1940        if let Some(local) = self.as_running() {
1941            let exception_filters = self
1942                .exception_breakpoints
1943                .values()
1944                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1945                .collect();
1946
1947            let supports_exception_filters = self
1948                .capabilities
1949                .supports_exception_filter_options
1950                .unwrap_or_default();
1951            local
1952                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1953                .detach_and_log_err(cx);
1954        } else {
1955            debug_assert!(false, "Not implemented");
1956        }
1957    }
1958
1959    pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
1960        if let Some(state) = self.data_breakpoints.get_mut(id) {
1961            state.is_enabled = !state.is_enabled;
1962            self.send_exception_breakpoints(cx);
1963        }
1964    }
1965
1966    fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
1967        if let Some(mode) = self.as_running() {
1968            let breakpoints = self
1969                .data_breakpoints
1970                .values()
1971                .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
1972                .collect();
1973            let command = SetDataBreakpointsCommand { breakpoints };
1974            mode.request(command).detach_and_log_err(cx);
1975        }
1976    }
1977
1978    pub fn create_data_breakpoint(
1979        &mut self,
1980        context: Arc<DataBreakpointContext>,
1981        data_id: String,
1982        dap: dap::DataBreakpoint,
1983        cx: &mut Context<Self>,
1984    ) {
1985        if self.data_breakpoints.remove(&data_id).is_none() {
1986            self.data_breakpoints.insert(
1987                data_id,
1988                DataBreakpointState {
1989                    dap,
1990                    is_enabled: true,
1991                    context,
1992                },
1993            );
1994        }
1995        self.send_data_breakpoints(cx);
1996    }
1997
1998    pub fn breakpoints_enabled(&self) -> bool {
1999        self.ignore_breakpoints
2000    }
2001
2002    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
2003        self.fetch(
2004            dap_command::LoadedSourcesCommand,
2005            |this, result, cx| {
2006                let Some(result) = result.log_err() else {
2007                    return;
2008                };
2009                this.loaded_sources = result;
2010                cx.emit(SessionEvent::LoadedSources);
2011                cx.notify();
2012            },
2013            cx,
2014        );
2015
2016        &self.loaded_sources
2017    }
2018
2019    fn fallback_to_manual_restart(
2020        &mut self,
2021        res: Result<()>,
2022        cx: &mut Context<Self>,
2023    ) -> Option<()> {
2024        if res.log_err().is_none() {
2025            cx.emit(SessionStateEvent::Restart);
2026            return None;
2027        }
2028        Some(())
2029    }
2030
2031    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2032        res.log_err()?;
2033        Some(())
2034    }
2035
2036    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2037        thread_id: ThreadId,
2038    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2039    {
2040        move |this, response, cx| match response.log_err() {
2041            Some(response) => {
2042                this.breakpoint_store.update(cx, |store, cx| {
2043                    store.remove_active_position(Some(this.session_id()), cx)
2044                });
2045                Some(response)
2046            }
2047            None => {
2048                this.thread_states.stop_thread(thread_id);
2049                cx.notify();
2050                None
2051            }
2052        }
2053    }
2054
2055    fn clear_active_debug_line_response(
2056        &mut self,
2057        response: Result<()>,
2058        cx: &mut Context<Session>,
2059    ) -> Option<()> {
2060        response.log_err()?;
2061        self.clear_active_debug_line(cx);
2062        Some(())
2063    }
2064
2065    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2066        self.breakpoint_store.update(cx, |store, cx| {
2067            store.remove_active_position(Some(self.id), cx)
2068        });
2069    }
2070
2071    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2072        self.request(
2073            PauseCommand {
2074                thread_id: thread_id.0,
2075            },
2076            Self::empty_response,
2077            cx,
2078        )
2079        .detach();
2080    }
2081
2082    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2083        self.request(
2084            RestartStackFrameCommand { stack_frame_id },
2085            Self::empty_response,
2086            cx,
2087        )
2088        .detach();
2089    }
2090
2091    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2092        if self.restart_task.is_some() || self.as_running().is_none() {
2093            return;
2094        }
2095
2096        let supports_dap_restart =
2097            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2098
2099        self.restart_task = Some(cx.spawn(async move |this, cx| {
2100            let _ = this.update(cx, |session, cx| {
2101                if supports_dap_restart {
2102                    session
2103                        .request(
2104                            RestartCommand {
2105                                raw: args.unwrap_or(Value::Null),
2106                            },
2107                            Self::fallback_to_manual_restart,
2108                            cx,
2109                        )
2110                        .detach();
2111                } else {
2112                    cx.emit(SessionStateEvent::Restart);
2113                }
2114            });
2115        }));
2116    }
2117
2118    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2119        if self.is_session_terminated {
2120            return Task::ready(());
2121        }
2122
2123        self.is_session_terminated = true;
2124        self.thread_states.exit_all_threads();
2125        cx.notify();
2126
2127        let task = match &mut self.mode {
2128            SessionState::Running(_) => {
2129                if self
2130                    .capabilities
2131                    .supports_terminate_request
2132                    .unwrap_or_default()
2133                {
2134                    self.request(
2135                        TerminateCommand {
2136                            restart: Some(false),
2137                        },
2138                        Self::clear_active_debug_line_response,
2139                        cx,
2140                    )
2141                } else {
2142                    self.request(
2143                        DisconnectCommand {
2144                            restart: Some(false),
2145                            terminate_debuggee: Some(true),
2146                            suspend_debuggee: Some(false),
2147                        },
2148                        Self::clear_active_debug_line_response,
2149                        cx,
2150                    )
2151                }
2152            }
2153            SessionState::Building(build_task) => {
2154                build_task.take();
2155                Task::ready(Some(()))
2156            }
2157        };
2158
2159        cx.emit(SessionStateEvent::Shutdown);
2160
2161        cx.spawn(async move |this, cx| {
2162            task.await;
2163            let _ = this.update(cx, |this, _| {
2164                if let Some(adapter_client) = this.adapter_client() {
2165                    adapter_client.kill();
2166                }
2167            });
2168        })
2169    }
2170
2171    pub fn completions(
2172        &mut self,
2173        query: CompletionsQuery,
2174        cx: &mut Context<Self>,
2175    ) -> Task<Result<Vec<dap::CompletionItem>>> {
2176        let task = self.request(query, |_, result, _| result.log_err(), cx);
2177
2178        cx.background_executor().spawn(async move {
2179            anyhow::Ok(
2180                task.await
2181                    .map(|response| response.targets)
2182                    .context("failed to fetch completions")?,
2183            )
2184        })
2185    }
2186
2187    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2188        let supports_single_thread_execution_requests =
2189            self.capabilities.supports_single_thread_execution_requests;
2190        self.thread_states.continue_thread(thread_id);
2191        self.request(
2192            ContinueCommand {
2193                args: ContinueArguments {
2194                    thread_id: thread_id.0,
2195                    single_thread: supports_single_thread_execution_requests,
2196                },
2197            },
2198            Self::on_step_response::<ContinueCommand>(thread_id),
2199            cx,
2200        )
2201        .detach();
2202    }
2203
2204    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2205        match self.mode {
2206            SessionState::Running(ref local) => Some(local.client.clone()),
2207            SessionState::Building(_) => None,
2208        }
2209    }
2210
2211    pub fn has_ever_stopped(&self) -> bool {
2212        self.mode.has_ever_stopped()
2213    }
2214    pub fn step_over(
2215        &mut self,
2216        thread_id: ThreadId,
2217        granularity: SteppingGranularity,
2218        cx: &mut Context<Self>,
2219    ) {
2220        let supports_single_thread_execution_requests =
2221            self.capabilities.supports_single_thread_execution_requests;
2222        let supports_stepping_granularity = self
2223            .capabilities
2224            .supports_stepping_granularity
2225            .unwrap_or_default();
2226
2227        let command = NextCommand {
2228            inner: StepCommand {
2229                thread_id: thread_id.0,
2230                granularity: supports_stepping_granularity.then(|| granularity),
2231                single_thread: supports_single_thread_execution_requests,
2232            },
2233        };
2234
2235        self.thread_states.process_step(thread_id);
2236        self.request(
2237            command,
2238            Self::on_step_response::<NextCommand>(thread_id),
2239            cx,
2240        )
2241        .detach();
2242    }
2243
2244    pub fn step_in(
2245        &mut self,
2246        thread_id: ThreadId,
2247        granularity: SteppingGranularity,
2248        cx: &mut Context<Self>,
2249    ) {
2250        let supports_single_thread_execution_requests =
2251            self.capabilities.supports_single_thread_execution_requests;
2252        let supports_stepping_granularity = self
2253            .capabilities
2254            .supports_stepping_granularity
2255            .unwrap_or_default();
2256
2257        let command = StepInCommand {
2258            inner: StepCommand {
2259                thread_id: thread_id.0,
2260                granularity: supports_stepping_granularity.then(|| granularity),
2261                single_thread: supports_single_thread_execution_requests,
2262            },
2263        };
2264
2265        self.thread_states.process_step(thread_id);
2266        self.request(
2267            command,
2268            Self::on_step_response::<StepInCommand>(thread_id),
2269            cx,
2270        )
2271        .detach();
2272    }
2273
2274    pub fn step_out(
2275        &mut self,
2276        thread_id: ThreadId,
2277        granularity: SteppingGranularity,
2278        cx: &mut Context<Self>,
2279    ) {
2280        let supports_single_thread_execution_requests =
2281            self.capabilities.supports_single_thread_execution_requests;
2282        let supports_stepping_granularity = self
2283            .capabilities
2284            .supports_stepping_granularity
2285            .unwrap_or_default();
2286
2287        let command = StepOutCommand {
2288            inner: StepCommand {
2289                thread_id: thread_id.0,
2290                granularity: supports_stepping_granularity.then(|| granularity),
2291                single_thread: supports_single_thread_execution_requests,
2292            },
2293        };
2294
2295        self.thread_states.process_step(thread_id);
2296        self.request(
2297            command,
2298            Self::on_step_response::<StepOutCommand>(thread_id),
2299            cx,
2300        )
2301        .detach();
2302    }
2303
2304    pub fn step_back(
2305        &mut self,
2306        thread_id: ThreadId,
2307        granularity: SteppingGranularity,
2308        cx: &mut Context<Self>,
2309    ) {
2310        let supports_single_thread_execution_requests =
2311            self.capabilities.supports_single_thread_execution_requests;
2312        let supports_stepping_granularity = self
2313            .capabilities
2314            .supports_stepping_granularity
2315            .unwrap_or_default();
2316
2317        let command = StepBackCommand {
2318            inner: StepCommand {
2319                thread_id: thread_id.0,
2320                granularity: supports_stepping_granularity.then(|| granularity),
2321                single_thread: supports_single_thread_execution_requests,
2322            },
2323        };
2324
2325        self.thread_states.process_step(thread_id);
2326
2327        self.request(
2328            command,
2329            Self::on_step_response::<StepBackCommand>(thread_id),
2330            cx,
2331        )
2332        .detach();
2333    }
2334
2335    pub fn stack_frames(
2336        &mut self,
2337        thread_id: ThreadId,
2338        cx: &mut Context<Self>,
2339    ) -> Result<Vec<StackFrame>> {
2340        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2341            && self.requests.contains_key(&ThreadsCommand.type_id())
2342            && self.threads.contains_key(&thread_id)
2343        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2344        // We could still be using an old thread id and have sent a new thread's request
2345        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2346        // But it very well could cause a minor bug in the future that is hard to track down
2347        {
2348            self.fetch(
2349                super::dap_command::StackTraceCommand {
2350                    thread_id: thread_id.0,
2351                    start_frame: None,
2352                    levels: None,
2353                },
2354                move |this, stack_frames, cx| {
2355                    let entry =
2356                        this.threads
2357                            .entry(thread_id)
2358                            .and_modify(|thread| match &stack_frames {
2359                                Ok(stack_frames) => {
2360                                    thread.stack_frames = stack_frames
2361                                        .iter()
2362                                        .cloned()
2363                                        .map(StackFrame::from)
2364                                        .collect();
2365                                    thread.stack_frames_error = None;
2366                                }
2367                                Err(error) => {
2368                                    thread.stack_frames.clear();
2369                                    thread.stack_frames_error = Some(error.cloned());
2370                                }
2371                            });
2372                    debug_assert!(
2373                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2374                        "Sent request for thread_id that doesn't exist"
2375                    );
2376                    if let Ok(stack_frames) = stack_frames {
2377                        this.stack_frames.extend(
2378                            stack_frames
2379                                .into_iter()
2380                                .filter(|frame| {
2381                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2382                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2383                                    !(frame.id == 0
2384                                        && frame.line == 0
2385                                        && frame.column == 0
2386                                        && frame.presentation_hint
2387                                            == Some(StackFramePresentationHint::Label))
2388                                })
2389                                .map(|frame| (frame.id, StackFrame::from(frame))),
2390                        );
2391                    }
2392
2393                    this.invalidate_command_type::<ScopesCommand>();
2394                    this.invalidate_command_type::<VariablesCommand>();
2395
2396                    cx.emit(SessionEvent::StackTrace);
2397                },
2398                cx,
2399            );
2400        }
2401
2402        match self.threads.get(&thread_id) {
2403            Some(thread) => {
2404                if let Some(error) = &thread.stack_frames_error {
2405                    Err(error.cloned())
2406                } else {
2407                    Ok(thread.stack_frames.clone())
2408                }
2409            }
2410            None => Ok(Vec::new()),
2411        }
2412    }
2413
2414    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2415        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2416            && self
2417                .requests
2418                .contains_key(&TypeId::of::<StackTraceCommand>())
2419        {
2420            self.fetch(
2421                ScopesCommand { stack_frame_id },
2422                move |this, scopes, cx| {
2423                    let Some(scopes) = scopes.log_err() else {
2424                        return
2425                    };
2426
2427                    for scope in scopes.iter() {
2428                        this.variables(scope.variables_reference, cx);
2429                    }
2430
2431                    let entry = this
2432                        .stack_frames
2433                        .entry(stack_frame_id)
2434                        .and_modify(|stack_frame| {
2435                            stack_frame.scopes = scopes;
2436                        });
2437
2438                    cx.emit(SessionEvent::Variables);
2439
2440                    debug_assert!(
2441                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2442                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2443                    );
2444                },
2445                cx,
2446            );
2447        }
2448
2449        self.stack_frames
2450            .get(&stack_frame_id)
2451            .map(|frame| frame.scopes.as_slice())
2452            .unwrap_or_default()
2453    }
2454
2455    pub fn variables_by_stack_frame_id(
2456        &self,
2457        stack_frame_id: StackFrameId,
2458        globals: bool,
2459        locals: bool,
2460    ) -> Vec<dap::Variable> {
2461        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2462            return Vec::new();
2463        };
2464
2465        stack_frame
2466            .scopes
2467            .iter()
2468            .filter(|scope| {
2469                (scope.name.to_lowercase().contains("local") && locals)
2470                    || (scope.name.to_lowercase().contains("global") && globals)
2471            })
2472            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2473            .flatten()
2474            .cloned()
2475            .collect()
2476    }
2477
2478    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2479        &self.watchers
2480    }
2481
2482    pub fn add_watcher(
2483        &mut self,
2484        expression: SharedString,
2485        frame_id: u64,
2486        cx: &mut Context<Self>,
2487    ) -> Task<Result<()>> {
2488        let request = self.mode.request_dap(EvaluateCommand {
2489            expression: expression.to_string(),
2490            context: Some(EvaluateArgumentsContext::Watch),
2491            frame_id: Some(frame_id),
2492            source: None,
2493        });
2494
2495        cx.spawn(async move |this, cx| {
2496            let response = request.await?;
2497
2498            this.update(cx, |session, cx| {
2499                session.watchers.insert(
2500                    expression.clone(),
2501                    Watcher {
2502                        expression,
2503                        value: response.result.into(),
2504                        variables_reference: response.variables_reference,
2505                        presentation_hint: response.presentation_hint,
2506                    },
2507                );
2508                cx.emit(SessionEvent::Watchers);
2509            })
2510        })
2511    }
2512
2513    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2514        let watches = self.watchers.clone();
2515        for (_, watch) in watches.into_iter() {
2516            self.add_watcher(watch.expression.clone(), frame_id, cx)
2517                .detach();
2518        }
2519    }
2520
2521    pub fn remove_watcher(&mut self, expression: SharedString) {
2522        self.watchers.remove(&expression);
2523    }
2524
2525    pub fn variables(
2526        &mut self,
2527        variables_reference: VariableReference,
2528        cx: &mut Context<Self>,
2529    ) -> Vec<dap::Variable> {
2530        let command = VariablesCommand {
2531            variables_reference,
2532            filter: None,
2533            start: None,
2534            count: None,
2535            format: None,
2536        };
2537
2538        self.fetch(
2539            command,
2540            move |this, variables, cx| {
2541                let Some(variables) = variables.log_err() else {
2542                    return;
2543                };
2544
2545                this.variables.insert(variables_reference, variables);
2546
2547                cx.emit(SessionEvent::Variables);
2548                cx.emit(SessionEvent::InvalidateInlineValue);
2549            },
2550            cx,
2551        );
2552
2553        self.variables
2554            .get(&variables_reference)
2555            .cloned()
2556            .unwrap_or_default()
2557    }
2558
2559    pub fn data_breakpoint_info(
2560        &mut self,
2561        context: Arc<DataBreakpointContext>,
2562        mode: Option<String>,
2563        cx: &mut Context<Self>,
2564    ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2565        let command = DataBreakpointInfoCommand {
2566            context: context.clone(),
2567            mode,
2568        };
2569
2570        self.request(command, |_, response, _| response.ok(), cx)
2571    }
2572
2573    pub fn set_variable_value(
2574        &mut self,
2575        stack_frame_id: u64,
2576        variables_reference: u64,
2577        name: String,
2578        value: String,
2579        cx: &mut Context<Self>,
2580    ) {
2581        if self.capabilities.supports_set_variable.unwrap_or_default() {
2582            self.request(
2583                SetVariableValueCommand {
2584                    name,
2585                    value,
2586                    variables_reference,
2587                },
2588                move |this, response, cx| {
2589                    let response = response.log_err()?;
2590                    this.invalidate_command_type::<VariablesCommand>();
2591                    this.invalidate_command_type::<ReadMemory>();
2592                    this.memory.clear(cx.background_executor());
2593                    this.refresh_watchers(stack_frame_id, cx);
2594                    cx.emit(SessionEvent::Variables);
2595                    Some(response)
2596                },
2597                cx,
2598            )
2599            .detach();
2600        }
2601    }
2602
2603    pub fn evaluate(
2604        &mut self,
2605        expression: String,
2606        context: Option<EvaluateArgumentsContext>,
2607        frame_id: Option<u64>,
2608        source: Option<Source>,
2609        cx: &mut Context<Self>,
2610    ) -> Task<()> {
2611        let event = dap::OutputEvent {
2612            category: None,
2613            output: format!("> {expression}"),
2614            group: None,
2615            variables_reference: None,
2616            source: None,
2617            line: None,
2618            column: None,
2619            data: None,
2620            location_reference: None,
2621        };
2622        self.push_output(event);
2623        let request = self.mode.request_dap(EvaluateCommand {
2624            expression,
2625            context,
2626            frame_id,
2627            source,
2628        });
2629        cx.spawn(async move |this, cx| {
2630            let response = request.await;
2631            this.update(cx, |this, cx| {
2632                this.memory.clear(cx.background_executor());
2633                this.invalidate_command_type::<ReadMemory>();
2634                match response {
2635                    Ok(response) => {
2636                        let event = dap::OutputEvent {
2637                            category: None,
2638                            output: format!("< {}", &response.result),
2639                            group: None,
2640                            variables_reference: Some(response.variables_reference),
2641                            source: None,
2642                            line: None,
2643                            column: None,
2644                            data: None,
2645                            location_reference: None,
2646                        };
2647                        this.push_output(event);
2648                    }
2649                    Err(e) => {
2650                        let event = dap::OutputEvent {
2651                            category: None,
2652                            output: format!("{}", e),
2653                            group: None,
2654                            variables_reference: None,
2655                            source: None,
2656                            line: None,
2657                            column: None,
2658                            data: None,
2659                            location_reference: None,
2660                        };
2661                        this.push_output(event);
2662                    }
2663                };
2664                cx.notify();
2665            })
2666            .ok();
2667        })
2668    }
2669
2670    pub fn location(
2671        &mut self,
2672        reference: u64,
2673        cx: &mut Context<Self>,
2674    ) -> Option<dap::LocationsResponse> {
2675        self.fetch(
2676            LocationsCommand { reference },
2677            move |this, response, _| {
2678                let Some(response) = response.log_err() else {
2679                    return;
2680                };
2681                this.locations.insert(reference, response);
2682            },
2683            cx,
2684        );
2685        self.locations.get(&reference).cloned()
2686    }
2687
2688    pub fn is_attached(&self) -> bool {
2689        let SessionState::Running(local_mode) = &self.mode else {
2690            return false;
2691        };
2692        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2693    }
2694
2695    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2696        let command = DisconnectCommand {
2697            restart: Some(false),
2698            terminate_debuggee: Some(false),
2699            suspend_debuggee: Some(false),
2700        };
2701
2702        self.request(command, Self::empty_response, cx).detach()
2703    }
2704
2705    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2706        if self
2707            .capabilities
2708            .supports_terminate_threads_request
2709            .unwrap_or_default()
2710        {
2711            self.request(
2712                TerminateThreadsCommand {
2713                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2714                },
2715                Self::clear_active_debug_line_response,
2716                cx,
2717            )
2718            .detach();
2719        } else {
2720            self.shutdown(cx).detach();
2721        }
2722    }
2723
2724    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2725        self.thread_states.thread_state(thread_id)
2726    }
2727
2728    pub fn quirks(&self) -> SessionQuirks {
2729        self.quirks
2730    }
2731}