session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{anyhow, Result};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::OutputEventCategory;
  18use dap::{
  19    adapters::{DapDelegate, DapStatus},
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  23    SteppingGranularity, StoppedEvent, VariableReference,
  24};
  25use dap_adapters::build_adapter;
  26use futures::channel::oneshot;
  27use futures::{future::Shared, FutureExt};
  28use gpui::{
  29    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  30};
  31use rpc::AnyProtoClient;
  32use serde_json::{json, Value};
  33use settings::Settings;
  34use smol::stream::StreamExt;
  35use std::any::TypeId;
  36use std::path::PathBuf;
  37use std::u64;
  38use std::{
  39    any::Any,
  40    collections::hash_map::Entry,
  41    hash::{Hash, Hasher},
  42    path::Path,
  43    sync::Arc,
  44};
  45use task::DebugAdapterConfig;
  46use text::{PointUtf16, ToPointUtf16};
  47use util::{merge_json_value_into, ResultExt};
  48
  49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  50#[repr(transparent)]
  51pub struct ThreadId(pub u64);
  52
  53impl ThreadId {
  54    pub const MIN: ThreadId = ThreadId(u64::MIN);
  55    pub const MAX: ThreadId = ThreadId(u64::MAX);
  56}
  57
  58impl From<u64> for ThreadId {
  59    fn from(id: u64) -> Self {
  60        Self(id)
  61    }
  62}
  63
  64#[derive(Clone, Debug)]
  65pub struct StackFrame {
  66    pub dap: dap::StackFrame,
  67    pub scopes: Vec<dap::Scope>,
  68}
  69
  70impl From<dap::StackFrame> for StackFrame {
  71    fn from(stack_frame: dap::StackFrame) -> Self {
  72        Self {
  73            scopes: vec![],
  74            dap: stack_frame,
  75        }
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  80pub enum ThreadStatus {
  81    #[default]
  82    Running,
  83    Stopped,
  84    Stepping,
  85    Exited,
  86    Ended,
  87}
  88
  89impl ThreadStatus {
  90    pub fn label(&self) -> &'static str {
  91        match self {
  92            ThreadStatus::Running => "Running",
  93            ThreadStatus::Stopped => "Stopped",
  94            ThreadStatus::Stepping => "Stepping",
  95            ThreadStatus::Exited => "Exited",
  96            ThreadStatus::Ended => "Ended",
  97        }
  98    }
  99}
 100
 101#[derive(Debug)]
 102pub struct Thread {
 103    dap: dap::Thread,
 104    stack_frame_ids: IndexSet<StackFrameId>,
 105    _has_stopped: bool,
 106}
 107
 108impl From<dap::Thread> for Thread {
 109    fn from(dap: dap::Thread) -> Self {
 110        Self {
 111            dap,
 112            stack_frame_ids: Default::default(),
 113            _has_stopped: false,
 114        }
 115    }
 116}
 117
 118type UpstreamProjectId = u64;
 119
 120struct RemoteConnection {
 121    _client: AnyProtoClient,
 122    _upstream_project_id: UpstreamProjectId,
 123}
 124
 125impl RemoteConnection {
 126    fn send_proto_client_request<R: DapCommand>(
 127        &self,
 128        _request: R,
 129        _session_id: SessionId,
 130        cx: &mut App,
 131    ) -> Task<Result<R::Response>> {
 132        // let message = request.to_proto(session_id, self.upstream_project_id);
 133        // let upstream_client = self.client.clone();
 134        cx.background_executor().spawn(async move {
 135            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 136            // let response = upstream_client.request(message).await?;
 137            // request.response_from_proto(response)
 138            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 139        })
 140    }
 141
 142    fn request<R: DapCommand>(
 143        &self,
 144        request: R,
 145        session_id: SessionId,
 146        cx: &mut App,
 147    ) -> Task<Result<R::Response>>
 148    where
 149        <R::DapRequest as dap::requests::Request>::Response: 'static,
 150        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 151    {
 152        return self.send_proto_client_request::<R>(request, session_id, cx);
 153    }
 154}
 155
 156enum Mode {
 157    Local(LocalMode),
 158    Remote(RemoteConnection),
 159}
 160
 161#[derive(Clone)]
 162pub struct LocalMode {
 163    client: Arc<DebugAdapterClient>,
 164    config: DebugAdapterConfig,
 165    adapter: Arc<dyn DebugAdapter>,
 166    breakpoint_store: Entity<BreakpointStore>,
 167}
 168
 169fn client_source(abs_path: &Path) -> dap::Source {
 170    dap::Source {
 171        name: abs_path
 172            .file_name()
 173            .map(|filename| filename.to_string_lossy().to_string()),
 174        path: Some(abs_path.to_string_lossy().to_string()),
 175        source_reference: None,
 176        presentation_hint: None,
 177        origin: None,
 178        sources: None,
 179        adapter_data: None,
 180        checksums: None,
 181    }
 182}
 183
 184impl LocalMode {
 185    fn new(
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        cx.spawn(async move |cx| {
 195            let (adapter, binary) = Self::get_adapter_binary(&config, &delegate,  cx).await?;
 196
 197            let message_handler = Box::new(move |message| {
 198                messages_tx.unbounded_send(message).ok();
 199            });
 200
 201            let client = Arc::new(
 202                if let Some(client) = parent_session
 203                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 204                    .flatten()
 205                {
 206                    client
 207                        .reconnect(session_id, binary, message_handler, cx.clone())
 208                        .await?
 209                } else {
 210                    DebugAdapterClient::start(
 211                        session_id,
 212                        adapter.name(),
 213                        binary,
 214                        message_handler,
 215                        cx.clone(),
 216                    )
 217                    .await?
 218                },
 219            );
 220
 221            let adapter_id = adapter.name().to_string().to_owned();
 222            let session = Self {
 223                client,
 224                adapter,
 225                breakpoint_store,
 226                config: config.clone(),
 227            };
 228
 229            #[cfg(any(test, feature = "test-support"))]
 230            {
 231                let dap::DebugAdapterKind::Fake((fail, caps)) = session.config.kind.clone() else {
 232                    panic!("Only fake debug adapter configs should be used in tests");
 233                };
 234
 235                session
 236                    .client
 237                    .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 238                    .await;
 239
 240                match config.request.clone() {
 241                    dap::DebugRequestType::Launch if fail => {
 242                        session
 243                            .client
 244                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 245                                Err(dap::ErrorResponse {
 246                                    error: Some(dap::Message {
 247                                        id: 1,
 248                                        format: "error".into(),
 249                                        variables: None,
 250                                        send_telemetry: None,
 251                                        show_user: None,
 252                                        url: None,
 253                                        url_label: None,
 254                                    }),
 255                                })
 256                            })
 257                            .await;
 258                    }
 259                    dap::DebugRequestType::Launch => {
 260                        session
 261                            .client
 262                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 263                            .await;
 264                    }
 265                    dap::DebugRequestType::Attach(_) if fail => {
 266                        session
 267                            .client
 268                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 269                                Err(dap::ErrorResponse {
 270                                    error: Some(dap::Message {
 271                                        id: 1,
 272                                        format: "error".into(),
 273                                        variables: None,
 274                                        send_telemetry: None,
 275                                        show_user: None,
 276                                        url: None,
 277                                        url_label: None,
 278                                    }),
 279                                })
 280                            })
 281                            .await;
 282                    }
 283                    dap::DebugRequestType::Attach(attach_config) => {
 284                        session
 285                            .client
 286                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 287                                assert_eq!(
 288                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 289                                    args.raw
 290                                );
 291
 292                                Ok(())
 293                            })
 294                            .await;
 295                    }
 296                }
 297
 298                session.client.on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(())).await;
 299                session.client.fake_event(Events::Initialized(None)).await;
 300            }
 301
 302            let capabilities = session
 303                .request(Initialize { adapter_id }, cx.background_executor().clone())
 304                .await?;
 305
 306            Ok((session, capabilities))
 307        })
 308    }
 309
 310    fn send_breakpoints_from_path(
 311        &self,
 312        abs_path: Arc<Path>,
 313        reason: BreakpointUpdatedReason,
 314        cx: &mut App,
 315    ) -> Task<()> {
 316        let breakpoints = self
 317            .breakpoint_store
 318            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 319            .into_iter()
 320            .map(Into::into)
 321            .collect();
 322
 323        let task = self.request(
 324            dap_command::SetBreakpoints {
 325                source: client_source(&abs_path),
 326                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 327                breakpoints,
 328            },
 329            cx.background_executor().clone(),
 330        );
 331
 332        cx.background_spawn(async move {
 333            match task.await {
 334                Ok(_) => {}
 335                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 336            }
 337        })
 338    }
 339
 340    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 341        let mut breakpoint_tasks = Vec::new();
 342        let breakpoints = self
 343            .breakpoint_store
 344            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 345
 346        for (path, breakpoints) in breakpoints {
 347            let breakpoints = if ignore_breakpoints {
 348                vec![]
 349            } else {
 350                breakpoints.into_iter().map(Into::into).collect()
 351            };
 352
 353            breakpoint_tasks.push(self.request(
 354                dap_command::SetBreakpoints {
 355                    source: client_source(&path),
 356                    source_modified: Some(false),
 357                    breakpoints,
 358                },
 359                cx.background_executor().clone(),
 360            ));
 361        }
 362
 363        cx.background_spawn(async move {
 364            futures::future::join_all(breakpoint_tasks)
 365                .await
 366                .iter()
 367                .for_each(|res| match res {
 368                    Ok(_) => {}
 369                    Err(err) => {
 370                        log::warn!("Set breakpoints request failed: {}", err);
 371                    }
 372                });
 373        })
 374    }
 375
 376    async fn get_adapter_binary(
 377        config: &DebugAdapterConfig,
 378        delegate: &DapAdapterDelegate,
 379        cx: &mut AsyncApp,
 380    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 381        let adapter = build_adapter(&config.kind).await?;
 382
 383        let binary = cx.update(|cx| {
 384            ProjectSettings::get_global(cx)
 385                .dap
 386                .get(&adapter.name())
 387                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 388        })?;
 389
 390        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 391            Err(error) => {
 392                delegate.update_status(
 393                    adapter.name(),
 394                    DapStatus::Failed {
 395                        error: error.to_string(),
 396                    },
 397                );
 398
 399                return Err(error);
 400            }
 401            Ok(mut binary) => {
 402                delegate.update_status(adapter.name(), DapStatus::None);
 403
 404                let shell_env = delegate.shell_env().await;
 405                let mut envs = binary.envs.unwrap_or_default();
 406                envs.extend(shell_env);
 407                binary.envs = Some(envs);
 408
 409                binary
 410            }
 411        };
 412
 413        Ok((adapter, binary))
 414    }
 415
 416    pub fn initialize_sequence(
 417        &self,
 418        capabilities: &Capabilities,
 419        initialized_rx: oneshot::Receiver<()>,
 420        cx: &App,
 421    ) -> Task<Result<()>> {
 422        let mut raw = self.adapter.request_args(&self.config);
 423        merge_json_value_into(
 424            self.config.initialize_args.clone().unwrap_or(json!({})),
 425            &mut raw,
 426        );
 427
 428        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 429        let launch = match &self.config.request {
 430            dap::DebugRequestType::Launch => {
 431                self.request(Launch { raw }, cx.background_executor().clone())
 432            }
 433            dap::DebugRequestType::Attach(_) => {
 434                self.request(Attach { raw }, cx.background_executor().clone())
 435            }
 436        };
 437
 438        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 439
 440        let configuration_sequence = cx.spawn({
 441            let this = self.clone();
 442            async move |cx| {
 443                initialized_rx.await?;
 444                // todo(debugger) figure out if we want to handle a breakpoint response error
 445                // This will probably consist of letting a user know that breakpoints failed to be set
 446                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 447
 448                if configuration_done_supported {
 449                    this.request(ConfigurationDone, cx.background_executor().clone())
 450                } else {
 451                    Task::ready(Ok(()))
 452                }
 453                .await
 454            }
 455        });
 456
 457        cx.background_spawn(async move {
 458            futures::future::try_join(launch, configuration_sequence).await?;
 459            Ok(())
 460        })
 461    }
 462
 463    fn request<R: LocalDapCommand>(
 464        &self,
 465        request: R,
 466        executor: BackgroundExecutor,
 467    ) -> Task<Result<R::Response>>
 468    where
 469        <R::DapRequest as dap::requests::Request>::Response: 'static,
 470        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 471    {
 472        let request = Arc::new(request);
 473
 474        let request_clone = request.clone();
 475        let connection = self.client.clone();
 476        let request_task = executor.spawn(async move {
 477            let args = request_clone.to_dap();
 478            connection.request::<R::DapRequest>(args).await
 479        });
 480
 481        executor.spawn(async move {
 482            let response = request.response_from_dap(request_task.await?);
 483            response
 484        })
 485    }
 486}
 487impl From<RemoteConnection> for Mode {
 488    fn from(value: RemoteConnection) -> Self {
 489        Self::Remote(value)
 490    }
 491}
 492
 493impl Mode {
 494    fn request_dap<R: DapCommand>(
 495        &self,
 496        session_id: SessionId,
 497        request: R,
 498        cx: &mut Context<Session>,
 499    ) -> Task<Result<R::Response>>
 500    where
 501        <R::DapRequest as dap::requests::Request>::Response: 'static,
 502        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 503    {
 504        match self {
 505            Mode::Local(debug_adapter_client) => {
 506                debug_adapter_client.request(request, cx.background_executor().clone())
 507            }
 508            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 509        }
 510    }
 511}
 512
 513#[derive(Default)]
 514struct ThreadStates {
 515    global_state: Option<ThreadStatus>,
 516    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 517}
 518
 519impl ThreadStates {
 520    fn stop_all_threads(&mut self) {
 521        self.global_state = Some(ThreadStatus::Stopped);
 522        self.known_thread_states.clear();
 523    }
 524
 525    fn exit_all_threads(&mut self) {
 526        self.global_state = Some(ThreadStatus::Exited);
 527        self.known_thread_states.clear();
 528    }
 529
 530    fn continue_all_threads(&mut self) {
 531        self.global_state = Some(ThreadStatus::Running);
 532        self.known_thread_states.clear();
 533    }
 534
 535    fn stop_thread(&mut self, thread_id: ThreadId) {
 536        self.known_thread_states
 537            .insert(thread_id, ThreadStatus::Stopped);
 538    }
 539
 540    fn continue_thread(&mut self, thread_id: ThreadId) {
 541        self.known_thread_states
 542            .insert(thread_id, ThreadStatus::Running);
 543    }
 544
 545    fn process_step(&mut self, thread_id: ThreadId) {
 546        self.known_thread_states
 547            .insert(thread_id, ThreadStatus::Stepping);
 548    }
 549
 550    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 551        self.thread_state(thread_id)
 552            .unwrap_or(ThreadStatus::Running)
 553    }
 554
 555    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 556        self.known_thread_states
 557            .get(&thread_id)
 558            .copied()
 559            .or(self.global_state)
 560    }
 561
 562    fn exit_thread(&mut self, thread_id: ThreadId) {
 563        self.known_thread_states
 564            .insert(thread_id, ThreadStatus::Exited);
 565    }
 566
 567    fn any_stopped_thread(&self) -> bool {
 568        self.global_state
 569            .is_some_and(|state| state == ThreadStatus::Stopped)
 570            || self
 571                .known_thread_states
 572                .values()
 573                .any(|status| *status == ThreadStatus::Stopped)
 574    }
 575}
 576const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 577
 578#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 579pub struct OutputToken(pub usize);
 580/// Represents a current state of a single debug adapter and provides ways to mutate it.
 581pub struct Session {
 582    mode: Mode,
 583    pub(super) capabilities: Capabilities,
 584    id: SessionId,
 585    child_session_ids: HashSet<SessionId>,
 586    parent_id: Option<SessionId>,
 587    ignore_breakpoints: bool,
 588    modules: Vec<dap::Module>,
 589    loaded_sources: Vec<dap::Source>,
 590    output_token: OutputToken,
 591    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 592    threads: IndexMap<ThreadId, Thread>,
 593    thread_states: ThreadStates,
 594    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 595    stack_frames: IndexMap<StackFrameId, StackFrame>,
 596    locations: HashMap<u64, dap::LocationsResponse>,
 597    is_session_terminated: bool,
 598    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 599    _background_tasks: Vec<Task<()>>,
 600}
 601
 602trait CacheableCommand: 'static + Send + Sync {
 603    fn as_any(&self) -> &dyn Any;
 604    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 605    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 606    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 607}
 608
 609impl<T> CacheableCommand for T
 610where
 611    T: DapCommand + PartialEq + Eq + Hash,
 612{
 613    fn as_any(&self) -> &dyn Any {
 614        self
 615    }
 616
 617    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 618        rhs.as_any()
 619            .downcast_ref::<Self>()
 620            .map_or(false, |rhs| self == rhs)
 621    }
 622
 623    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 624        T::hash(self, &mut hasher);
 625    }
 626
 627    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 628        self
 629    }
 630}
 631
 632pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 633
 634impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 635    fn from(request: T) -> Self {
 636        Self(Arc::new(request))
 637    }
 638}
 639
 640impl PartialEq for RequestSlot {
 641    fn eq(&self, other: &Self) -> bool {
 642        self.0.dyn_eq(other.0.as_ref())
 643    }
 644}
 645
 646impl Eq for RequestSlot {}
 647
 648impl Hash for RequestSlot {
 649    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 650        self.0.dyn_hash(state);
 651        self.0.as_any().type_id().hash(state)
 652    }
 653}
 654
 655#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 656pub struct CompletionsQuery {
 657    pub query: String,
 658    pub column: u64,
 659    pub line: Option<u64>,
 660    pub frame_id: Option<u64>,
 661}
 662
 663impl CompletionsQuery {
 664    pub fn new(
 665        buffer: &language::Buffer,
 666        cursor_position: language::Anchor,
 667        frame_id: Option<u64>,
 668    ) -> Self {
 669        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 670        Self {
 671            query: buffer.text(),
 672            column: column as u64,
 673            frame_id,
 674            line: Some(row as u64),
 675        }
 676    }
 677}
 678
 679pub enum SessionEvent {
 680    Modules,
 681    LoadedSources,
 682    Stopped(Option<ThreadId>),
 683    StackTrace,
 684    Variables,
 685    Threads,
 686}
 687
 688impl EventEmitter<SessionEvent> for Session {}
 689
 690// local session will send breakpoint updates to DAP for all new breakpoints
 691// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 692// BreakpointStore notifies session on breakpoint changes
 693impl Session {
 694    pub(crate) fn local(
 695        breakpoint_store: Entity<BreakpointStore>,
 696        session_id: SessionId,
 697        parent_session: Option<Entity<Session>>,
 698        delegate: DapAdapterDelegate,
 699        config: DebugAdapterConfig,
 700        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 701        initialized_tx: oneshot::Sender<()>,
 702        cx: &mut App,
 703    ) -> Task<Result<Entity<Self>>> {
 704        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 705
 706        cx.spawn(async move |cx| {
 707            let (mode, capabilities) = LocalMode::new(
 708                session_id,
 709                parent_session.clone(),
 710                breakpoint_store.clone(),
 711                config.clone(),
 712                delegate,
 713                message_tx,
 714                cx.clone(),
 715            )
 716            .await?;
 717
 718            cx.new(|cx| {
 719                let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Self>, cx| {
 720                    let mut initialized_tx = Some(initialized_tx);
 721                    while let Some(message) = message_rx.next().await {
 722                        if let Message::Event(event) = message {
 723                            if let Events::Initialized(_) = *event {
 724                                if let Some(tx) = initialized_tx.take() {
 725                                    tx.send(()).ok();
 726                                }
 727                            } else {
 728                                let Ok(_) = this.update(cx, |session, cx| {
 729                                    session.handle_dap_event(event, cx);
 730                                }) else {
 731                                    break;
 732                                };
 733                            }
 734                        } else {
 735                            let Ok(_) =
 736                                start_debugging_requests_tx.unbounded_send((session_id, message))
 737                            else {
 738                                break;
 739                            };
 740                        }
 741                    }
 742                })];
 743
 744                cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 745                    BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 746                        if let Some(local) = (!this.ignore_breakpoints)
 747                            .then(|| this.as_local_mut())
 748                            .flatten()
 749                        {
 750                            local
 751                                .send_breakpoints_from_path(path.clone(), *reason, cx)
 752                                .detach();
 753                        };
 754                    }
 755                    BreakpointStoreEvent::ActiveDebugLineChanged => {}
 756                })
 757                .detach();
 758
 759                Self {
 760                    mode: Mode::Local(mode),
 761                    id: session_id,
 762                    child_session_ids: HashSet::default(),
 763                    parent_id: parent_session.map(|session| session.read(cx).id),
 764                    variables: Default::default(),
 765                    capabilities,
 766                    thread_states: ThreadStates::default(),
 767                    output_token: OutputToken(0),
 768                    ignore_breakpoints: false,
 769                    output: circular_buffer::CircularBuffer::boxed(),
 770                    requests: HashMap::default(),
 771                    modules: Vec::default(),
 772                    loaded_sources: Vec::default(),
 773                    threads: IndexMap::default(),
 774                    stack_frames: IndexMap::default(),
 775                    locations: Default::default(),
 776                    _background_tasks,
 777                    is_session_terminated: false,
 778                }
 779            })
 780        })
 781    }
 782
 783    pub(crate) fn remote(
 784        session_id: SessionId,
 785        client: AnyProtoClient,
 786        upstream_project_id: u64,
 787        ignore_breakpoints: bool,
 788    ) -> Self {
 789        Self {
 790            mode: Mode::Remote(RemoteConnection {
 791                _client: client,
 792                _upstream_project_id: upstream_project_id,
 793            }),
 794            id: session_id,
 795            child_session_ids: HashSet::default(),
 796            parent_id: None,
 797            capabilities: Capabilities::default(),
 798            ignore_breakpoints,
 799            variables: Default::default(),
 800            stack_frames: Default::default(),
 801            thread_states: ThreadStates::default(),
 802            output_token: OutputToken(0),
 803            output: circular_buffer::CircularBuffer::boxed(),
 804            requests: HashMap::default(),
 805            modules: Vec::default(),
 806            loaded_sources: Vec::default(),
 807            threads: IndexMap::default(),
 808            _background_tasks: Vec::default(),
 809            locations: Default::default(),
 810            is_session_terminated: false,
 811        }
 812    }
 813
 814    pub fn session_id(&self) -> SessionId {
 815        self.id
 816    }
 817
 818    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 819        self.child_session_ids.clone()
 820    }
 821
 822    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 823        self.child_session_ids.insert(session_id);
 824    }
 825
 826    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 827        self.child_session_ids.remove(&session_id);
 828    }
 829
 830    pub fn parent_id(&self) -> Option<SessionId> {
 831        self.parent_id
 832    }
 833
 834    pub fn capabilities(&self) -> &Capabilities {
 835        &self.capabilities
 836    }
 837
 838    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 839        if let Mode::Local(local_mode) = &self.mode {
 840            Some(local_mode.config.clone())
 841        } else {
 842            None
 843        }
 844    }
 845
 846    pub fn is_terminated(&self) -> bool {
 847        self.is_session_terminated
 848    }
 849
 850    pub fn is_local(&self) -> bool {
 851        matches!(self.mode, Mode::Local(_))
 852    }
 853
 854    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 855        match &mut self.mode {
 856            Mode::Local(local_mode) => Some(local_mode),
 857            Mode::Remote(_) => None,
 858        }
 859    }
 860
 861    pub fn as_local(&self) -> Option<&LocalMode> {
 862        match &self.mode {
 863            Mode::Local(local_mode) => Some(local_mode),
 864            Mode::Remote(_) => None,
 865        }
 866    }
 867
 868    pub(super) fn initialize_sequence(
 869        &mut self,
 870        initialize_rx: oneshot::Receiver<()>,
 871        cx: &mut Context<Self>,
 872    ) -> Task<Result<()>> {
 873        match &self.mode {
 874            Mode::Local(local_mode) => {
 875                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
 876            }
 877            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 878        }
 879    }
 880
 881    pub fn output(
 882        &self,
 883        since: OutputToken,
 884    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 885        if self.output_token.0 == 0 {
 886            return (self.output.range(0..0), OutputToken(0));
 887        };
 888
 889        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 890
 891        let clamped_events_since = events_since.clamp(0, self.output.len());
 892        (
 893            self.output
 894                .range(self.output.len() - clamped_events_since..),
 895            self.output_token,
 896        )
 897    }
 898
 899    pub fn respond_to_client(
 900        &self,
 901        request_seq: u64,
 902        success: bool,
 903        command: String,
 904        body: Option<serde_json::Value>,
 905        cx: &mut Context<Self>,
 906    ) -> Task<Result<()>> {
 907        let Some(local_session) = self.as_local().cloned() else {
 908            unreachable!("Cannot respond to remote client");
 909        };
 910
 911        cx.background_spawn(async move {
 912            local_session
 913                .client
 914                .send_message(Message::Response(Response {
 915                    body,
 916                    success,
 917                    command,
 918                    seq: request_seq + 1,
 919                    request_seq,
 920                    message: None,
 921                }))
 922                .await
 923        })
 924    }
 925
 926    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
 927        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
 928            self.thread_states.stop_all_threads();
 929
 930            self.invalidate_command_type::<StackTraceCommand>();
 931        }
 932
 933        // Event if we stopped all threads we still need to insert the thread_id
 934        // to our own data
 935        if let Some(thread_id) = event.thread_id {
 936            self.thread_states.stop_thread(ThreadId(thread_id));
 937
 938            self.invalidate_state(
 939                &StackTraceCommand {
 940                    thread_id,
 941                    start_frame: None,
 942                    levels: None,
 943                }
 944                .into(),
 945            );
 946        }
 947
 948        self.invalidate_generic();
 949        self.threads.clear();
 950        self.variables.clear();
 951        cx.emit(SessionEvent::Stopped(
 952            event
 953                .thread_id
 954                .map(Into::into)
 955                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
 956        ));
 957        cx.notify();
 958    }
 959
 960    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
 961        match *event {
 962            Events::Initialized(_) => {
 963                debug_assert!(
 964                    false,
 965                    "Initialized event should have been handled in LocalMode"
 966                );
 967            }
 968            Events::Stopped(event) => self.handle_stopped_event(event, cx),
 969            Events::Continued(event) => {
 970                if event.all_threads_continued.unwrap_or_default() {
 971                    self.thread_states.continue_all_threads();
 972                } else {
 973                    self.thread_states
 974                        .continue_thread(ThreadId(event.thread_id));
 975                }
 976                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
 977                self.invalidate_generic();
 978            }
 979            Events::Exited(_event) => {
 980                self.clear_active_debug_line(cx);
 981            }
 982            Events::Terminated(_) => {
 983                self.is_session_terminated = true;
 984                self.clear_active_debug_line(cx);
 985            }
 986            Events::Thread(event) => {
 987                let thread_id = ThreadId(event.thread_id);
 988
 989                match event.reason {
 990                    dap::ThreadEventReason::Started => {
 991                        self.thread_states.continue_thread(thread_id);
 992                    }
 993                    dap::ThreadEventReason::Exited => {
 994                        self.thread_states.exit_thread(thread_id);
 995                    }
 996                    reason => {
 997                        log::error!("Unhandled thread event reason {:?}", reason);
 998                    }
 999                }
1000                self.invalidate_state(&ThreadsCommand.into());
1001                cx.notify();
1002            }
1003            Events::Output(event) => {
1004                if event
1005                    .category
1006                    .as_ref()
1007                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1008                {
1009                    return;
1010                }
1011
1012                self.output.push_back(event);
1013                self.output_token.0 += 1;
1014                cx.notify();
1015            }
1016            Events::Breakpoint(_) => {}
1017            Events::Module(event) => {
1018                match event.reason {
1019                    dap::ModuleEventReason::New => {
1020                        self.modules.push(event.module);
1021                    }
1022                    dap::ModuleEventReason::Changed => {
1023                        if let Some(module) = self
1024                            .modules
1025                            .iter_mut()
1026                            .find(|other| event.module.id == other.id)
1027                        {
1028                            *module = event.module;
1029                        }
1030                    }
1031                    dap::ModuleEventReason::Removed => {
1032                        self.modules.retain(|other| event.module.id != other.id);
1033                    }
1034                }
1035
1036                // todo(debugger): We should only send the invalidate command to downstream clients.
1037                // self.invalidate_state(&ModulesCommand.into());
1038            }
1039            Events::LoadedSource(_) => {
1040                self.invalidate_state(&LoadedSourcesCommand.into());
1041            }
1042            Events::Capabilities(event) => {
1043                self.capabilities = self.capabilities.merge(event.capabilities);
1044                cx.notify();
1045            }
1046            Events::Memory(_) => {}
1047            Events::Process(_) => {}
1048            Events::ProgressEnd(_) => {}
1049            Events::ProgressStart(_) => {}
1050            Events::ProgressUpdate(_) => {}
1051            Events::Invalidated(_) => {}
1052            Events::Other(_) => {}
1053        }
1054    }
1055
1056    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1057    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1058        &mut self,
1059        request: T,
1060        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1061            + 'static,
1062        cx: &mut Context<Self>,
1063    ) {
1064        const {
1065            assert!(
1066                T::CACHEABLE,
1067                "Only requests marked as cacheable should invoke `fetch`"
1068            );
1069        }
1070
1071        if !self.thread_states.any_stopped_thread()
1072            && request.type_id() != TypeId::of::<ThreadsCommand>()
1073            || self.is_session_terminated
1074        {
1075            return;
1076        }
1077
1078        let request_map = self
1079            .requests
1080            .entry(std::any::TypeId::of::<T>())
1081            .or_default();
1082
1083        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1084            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1085
1086            let task = Self::request_inner::<Arc<T>>(
1087                &self.capabilities,
1088                self.id,
1089                &self.mode,
1090                command,
1091                process_result,
1092                cx,
1093            );
1094            let task = cx
1095                .background_executor()
1096                .spawn(async move {
1097                    let _ = task.await?;
1098                    Some(())
1099                })
1100                .shared();
1101
1102            vacant.insert(task);
1103            cx.notify();
1104        }
1105    }
1106
1107    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1108        capabilities: &Capabilities,
1109        session_id: SessionId,
1110        mode: &Mode,
1111        request: T,
1112        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1113            + 'static,
1114        cx: &mut Context<Self>,
1115    ) -> Task<Option<T::Response>> {
1116        if !T::is_supported(&capabilities) {
1117            log::warn!(
1118                "Attempted to send a DAP request that isn't supported: {:?}",
1119                request
1120            );
1121            let error = Err(anyhow::Error::msg(
1122                "Couldn't complete request because it's not supported",
1123            ));
1124            return cx.spawn(async move |this, cx| {
1125                this.update(cx, |this, cx| process_result(this, error, cx))
1126                    .log_err()
1127                    .flatten()
1128            });
1129        }
1130
1131        let request = mode.request_dap(session_id, request, cx);
1132        cx.spawn(async move |this, cx| {
1133            let result = request.await;
1134            this.update(cx, |this, cx| process_result(this, result, cx))
1135                .log_err()
1136                .flatten()
1137        })
1138    }
1139
1140    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1141        &self,
1142        request: T,
1143        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1144            + 'static,
1145        cx: &mut Context<Self>,
1146    ) -> Task<Option<T::Response>> {
1147        Self::request_inner(
1148            &self.capabilities,
1149            self.id,
1150            &self.mode,
1151            request,
1152            process_result,
1153            cx,
1154        )
1155    }
1156
1157    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1158        self.requests.remove(&std::any::TypeId::of::<Command>());
1159    }
1160
1161    fn invalidate_generic(&mut self) {
1162        self.invalidate_command_type::<ModulesCommand>();
1163        self.invalidate_command_type::<LoadedSourcesCommand>();
1164        self.invalidate_command_type::<ThreadsCommand>();
1165    }
1166
1167    fn invalidate_state(&mut self, key: &RequestSlot) {
1168        self.requests
1169            .entry(key.0.as_any().type_id())
1170            .and_modify(|request_map| {
1171                request_map.remove(&key);
1172            });
1173    }
1174
1175    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1176        self.thread_states.thread_status(thread_id)
1177    }
1178
1179    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1180        self.fetch(
1181            dap_command::ThreadsCommand,
1182            |this, result, cx| {
1183                let result = result.log_err()?;
1184
1185                this.threads = result
1186                    .iter()
1187                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1188                    .collect();
1189
1190                this.invalidate_command_type::<StackTraceCommand>();
1191                cx.emit(SessionEvent::Threads);
1192                cx.notify();
1193
1194                Some(result)
1195            },
1196            cx,
1197        );
1198
1199        self.threads
1200            .values()
1201            .map(|thread| {
1202                (
1203                    thread.dap.clone(),
1204                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1205                )
1206            })
1207            .collect()
1208    }
1209
1210    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1211        self.fetch(
1212            dap_command::ModulesCommand,
1213            |this, result, cx| {
1214                let result = result.log_err()?;
1215
1216                this.modules = result.iter().cloned().collect();
1217                cx.emit(SessionEvent::Modules);
1218                cx.notify();
1219
1220                Some(result)
1221            },
1222            cx,
1223        );
1224
1225        &self.modules
1226    }
1227
1228    pub fn ignore_breakpoints(&self) -> bool {
1229        self.ignore_breakpoints
1230    }
1231
1232    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1233        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1234    }
1235
1236    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1237        if self.ignore_breakpoints == ignore {
1238            return Task::ready(());
1239        }
1240
1241        self.ignore_breakpoints = ignore;
1242
1243        if let Some(local) = self.as_local() {
1244            local.send_all_breakpoints(ignore, cx)
1245        } else {
1246            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1247            unimplemented!()
1248        }
1249    }
1250
1251    pub fn breakpoints_enabled(&self) -> bool {
1252        self.ignore_breakpoints
1253    }
1254
1255    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1256        self.fetch(
1257            dap_command::LoadedSourcesCommand,
1258            |this, result, cx| {
1259                let result = result.log_err()?;
1260                this.loaded_sources = result.iter().cloned().collect();
1261                cx.emit(SessionEvent::LoadedSources);
1262                cx.notify();
1263                Some(result)
1264            },
1265            cx,
1266        );
1267
1268        &self.loaded_sources
1269    }
1270
1271    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1272        res.log_err()?;
1273        Some(())
1274    }
1275
1276    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1277        thread_id: ThreadId,
1278    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1279    {
1280        move |this, response, cx| match response.log_err() {
1281            Some(response) => Some(response),
1282            None => {
1283                this.thread_states.stop_thread(thread_id);
1284                cx.notify();
1285                None
1286            }
1287        }
1288    }
1289
1290    fn clear_active_debug_line_response(
1291        &mut self,
1292        response: Result<()>,
1293        cx: &mut Context<'_, Session>,
1294    ) -> Option<()> {
1295        response.log_err()?;
1296        self.clear_active_debug_line(cx);
1297        Some(())
1298    }
1299
1300    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1301        self.as_local()
1302            .expect("Message handler will only run in local mode")
1303            .breakpoint_store
1304            .update(cx, |store, cx| {
1305                store.remove_active_position(Some(self.id), cx)
1306            });
1307    }
1308
1309    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1310        self.request(
1311            PauseCommand {
1312                thread_id: thread_id.0,
1313            },
1314            Self::empty_response,
1315            cx,
1316        )
1317        .detach();
1318    }
1319
1320    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1321        self.request(
1322            RestartStackFrameCommand { stack_frame_id },
1323            Self::empty_response,
1324            cx,
1325        )
1326        .detach();
1327    }
1328
1329    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1330        if self.capabilities.supports_restart_request.unwrap_or(false) {
1331            self.request(
1332                RestartCommand {
1333                    raw: args.unwrap_or(Value::Null),
1334                },
1335                Self::empty_response,
1336                cx,
1337            )
1338            .detach();
1339        } else {
1340            self.request(
1341                DisconnectCommand {
1342                    restart: Some(false),
1343                    terminate_debuggee: Some(true),
1344                    suspend_debuggee: Some(false),
1345                },
1346                Self::empty_response,
1347                cx,
1348            )
1349            .detach();
1350        }
1351    }
1352
1353    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1354        self.is_session_terminated = true;
1355        self.thread_states.exit_all_threads();
1356        cx.notify();
1357
1358        let task = if self
1359            .capabilities
1360            .supports_terminate_request
1361            .unwrap_or_default()
1362        {
1363            self.request(
1364                TerminateCommand {
1365                    restart: Some(false),
1366                },
1367                Self::clear_active_debug_line_response,
1368                cx,
1369            )
1370        } else {
1371            self.request(
1372                DisconnectCommand {
1373                    restart: Some(false),
1374                    terminate_debuggee: Some(true),
1375                    suspend_debuggee: Some(false),
1376                },
1377                Self::clear_active_debug_line_response,
1378                cx,
1379            )
1380        };
1381
1382        cx.background_spawn(async move {
1383            let _ = task.await;
1384        })
1385    }
1386
1387    pub fn completions(
1388        &mut self,
1389        query: CompletionsQuery,
1390        cx: &mut Context<Self>,
1391    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1392        let task = self.request(query, |_, result, _| result.log_err(), cx);
1393
1394        cx.background_executor().spawn(async move {
1395            anyhow::Ok(
1396                task.await
1397                    .map(|response| response.targets)
1398                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1399            )
1400        })
1401    }
1402
1403    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1404        self.thread_states.continue_thread(thread_id);
1405        self.request(
1406            ContinueCommand {
1407                args: ContinueArguments {
1408                    thread_id: thread_id.0,
1409                    single_thread: Some(true),
1410                },
1411            },
1412            Self::on_step_response::<ContinueCommand>(thread_id),
1413            cx,
1414        )
1415        .detach();
1416    }
1417
1418    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1419        match self.mode {
1420            Mode::Local(ref local) => Some(local.client.clone()),
1421            Mode::Remote(_) => None,
1422        }
1423    }
1424
1425    pub fn step_over(
1426        &mut self,
1427        thread_id: ThreadId,
1428        granularity: SteppingGranularity,
1429        cx: &mut Context<Self>,
1430    ) {
1431        let supports_single_thread_execution_requests =
1432            self.capabilities.supports_single_thread_execution_requests;
1433        let supports_stepping_granularity = self
1434            .capabilities
1435            .supports_stepping_granularity
1436            .unwrap_or_default();
1437
1438        let command = NextCommand {
1439            inner: StepCommand {
1440                thread_id: thread_id.0,
1441                granularity: supports_stepping_granularity.then(|| granularity),
1442                single_thread: supports_single_thread_execution_requests,
1443            },
1444        };
1445
1446        self.thread_states.process_step(thread_id);
1447        self.request(
1448            command,
1449            Self::on_step_response::<NextCommand>(thread_id),
1450            cx,
1451        )
1452        .detach();
1453    }
1454
1455    pub fn step_in(
1456        &mut self,
1457        thread_id: ThreadId,
1458        granularity: SteppingGranularity,
1459        cx: &mut Context<Self>,
1460    ) {
1461        let supports_single_thread_execution_requests =
1462            self.capabilities.supports_single_thread_execution_requests;
1463        let supports_stepping_granularity = self
1464            .capabilities
1465            .supports_stepping_granularity
1466            .unwrap_or_default();
1467
1468        let command = StepInCommand {
1469            inner: StepCommand {
1470                thread_id: thread_id.0,
1471                granularity: supports_stepping_granularity.then(|| granularity),
1472                single_thread: supports_single_thread_execution_requests,
1473            },
1474        };
1475
1476        self.thread_states.process_step(thread_id);
1477        self.request(
1478            command,
1479            Self::on_step_response::<StepInCommand>(thread_id),
1480            cx,
1481        )
1482        .detach();
1483    }
1484
1485    pub fn step_out(
1486        &mut self,
1487        thread_id: ThreadId,
1488        granularity: SteppingGranularity,
1489        cx: &mut Context<Self>,
1490    ) {
1491        let supports_single_thread_execution_requests =
1492            self.capabilities.supports_single_thread_execution_requests;
1493        let supports_stepping_granularity = self
1494            .capabilities
1495            .supports_stepping_granularity
1496            .unwrap_or_default();
1497
1498        let command = StepOutCommand {
1499            inner: StepCommand {
1500                thread_id: thread_id.0,
1501                granularity: supports_stepping_granularity.then(|| granularity),
1502                single_thread: supports_single_thread_execution_requests,
1503            },
1504        };
1505
1506        self.thread_states.process_step(thread_id);
1507        self.request(
1508            command,
1509            Self::on_step_response::<StepOutCommand>(thread_id),
1510            cx,
1511        )
1512        .detach();
1513    }
1514
1515    pub fn step_back(
1516        &mut self,
1517        thread_id: ThreadId,
1518        granularity: SteppingGranularity,
1519        cx: &mut Context<Self>,
1520    ) {
1521        let supports_single_thread_execution_requests =
1522            self.capabilities.supports_single_thread_execution_requests;
1523        let supports_stepping_granularity = self
1524            .capabilities
1525            .supports_stepping_granularity
1526            .unwrap_or_default();
1527
1528        let command = StepBackCommand {
1529            inner: StepCommand {
1530                thread_id: thread_id.0,
1531                granularity: supports_stepping_granularity.then(|| granularity),
1532                single_thread: supports_single_thread_execution_requests,
1533            },
1534        };
1535
1536        self.thread_states.process_step(thread_id);
1537
1538        self.request(
1539            command,
1540            Self::on_step_response::<StepBackCommand>(thread_id),
1541            cx,
1542        )
1543        .detach();
1544    }
1545
1546    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1547        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1548            && self.requests.contains_key(&ThreadsCommand.type_id())
1549            && self.threads.contains_key(&thread_id)
1550        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1551        // We could still be using an old thread id and have sent a new thread's request
1552        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1553        // But it very well could cause a minor bug in the future that is hard to track down
1554        {
1555            self.fetch(
1556                super::dap_command::StackTraceCommand {
1557                    thread_id: thread_id.0,
1558                    start_frame: None,
1559                    levels: None,
1560                },
1561                move |this, stack_frames, cx| {
1562                    let stack_frames = stack_frames.log_err()?;
1563
1564                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1565                        thread.stack_frame_ids =
1566                            stack_frames.iter().map(|frame| frame.id).collect();
1567                    });
1568                    debug_assert!(
1569                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1570                        "Sent request for thread_id that doesn't exist"
1571                    );
1572
1573                    this.stack_frames.extend(
1574                        stack_frames
1575                            .iter()
1576                            .cloned()
1577                            .map(|frame| (frame.id, StackFrame::from(frame))),
1578                    );
1579
1580                    this.invalidate_command_type::<ScopesCommand>();
1581                    this.invalidate_command_type::<VariablesCommand>();
1582
1583                    cx.emit(SessionEvent::StackTrace);
1584                    cx.notify();
1585                    Some(stack_frames)
1586                },
1587                cx,
1588            );
1589        }
1590
1591        self.threads
1592            .get(&thread_id)
1593            .map(|thread| {
1594                thread
1595                    .stack_frame_ids
1596                    .iter()
1597                    .filter_map(|id| self.stack_frames.get(id))
1598                    .cloned()
1599                    .collect()
1600            })
1601            .unwrap_or_default()
1602    }
1603
1604    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1605        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1606            && self
1607                .requests
1608                .contains_key(&TypeId::of::<StackTraceCommand>())
1609        {
1610            self.fetch(
1611                ScopesCommand { stack_frame_id },
1612                move |this, scopes, cx| {
1613                    let scopes = scopes.log_err()?;
1614
1615                    for scope in scopes .iter(){
1616                        this.variables(scope.variables_reference, cx);
1617                    }
1618
1619                    let entry = this
1620                        .stack_frames
1621                        .entry(stack_frame_id)
1622                        .and_modify(|stack_frame| {
1623                            stack_frame.scopes = scopes.clone();
1624                        });
1625
1626                    cx.emit(SessionEvent::Variables);
1627
1628                    debug_assert!(
1629                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1630                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1631                    );
1632
1633                    Some(scopes)
1634                },
1635                cx,
1636            );
1637        }
1638
1639        self.stack_frames
1640            .get(&stack_frame_id)
1641            .map(|frame| frame.scopes.as_slice())
1642            .unwrap_or_default()
1643    }
1644
1645    pub fn variables(
1646        &mut self,
1647        variables_reference: VariableReference,
1648        cx: &mut Context<Self>,
1649    ) -> Vec<dap::Variable> {
1650        let command = VariablesCommand {
1651            variables_reference,
1652            filter: None,
1653            start: None,
1654            count: None,
1655            format: None,
1656        };
1657
1658        self.fetch(
1659            command,
1660            move |this, variables, cx| {
1661                let variables = variables.log_err()?;
1662                this.variables
1663                    .insert(variables_reference, variables.clone());
1664
1665                cx.emit(SessionEvent::Variables);
1666                Some(variables)
1667            },
1668            cx,
1669        );
1670
1671        self.variables
1672            .get(&variables_reference)
1673            .cloned()
1674            .unwrap_or_default()
1675    }
1676
1677    pub fn set_variable_value(
1678        &mut self,
1679        variables_reference: u64,
1680        name: String,
1681        value: String,
1682        cx: &mut Context<Self>,
1683    ) {
1684        if self.capabilities.supports_set_variable.unwrap_or_default() {
1685            self.request(
1686                SetVariableValueCommand {
1687                    name,
1688                    value,
1689                    variables_reference,
1690                },
1691                move |this, response, cx| {
1692                    let response = response.log_err()?;
1693                    this.invalidate_command_type::<VariablesCommand>();
1694                    cx.notify();
1695                    Some(response)
1696                },
1697                cx,
1698            )
1699            .detach()
1700        }
1701    }
1702
1703    pub fn evaluate(
1704        &mut self,
1705        expression: String,
1706        context: Option<EvaluateArgumentsContext>,
1707        frame_id: Option<u64>,
1708        source: Option<Source>,
1709        cx: &mut Context<Self>,
1710    ) {
1711        self.request(
1712            EvaluateCommand {
1713                expression,
1714                context,
1715                frame_id,
1716                source,
1717            },
1718            |this, response, cx| {
1719                let response = response.log_err()?;
1720                this.output_token.0 += 1;
1721                this.output.push_back(dap::OutputEvent {
1722                    category: None,
1723                    output: response.result.clone(),
1724                    group: None,
1725                    variables_reference: Some(response.variables_reference),
1726                    source: None,
1727                    line: None,
1728                    column: None,
1729                    data: None,
1730                    location_reference: None,
1731                });
1732
1733                this.invalidate_command_type::<ScopesCommand>();
1734                cx.notify();
1735                Some(response)
1736            },
1737            cx,
1738        )
1739        .detach();
1740    }
1741
1742    pub fn location(
1743        &mut self,
1744        reference: u64,
1745        cx: &mut Context<Self>,
1746    ) -> Option<dap::LocationsResponse> {
1747        self.fetch(
1748            LocationsCommand { reference },
1749            move |this, response, _| {
1750                let response = response.log_err()?;
1751                this.locations.insert(reference, response.clone());
1752                Some(response)
1753            },
1754            cx,
1755        );
1756        self.locations.get(&reference).cloned()
1757    }
1758    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1759        let command = DisconnectCommand {
1760            restart: Some(false),
1761            terminate_debuggee: Some(true),
1762            suspend_debuggee: Some(false),
1763        };
1764
1765        self.request(command, Self::empty_response, cx).detach()
1766    }
1767
1768    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1769        if self
1770            .capabilities
1771            .supports_terminate_threads_request
1772            .unwrap_or_default()
1773        {
1774            self.request(
1775                TerminateThreadsCommand {
1776                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1777                },
1778                Self::clear_active_debug_line_response,
1779                cx,
1780            )
1781            .detach();
1782        } else {
1783            self.shutdown(cx).detach();
1784        }
1785    }
1786}