session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{anyhow, Result};
  14use collections::{HashMap, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::OutputEventCategory;
  18use dap::{
  19    adapters::{DapDelegate, DapStatus},
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  23    SteppingGranularity, StoppedEvent, VariableReference,
  24};
  25use dap_adapters::build_adapter;
  26use futures::channel::oneshot;
  27use futures::{future::Shared, FutureExt};
  28use gpui::{
  29    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  30};
  31use rpc::AnyProtoClient;
  32use serde_json::{json, Value};
  33use settings::Settings;
  34use smol::stream::StreamExt;
  35use std::any::TypeId;
  36use std::path::PathBuf;
  37use std::u64;
  38use std::{
  39    any::Any,
  40    collections::hash_map::Entry,
  41    hash::{Hash, Hasher},
  42    path::Path,
  43    sync::Arc,
  44};
  45use task::DebugAdapterConfig;
  46use text::{PointUtf16, ToPointUtf16};
  47use util::{merge_json_value_into, ResultExt};
  48
  49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  50#[repr(transparent)]
  51pub struct ThreadId(pub u64);
  52
  53impl ThreadId {
  54    pub const MIN: ThreadId = ThreadId(u64::MIN);
  55    pub const MAX: ThreadId = ThreadId(u64::MAX);
  56}
  57
  58impl From<u64> for ThreadId {
  59    fn from(id: u64) -> Self {
  60        Self(id)
  61    }
  62}
  63
  64#[derive(Clone, Debug)]
  65pub struct StackFrame {
  66    pub dap: dap::StackFrame,
  67    pub scopes: Vec<dap::Scope>,
  68}
  69
  70impl From<dap::StackFrame> for StackFrame {
  71    fn from(stack_frame: dap::StackFrame) -> Self {
  72        Self {
  73            scopes: vec![],
  74            dap: stack_frame,
  75        }
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  80pub enum ThreadStatus {
  81    #[default]
  82    Running,
  83    Stopped,
  84    Stepping,
  85    Exited,
  86    Ended,
  87}
  88
  89impl ThreadStatus {
  90    pub fn label(&self) -> &'static str {
  91        match self {
  92            ThreadStatus::Running => "Running",
  93            ThreadStatus::Stopped => "Stopped",
  94            ThreadStatus::Stepping => "Stepping",
  95            ThreadStatus::Exited => "Exited",
  96            ThreadStatus::Ended => "Ended",
  97        }
  98    }
  99}
 100
 101#[derive(Debug)]
 102pub struct Thread {
 103    dap: dap::Thread,
 104    stack_frame_ids: IndexSet<StackFrameId>,
 105    _has_stopped: bool,
 106}
 107
 108impl From<dap::Thread> for Thread {
 109    fn from(dap: dap::Thread) -> Self {
 110        Self {
 111            dap,
 112            stack_frame_ids: Default::default(),
 113            _has_stopped: false,
 114        }
 115    }
 116}
 117
 118type UpstreamProjectId = u64;
 119
 120struct RemoteConnection {
 121    _client: AnyProtoClient,
 122    _upstream_project_id: UpstreamProjectId,
 123}
 124
 125impl RemoteConnection {
 126    fn send_proto_client_request<R: DapCommand>(
 127        &self,
 128        _request: R,
 129        _session_id: SessionId,
 130        cx: &mut App,
 131    ) -> Task<Result<R::Response>> {
 132        // let message = request.to_proto(session_id, self.upstream_project_id);
 133        // let upstream_client = self.client.clone();
 134        cx.background_executor().spawn(async move {
 135            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 136            // let response = upstream_client.request(message).await?;
 137            // request.response_from_proto(response)
 138            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 139        })
 140    }
 141
 142    fn request<R: DapCommand>(
 143        &self,
 144        request: R,
 145        session_id: SessionId,
 146        cx: &mut App,
 147    ) -> Task<Result<R::Response>>
 148    where
 149        <R::DapRequest as dap::requests::Request>::Response: 'static,
 150        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 151    {
 152        return self.send_proto_client_request::<R>(request, session_id, cx);
 153    }
 154}
 155
 156enum Mode {
 157    Local(LocalMode),
 158    Remote(RemoteConnection),
 159}
 160
 161#[derive(Clone)]
 162pub struct LocalMode {
 163    client: Arc<DebugAdapterClient>,
 164    config: DebugAdapterConfig,
 165    adapter: Arc<dyn DebugAdapter>,
 166    breakpoint_store: Entity<BreakpointStore>,
 167}
 168
 169fn client_source(abs_path: &Path) -> dap::Source {
 170    dap::Source {
 171        name: abs_path
 172            .file_name()
 173            .map(|filename| filename.to_string_lossy().to_string()),
 174        path: Some(abs_path.to_string_lossy().to_string()),
 175        source_reference: None,
 176        presentation_hint: None,
 177        origin: None,
 178        sources: None,
 179        adapter_data: None,
 180        checksums: None,
 181    }
 182}
 183
 184impl LocalMode {
 185    fn new(
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        cx.spawn(async move |cx| {
 195            let (adapter, binary) = Self::get_adapter_binary(&config, &delegate,  cx).await?;
 196
 197            let message_handler = Box::new(move |message| {
 198                messages_tx.unbounded_send(message).ok();
 199            });
 200
 201            let client = Arc::new(
 202                if let Some(client) = parent_session
 203                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 204                    .flatten()
 205                {
 206                    client
 207                        .reconnect(session_id, binary, message_handler, cx.clone())
 208                        .await?
 209                } else {
 210                    DebugAdapterClient::start(
 211                        session_id,
 212                        adapter.name(),
 213                        binary,
 214                        message_handler,
 215                        cx.clone(),
 216                    )
 217                    .await?
 218                },
 219            );
 220
 221            let adapter_id = adapter.name().to_string().to_owned();
 222            let session = Self {
 223                client,
 224                adapter,
 225                breakpoint_store,
 226                config: config.clone(),
 227            };
 228
 229            #[cfg(any(test, feature = "test-support"))]
 230            {
 231                let dap::DebugAdapterKind::Fake((fail, caps)) = session.config.kind.clone() else {
 232                    panic!("Only fake debug adapter configs should be used in tests");
 233                };
 234
 235                session
 236                    .client
 237                    .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 238                    .await;
 239
 240                match config.request.clone() {
 241                    dap::DebugRequestType::Launch if fail => {
 242                        session
 243                            .client
 244                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 245                                Err(dap::ErrorResponse {
 246                                    error: Some(dap::Message {
 247                                        id: 1,
 248                                        format: "error".into(),
 249                                        variables: None,
 250                                        send_telemetry: None,
 251                                        show_user: None,
 252                                        url: None,
 253                                        url_label: None,
 254                                    }),
 255                                })
 256                            })
 257                            .await;
 258                    }
 259                    dap::DebugRequestType::Launch => {
 260                        session
 261                            .client
 262                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 263                            .await;
 264                    }
 265                    dap::DebugRequestType::Attach(_) if fail => {
 266                        session
 267                            .client
 268                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 269                                Err(dap::ErrorResponse {
 270                                    error: Some(dap::Message {
 271                                        id: 1,
 272                                        format: "error".into(),
 273                                        variables: None,
 274                                        send_telemetry: None,
 275                                        show_user: None,
 276                                        url: None,
 277                                        url_label: None,
 278                                    }),
 279                                })
 280                            })
 281                            .await;
 282                    }
 283                    dap::DebugRequestType::Attach(attach_config) => {
 284                        session
 285                            .client
 286                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 287                                assert_eq!(
 288                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 289                                    args.raw
 290                                );
 291
 292                                Ok(())
 293                            })
 294                            .await;
 295                    }
 296                }
 297
 298                session.client.on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(())).await;
 299                session.client.fake_event(Events::Initialized(None)).await;
 300            }
 301
 302            let capabilities = session
 303                .request(Initialize { adapter_id }, cx.background_executor().clone())
 304                .await?;
 305
 306            Ok((session, capabilities))
 307        })
 308    }
 309
 310    fn send_breakpoints_from_path(
 311        &self,
 312        abs_path: Arc<Path>,
 313        reason: BreakpointUpdatedReason,
 314        cx: &mut App,
 315    ) -> Task<()> {
 316        let breakpoints = self
 317            .breakpoint_store
 318            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 319            .into_iter()
 320            .map(Into::into)
 321            .collect();
 322
 323        let task = self.request(
 324            dap_command::SetBreakpoints {
 325                source: client_source(&abs_path),
 326                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 327                breakpoints,
 328            },
 329            cx.background_executor().clone(),
 330        );
 331
 332        cx.background_spawn(async move {
 333            match task.await {
 334                Ok(_) => {}
 335                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 336            }
 337        })
 338    }
 339
 340    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 341        let mut breakpoint_tasks = Vec::new();
 342        let breakpoints = self
 343            .breakpoint_store
 344            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 345
 346        for (path, breakpoints) in breakpoints {
 347            let breakpoints = if ignore_breakpoints {
 348                vec![]
 349            } else {
 350                breakpoints.into_iter().map(Into::into).collect()
 351            };
 352
 353            breakpoint_tasks.push(self.request(
 354                dap_command::SetBreakpoints {
 355                    source: client_source(&path),
 356                    source_modified: Some(false),
 357                    breakpoints,
 358                },
 359                cx.background_executor().clone(),
 360            ));
 361        }
 362
 363        cx.background_spawn(async move {
 364            futures::future::join_all(breakpoint_tasks)
 365                .await
 366                .iter()
 367                .for_each(|res| match res {
 368                    Ok(_) => {}
 369                    Err(err) => {
 370                        log::warn!("Set breakpoints request failed: {}", err);
 371                    }
 372                });
 373        })
 374    }
 375
 376    async fn get_adapter_binary(
 377        config: &DebugAdapterConfig,
 378        delegate: &DapAdapterDelegate,
 379        cx: &mut AsyncApp,
 380    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 381        let adapter = build_adapter(&config.kind).await?;
 382
 383        let binary = cx.update(|cx| {
 384            ProjectSettings::get_global(cx)
 385                .dap
 386                .get(&adapter.name())
 387                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 388        })?;
 389
 390        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 391            Err(error) => {
 392                delegate.update_status(
 393                    adapter.name(),
 394                    DapStatus::Failed {
 395                        error: error.to_string(),
 396                    },
 397                );
 398
 399                return Err(error);
 400            }
 401            Ok(mut binary) => {
 402                delegate.update_status(adapter.name(), DapStatus::None);
 403
 404                let shell_env = delegate.shell_env().await;
 405                let mut envs = binary.envs.unwrap_or_default();
 406                envs.extend(shell_env);
 407                binary.envs = Some(envs);
 408
 409                binary
 410            }
 411        };
 412
 413        Ok((adapter, binary))
 414    }
 415
 416    pub fn initialize_sequence(
 417        &self,
 418        capabilities: &Capabilities,
 419        initialized_rx: oneshot::Receiver<()>,
 420        cx: &App,
 421    ) -> Task<Result<()>> {
 422        let mut raw = self.adapter.request_args(&self.config);
 423        merge_json_value_into(
 424            self.config.initialize_args.clone().unwrap_or(json!({})),
 425            &mut raw,
 426        );
 427
 428        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 429        let launch = match &self.config.request {
 430            dap::DebugRequestType::Launch => {
 431                self.request(Launch { raw }, cx.background_executor().clone())
 432            }
 433            dap::DebugRequestType::Attach(_) => {
 434                self.request(Attach { raw }, cx.background_executor().clone())
 435            }
 436        };
 437
 438        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 439
 440        let configuration_sequence = cx.spawn({
 441            let this = self.clone();
 442            async move |cx| {
 443                initialized_rx.await?;
 444                // todo(debugger) figure out if we want to handle a breakpoint response error
 445                // This will probably consist of letting a user know that breakpoints failed to be set
 446                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 447
 448                if configuration_done_supported {
 449                    this.request(ConfigurationDone, cx.background_executor().clone())
 450                } else {
 451                    Task::ready(Ok(()))
 452                }
 453                .await
 454            }
 455        });
 456
 457        cx.background_spawn(async move {
 458            futures::future::try_join(launch, configuration_sequence).await?;
 459            Ok(())
 460        })
 461    }
 462
 463    fn request<R: LocalDapCommand>(
 464        &self,
 465        request: R,
 466        executor: BackgroundExecutor,
 467    ) -> Task<Result<R::Response>>
 468    where
 469        <R::DapRequest as dap::requests::Request>::Response: 'static,
 470        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 471    {
 472        let request = Arc::new(request);
 473
 474        let request_clone = request.clone();
 475        let connection = self.client.clone();
 476        let request_task = executor.spawn(async move {
 477            let args = request_clone.to_dap();
 478            connection.request::<R::DapRequest>(args).await
 479        });
 480
 481        executor.spawn(async move {
 482            let response = request.response_from_dap(request_task.await?);
 483            response
 484        })
 485    }
 486}
 487impl From<RemoteConnection> for Mode {
 488    fn from(value: RemoteConnection) -> Self {
 489        Self::Remote(value)
 490    }
 491}
 492
 493impl Mode {
 494    fn request_dap<R: DapCommand>(
 495        &self,
 496        session_id: SessionId,
 497        request: R,
 498        cx: &mut Context<Session>,
 499    ) -> Task<Result<R::Response>>
 500    where
 501        <R::DapRequest as dap::requests::Request>::Response: 'static,
 502        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 503    {
 504        match self {
 505            Mode::Local(debug_adapter_client) => {
 506                debug_adapter_client.request(request, cx.background_executor().clone())
 507            }
 508            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 509        }
 510    }
 511}
 512
 513#[derive(Default)]
 514struct ThreadStates {
 515    global_state: Option<ThreadStatus>,
 516    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 517}
 518
 519impl ThreadStates {
 520    fn stop_all_threads(&mut self) {
 521        self.global_state = Some(ThreadStatus::Stopped);
 522        self.known_thread_states.clear();
 523    }
 524
 525    fn continue_all_threads(&mut self) {
 526        self.global_state = Some(ThreadStatus::Running);
 527        self.known_thread_states.clear();
 528    }
 529
 530    fn stop_thread(&mut self, thread_id: ThreadId) {
 531        self.known_thread_states
 532            .insert(thread_id, ThreadStatus::Stopped);
 533    }
 534
 535    fn continue_thread(&mut self, thread_id: ThreadId) {
 536        self.known_thread_states
 537            .insert(thread_id, ThreadStatus::Running);
 538    }
 539
 540    fn process_step(&mut self, thread_id: ThreadId) {
 541        self.known_thread_states
 542            .insert(thread_id, ThreadStatus::Stepping);
 543    }
 544
 545    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 546        self.thread_state(thread_id)
 547            .unwrap_or(ThreadStatus::Running)
 548    }
 549
 550    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 551        self.known_thread_states
 552            .get(&thread_id)
 553            .copied()
 554            .or(self.global_state)
 555    }
 556
 557    fn exit_thread(&mut self, thread_id: ThreadId) {
 558        self.known_thread_states
 559            .insert(thread_id, ThreadStatus::Exited);
 560    }
 561
 562    fn any_stopped_thread(&self) -> bool {
 563        self.global_state
 564            .is_some_and(|state| state == ThreadStatus::Stopped)
 565            || self
 566                .known_thread_states
 567                .values()
 568                .any(|status| *status == ThreadStatus::Stopped)
 569    }
 570}
 571const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 572
 573#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 574pub struct OutputToken(pub usize);
 575/// Represents a current state of a single debug adapter and provides ways to mutate it.
 576pub struct Session {
 577    mode: Mode,
 578    pub(super) capabilities: Capabilities,
 579    id: SessionId,
 580    parent_id: Option<SessionId>,
 581    ignore_breakpoints: bool,
 582    modules: Vec<dap::Module>,
 583    loaded_sources: Vec<dap::Source>,
 584    output_token: OutputToken,
 585    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 586    threads: IndexMap<ThreadId, Thread>,
 587    thread_states: ThreadStates,
 588    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 589    stack_frames: IndexMap<StackFrameId, StackFrame>,
 590    locations: HashMap<u64, dap::LocationsResponse>,
 591    is_session_terminated: bool,
 592    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 593    _background_tasks: Vec<Task<()>>,
 594}
 595
 596trait CacheableCommand: 'static + Send + Sync {
 597    fn as_any(&self) -> &dyn Any;
 598    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 599    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 600    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 601}
 602
 603impl<T> CacheableCommand for T
 604where
 605    T: DapCommand + PartialEq + Eq + Hash,
 606{
 607    fn as_any(&self) -> &dyn Any {
 608        self
 609    }
 610
 611    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 612        rhs.as_any()
 613            .downcast_ref::<Self>()
 614            .map_or(false, |rhs| self == rhs)
 615    }
 616
 617    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 618        T::hash(self, &mut hasher);
 619    }
 620
 621    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 622        self
 623    }
 624}
 625
 626pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 627
 628impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 629    fn from(request: T) -> Self {
 630        Self(Arc::new(request))
 631    }
 632}
 633
 634impl PartialEq for RequestSlot {
 635    fn eq(&self, other: &Self) -> bool {
 636        self.0.dyn_eq(other.0.as_ref())
 637    }
 638}
 639
 640impl Eq for RequestSlot {}
 641
 642impl Hash for RequestSlot {
 643    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 644        self.0.dyn_hash(state);
 645        self.0.as_any().type_id().hash(state)
 646    }
 647}
 648
 649#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 650pub struct CompletionsQuery {
 651    pub query: String,
 652    pub column: u64,
 653    pub line: Option<u64>,
 654    pub frame_id: Option<u64>,
 655}
 656
 657impl CompletionsQuery {
 658    pub fn new(
 659        buffer: &language::Buffer,
 660        cursor_position: language::Anchor,
 661        frame_id: Option<u64>,
 662    ) -> Self {
 663        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 664        Self {
 665            query: buffer.text(),
 666            column: column as u64,
 667            frame_id,
 668            line: Some(row as u64),
 669        }
 670    }
 671}
 672
 673pub enum SessionEvent {
 674    Modules,
 675    LoadedSources,
 676    Stopped(Option<ThreadId>),
 677    StackTrace,
 678    Variables,
 679    Threads,
 680}
 681
 682impl EventEmitter<SessionEvent> for Session {}
 683
 684// local session will send breakpoint updates to DAP for all new breakpoints
 685// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 686// BreakpointStore notifies session on breakpoint changes
 687impl Session {
 688    pub(crate) fn local(
 689        breakpoint_store: Entity<BreakpointStore>,
 690        session_id: SessionId,
 691        parent_session: Option<Entity<Session>>,
 692        delegate: DapAdapterDelegate,
 693        config: DebugAdapterConfig,
 694        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 695        initialized_tx: oneshot::Sender<()>,
 696        cx: &mut App,
 697    ) -> Task<Result<Entity<Self>>> {
 698        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 699
 700        cx.spawn(async move |cx| {
 701            let (mode, capabilities) = LocalMode::new(
 702                session_id,
 703                parent_session.clone(),
 704                breakpoint_store.clone(),
 705                config.clone(),
 706                delegate,
 707                message_tx,
 708                cx.clone(),
 709            )
 710            .await?;
 711
 712            cx.new(|cx| {
 713                let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Self>, cx| {
 714                    let mut initialized_tx = Some(initialized_tx);
 715                    while let Some(message) = message_rx.next().await {
 716                        if let Message::Event(event) = message {
 717                            if let Events::Initialized(_) = *event {
 718                                if let Some(tx) = initialized_tx.take() {
 719                                    tx.send(()).ok();
 720                                }
 721                            } else {
 722                                let Ok(_) = this.update(cx, |session, cx| {
 723                                    session.handle_dap_event(event, cx);
 724                                }) else {
 725                                    break;
 726                                };
 727                            }
 728                        } else {
 729                            let Ok(_) =
 730                                start_debugging_requests_tx.unbounded_send((session_id, message))
 731                            else {
 732                                break;
 733                            };
 734                        }
 735                    }
 736                })];
 737
 738                cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 739                    BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 740                        if let Some(local) = (!this.ignore_breakpoints)
 741                            .then(|| this.as_local_mut())
 742                            .flatten()
 743                        {
 744                            local
 745                                .send_breakpoints_from_path(path.clone(), *reason, cx)
 746                                .detach();
 747                        };
 748                    }
 749                    BreakpointStoreEvent::ActiveDebugLineChanged => {}
 750                })
 751                .detach();
 752
 753                Self {
 754                    mode: Mode::Local(mode),
 755                    id: session_id,
 756                    parent_id: parent_session.map(|session| session.read(cx).id),
 757                    variables: Default::default(),
 758                    capabilities,
 759                    thread_states: ThreadStates::default(),
 760                    output_token: OutputToken(0),
 761                    ignore_breakpoints: false,
 762                    output: circular_buffer::CircularBuffer::boxed(),
 763                    requests: HashMap::default(),
 764                    modules: Vec::default(),
 765                    loaded_sources: Vec::default(),
 766                    threads: IndexMap::default(),
 767                    stack_frames: IndexMap::default(),
 768                    locations: Default::default(),
 769                    _background_tasks,
 770                    is_session_terminated: false,
 771                }
 772            })
 773        })
 774    }
 775
 776    pub(crate) fn remote(
 777        session_id: SessionId,
 778        client: AnyProtoClient,
 779        upstream_project_id: u64,
 780        ignore_breakpoints: bool,
 781    ) -> Self {
 782        Self {
 783            mode: Mode::Remote(RemoteConnection {
 784                _client: client,
 785                _upstream_project_id: upstream_project_id,
 786            }),
 787            id: session_id,
 788            parent_id: None,
 789            capabilities: Capabilities::default(),
 790            ignore_breakpoints,
 791            variables: Default::default(),
 792            stack_frames: Default::default(),
 793            thread_states: ThreadStates::default(),
 794
 795            output_token: OutputToken(0),
 796            output: circular_buffer::CircularBuffer::boxed(),
 797            requests: HashMap::default(),
 798            modules: Vec::default(),
 799            loaded_sources: Vec::default(),
 800            threads: IndexMap::default(),
 801            _background_tasks: Vec::default(),
 802            locations: Default::default(),
 803            is_session_terminated: false,
 804        }
 805    }
 806
 807    pub fn session_id(&self) -> SessionId {
 808        self.id
 809    }
 810
 811    pub fn parent_id(&self) -> Option<SessionId> {
 812        self.parent_id
 813    }
 814
 815    pub fn capabilities(&self) -> &Capabilities {
 816        &self.capabilities
 817    }
 818
 819    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 820        if let Mode::Local(local_mode) = &self.mode {
 821            Some(local_mode.config.clone())
 822        } else {
 823            None
 824        }
 825    }
 826
 827    pub fn is_terminated(&self) -> bool {
 828        self.is_session_terminated
 829    }
 830
 831    pub fn is_local(&self) -> bool {
 832        matches!(self.mode, Mode::Local(_))
 833    }
 834
 835    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 836        match &mut self.mode {
 837            Mode::Local(local_mode) => Some(local_mode),
 838            Mode::Remote(_) => None,
 839        }
 840    }
 841
 842    pub fn as_local(&self) -> Option<&LocalMode> {
 843        match &self.mode {
 844            Mode::Local(local_mode) => Some(local_mode),
 845            Mode::Remote(_) => None,
 846        }
 847    }
 848
 849    pub(super) fn initialize_sequence(
 850        &mut self,
 851        initialize_rx: oneshot::Receiver<()>,
 852        cx: &mut Context<Self>,
 853    ) -> Task<Result<()>> {
 854        match &self.mode {
 855            Mode::Local(local_mode) => {
 856                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
 857            }
 858            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 859        }
 860    }
 861
 862    pub fn output(
 863        &self,
 864        since: OutputToken,
 865    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 866        if self.output_token.0 == 0 {
 867            return (self.output.range(0..0), OutputToken(0));
 868        };
 869
 870        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 871
 872        let clamped_events_since = events_since.clamp(0, self.output.len());
 873        (
 874            self.output
 875                .range(self.output.len() - clamped_events_since..),
 876            self.output_token,
 877        )
 878    }
 879
 880    pub fn respond_to_client(
 881        &self,
 882        request_seq: u64,
 883        success: bool,
 884        command: String,
 885        body: Option<serde_json::Value>,
 886        cx: &mut Context<Self>,
 887    ) -> Task<Result<()>> {
 888        let Some(local_session) = self.as_local().cloned() else {
 889            unreachable!("Cannot respond to remote client");
 890        };
 891
 892        cx.background_spawn(async move {
 893            local_session
 894                .client
 895                .send_message(Message::Response(Response {
 896                    body,
 897                    success,
 898                    command,
 899                    seq: request_seq + 1,
 900                    request_seq,
 901                    message: None,
 902                }))
 903                .await
 904        })
 905    }
 906
 907    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
 908        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
 909            self.thread_states.stop_all_threads();
 910
 911            self.invalidate_command_type::<StackTraceCommand>();
 912        }
 913
 914        // Event if we stopped all threads we still need to insert the thread_id
 915        // to our own data
 916        if let Some(thread_id) = event.thread_id {
 917            self.thread_states.stop_thread(ThreadId(thread_id));
 918
 919            self.invalidate_state(
 920                &StackTraceCommand {
 921                    thread_id,
 922                    start_frame: None,
 923                    levels: None,
 924                }
 925                .into(),
 926            );
 927        }
 928
 929        self.invalidate_generic();
 930        self.threads.clear();
 931        self.variables.clear();
 932        cx.emit(SessionEvent::Stopped(
 933            event
 934                .thread_id
 935                .map(Into::into)
 936                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
 937        ));
 938        cx.notify();
 939    }
 940
 941    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
 942        match *event {
 943            Events::Initialized(_) => {
 944                debug_assert!(
 945                    false,
 946                    "Initialized event should have been handled in LocalMode"
 947                );
 948            }
 949            Events::Stopped(event) => self.handle_stopped_event(event, cx),
 950            Events::Continued(event) => {
 951                if event.all_threads_continued.unwrap_or_default() {
 952                    self.thread_states.continue_all_threads();
 953                } else {
 954                    self.thread_states
 955                        .continue_thread(ThreadId(event.thread_id));
 956                }
 957                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
 958                self.invalidate_generic();
 959            }
 960            Events::Exited(_event) => {
 961                self.clear_active_debug_line(cx);
 962            }
 963            Events::Terminated(_) => {
 964                self.is_session_terminated = true;
 965                self.clear_active_debug_line(cx);
 966            }
 967            Events::Thread(event) => {
 968                let thread_id = ThreadId(event.thread_id);
 969
 970                match event.reason {
 971                    dap::ThreadEventReason::Started => {
 972                        self.thread_states.continue_thread(thread_id);
 973                    }
 974                    dap::ThreadEventReason::Exited => {
 975                        self.thread_states.exit_thread(thread_id);
 976                    }
 977                    reason => {
 978                        log::error!("Unhandled thread event reason {:?}", reason);
 979                    }
 980                }
 981                self.invalidate_state(&ThreadsCommand.into());
 982                cx.notify();
 983            }
 984            Events::Output(event) => {
 985                if event
 986                    .category
 987                    .as_ref()
 988                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
 989                {
 990                    return;
 991                }
 992
 993                self.output.push_back(event);
 994                self.output_token.0 += 1;
 995                cx.notify();
 996            }
 997            Events::Breakpoint(_) => {}
 998            Events::Module(event) => {
 999                match event.reason {
1000                    dap::ModuleEventReason::New => {
1001                        self.modules.push(event.module);
1002                    }
1003                    dap::ModuleEventReason::Changed => {
1004                        if let Some(module) = self
1005                            .modules
1006                            .iter_mut()
1007                            .find(|other| event.module.id == other.id)
1008                        {
1009                            *module = event.module;
1010                        }
1011                    }
1012                    dap::ModuleEventReason::Removed => {
1013                        self.modules.retain(|other| event.module.id != other.id);
1014                    }
1015                }
1016
1017                // todo(debugger): We should only send the invalidate command to downstream clients.
1018                // self.invalidate_state(&ModulesCommand.into());
1019            }
1020            Events::LoadedSource(_) => {
1021                self.invalidate_state(&LoadedSourcesCommand.into());
1022            }
1023            Events::Capabilities(event) => {
1024                self.capabilities = self.capabilities.merge(event.capabilities);
1025                cx.notify();
1026            }
1027            Events::Memory(_) => {}
1028            Events::Process(_) => {}
1029            Events::ProgressEnd(_) => {}
1030            Events::ProgressStart(_) => {}
1031            Events::ProgressUpdate(_) => {}
1032            Events::Invalidated(_) => {}
1033            Events::Other(_) => {}
1034        }
1035    }
1036
1037    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1038    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1039        &mut self,
1040        request: T,
1041        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1042            + 'static,
1043        cx: &mut Context<Self>,
1044    ) {
1045        const {
1046            assert!(
1047                T::CACHEABLE,
1048                "Only requests marked as cacheable should invoke `fetch`"
1049            );
1050        }
1051
1052        if !self.thread_states.any_stopped_thread()
1053            && request.type_id() != TypeId::of::<ThreadsCommand>()
1054        {
1055            return;
1056        }
1057
1058        let request_map = self
1059            .requests
1060            .entry(std::any::TypeId::of::<T>())
1061            .or_default();
1062
1063        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1064            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1065
1066            let task = Self::request_inner::<Arc<T>>(
1067                &self.capabilities,
1068                self.id,
1069                &self.mode,
1070                command,
1071                process_result,
1072                cx,
1073            );
1074            let task = cx
1075                .background_executor()
1076                .spawn(async move {
1077                    let _ = task.await?;
1078                    Some(())
1079                })
1080                .shared();
1081
1082            vacant.insert(task);
1083            cx.notify();
1084        }
1085    }
1086
1087    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1088        capabilities: &Capabilities,
1089        session_id: SessionId,
1090        mode: &Mode,
1091        request: T,
1092        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1093            + 'static,
1094        cx: &mut Context<Self>,
1095    ) -> Task<Option<T::Response>> {
1096        if !T::is_supported(&capabilities) {
1097            log::warn!(
1098                "Attempted to send a DAP request that isn't supported: {:?}",
1099                request
1100            );
1101            let error = Err(anyhow::Error::msg(
1102                "Couldn't complete request because it's not supported",
1103            ));
1104            return cx.spawn(async move |this, cx| {
1105                this.update(cx, |this, cx| process_result(this, error, cx))
1106                    .log_err()
1107                    .flatten()
1108            });
1109        }
1110
1111        let request = mode.request_dap(session_id, request, cx);
1112        cx.spawn(async move |this, cx| {
1113            let result = request.await;
1114            this.update(cx, |this, cx| process_result(this, result, cx))
1115                .log_err()
1116                .flatten()
1117        })
1118    }
1119
1120    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1121        &self,
1122        request: T,
1123        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1124            + 'static,
1125        cx: &mut Context<Self>,
1126    ) -> Task<Option<T::Response>> {
1127        Self::request_inner(
1128            &self.capabilities,
1129            self.id,
1130            &self.mode,
1131            request,
1132            process_result,
1133            cx,
1134        )
1135    }
1136
1137    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1138        self.requests.remove(&std::any::TypeId::of::<Command>());
1139    }
1140
1141    fn invalidate_generic(&mut self) {
1142        self.invalidate_command_type::<ModulesCommand>();
1143        self.invalidate_command_type::<LoadedSourcesCommand>();
1144        self.invalidate_command_type::<ThreadsCommand>();
1145    }
1146
1147    fn invalidate_state(&mut self, key: &RequestSlot) {
1148        self.requests
1149            .entry(key.0.as_any().type_id())
1150            .and_modify(|request_map| {
1151                request_map.remove(&key);
1152            });
1153    }
1154
1155    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1156        self.thread_states.thread_status(thread_id)
1157    }
1158
1159    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1160        self.fetch(
1161            dap_command::ThreadsCommand,
1162            |this, result, cx| {
1163                let result = result.log_err()?;
1164
1165                this.threads = result
1166                    .iter()
1167                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1168                    .collect();
1169
1170                this.invalidate_command_type::<StackTraceCommand>();
1171                cx.emit(SessionEvent::Threads);
1172                cx.notify();
1173
1174                Some(result)
1175            },
1176            cx,
1177        );
1178
1179        self.threads
1180            .values()
1181            .map(|thread| {
1182                (
1183                    thread.dap.clone(),
1184                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1185                )
1186            })
1187            .collect()
1188    }
1189
1190    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1191        self.fetch(
1192            dap_command::ModulesCommand,
1193            |this, result, cx| {
1194                let result = result.log_err()?;
1195
1196                this.modules = result.iter().cloned().collect();
1197                cx.emit(SessionEvent::Modules);
1198                cx.notify();
1199
1200                Some(result)
1201            },
1202            cx,
1203        );
1204
1205        &self.modules
1206    }
1207
1208    pub fn ignore_breakpoints(&self) -> bool {
1209        self.ignore_breakpoints
1210    }
1211
1212    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1213        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1214    }
1215
1216    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1217        if self.ignore_breakpoints == ignore {
1218            return Task::ready(());
1219        }
1220
1221        self.ignore_breakpoints = ignore;
1222
1223        if let Some(local) = self.as_local() {
1224            local.send_all_breakpoints(ignore, cx)
1225        } else {
1226            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1227            unimplemented!()
1228        }
1229    }
1230
1231    pub fn breakpoints_enabled(&self) -> bool {
1232        self.ignore_breakpoints
1233    }
1234
1235    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1236        self.fetch(
1237            dap_command::LoadedSourcesCommand,
1238            |this, result, cx| {
1239                let result = result.log_err()?;
1240                this.loaded_sources = result.iter().cloned().collect();
1241                cx.emit(SessionEvent::LoadedSources);
1242                cx.notify();
1243                Some(result)
1244            },
1245            cx,
1246        );
1247
1248        &self.loaded_sources
1249    }
1250
1251    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1252        res.log_err()?;
1253        Some(())
1254    }
1255
1256    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1257        thread_id: ThreadId,
1258    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1259    {
1260        move |this, response, cx| match response.log_err() {
1261            Some(response) => Some(response),
1262            None => {
1263                this.thread_states.stop_thread(thread_id);
1264                cx.notify();
1265                None
1266            }
1267        }
1268    }
1269
1270    fn clear_active_debug_line_response(
1271        &mut self,
1272        response: Result<()>,
1273        cx: &mut Context<'_, Session>,
1274    ) -> Option<()> {
1275        response.log_err()?;
1276        self.clear_active_debug_line(cx);
1277        Some(())
1278    }
1279
1280    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1281        self.as_local()
1282            .expect("Message handler will only run in local mode")
1283            .breakpoint_store
1284            .update(cx, |store, cx| {
1285                store.remove_active_position(Some(self.id), cx)
1286            });
1287    }
1288
1289    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1290        self.request(
1291            PauseCommand {
1292                thread_id: thread_id.0,
1293            },
1294            Self::empty_response,
1295            cx,
1296        )
1297        .detach();
1298    }
1299
1300    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1301        self.request(
1302            RestartStackFrameCommand { stack_frame_id },
1303            Self::empty_response,
1304            cx,
1305        )
1306        .detach();
1307    }
1308
1309    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1310        if self.capabilities.supports_restart_request.unwrap_or(false) {
1311            self.request(
1312                RestartCommand {
1313                    raw: args.unwrap_or(Value::Null),
1314                },
1315                Self::empty_response,
1316                cx,
1317            )
1318            .detach();
1319        } else {
1320            self.request(
1321                DisconnectCommand {
1322                    restart: Some(false),
1323                    terminate_debuggee: Some(true),
1324                    suspend_debuggee: Some(false),
1325                },
1326                Self::empty_response,
1327                cx,
1328            )
1329            .detach();
1330        }
1331    }
1332
1333    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1334        let task = if self
1335            .capabilities
1336            .supports_terminate_request
1337            .unwrap_or_default()
1338        {
1339            self.request(
1340                TerminateCommand {
1341                    restart: Some(false),
1342                },
1343                Self::clear_active_debug_line_response,
1344                cx,
1345            )
1346        } else {
1347            self.request(
1348                DisconnectCommand {
1349                    restart: Some(false),
1350                    terminate_debuggee: Some(true),
1351                    suspend_debuggee: Some(false),
1352                },
1353                Self::clear_active_debug_line_response,
1354                cx,
1355            )
1356        };
1357
1358        cx.background_spawn(async move {
1359            let _ = task.await;
1360        })
1361    }
1362
1363    pub fn completions(
1364        &mut self,
1365        query: CompletionsQuery,
1366        cx: &mut Context<Self>,
1367    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1368        let task = self.request(query, |_, result, _| result.log_err(), cx);
1369
1370        cx.background_executor().spawn(async move {
1371            anyhow::Ok(
1372                task.await
1373                    .map(|response| response.targets)
1374                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1375            )
1376        })
1377    }
1378
1379    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1380        self.thread_states.continue_thread(thread_id);
1381        self.request(
1382            ContinueCommand {
1383                args: ContinueArguments {
1384                    thread_id: thread_id.0,
1385                    single_thread: Some(true),
1386                },
1387            },
1388            Self::on_step_response::<ContinueCommand>(thread_id),
1389            cx,
1390        )
1391        .detach();
1392    }
1393
1394    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1395        match self.mode {
1396            Mode::Local(ref local) => Some(local.client.clone()),
1397            Mode::Remote(_) => None,
1398        }
1399    }
1400
1401    pub fn step_over(
1402        &mut self,
1403        thread_id: ThreadId,
1404        granularity: SteppingGranularity,
1405        cx: &mut Context<Self>,
1406    ) {
1407        let supports_single_thread_execution_requests =
1408            self.capabilities.supports_single_thread_execution_requests;
1409        let supports_stepping_granularity = self
1410            .capabilities
1411            .supports_stepping_granularity
1412            .unwrap_or_default();
1413
1414        let command = NextCommand {
1415            inner: StepCommand {
1416                thread_id: thread_id.0,
1417                granularity: supports_stepping_granularity.then(|| granularity),
1418                single_thread: supports_single_thread_execution_requests,
1419            },
1420        };
1421
1422        self.thread_states.process_step(thread_id);
1423        self.request(
1424            command,
1425            Self::on_step_response::<NextCommand>(thread_id),
1426            cx,
1427        )
1428        .detach();
1429    }
1430
1431    pub fn step_in(
1432        &mut self,
1433        thread_id: ThreadId,
1434        granularity: SteppingGranularity,
1435        cx: &mut Context<Self>,
1436    ) {
1437        let supports_single_thread_execution_requests =
1438            self.capabilities.supports_single_thread_execution_requests;
1439        let supports_stepping_granularity = self
1440            .capabilities
1441            .supports_stepping_granularity
1442            .unwrap_or_default();
1443
1444        let command = StepInCommand {
1445            inner: StepCommand {
1446                thread_id: thread_id.0,
1447                granularity: supports_stepping_granularity.then(|| granularity),
1448                single_thread: supports_single_thread_execution_requests,
1449            },
1450        };
1451
1452        self.thread_states.process_step(thread_id);
1453        self.request(
1454            command,
1455            Self::on_step_response::<StepInCommand>(thread_id),
1456            cx,
1457        )
1458        .detach();
1459    }
1460
1461    pub fn step_out(
1462        &mut self,
1463        thread_id: ThreadId,
1464        granularity: SteppingGranularity,
1465        cx: &mut Context<Self>,
1466    ) {
1467        let supports_single_thread_execution_requests =
1468            self.capabilities.supports_single_thread_execution_requests;
1469        let supports_stepping_granularity = self
1470            .capabilities
1471            .supports_stepping_granularity
1472            .unwrap_or_default();
1473
1474        let command = StepOutCommand {
1475            inner: StepCommand {
1476                thread_id: thread_id.0,
1477                granularity: supports_stepping_granularity.then(|| granularity),
1478                single_thread: supports_single_thread_execution_requests,
1479            },
1480        };
1481
1482        self.thread_states.process_step(thread_id);
1483        self.request(
1484            command,
1485            Self::on_step_response::<StepOutCommand>(thread_id),
1486            cx,
1487        )
1488        .detach();
1489    }
1490
1491    pub fn step_back(
1492        &mut self,
1493        thread_id: ThreadId,
1494        granularity: SteppingGranularity,
1495        cx: &mut Context<Self>,
1496    ) {
1497        let supports_single_thread_execution_requests =
1498            self.capabilities.supports_single_thread_execution_requests;
1499        let supports_stepping_granularity = self
1500            .capabilities
1501            .supports_stepping_granularity
1502            .unwrap_or_default();
1503
1504        let command = StepBackCommand {
1505            inner: StepCommand {
1506                thread_id: thread_id.0,
1507                granularity: supports_stepping_granularity.then(|| granularity),
1508                single_thread: supports_single_thread_execution_requests,
1509            },
1510        };
1511
1512        self.thread_states.process_step(thread_id);
1513
1514        self.request(
1515            command,
1516            Self::on_step_response::<StepBackCommand>(thread_id),
1517            cx,
1518        )
1519        .detach();
1520    }
1521
1522    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1523        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1524            && self.requests.contains_key(&ThreadsCommand.type_id())
1525            && self.threads.contains_key(&thread_id)
1526        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1527        // We could still be using an old thread id and have sent a new thread's request
1528        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1529        // But it very well could cause a minor bug in the future that is hard to track down
1530        {
1531            self.fetch(
1532                super::dap_command::StackTraceCommand {
1533                    thread_id: thread_id.0,
1534                    start_frame: None,
1535                    levels: None,
1536                },
1537                move |this, stack_frames, cx| {
1538                    let stack_frames = stack_frames.log_err()?;
1539
1540                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1541                        thread.stack_frame_ids =
1542                            stack_frames.iter().map(|frame| frame.id).collect();
1543                    });
1544                    debug_assert!(
1545                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1546                        "Sent request for thread_id that doesn't exist"
1547                    );
1548
1549                    this.stack_frames.extend(
1550                        stack_frames
1551                            .iter()
1552                            .cloned()
1553                            .map(|frame| (frame.id, StackFrame::from(frame))),
1554                    );
1555
1556                    this.invalidate_command_type::<ScopesCommand>();
1557                    this.invalidate_command_type::<VariablesCommand>();
1558
1559                    cx.emit(SessionEvent::StackTrace);
1560                    cx.notify();
1561                    Some(stack_frames)
1562                },
1563                cx,
1564            );
1565        }
1566
1567        self.threads
1568            .get(&thread_id)
1569            .map(|thread| {
1570                thread
1571                    .stack_frame_ids
1572                    .iter()
1573                    .filter_map(|id| self.stack_frames.get(id))
1574                    .cloned()
1575                    .collect()
1576            })
1577            .unwrap_or_default()
1578    }
1579
1580    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1581        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1582            && self
1583                .requests
1584                .contains_key(&TypeId::of::<StackTraceCommand>())
1585        {
1586            self.fetch(
1587                ScopesCommand { stack_frame_id },
1588                move |this, scopes, cx| {
1589                    let scopes = scopes.log_err()?;
1590
1591                    for scope in scopes .iter(){
1592                        this.variables(scope.variables_reference, cx);
1593                    }
1594
1595                    let entry = this
1596                        .stack_frames
1597                        .entry(stack_frame_id)
1598                        .and_modify(|stack_frame| {
1599                            stack_frame.scopes = scopes.clone();
1600                        });
1601
1602                    cx.emit(SessionEvent::Variables);
1603
1604                    debug_assert!(
1605                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1606                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1607                    );
1608
1609                    Some(scopes)
1610                },
1611                cx,
1612            );
1613        }
1614
1615        self.stack_frames
1616            .get(&stack_frame_id)
1617            .map(|frame| frame.scopes.as_slice())
1618            .unwrap_or_default()
1619    }
1620
1621    pub fn variables(
1622        &mut self,
1623        variables_reference: VariableReference,
1624        cx: &mut Context<Self>,
1625    ) -> Vec<dap::Variable> {
1626        let command = VariablesCommand {
1627            variables_reference,
1628            filter: None,
1629            start: None,
1630            count: None,
1631            format: None,
1632        };
1633
1634        self.fetch(
1635            command,
1636            move |this, variables, cx| {
1637                let variables = variables.log_err()?;
1638                this.variables
1639                    .insert(variables_reference, variables.clone());
1640
1641                cx.emit(SessionEvent::Variables);
1642                Some(variables)
1643            },
1644            cx,
1645        );
1646
1647        self.variables
1648            .get(&variables_reference)
1649            .cloned()
1650            .unwrap_or_default()
1651    }
1652
1653    pub fn set_variable_value(
1654        &mut self,
1655        variables_reference: u64,
1656        name: String,
1657        value: String,
1658        cx: &mut Context<Self>,
1659    ) {
1660        if self.capabilities.supports_set_variable.unwrap_or_default() {
1661            self.request(
1662                SetVariableValueCommand {
1663                    name,
1664                    value,
1665                    variables_reference,
1666                },
1667                move |this, response, cx| {
1668                    let response = response.log_err()?;
1669                    this.invalidate_command_type::<VariablesCommand>();
1670                    cx.notify();
1671                    Some(response)
1672                },
1673                cx,
1674            )
1675            .detach()
1676        }
1677    }
1678
1679    pub fn evaluate(
1680        &mut self,
1681        expression: String,
1682        context: Option<EvaluateArgumentsContext>,
1683        frame_id: Option<u64>,
1684        source: Option<Source>,
1685        cx: &mut Context<Self>,
1686    ) {
1687        self.request(
1688            EvaluateCommand {
1689                expression,
1690                context,
1691                frame_id,
1692                source,
1693            },
1694            |this, response, cx| {
1695                let response = response.log_err()?;
1696                this.output_token.0 += 1;
1697                this.output.push_back(dap::OutputEvent {
1698                    category: None,
1699                    output: response.result.clone(),
1700                    group: None,
1701                    variables_reference: Some(response.variables_reference),
1702                    source: None,
1703                    line: None,
1704                    column: None,
1705                    data: None,
1706                    location_reference: None,
1707                });
1708
1709                this.invalidate_command_type::<ScopesCommand>();
1710                cx.notify();
1711                Some(response)
1712            },
1713            cx,
1714        )
1715        .detach();
1716    }
1717
1718    pub fn location(
1719        &mut self,
1720        reference: u64,
1721        cx: &mut Context<Self>,
1722    ) -> Option<dap::LocationsResponse> {
1723        self.fetch(
1724            LocationsCommand { reference },
1725            move |this, response, _| {
1726                let response = response.log_err()?;
1727                this.locations.insert(reference, response.clone());
1728                Some(response)
1729            },
1730            cx,
1731        );
1732        self.locations.get(&reference).cloned()
1733    }
1734    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1735        let command = DisconnectCommand {
1736            restart: Some(false),
1737            terminate_debuggee: Some(true),
1738            suspend_debuggee: Some(false),
1739        };
1740
1741        self.request(command, Self::empty_response, cx).detach()
1742    }
1743
1744    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1745        if self
1746            .capabilities
1747            .supports_terminate_threads_request
1748            .unwrap_or_default()
1749        {
1750            self.request(
1751                TerminateThreadsCommand {
1752                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1753                },
1754                Self::clear_active_debug_line_response,
1755                cx,
1756            )
1757            .detach();
1758        } else {
1759            self.shutdown(cx).detach();
1760        }
1761    }
1762}