session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{anyhow, Result};
  14use collections::{HashMap, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::OutputEventCategory;
  18use dap::{
  19    adapters::{DapDelegate, DapStatus},
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  23    SteppingGranularity, StoppedEvent, VariableReference,
  24};
  25use dap_adapters::build_adapter;
  26use futures::channel::oneshot;
  27use futures::{future::Shared, FutureExt};
  28use gpui::{
  29    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  30};
  31use rpc::AnyProtoClient;
  32use serde_json::{json, Value};
  33use settings::Settings;
  34use smol::stream::StreamExt;
  35use std::any::TypeId;
  36use std::path::PathBuf;
  37use std::u64;
  38use std::{
  39    any::Any,
  40    collections::hash_map::Entry,
  41    hash::{Hash, Hasher},
  42    path::Path,
  43    sync::Arc,
  44};
  45use task::DebugAdapterConfig;
  46use text::{PointUtf16, ToPointUtf16};
  47use util::{merge_json_value_into, ResultExt};
  48
  49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  50#[repr(transparent)]
  51pub struct ThreadId(pub u64);
  52
  53impl ThreadId {
  54    pub const MIN: ThreadId = ThreadId(u64::MIN);
  55    pub const MAX: ThreadId = ThreadId(u64::MAX);
  56}
  57
  58impl From<u64> for ThreadId {
  59    fn from(id: u64) -> Self {
  60        Self(id)
  61    }
  62}
  63
  64#[derive(Clone, Debug)]
  65pub struct StackFrame {
  66    pub dap: dap::StackFrame,
  67    pub scopes: Vec<dap::Scope>,
  68}
  69
  70impl From<dap::StackFrame> for StackFrame {
  71    fn from(stack_frame: dap::StackFrame) -> Self {
  72        Self {
  73            scopes: vec![],
  74            dap: stack_frame,
  75        }
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  80pub enum ThreadStatus {
  81    #[default]
  82    Running,
  83    Stopped,
  84    Stepping,
  85    Exited,
  86    Ended,
  87}
  88
  89impl ThreadStatus {
  90    pub fn label(&self) -> &'static str {
  91        match self {
  92            ThreadStatus::Running => "Running",
  93            ThreadStatus::Stopped => "Stopped",
  94            ThreadStatus::Stepping => "Stepping",
  95            ThreadStatus::Exited => "Exited",
  96            ThreadStatus::Ended => "Ended",
  97        }
  98    }
  99}
 100
 101#[derive(Debug)]
 102pub struct Thread {
 103    dap: dap::Thread,
 104    stack_frame_ids: IndexSet<StackFrameId>,
 105    _has_stopped: bool,
 106}
 107
 108impl From<dap::Thread> for Thread {
 109    fn from(dap: dap::Thread) -> Self {
 110        Self {
 111            dap,
 112            stack_frame_ids: Default::default(),
 113            _has_stopped: false,
 114        }
 115    }
 116}
 117
 118type UpstreamProjectId = u64;
 119
 120struct RemoteConnection {
 121    _client: AnyProtoClient,
 122    _upstream_project_id: UpstreamProjectId,
 123}
 124
 125impl RemoteConnection {
 126    fn send_proto_client_request<R: DapCommand>(
 127        &self,
 128        _request: R,
 129        _session_id: SessionId,
 130        cx: &mut App,
 131    ) -> Task<Result<R::Response>> {
 132        // let message = request.to_proto(session_id, self.upstream_project_id);
 133        // let upstream_client = self.client.clone();
 134        cx.background_executor().spawn(async move {
 135            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 136            // let response = upstream_client.request(message).await?;
 137            // request.response_from_proto(response)
 138            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 139        })
 140    }
 141
 142    fn request<R: DapCommand>(
 143        &self,
 144        request: R,
 145        session_id: SessionId,
 146        cx: &mut App,
 147    ) -> Task<Result<R::Response>>
 148    where
 149        <R::DapRequest as dap::requests::Request>::Response: 'static,
 150        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 151    {
 152        return self.send_proto_client_request::<R>(request, session_id, cx);
 153    }
 154}
 155
 156enum Mode {
 157    Local(LocalMode),
 158    Remote(RemoteConnection),
 159}
 160
 161#[derive(Clone)]
 162pub struct LocalMode {
 163    client: Arc<DebugAdapterClient>,
 164    config: DebugAdapterConfig,
 165    adapter: Arc<dyn DebugAdapter>,
 166    breakpoint_store: Entity<BreakpointStore>,
 167}
 168
 169fn client_source(abs_path: &Path) -> dap::Source {
 170    dap::Source {
 171        name: abs_path
 172            .file_name()
 173            .map(|filename| filename.to_string_lossy().to_string()),
 174        path: Some(abs_path.to_string_lossy().to_string()),
 175        source_reference: None,
 176        presentation_hint: None,
 177        origin: None,
 178        sources: None,
 179        adapter_data: None,
 180        checksums: None,
 181    }
 182}
 183
 184impl LocalMode {
 185    fn new(
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        cx.spawn(move |mut cx| async move {
 195            let (adapter, binary) = Self::get_adapter_binary(&config, &delegate, &mut cx).await?;
 196
 197            let message_handler = Box::new(move |message| {
 198                messages_tx.unbounded_send(message).ok();
 199            });
 200
 201            let client = Arc::new(
 202                if let Some(client) = parent_session
 203                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 204                    .flatten()
 205                {
 206                    client
 207                        .reconnect(session_id, binary, message_handler, cx.clone())
 208                        .await?
 209                } else {
 210                    DebugAdapterClient::start(
 211                        session_id,
 212                        adapter.name(),
 213                        binary,
 214                        message_handler,
 215                        cx.clone(),
 216                    )
 217                    .await?
 218                },
 219            );
 220
 221            let adapter_id = adapter.name().to_string().to_owned();
 222            let session = Self {
 223                client,
 224                adapter,
 225                breakpoint_store,
 226                config: config.clone(),
 227            };
 228
 229            #[cfg(any(test, feature = "test-support"))]
 230            {
 231                let dap::DebugAdapterKind::Fake((fail, caps)) = session.config.kind.clone() else {
 232                    panic!("Only fake debug adapter configs should be used in tests");
 233                };
 234
 235                session
 236                    .client
 237                    .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 238                    .await;
 239
 240                match config.request.clone() {
 241                    dap::DebugRequestType::Launch if fail => {
 242                        session
 243                            .client
 244                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 245                                Err(dap::ErrorResponse {
 246                                    error: Some(dap::Message {
 247                                        id: 1,
 248                                        format: "error".into(),
 249                                        variables: None,
 250                                        send_telemetry: None,
 251                                        show_user: None,
 252                                        url: None,
 253                                        url_label: None,
 254                                    }),
 255                                })
 256                            })
 257                            .await;
 258                    }
 259                    dap::DebugRequestType::Launch => {
 260                        session
 261                            .client
 262                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 263                            .await;
 264                    }
 265                    dap::DebugRequestType::Attach(_) if fail => {
 266                        session
 267                            .client
 268                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 269                                Err(dap::ErrorResponse {
 270                                    error: Some(dap::Message {
 271                                        id: 1,
 272                                        format: "error".into(),
 273                                        variables: None,
 274                                        send_telemetry: None,
 275                                        show_user: None,
 276                                        url: None,
 277                                        url_label: None,
 278                                    }),
 279                                })
 280                            })
 281                            .await;
 282                    }
 283                    dap::DebugRequestType::Attach(attach_config) => {
 284                        session
 285                            .client
 286                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 287                                assert_eq!(
 288                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 289                                    args.raw
 290                                );
 291
 292                                Ok(())
 293                            })
 294                            .await;
 295                    }
 296                }
 297
 298                session.client.on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(())).await;
 299                session.client.fake_event(Events::Initialized(None)).await;
 300            }
 301
 302            let capabilities = session
 303                .request(Initialize { adapter_id }, cx.background_executor().clone())
 304                .await?;
 305
 306            Ok((session, capabilities))
 307        })
 308    }
 309
 310    fn send_breakpoints_from_path(
 311        &self,
 312        abs_path: Arc<Path>,
 313        reason: BreakpointUpdatedReason,
 314        cx: &mut App,
 315    ) -> Task<()> {
 316        let breakpoints = self
 317            .breakpoint_store
 318            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 319            .into_iter()
 320            .map(Into::into)
 321            .collect();
 322
 323        let task = self.request(
 324            dap_command::SetBreakpoints {
 325                source: client_source(&abs_path),
 326                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 327                breakpoints,
 328            },
 329            cx.background_executor().clone(),
 330        );
 331
 332        cx.background_spawn(async move {
 333            match task.await {
 334                Ok(_) => {}
 335                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 336            }
 337        })
 338    }
 339
 340    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 341        let mut breakpoint_tasks = Vec::new();
 342        let breakpoints = self
 343            .breakpoint_store
 344            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 345
 346        for (path, breakpoints) in breakpoints {
 347            let breakpoints = if ignore_breakpoints {
 348                vec![]
 349            } else {
 350                breakpoints.into_iter().map(Into::into).collect()
 351            };
 352
 353            breakpoint_tasks.push(self.request(
 354                dap_command::SetBreakpoints {
 355                    source: client_source(&path),
 356                    source_modified: Some(false),
 357                    breakpoints,
 358                },
 359                cx.background_executor().clone(),
 360            ));
 361        }
 362
 363        cx.background_spawn(async move {
 364            futures::future::join_all(breakpoint_tasks)
 365                .await
 366                .iter()
 367                .for_each(|res| match res {
 368                    Ok(_) => {}
 369                    Err(err) => {
 370                        log::warn!("Set breakpoints request failed: {}", err);
 371                    }
 372                });
 373        })
 374    }
 375
 376    async fn get_adapter_binary(
 377        config: &DebugAdapterConfig,
 378        delegate: &DapAdapterDelegate,
 379        cx: &mut AsyncApp,
 380    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 381        let adapter = build_adapter(&config.kind).await?;
 382
 383        let binary = cx.update(|cx| {
 384            ProjectSettings::get_global(cx)
 385                .dap
 386                .get(&adapter.name())
 387                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 388        })?;
 389
 390        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 391            Err(error) => {
 392                delegate.update_status(
 393                    adapter.name(),
 394                    DapStatus::Failed {
 395                        error: error.to_string(),
 396                    },
 397                );
 398
 399                return Err(error);
 400            }
 401            Ok(mut binary) => {
 402                delegate.update_status(adapter.name(), DapStatus::None);
 403
 404                let shell_env = delegate.shell_env().await;
 405                let mut envs = binary.envs.unwrap_or_default();
 406                envs.extend(shell_env);
 407                binary.envs = Some(envs);
 408
 409                binary
 410            }
 411        };
 412
 413        Ok((adapter, binary))
 414    }
 415
 416    pub fn initialize_sequence(
 417        &self,
 418        capabilities: &Capabilities,
 419        initialized_rx: oneshot::Receiver<()>,
 420        cx: &App,
 421    ) -> Task<Result<()>> {
 422        let mut raw = self.adapter.request_args(&self.config);
 423        merge_json_value_into(
 424            self.config.initialize_args.clone().unwrap_or(json!({})),
 425            &mut raw,
 426        );
 427
 428        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 429        let launch = match &self.config.request {
 430            dap::DebugRequestType::Launch => {
 431                self.request(Launch { raw }, cx.background_executor().clone())
 432            }
 433            dap::DebugRequestType::Attach(_) => {
 434                self.request(Attach { raw }, cx.background_executor().clone())
 435            }
 436        };
 437
 438        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 439
 440        let configuration_sequence = cx.spawn({
 441            let this = self.clone();
 442            move |cx| async move {
 443                initialized_rx.await?;
 444                // todo(debugger) figure out if we want to handle a breakpoint response error
 445                // This will probably consist of letting a user know that breakpoints failed to be set
 446                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 447
 448                if configuration_done_supported {
 449                    this.request(ConfigurationDone, cx.background_executor().clone())
 450                } else {
 451                    Task::ready(Ok(()))
 452                }
 453                .await
 454            }
 455        });
 456
 457        cx.background_spawn(async move {
 458            futures::future::try_join(launch, configuration_sequence).await?;
 459            Ok(())
 460        })
 461    }
 462
 463    fn request<R: LocalDapCommand>(
 464        &self,
 465        request: R,
 466        executor: BackgroundExecutor,
 467    ) -> Task<Result<R::Response>>
 468    where
 469        <R::DapRequest as dap::requests::Request>::Response: 'static,
 470        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 471    {
 472        let request = Arc::new(request);
 473
 474        let request_clone = request.clone();
 475        let connection = self.client.clone();
 476        let request_task = executor.spawn(async move {
 477            let args = request_clone.to_dap();
 478            connection.request::<R::DapRequest>(args).await
 479        });
 480
 481        executor.spawn(async move {
 482            let response = request.response_from_dap(request_task.await?);
 483            response
 484        })
 485    }
 486}
 487impl From<RemoteConnection> for Mode {
 488    fn from(value: RemoteConnection) -> Self {
 489        Self::Remote(value)
 490    }
 491}
 492
 493impl Mode {
 494    fn request_dap<R: DapCommand>(
 495        &self,
 496        session_id: SessionId,
 497        request: R,
 498        cx: &mut Context<Session>,
 499    ) -> Task<Result<R::Response>>
 500    where
 501        <R::DapRequest as dap::requests::Request>::Response: 'static,
 502        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 503    {
 504        match self {
 505            Mode::Local(debug_adapter_client) => {
 506                debug_adapter_client.request(request, cx.background_executor().clone())
 507            }
 508            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 509        }
 510    }
 511}
 512
 513#[derive(Default)]
 514struct ThreadStates {
 515    global_state: Option<ThreadStatus>,
 516    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 517}
 518
 519impl ThreadStates {
 520    fn stop_all_threads(&mut self) {
 521        self.global_state = Some(ThreadStatus::Stopped);
 522        self.known_thread_states.clear();
 523    }
 524
 525    fn continue_all_threads(&mut self) {
 526        self.global_state = Some(ThreadStatus::Running);
 527        self.known_thread_states.clear();
 528    }
 529
 530    fn stop_thread(&mut self, thread_id: ThreadId) {
 531        self.known_thread_states
 532            .insert(thread_id, ThreadStatus::Stopped);
 533    }
 534
 535    fn continue_thread(&mut self, thread_id: ThreadId) {
 536        self.known_thread_states
 537            .insert(thread_id, ThreadStatus::Running);
 538    }
 539
 540    fn process_step(&mut self, thread_id: ThreadId) {
 541        self.known_thread_states
 542            .insert(thread_id, ThreadStatus::Stepping);
 543    }
 544
 545    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 546        self.thread_state(thread_id)
 547            .unwrap_or(ThreadStatus::Running)
 548    }
 549
 550    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 551        self.known_thread_states
 552            .get(&thread_id)
 553            .copied()
 554            .or(self.global_state)
 555    }
 556
 557    fn exit_thread(&mut self, thread_id: ThreadId) {
 558        self.known_thread_states
 559            .insert(thread_id, ThreadStatus::Exited);
 560    }
 561
 562    fn any_stopped_thread(&self) -> bool {
 563        self.global_state
 564            .is_some_and(|state| state == ThreadStatus::Stopped)
 565            || self
 566                .known_thread_states
 567                .values()
 568                .any(|status| *status == ThreadStatus::Stopped)
 569    }
 570}
 571const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 572
 573#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 574pub struct OutputToken(pub usize);
 575/// Represents a current state of a single debug adapter and provides ways to mutate it.
 576pub struct Session {
 577    mode: Mode,
 578    pub(super) capabilities: Capabilities,
 579    id: SessionId,
 580    parent_id: Option<SessionId>,
 581    ignore_breakpoints: bool,
 582    modules: Vec<dap::Module>,
 583    loaded_sources: Vec<dap::Source>,
 584    output_token: OutputToken,
 585    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 586    threads: IndexMap<ThreadId, Thread>,
 587    thread_states: ThreadStates,
 588    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 589    stack_frames: IndexMap<StackFrameId, StackFrame>,
 590    locations: HashMap<u64, dap::LocationsResponse>,
 591    is_session_terminated: bool,
 592    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 593    _background_tasks: Vec<Task<()>>,
 594}
 595
 596trait CacheableCommand: 'static + Send + Sync {
 597    fn as_any(&self) -> &dyn Any;
 598    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 599    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 600    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 601}
 602
 603impl<T> CacheableCommand for T
 604where
 605    T: DapCommand + PartialEq + Eq + Hash,
 606{
 607    fn as_any(&self) -> &dyn Any {
 608        self
 609    }
 610
 611    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 612        rhs.as_any()
 613            .downcast_ref::<Self>()
 614            .map_or(false, |rhs| self == rhs)
 615    }
 616
 617    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 618        T::hash(self, &mut hasher);
 619    }
 620
 621    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 622        self
 623    }
 624}
 625
 626pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 627
 628impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 629    fn from(request: T) -> Self {
 630        Self(Arc::new(request))
 631    }
 632}
 633
 634impl PartialEq for RequestSlot {
 635    fn eq(&self, other: &Self) -> bool {
 636        self.0.dyn_eq(other.0.as_ref())
 637    }
 638}
 639
 640impl Eq for RequestSlot {}
 641
 642impl Hash for RequestSlot {
 643    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 644        self.0.dyn_hash(state);
 645        self.0.as_any().type_id().hash(state)
 646    }
 647}
 648
 649#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 650pub struct CompletionsQuery {
 651    pub query: String,
 652    pub column: u64,
 653    pub line: Option<u64>,
 654    pub frame_id: Option<u64>,
 655}
 656
 657impl CompletionsQuery {
 658    pub fn new(
 659        buffer: &language::Buffer,
 660        cursor_position: language::Anchor,
 661        frame_id: Option<u64>,
 662    ) -> Self {
 663        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 664        Self {
 665            query: buffer.text(),
 666            column: column as u64,
 667            frame_id,
 668            line: Some(row as u64),
 669        }
 670    }
 671}
 672
 673pub enum SessionEvent {
 674    Modules,
 675    LoadedSources,
 676    Stopped(Option<ThreadId>),
 677    StackTrace,
 678    Variables,
 679    Threads,
 680}
 681
 682impl EventEmitter<SessionEvent> for Session {}
 683
 684// local session will send breakpoint updates to DAP for all new breakpoints
 685// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 686// BreakpointStore notifies session on breakpoint changes
 687impl Session {
 688    pub(crate) fn local(
 689        breakpoint_store: Entity<BreakpointStore>,
 690        session_id: SessionId,
 691        parent_session: Option<Entity<Session>>,
 692        delegate: DapAdapterDelegate,
 693        config: DebugAdapterConfig,
 694        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 695        initialized_tx: oneshot::Sender<()>,
 696        cx: &mut App,
 697    ) -> Task<Result<Entity<Self>>> {
 698        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 699
 700        cx.spawn(move |mut cx| async move {
 701            let (mode, capabilities) = LocalMode::new(
 702                session_id,
 703                parent_session.clone(),
 704                breakpoint_store.clone(),
 705                config.clone(),
 706                delegate,
 707                message_tx,
 708                cx.clone(),
 709            )
 710            .await?;
 711
 712            cx.new(|cx| {
 713                let _background_tasks =
 714                    vec![cx.spawn(move |this: WeakEntity<Self>, mut cx| async move {
 715                        let mut initialized_tx = Some(initialized_tx);
 716                        while let Some(message) = message_rx.next().await {
 717                            if let Message::Event(event) = message {
 718                                if let Events::Initialized(_) = *event {
 719                                    if let Some(tx) = initialized_tx.take() {
 720                                        tx.send(()).ok();
 721                                    }
 722                                } else {
 723                                    let Ok(_) = this.update(&mut cx, |session, cx| {
 724                                        session.handle_dap_event(event, cx);
 725                                    }) else {
 726                                        break;
 727                                    };
 728                                }
 729                            } else {
 730                                let Ok(_) = start_debugging_requests_tx
 731                                    .unbounded_send((session_id, message))
 732                                else {
 733                                    break;
 734                                };
 735                            }
 736                        }
 737                    })];
 738
 739                cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 740                    BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 741                        if let Some(local) = (!this.ignore_breakpoints)
 742                            .then(|| this.as_local_mut())
 743                            .flatten()
 744                        {
 745                            local
 746                                .send_breakpoints_from_path(path.clone(), *reason, cx)
 747                                .detach();
 748                        };
 749                    }
 750                    BreakpointStoreEvent::ActiveDebugLineChanged => {}
 751                })
 752                .detach();
 753
 754                Self {
 755                    mode: Mode::Local(mode),
 756                    id: session_id,
 757                    parent_id: parent_session.map(|session| session.read(cx).id),
 758                    variables: Default::default(),
 759                    capabilities,
 760                    thread_states: ThreadStates::default(),
 761                    output_token: OutputToken(0),
 762                    ignore_breakpoints: false,
 763                    output: circular_buffer::CircularBuffer::boxed(),
 764                    requests: HashMap::default(),
 765                    modules: Vec::default(),
 766                    loaded_sources: Vec::default(),
 767                    threads: IndexMap::default(),
 768                    stack_frames: IndexMap::default(),
 769                    locations: Default::default(),
 770                    _background_tasks,
 771                    is_session_terminated: false,
 772                }
 773            })
 774        })
 775    }
 776
 777    pub(crate) fn remote(
 778        session_id: SessionId,
 779        client: AnyProtoClient,
 780        upstream_project_id: u64,
 781        ignore_breakpoints: bool,
 782    ) -> Self {
 783        Self {
 784            mode: Mode::Remote(RemoteConnection {
 785                _client: client,
 786                _upstream_project_id: upstream_project_id,
 787            }),
 788            id: session_id,
 789            parent_id: None,
 790            capabilities: Capabilities::default(),
 791            ignore_breakpoints,
 792            variables: Default::default(),
 793            stack_frames: Default::default(),
 794            thread_states: ThreadStates::default(),
 795
 796            output_token: OutputToken(0),
 797            output: circular_buffer::CircularBuffer::boxed(),
 798            requests: HashMap::default(),
 799            modules: Vec::default(),
 800            loaded_sources: Vec::default(),
 801            threads: IndexMap::default(),
 802            _background_tasks: Vec::default(),
 803            locations: Default::default(),
 804            is_session_terminated: false,
 805        }
 806    }
 807
 808    pub fn session_id(&self) -> SessionId {
 809        self.id
 810    }
 811
 812    pub fn parent_id(&self) -> Option<SessionId> {
 813        self.parent_id
 814    }
 815
 816    pub fn capabilities(&self) -> &Capabilities {
 817        &self.capabilities
 818    }
 819
 820    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 821        if let Mode::Local(local_mode) = &self.mode {
 822            Some(local_mode.config.clone())
 823        } else {
 824            None
 825        }
 826    }
 827
 828    pub fn is_terminated(&self) -> bool {
 829        self.is_session_terminated
 830    }
 831
 832    pub fn is_local(&self) -> bool {
 833        matches!(self.mode, Mode::Local(_))
 834    }
 835
 836    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 837        match &mut self.mode {
 838            Mode::Local(local_mode) => Some(local_mode),
 839            Mode::Remote(_) => None,
 840        }
 841    }
 842
 843    pub fn as_local(&self) -> Option<&LocalMode> {
 844        match &self.mode {
 845            Mode::Local(local_mode) => Some(local_mode),
 846            Mode::Remote(_) => None,
 847        }
 848    }
 849
 850    pub(super) fn initialize_sequence(
 851        &mut self,
 852        initialize_rx: oneshot::Receiver<()>,
 853        cx: &mut Context<Self>,
 854    ) -> Task<Result<()>> {
 855        match &self.mode {
 856            Mode::Local(local_mode) => {
 857                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
 858            }
 859            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 860        }
 861    }
 862
 863    pub fn output(
 864        &self,
 865        since: OutputToken,
 866    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 867        if self.output_token.0 == 0 {
 868            return (self.output.range(0..0), OutputToken(0));
 869        };
 870
 871        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 872
 873        let clamped_events_since = events_since.clamp(0, self.output.len());
 874        (
 875            self.output
 876                .range(self.output.len() - clamped_events_since..),
 877            self.output_token,
 878        )
 879    }
 880
 881    pub fn respond_to_client(
 882        &self,
 883        request_seq: u64,
 884        success: bool,
 885        command: String,
 886        body: Option<serde_json::Value>,
 887        cx: &mut Context<Self>,
 888    ) -> Task<Result<()>> {
 889        let Some(local_session) = self.as_local().cloned() else {
 890            unreachable!("Cannot respond to remote client");
 891        };
 892
 893        cx.background_spawn(async move {
 894            local_session
 895                .client
 896                .send_message(Message::Response(Response {
 897                    body,
 898                    success,
 899                    command,
 900                    seq: request_seq + 1,
 901                    request_seq,
 902                    message: None,
 903                }))
 904                .await
 905        })
 906    }
 907
 908    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
 909        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
 910            self.thread_states.stop_all_threads();
 911
 912            self.invalidate_command_type::<StackTraceCommand>();
 913        }
 914
 915        // Event if we stopped all threads we still need to insert the thread_id
 916        // to our own data
 917        if let Some(thread_id) = event.thread_id {
 918            self.thread_states.stop_thread(ThreadId(thread_id));
 919
 920            self.invalidate_state(
 921                &StackTraceCommand {
 922                    thread_id,
 923                    start_frame: None,
 924                    levels: None,
 925                }
 926                .into(),
 927            );
 928        }
 929
 930        self.invalidate_generic();
 931        self.threads.clear();
 932        self.variables.clear();
 933        cx.emit(SessionEvent::Stopped(
 934            event
 935                .thread_id
 936                .map(Into::into)
 937                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
 938        ));
 939        cx.notify();
 940    }
 941
 942    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
 943        match *event {
 944            Events::Initialized(_) => {
 945                debug_assert!(
 946                    false,
 947                    "Initialized event should have been handled in LocalMode"
 948                );
 949            }
 950            Events::Stopped(event) => self.handle_stopped_event(event, cx),
 951            Events::Continued(event) => {
 952                if event.all_threads_continued.unwrap_or_default() {
 953                    self.thread_states.continue_all_threads();
 954                } else {
 955                    self.thread_states
 956                        .continue_thread(ThreadId(event.thread_id));
 957                }
 958                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
 959                self.invalidate_generic();
 960            }
 961            Events::Exited(_event) => {
 962                self.clear_active_debug_line(cx);
 963            }
 964            Events::Terminated(_) => {
 965                self.is_session_terminated = true;
 966                self.clear_active_debug_line(cx);
 967            }
 968            Events::Thread(event) => {
 969                let thread_id = ThreadId(event.thread_id);
 970
 971                match event.reason {
 972                    dap::ThreadEventReason::Started => {
 973                        self.thread_states.continue_thread(thread_id);
 974                    }
 975                    dap::ThreadEventReason::Exited => {
 976                        self.thread_states.exit_thread(thread_id);
 977                    }
 978                    reason => {
 979                        log::error!("Unhandled thread event reason {:?}", reason);
 980                    }
 981                }
 982                self.invalidate_state(&ThreadsCommand.into());
 983                cx.notify();
 984            }
 985            Events::Output(event) => {
 986                if event
 987                    .category
 988                    .as_ref()
 989                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
 990                {
 991                    return;
 992                }
 993
 994                self.output.push_back(event);
 995                self.output_token.0 += 1;
 996                cx.notify();
 997            }
 998            Events::Breakpoint(_) => {}
 999            Events::Module(event) => {
1000                match event.reason {
1001                    dap::ModuleEventReason::New => {
1002                        self.modules.push(event.module);
1003                    }
1004                    dap::ModuleEventReason::Changed => {
1005                        if let Some(module) = self
1006                            .modules
1007                            .iter_mut()
1008                            .find(|other| event.module.id == other.id)
1009                        {
1010                            *module = event.module;
1011                        }
1012                    }
1013                    dap::ModuleEventReason::Removed => {
1014                        self.modules.retain(|other| event.module.id != other.id);
1015                    }
1016                }
1017
1018                // todo(debugger): We should only send the invalidate command to downstream clients.
1019                // self.invalidate_state(&ModulesCommand.into());
1020            }
1021            Events::LoadedSource(_) => {
1022                self.invalidate_state(&LoadedSourcesCommand.into());
1023            }
1024            Events::Capabilities(event) => {
1025                self.capabilities = self.capabilities.merge(event.capabilities);
1026                cx.notify();
1027            }
1028            Events::Memory(_) => {}
1029            Events::Process(_) => {}
1030            Events::ProgressEnd(_) => {}
1031            Events::ProgressStart(_) => {}
1032            Events::ProgressUpdate(_) => {}
1033            Events::Invalidated(_) => {}
1034            Events::Other(_) => {}
1035        }
1036    }
1037
1038    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1039    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1040        &mut self,
1041        request: T,
1042        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1043            + 'static,
1044        cx: &mut Context<Self>,
1045    ) {
1046        const {
1047            assert!(
1048                T::CACHEABLE,
1049                "Only requests marked as cacheable should invoke `fetch`"
1050            );
1051        }
1052
1053        if !self.thread_states.any_stopped_thread()
1054            && request.type_id() != TypeId::of::<ThreadsCommand>()
1055        {
1056            return;
1057        }
1058
1059        let request_map = self
1060            .requests
1061            .entry(std::any::TypeId::of::<T>())
1062            .or_default();
1063
1064        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1065            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1066
1067            let task = Self::request_inner::<Arc<T>>(
1068                &self.capabilities,
1069                self.id,
1070                &self.mode,
1071                command,
1072                process_result,
1073                cx,
1074            );
1075            let task = cx
1076                .background_executor()
1077                .spawn(async move {
1078                    let _ = task.await?;
1079                    Some(())
1080                })
1081                .shared();
1082
1083            vacant.insert(task);
1084            cx.notify();
1085        }
1086    }
1087
1088    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1089        capabilities: &Capabilities,
1090        session_id: SessionId,
1091        mode: &Mode,
1092        request: T,
1093        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1094            + 'static,
1095        cx: &mut Context<Self>,
1096    ) -> Task<Option<T::Response>> {
1097        if !T::is_supported(&capabilities) {
1098            log::warn!(
1099                "Attempted to send a DAP request that isn't supported: {:?}",
1100                request
1101            );
1102            let error = Err(anyhow::Error::msg(
1103                "Couldn't complete request because it's not supported",
1104            ));
1105            return cx.spawn(|this, mut cx| async move {
1106                this.update(&mut cx, |this, cx| process_result(this, error, cx))
1107                    .log_err()
1108                    .flatten()
1109            });
1110        }
1111
1112        let request = mode.request_dap(session_id, request, cx);
1113        cx.spawn(|this, mut cx| async move {
1114            let result = request.await;
1115            this.update(&mut cx, |this, cx| process_result(this, result, cx))
1116                .log_err()
1117                .flatten()
1118        })
1119    }
1120
1121    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1122        &self,
1123        request: T,
1124        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1125            + 'static,
1126        cx: &mut Context<Self>,
1127    ) -> Task<Option<T::Response>> {
1128        Self::request_inner(
1129            &self.capabilities,
1130            self.id,
1131            &self.mode,
1132            request,
1133            process_result,
1134            cx,
1135        )
1136    }
1137
1138    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1139        self.requests.remove(&std::any::TypeId::of::<Command>());
1140    }
1141
1142    fn invalidate_generic(&mut self) {
1143        self.invalidate_command_type::<ModulesCommand>();
1144        self.invalidate_command_type::<LoadedSourcesCommand>();
1145        self.invalidate_command_type::<ThreadsCommand>();
1146    }
1147
1148    fn invalidate_state(&mut self, key: &RequestSlot) {
1149        self.requests
1150            .entry(key.0.as_any().type_id())
1151            .and_modify(|request_map| {
1152                request_map.remove(&key);
1153            });
1154    }
1155
1156    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1157        self.thread_states.thread_status(thread_id)
1158    }
1159
1160    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1161        self.fetch(
1162            dap_command::ThreadsCommand,
1163            |this, result, cx| {
1164                let result = result.log_err()?;
1165
1166                this.threads = result
1167                    .iter()
1168                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1169                    .collect();
1170
1171                this.invalidate_command_type::<StackTraceCommand>();
1172                cx.emit(SessionEvent::Threads);
1173                cx.notify();
1174
1175                Some(result)
1176            },
1177            cx,
1178        );
1179
1180        self.threads
1181            .values()
1182            .map(|thread| {
1183                (
1184                    thread.dap.clone(),
1185                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1186                )
1187            })
1188            .collect()
1189    }
1190
1191    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1192        self.fetch(
1193            dap_command::ModulesCommand,
1194            |this, result, cx| {
1195                let result = result.log_err()?;
1196
1197                this.modules = result.iter().cloned().collect();
1198                cx.emit(SessionEvent::Modules);
1199                cx.notify();
1200
1201                Some(result)
1202            },
1203            cx,
1204        );
1205
1206        &self.modules
1207    }
1208
1209    pub fn ignore_breakpoints(&self) -> bool {
1210        self.ignore_breakpoints
1211    }
1212
1213    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1214        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1215    }
1216
1217    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1218        if self.ignore_breakpoints == ignore {
1219            return Task::ready(());
1220        }
1221
1222        self.ignore_breakpoints = ignore;
1223
1224        if let Some(local) = self.as_local() {
1225            local.send_all_breakpoints(ignore, cx)
1226        } else {
1227            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1228            unimplemented!()
1229        }
1230    }
1231
1232    pub fn breakpoints_enabled(&self) -> bool {
1233        self.ignore_breakpoints
1234    }
1235
1236    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1237        self.fetch(
1238            dap_command::LoadedSourcesCommand,
1239            |this, result, cx| {
1240                let result = result.log_err()?;
1241                this.loaded_sources = result.iter().cloned().collect();
1242                cx.emit(SessionEvent::LoadedSources);
1243                cx.notify();
1244                Some(result)
1245            },
1246            cx,
1247        );
1248
1249        &self.loaded_sources
1250    }
1251
1252    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1253        res.log_err()?;
1254        Some(())
1255    }
1256
1257    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1258        thread_id: ThreadId,
1259    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1260    {
1261        move |this, response, cx| match response.log_err() {
1262            Some(response) => Some(response),
1263            None => {
1264                this.thread_states.stop_thread(thread_id);
1265                cx.notify();
1266                None
1267            }
1268        }
1269    }
1270
1271    fn clear_active_debug_line_response(
1272        &mut self,
1273        response: Result<()>,
1274        cx: &mut Context<'_, Session>,
1275    ) -> Option<()> {
1276        response.log_err()?;
1277        self.clear_active_debug_line(cx);
1278        Some(())
1279    }
1280
1281    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1282        self.as_local()
1283            .expect("Message handler will only run in local mode")
1284            .breakpoint_store
1285            .update(cx, |store, cx| {
1286                store.remove_active_position(Some(self.id), cx)
1287            });
1288    }
1289
1290    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1291        self.request(
1292            PauseCommand {
1293                thread_id: thread_id.0,
1294            },
1295            Self::empty_response,
1296            cx,
1297        )
1298        .detach();
1299    }
1300
1301    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1302        self.request(
1303            RestartStackFrameCommand { stack_frame_id },
1304            Self::empty_response,
1305            cx,
1306        )
1307        .detach();
1308    }
1309
1310    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1311        if self.capabilities.supports_restart_request.unwrap_or(false) {
1312            self.request(
1313                RestartCommand {
1314                    raw: args.unwrap_or(Value::Null),
1315                },
1316                Self::empty_response,
1317                cx,
1318            )
1319            .detach();
1320        } else {
1321            self.request(
1322                DisconnectCommand {
1323                    restart: Some(false),
1324                    terminate_debuggee: Some(true),
1325                    suspend_debuggee: Some(false),
1326                },
1327                Self::empty_response,
1328                cx,
1329            )
1330            .detach();
1331        }
1332    }
1333
1334    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1335        let task = if self
1336            .capabilities
1337            .supports_terminate_request
1338            .unwrap_or_default()
1339        {
1340            self.request(
1341                TerminateCommand {
1342                    restart: Some(false),
1343                },
1344                Self::clear_active_debug_line_response,
1345                cx,
1346            )
1347        } else {
1348            self.request(
1349                DisconnectCommand {
1350                    restart: Some(false),
1351                    terminate_debuggee: Some(true),
1352                    suspend_debuggee: Some(false),
1353                },
1354                Self::clear_active_debug_line_response,
1355                cx,
1356            )
1357        };
1358
1359        cx.background_spawn(async move {
1360            let _ = task.await;
1361        })
1362    }
1363
1364    pub fn completions(
1365        &mut self,
1366        query: CompletionsQuery,
1367        cx: &mut Context<Self>,
1368    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1369        let task = self.request(query, |_, result, _| result.log_err(), cx);
1370
1371        cx.background_executor().spawn(async move {
1372            anyhow::Ok(
1373                task.await
1374                    .map(|response| response.targets)
1375                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1376            )
1377        })
1378    }
1379
1380    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1381        self.thread_states.continue_thread(thread_id);
1382        self.request(
1383            ContinueCommand {
1384                args: ContinueArguments {
1385                    thread_id: thread_id.0,
1386                    single_thread: Some(true),
1387                },
1388            },
1389            Self::on_step_response::<ContinueCommand>(thread_id),
1390            cx,
1391        )
1392        .detach();
1393    }
1394
1395    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1396        match self.mode {
1397            Mode::Local(ref local) => Some(local.client.clone()),
1398            Mode::Remote(_) => None,
1399        }
1400    }
1401
1402    pub fn step_over(
1403        &mut self,
1404        thread_id: ThreadId,
1405        granularity: SteppingGranularity,
1406        cx: &mut Context<Self>,
1407    ) {
1408        let supports_single_thread_execution_requests =
1409            self.capabilities.supports_single_thread_execution_requests;
1410        let supports_stepping_granularity = self
1411            .capabilities
1412            .supports_stepping_granularity
1413            .unwrap_or_default();
1414
1415        let command = NextCommand {
1416            inner: StepCommand {
1417                thread_id: thread_id.0,
1418                granularity: supports_stepping_granularity.then(|| granularity),
1419                single_thread: supports_single_thread_execution_requests,
1420            },
1421        };
1422
1423        self.thread_states.process_step(thread_id);
1424        self.request(
1425            command,
1426            Self::on_step_response::<NextCommand>(thread_id),
1427            cx,
1428        )
1429        .detach();
1430    }
1431
1432    pub fn step_in(
1433        &mut self,
1434        thread_id: ThreadId,
1435        granularity: SteppingGranularity,
1436        cx: &mut Context<Self>,
1437    ) {
1438        let supports_single_thread_execution_requests =
1439            self.capabilities.supports_single_thread_execution_requests;
1440        let supports_stepping_granularity = self
1441            .capabilities
1442            .supports_stepping_granularity
1443            .unwrap_or_default();
1444
1445        let command = StepInCommand {
1446            inner: StepCommand {
1447                thread_id: thread_id.0,
1448                granularity: supports_stepping_granularity.then(|| granularity),
1449                single_thread: supports_single_thread_execution_requests,
1450            },
1451        };
1452
1453        self.thread_states.process_step(thread_id);
1454        self.request(
1455            command,
1456            Self::on_step_response::<StepInCommand>(thread_id),
1457            cx,
1458        )
1459        .detach();
1460    }
1461
1462    pub fn step_out(
1463        &mut self,
1464        thread_id: ThreadId,
1465        granularity: SteppingGranularity,
1466        cx: &mut Context<Self>,
1467    ) {
1468        let supports_single_thread_execution_requests =
1469            self.capabilities.supports_single_thread_execution_requests;
1470        let supports_stepping_granularity = self
1471            .capabilities
1472            .supports_stepping_granularity
1473            .unwrap_or_default();
1474
1475        let command = StepOutCommand {
1476            inner: StepCommand {
1477                thread_id: thread_id.0,
1478                granularity: supports_stepping_granularity.then(|| granularity),
1479                single_thread: supports_single_thread_execution_requests,
1480            },
1481        };
1482
1483        self.thread_states.process_step(thread_id);
1484        self.request(
1485            command,
1486            Self::on_step_response::<StepOutCommand>(thread_id),
1487            cx,
1488        )
1489        .detach();
1490    }
1491
1492    pub fn step_back(
1493        &mut self,
1494        thread_id: ThreadId,
1495        granularity: SteppingGranularity,
1496        cx: &mut Context<Self>,
1497    ) {
1498        let supports_single_thread_execution_requests =
1499            self.capabilities.supports_single_thread_execution_requests;
1500        let supports_stepping_granularity = self
1501            .capabilities
1502            .supports_stepping_granularity
1503            .unwrap_or_default();
1504
1505        let command = StepBackCommand {
1506            inner: StepCommand {
1507                thread_id: thread_id.0,
1508                granularity: supports_stepping_granularity.then(|| granularity),
1509                single_thread: supports_single_thread_execution_requests,
1510            },
1511        };
1512
1513        self.thread_states.process_step(thread_id);
1514
1515        self.request(
1516            command,
1517            Self::on_step_response::<StepBackCommand>(thread_id),
1518            cx,
1519        )
1520        .detach();
1521    }
1522
1523    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1524        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1525            && self.requests.contains_key(&ThreadsCommand.type_id())
1526            && self.threads.contains_key(&thread_id)
1527        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1528        // We could still be using an old thread id and have sent a new thread's request
1529        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1530        // But it very well could cause a minor bug in the future that is hard to track down
1531        {
1532            self.fetch(
1533                super::dap_command::StackTraceCommand {
1534                    thread_id: thread_id.0,
1535                    start_frame: None,
1536                    levels: None,
1537                },
1538                move |this, stack_frames, cx| {
1539                    let stack_frames = stack_frames.log_err()?;
1540
1541                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1542                        thread.stack_frame_ids =
1543                            stack_frames.iter().map(|frame| frame.id).collect();
1544                    });
1545                    debug_assert!(
1546                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1547                        "Sent request for thread_id that doesn't exist"
1548                    );
1549
1550                    this.stack_frames.extend(
1551                        stack_frames
1552                            .iter()
1553                            .cloned()
1554                            .map(|frame| (frame.id, StackFrame::from(frame))),
1555                    );
1556
1557                    this.invalidate_command_type::<ScopesCommand>();
1558                    this.invalidate_command_type::<VariablesCommand>();
1559
1560                    cx.emit(SessionEvent::StackTrace);
1561                    cx.notify();
1562                    Some(stack_frames)
1563                },
1564                cx,
1565            );
1566        }
1567
1568        self.threads
1569            .get(&thread_id)
1570            .map(|thread| {
1571                thread
1572                    .stack_frame_ids
1573                    .iter()
1574                    .filter_map(|id| self.stack_frames.get(id))
1575                    .cloned()
1576                    .collect()
1577            })
1578            .unwrap_or_default()
1579    }
1580
1581    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1582        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1583            && self
1584                .requests
1585                .contains_key(&TypeId::of::<StackTraceCommand>())
1586        {
1587            self.fetch(
1588                ScopesCommand { stack_frame_id },
1589                move |this, scopes, cx| {
1590                    let scopes = scopes.log_err()?;
1591
1592                    for scope in scopes .iter(){
1593                        this.variables(scope.variables_reference, cx);
1594                    }
1595
1596                    let entry = this
1597                        .stack_frames
1598                        .entry(stack_frame_id)
1599                        .and_modify(|stack_frame| {
1600                            stack_frame.scopes = scopes.clone();
1601                        });
1602
1603                    cx.emit(SessionEvent::Variables);
1604
1605                    debug_assert!(
1606                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1607                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1608                    );
1609
1610                    Some(scopes)
1611                },
1612                cx,
1613            );
1614        }
1615
1616        self.stack_frames
1617            .get(&stack_frame_id)
1618            .map(|frame| frame.scopes.as_slice())
1619            .unwrap_or_default()
1620    }
1621
1622    pub fn variables(
1623        &mut self,
1624        variables_reference: VariableReference,
1625        cx: &mut Context<Self>,
1626    ) -> Vec<dap::Variable> {
1627        let command = VariablesCommand {
1628            variables_reference,
1629            filter: None,
1630            start: None,
1631            count: None,
1632            format: None,
1633        };
1634
1635        self.fetch(
1636            command,
1637            move |this, variables, cx| {
1638                let variables = variables.log_err()?;
1639                this.variables
1640                    .insert(variables_reference, variables.clone());
1641
1642                cx.emit(SessionEvent::Variables);
1643                Some(variables)
1644            },
1645            cx,
1646        );
1647
1648        self.variables
1649            .get(&variables_reference)
1650            .cloned()
1651            .unwrap_or_default()
1652    }
1653
1654    pub fn set_variable_value(
1655        &mut self,
1656        variables_reference: u64,
1657        name: String,
1658        value: String,
1659        cx: &mut Context<Self>,
1660    ) {
1661        if self.capabilities.supports_set_variable.unwrap_or_default() {
1662            self.request(
1663                SetVariableValueCommand {
1664                    name,
1665                    value,
1666                    variables_reference,
1667                },
1668                move |this, response, cx| {
1669                    let response = response.log_err()?;
1670                    this.invalidate_command_type::<VariablesCommand>();
1671                    cx.notify();
1672                    Some(response)
1673                },
1674                cx,
1675            )
1676            .detach()
1677        }
1678    }
1679
1680    pub fn evaluate(
1681        &mut self,
1682        expression: String,
1683        context: Option<EvaluateArgumentsContext>,
1684        frame_id: Option<u64>,
1685        source: Option<Source>,
1686        cx: &mut Context<Self>,
1687    ) {
1688        self.request(
1689            EvaluateCommand {
1690                expression,
1691                context,
1692                frame_id,
1693                source,
1694            },
1695            |this, response, cx| {
1696                let response = response.log_err()?;
1697                this.output.push_back(dap::OutputEvent {
1698                    category: None,
1699                    output: response.result.clone(),
1700                    group: None,
1701                    variables_reference: Some(response.variables_reference),
1702                    source: None,
1703                    line: None,
1704                    column: None,
1705                    data: None,
1706                    location_reference: None,
1707                });
1708
1709                this.invalidate_command_type::<ScopesCommand>();
1710                cx.notify();
1711                Some(response)
1712            },
1713            cx,
1714        )
1715        .detach();
1716    }
1717
1718    pub fn location(
1719        &mut self,
1720        reference: u64,
1721        cx: &mut Context<Self>,
1722    ) -> Option<dap::LocationsResponse> {
1723        self.fetch(
1724            LocationsCommand { reference },
1725            move |this, response, _| {
1726                let response = response.log_err()?;
1727                this.locations.insert(reference, response.clone());
1728                Some(response)
1729            },
1730            cx,
1731        );
1732        self.locations.get(&reference).cloned()
1733    }
1734    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1735        let command = DisconnectCommand {
1736            restart: Some(false),
1737            terminate_debuggee: Some(true),
1738            suspend_debuggee: Some(false),
1739        };
1740
1741        self.request(command, Self::empty_response, cx).detach()
1742    }
1743
1744    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1745        if self
1746            .capabilities
1747            .supports_terminate_threads_request
1748            .unwrap_or_default()
1749        {
1750            self.request(
1751                TerminateThreadsCommand {
1752                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1753                },
1754                Self::clear_active_debug_line_response,
1755                cx,
1756            )
1757            .detach();
1758        } else {
1759            self.shutdown(cx).detach();
1760        }
1761    }
1762}