session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{anyhow, Result};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::OutputEventCategory;
  18use dap::{
  19    adapters::{DapDelegate, DapStatus},
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  23    SteppingGranularity, StoppedEvent, VariableReference,
  24};
  25use dap_adapters::build_adapter;
  26use futures::channel::oneshot;
  27use futures::{future::Shared, FutureExt};
  28use gpui::{
  29    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  30};
  31use rpc::AnyProtoClient;
  32use serde_json::{json, Value};
  33use settings::Settings;
  34use smol::stream::StreamExt;
  35use std::any::TypeId;
  36use std::path::PathBuf;
  37use std::u64;
  38use std::{
  39    any::Any,
  40    collections::hash_map::Entry,
  41    hash::{Hash, Hasher},
  42    path::Path,
  43    sync::Arc,
  44};
  45use task::DebugAdapterConfig;
  46use text::{PointUtf16, ToPointUtf16};
  47use util::{merge_json_value_into, ResultExt};
  48
  49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  50#[repr(transparent)]
  51pub struct ThreadId(pub u64);
  52
  53impl ThreadId {
  54    pub const MIN: ThreadId = ThreadId(u64::MIN);
  55    pub const MAX: ThreadId = ThreadId(u64::MAX);
  56}
  57
  58impl From<u64> for ThreadId {
  59    fn from(id: u64) -> Self {
  60        Self(id)
  61    }
  62}
  63
  64#[derive(Clone, Debug)]
  65pub struct StackFrame {
  66    pub dap: dap::StackFrame,
  67    pub scopes: Vec<dap::Scope>,
  68}
  69
  70impl From<dap::StackFrame> for StackFrame {
  71    fn from(stack_frame: dap::StackFrame) -> Self {
  72        Self {
  73            scopes: vec![],
  74            dap: stack_frame,
  75        }
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  80pub enum ThreadStatus {
  81    #[default]
  82    Running,
  83    Stopped,
  84    Stepping,
  85    Exited,
  86    Ended,
  87}
  88
  89impl ThreadStatus {
  90    pub fn label(&self) -> &'static str {
  91        match self {
  92            ThreadStatus::Running => "Running",
  93            ThreadStatus::Stopped => "Stopped",
  94            ThreadStatus::Stepping => "Stepping",
  95            ThreadStatus::Exited => "Exited",
  96            ThreadStatus::Ended => "Ended",
  97        }
  98    }
  99}
 100
 101#[derive(Debug)]
 102pub struct Thread {
 103    dap: dap::Thread,
 104    stack_frame_ids: IndexSet<StackFrameId>,
 105    _has_stopped: bool,
 106}
 107
 108impl From<dap::Thread> for Thread {
 109    fn from(dap: dap::Thread) -> Self {
 110        Self {
 111            dap,
 112            stack_frame_ids: Default::default(),
 113            _has_stopped: false,
 114        }
 115    }
 116}
 117
 118type UpstreamProjectId = u64;
 119
 120struct RemoteConnection {
 121    _client: AnyProtoClient,
 122    _upstream_project_id: UpstreamProjectId,
 123}
 124
 125impl RemoteConnection {
 126    fn send_proto_client_request<R: DapCommand>(
 127        &self,
 128        _request: R,
 129        _session_id: SessionId,
 130        cx: &mut App,
 131    ) -> Task<Result<R::Response>> {
 132        // let message = request.to_proto(session_id, self.upstream_project_id);
 133        // let upstream_client = self.client.clone();
 134        cx.background_executor().spawn(async move {
 135            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 136            // let response = upstream_client.request(message).await?;
 137            // request.response_from_proto(response)
 138            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 139        })
 140    }
 141
 142    fn request<R: DapCommand>(
 143        &self,
 144        request: R,
 145        session_id: SessionId,
 146        cx: &mut App,
 147    ) -> Task<Result<R::Response>>
 148    where
 149        <R::DapRequest as dap::requests::Request>::Response: 'static,
 150        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 151    {
 152        return self.send_proto_client_request::<R>(request, session_id, cx);
 153    }
 154}
 155
 156enum Mode {
 157    Local(LocalMode),
 158    Remote(RemoteConnection),
 159}
 160
 161#[derive(Clone)]
 162pub struct LocalMode {
 163    client: Arc<DebugAdapterClient>,
 164    config: DebugAdapterConfig,
 165    adapter: Arc<dyn DebugAdapter>,
 166    breakpoint_store: Entity<BreakpointStore>,
 167}
 168
 169fn client_source(abs_path: &Path) -> dap::Source {
 170    dap::Source {
 171        name: abs_path
 172            .file_name()
 173            .map(|filename| filename.to_string_lossy().to_string()),
 174        path: Some(abs_path.to_string_lossy().to_string()),
 175        source_reference: None,
 176        presentation_hint: None,
 177        origin: None,
 178        sources: None,
 179        adapter_data: None,
 180        checksums: None,
 181    }
 182}
 183
 184impl LocalMode {
 185    fn new(
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        cx.spawn(async move |cx| {
 195            let (adapter, binary) = Self::get_adapter_binary(&config, &delegate,  cx).await?;
 196
 197            let message_handler = Box::new(move |message| {
 198                messages_tx.unbounded_send(message).ok();
 199            });
 200
 201            let client = Arc::new(
 202                if let Some(client) = parent_session
 203                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 204                    .flatten()
 205                {
 206                    client
 207                        .reconnect(session_id, binary, message_handler, cx.clone())
 208                        .await?
 209                } else {
 210                    DebugAdapterClient::start(
 211                        session_id,
 212                        adapter.name(),
 213                        binary,
 214                        message_handler,
 215                        cx.clone(),
 216                    )
 217                    .await?
 218                },
 219            );
 220
 221            let adapter_id = adapter.name().to_string().to_owned();
 222            let session = Self {
 223                client,
 224                adapter,
 225                breakpoint_store,
 226                config: config.clone(),
 227            };
 228
 229            #[cfg(any(test, feature = "test-support"))]
 230            {
 231                let dap::DebugAdapterKind::Fake((fail, caps)) = session.config.kind.clone() else {
 232                    panic!("Only fake debug adapter configs should be used in tests");
 233                };
 234
 235                session
 236                    .client
 237                    .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 238                    .await;
 239
 240                let paths = cx.update(|cx| session.breakpoint_store.read(cx).breakpoint_paths()).expect("Breakpoint store should exist in all tests that start debuggers");
 241
 242                session.client.on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 243                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 244                    if !paths.contains(&p) {
 245                        panic!("Sent breakpoints for path without any")
 246                    }
 247
 248                    Ok(dap::SetBreakpointsResponse {
 249                        breakpoints: Vec::default(),
 250                    })
 251                }).await;
 252
 253                match config.request.clone() {
 254                    dap::DebugRequestType::Launch if fail => {
 255                        session
 256                            .client
 257                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 258                                Err(dap::ErrorResponse {
 259                                    error: Some(dap::Message {
 260                                        id: 1,
 261                                        format: "error".into(),
 262                                        variables: None,
 263                                        send_telemetry: None,
 264                                        show_user: None,
 265                                        url: None,
 266                                        url_label: None,
 267                                    }),
 268                                })
 269                            })
 270                            .await;
 271                    }
 272                    dap::DebugRequestType::Launch => {
 273                        session
 274                            .client
 275                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 276                            .await;
 277                    }
 278                    dap::DebugRequestType::Attach(_) if fail => {
 279                        session
 280                            .client
 281                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 282                                Err(dap::ErrorResponse {
 283                                    error: Some(dap::Message {
 284                                        id: 1,
 285                                        format: "error".into(),
 286                                        variables: None,
 287                                        send_telemetry: None,
 288                                        show_user: None,
 289                                        url: None,
 290                                        url_label: None,
 291                                    }),
 292                                })
 293                            })
 294                            .await;
 295                    }
 296                    dap::DebugRequestType::Attach(attach_config) => {
 297                        session
 298                            .client
 299                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 300                                assert_eq!(
 301                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 302                                    args.raw
 303                                );
 304
 305                                Ok(())
 306                            })
 307                            .await;
 308                    }
 309                }
 310
 311                session.client.on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(())).await;
 312                session.client.fake_event(Events::Initialized(None)).await;
 313            }
 314
 315            let capabilities = session
 316                .request(Initialize { adapter_id }, cx.background_executor().clone())
 317                .await?;
 318
 319            Ok((session, capabilities))
 320        })
 321    }
 322
 323    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 324        let tasks: Vec<_> = paths
 325            .into_iter()
 326            .map(|path| {
 327                self.request(
 328                    dap_command::SetBreakpoints {
 329                        source: client_source(path),
 330                        source_modified: None,
 331                        breakpoints: vec![],
 332                    },
 333                    cx.background_executor().clone(),
 334                )
 335            })
 336            .collect();
 337
 338        cx.background_spawn(async move {
 339            futures::future::join_all(tasks)
 340                .await
 341                .iter()
 342                .for_each(|res| match res {
 343                    Ok(_) => {}
 344                    Err(err) => {
 345                        log::warn!("Set breakpoints request failed: {}", err);
 346                    }
 347                });
 348        })
 349    }
 350
 351    fn send_breakpoints_from_path(
 352        &self,
 353        abs_path: Arc<Path>,
 354        reason: BreakpointUpdatedReason,
 355        cx: &mut App,
 356    ) -> Task<()> {
 357        let breakpoints = self
 358            .breakpoint_store
 359            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 360            .into_iter()
 361            .map(Into::into)
 362            .collect();
 363
 364        let task = self.request(
 365            dap_command::SetBreakpoints {
 366                source: client_source(&abs_path),
 367                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 368                breakpoints,
 369            },
 370            cx.background_executor().clone(),
 371        );
 372
 373        cx.background_spawn(async move {
 374            match task.await {
 375                Ok(_) => {}
 376                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 377            }
 378        })
 379    }
 380
 381    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 382        let mut breakpoint_tasks = Vec::new();
 383        let breakpoints = self
 384            .breakpoint_store
 385            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 386
 387        for (path, breakpoints) in breakpoints {
 388            let breakpoints = if ignore_breakpoints {
 389                vec![]
 390            } else {
 391                breakpoints.into_iter().map(Into::into).collect()
 392            };
 393
 394            breakpoint_tasks.push(self.request(
 395                dap_command::SetBreakpoints {
 396                    source: client_source(&path),
 397                    source_modified: Some(false),
 398                    breakpoints,
 399                },
 400                cx.background_executor().clone(),
 401            ));
 402        }
 403
 404        cx.background_spawn(async move {
 405            futures::future::join_all(breakpoint_tasks)
 406                .await
 407                .iter()
 408                .for_each(|res| match res {
 409                    Ok(_) => {}
 410                    Err(err) => {
 411                        log::warn!("Set breakpoints request failed: {}", err);
 412                    }
 413                });
 414        })
 415    }
 416
 417    async fn get_adapter_binary(
 418        config: &DebugAdapterConfig,
 419        delegate: &DapAdapterDelegate,
 420        cx: &mut AsyncApp,
 421    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 422        let adapter = build_adapter(&config.kind).await?;
 423
 424        let binary = cx.update(|cx| {
 425            ProjectSettings::get_global(cx)
 426                .dap
 427                .get(&adapter.name())
 428                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 429        })?;
 430
 431        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 432            Err(error) => {
 433                delegate.update_status(
 434                    adapter.name(),
 435                    DapStatus::Failed {
 436                        error: error.to_string(),
 437                    },
 438                );
 439
 440                return Err(error);
 441            }
 442            Ok(mut binary) => {
 443                delegate.update_status(adapter.name(), DapStatus::None);
 444
 445                let shell_env = delegate.shell_env().await;
 446                let mut envs = binary.envs.unwrap_or_default();
 447                envs.extend(shell_env);
 448                binary.envs = Some(envs);
 449
 450                binary
 451            }
 452        };
 453
 454        Ok((adapter, binary))
 455    }
 456
 457    pub fn initialize_sequence(
 458        &self,
 459        capabilities: &Capabilities,
 460        initialized_rx: oneshot::Receiver<()>,
 461        cx: &App,
 462    ) -> Task<Result<()>> {
 463        let mut raw = self.adapter.request_args(&self.config);
 464        merge_json_value_into(
 465            self.config.initialize_args.clone().unwrap_or(json!({})),
 466            &mut raw,
 467        );
 468
 469        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 470        let launch = match &self.config.request {
 471            dap::DebugRequestType::Launch => {
 472                self.request(Launch { raw }, cx.background_executor().clone())
 473            }
 474            dap::DebugRequestType::Attach(_) => {
 475                self.request(Attach { raw }, cx.background_executor().clone())
 476            }
 477        };
 478
 479        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 480
 481        let configuration_sequence = cx.spawn({
 482            let this = self.clone();
 483            async move |cx| {
 484                initialized_rx.await?;
 485                // todo(debugger) figure out if we want to handle a breakpoint response error
 486                // This will probably consist of letting a user know that breakpoints failed to be set
 487                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 488
 489                if configuration_done_supported {
 490                    this.request(ConfigurationDone, cx.background_executor().clone())
 491                } else {
 492                    Task::ready(Ok(()))
 493                }
 494                .await
 495            }
 496        });
 497
 498        cx.background_spawn(async move {
 499            futures::future::try_join(launch, configuration_sequence).await?;
 500            Ok(())
 501        })
 502    }
 503
 504    fn request<R: LocalDapCommand>(
 505        &self,
 506        request: R,
 507        executor: BackgroundExecutor,
 508    ) -> Task<Result<R::Response>>
 509    where
 510        <R::DapRequest as dap::requests::Request>::Response: 'static,
 511        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 512    {
 513        let request = Arc::new(request);
 514
 515        let request_clone = request.clone();
 516        let connection = self.client.clone();
 517        let request_task = executor.spawn(async move {
 518            let args = request_clone.to_dap();
 519            connection.request::<R::DapRequest>(args).await
 520        });
 521
 522        executor.spawn(async move {
 523            let response = request.response_from_dap(request_task.await?);
 524            response
 525        })
 526    }
 527}
 528impl From<RemoteConnection> for Mode {
 529    fn from(value: RemoteConnection) -> Self {
 530        Self::Remote(value)
 531    }
 532}
 533
 534impl Mode {
 535    fn request_dap<R: DapCommand>(
 536        &self,
 537        session_id: SessionId,
 538        request: R,
 539        cx: &mut Context<Session>,
 540    ) -> Task<Result<R::Response>>
 541    where
 542        <R::DapRequest as dap::requests::Request>::Response: 'static,
 543        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 544    {
 545        match self {
 546            Mode::Local(debug_adapter_client) => {
 547                debug_adapter_client.request(request, cx.background_executor().clone())
 548            }
 549            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 550        }
 551    }
 552}
 553
 554#[derive(Default)]
 555struct ThreadStates {
 556    global_state: Option<ThreadStatus>,
 557    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 558}
 559
 560impl ThreadStates {
 561    fn stop_all_threads(&mut self) {
 562        self.global_state = Some(ThreadStatus::Stopped);
 563        self.known_thread_states.clear();
 564    }
 565
 566    fn exit_all_threads(&mut self) {
 567        self.global_state = Some(ThreadStatus::Exited);
 568        self.known_thread_states.clear();
 569    }
 570
 571    fn continue_all_threads(&mut self) {
 572        self.global_state = Some(ThreadStatus::Running);
 573        self.known_thread_states.clear();
 574    }
 575
 576    fn stop_thread(&mut self, thread_id: ThreadId) {
 577        self.known_thread_states
 578            .insert(thread_id, ThreadStatus::Stopped);
 579    }
 580
 581    fn continue_thread(&mut self, thread_id: ThreadId) {
 582        self.known_thread_states
 583            .insert(thread_id, ThreadStatus::Running);
 584    }
 585
 586    fn process_step(&mut self, thread_id: ThreadId) {
 587        self.known_thread_states
 588            .insert(thread_id, ThreadStatus::Stepping);
 589    }
 590
 591    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 592        self.thread_state(thread_id)
 593            .unwrap_or(ThreadStatus::Running)
 594    }
 595
 596    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 597        self.known_thread_states
 598            .get(&thread_id)
 599            .copied()
 600            .or(self.global_state)
 601    }
 602
 603    fn exit_thread(&mut self, thread_id: ThreadId) {
 604        self.known_thread_states
 605            .insert(thread_id, ThreadStatus::Exited);
 606    }
 607
 608    fn any_stopped_thread(&self) -> bool {
 609        self.global_state
 610            .is_some_and(|state| state == ThreadStatus::Stopped)
 611            || self
 612                .known_thread_states
 613                .values()
 614                .any(|status| *status == ThreadStatus::Stopped)
 615    }
 616}
 617const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 618
 619#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 620pub struct OutputToken(pub usize);
 621/// Represents a current state of a single debug adapter and provides ways to mutate it.
 622pub struct Session {
 623    mode: Mode,
 624    pub(super) capabilities: Capabilities,
 625    id: SessionId,
 626    child_session_ids: HashSet<SessionId>,
 627    parent_id: Option<SessionId>,
 628    ignore_breakpoints: bool,
 629    modules: Vec<dap::Module>,
 630    loaded_sources: Vec<dap::Source>,
 631    output_token: OutputToken,
 632    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 633    threads: IndexMap<ThreadId, Thread>,
 634    thread_states: ThreadStates,
 635    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 636    stack_frames: IndexMap<StackFrameId, StackFrame>,
 637    locations: HashMap<u64, dap::LocationsResponse>,
 638    is_session_terminated: bool,
 639    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 640    _background_tasks: Vec<Task<()>>,
 641}
 642
 643trait CacheableCommand: 'static + Send + Sync {
 644    fn as_any(&self) -> &dyn Any;
 645    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 646    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 647    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 648}
 649
 650impl<T> CacheableCommand for T
 651where
 652    T: DapCommand + PartialEq + Eq + Hash,
 653{
 654    fn as_any(&self) -> &dyn Any {
 655        self
 656    }
 657
 658    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 659        rhs.as_any()
 660            .downcast_ref::<Self>()
 661            .map_or(false, |rhs| self == rhs)
 662    }
 663
 664    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 665        T::hash(self, &mut hasher);
 666    }
 667
 668    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 669        self
 670    }
 671}
 672
 673pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 674
 675impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 676    fn from(request: T) -> Self {
 677        Self(Arc::new(request))
 678    }
 679}
 680
 681impl PartialEq for RequestSlot {
 682    fn eq(&self, other: &Self) -> bool {
 683        self.0.dyn_eq(other.0.as_ref())
 684    }
 685}
 686
 687impl Eq for RequestSlot {}
 688
 689impl Hash for RequestSlot {
 690    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 691        self.0.dyn_hash(state);
 692        self.0.as_any().type_id().hash(state)
 693    }
 694}
 695
 696#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 697pub struct CompletionsQuery {
 698    pub query: String,
 699    pub column: u64,
 700    pub line: Option<u64>,
 701    pub frame_id: Option<u64>,
 702}
 703
 704impl CompletionsQuery {
 705    pub fn new(
 706        buffer: &language::Buffer,
 707        cursor_position: language::Anchor,
 708        frame_id: Option<u64>,
 709    ) -> Self {
 710        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 711        Self {
 712            query: buffer.text(),
 713            column: column as u64,
 714            frame_id,
 715            line: Some(row as u64),
 716        }
 717    }
 718}
 719
 720pub enum SessionEvent {
 721    Modules,
 722    LoadedSources,
 723    Stopped(Option<ThreadId>),
 724    StackTrace,
 725    Variables,
 726    Threads,
 727}
 728
 729impl EventEmitter<SessionEvent> for Session {}
 730
 731// local session will send breakpoint updates to DAP for all new breakpoints
 732// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 733// BreakpointStore notifies session on breakpoint changes
 734impl Session {
 735    pub(crate) fn local(
 736        breakpoint_store: Entity<BreakpointStore>,
 737        session_id: SessionId,
 738        parent_session: Option<Entity<Session>>,
 739        delegate: DapAdapterDelegate,
 740        config: DebugAdapterConfig,
 741        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 742        initialized_tx: oneshot::Sender<()>,
 743        cx: &mut App,
 744    ) -> Task<Result<Entity<Self>>> {
 745        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 746
 747        cx.spawn(async move |cx| {
 748            let (mode, capabilities) = LocalMode::new(
 749                session_id,
 750                parent_session.clone(),
 751                breakpoint_store.clone(),
 752                config.clone(),
 753                delegate,
 754                message_tx,
 755                cx.clone(),
 756            )
 757            .await?;
 758
 759            cx.new(|cx| {
 760                let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Self>, cx| {
 761                    let mut initialized_tx = Some(initialized_tx);
 762                    while let Some(message) = message_rx.next().await {
 763                        if let Message::Event(event) = message {
 764                            if let Events::Initialized(_) = *event {
 765                                if let Some(tx) = initialized_tx.take() {
 766                                    tx.send(()).ok();
 767                                }
 768                            } else {
 769                                let Ok(_) = this.update(cx, |session, cx| {
 770                                    session.handle_dap_event(event, cx);
 771                                }) else {
 772                                    break;
 773                                };
 774                            }
 775                        } else {
 776                            let Ok(_) =
 777                                start_debugging_requests_tx.unbounded_send((session_id, message))
 778                            else {
 779                                break;
 780                            };
 781                        }
 782                    }
 783                })];
 784
 785                cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 786                    BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 787                        if let Some(local) = (!this.ignore_breakpoints)
 788                            .then(|| this.as_local_mut())
 789                            .flatten()
 790                        {
 791                            local
 792                                .send_breakpoints_from_path(path.clone(), *reason, cx)
 793                                .detach();
 794                        };
 795                    }
 796                    BreakpointStoreEvent::BreakpointsCleared(paths) => {
 797                        if let Some(local) = (!this.ignore_breakpoints)
 798                            .then(|| this.as_local_mut())
 799                            .flatten()
 800                        {
 801                            local.unset_breakpoints_from_paths(paths, cx).detach();
 802                        }
 803                    }
 804                    BreakpointStoreEvent::ActiveDebugLineChanged => {}
 805                })
 806                .detach();
 807
 808                Self {
 809                    mode: Mode::Local(mode),
 810                    id: session_id,
 811                    child_session_ids: HashSet::default(),
 812                    parent_id: parent_session.map(|session| session.read(cx).id),
 813                    variables: Default::default(),
 814                    capabilities,
 815                    thread_states: ThreadStates::default(),
 816                    output_token: OutputToken(0),
 817                    ignore_breakpoints: false,
 818                    output: circular_buffer::CircularBuffer::boxed(),
 819                    requests: HashMap::default(),
 820                    modules: Vec::default(),
 821                    loaded_sources: Vec::default(),
 822                    threads: IndexMap::default(),
 823                    stack_frames: IndexMap::default(),
 824                    locations: Default::default(),
 825                    _background_tasks,
 826                    is_session_terminated: false,
 827                }
 828            })
 829        })
 830    }
 831
 832    pub(crate) fn remote(
 833        session_id: SessionId,
 834        client: AnyProtoClient,
 835        upstream_project_id: u64,
 836        ignore_breakpoints: bool,
 837    ) -> Self {
 838        Self {
 839            mode: Mode::Remote(RemoteConnection {
 840                _client: client,
 841                _upstream_project_id: upstream_project_id,
 842            }),
 843            id: session_id,
 844            child_session_ids: HashSet::default(),
 845            parent_id: None,
 846            capabilities: Capabilities::default(),
 847            ignore_breakpoints,
 848            variables: Default::default(),
 849            stack_frames: Default::default(),
 850            thread_states: ThreadStates::default(),
 851            output_token: OutputToken(0),
 852            output: circular_buffer::CircularBuffer::boxed(),
 853            requests: HashMap::default(),
 854            modules: Vec::default(),
 855            loaded_sources: Vec::default(),
 856            threads: IndexMap::default(),
 857            _background_tasks: Vec::default(),
 858            locations: Default::default(),
 859            is_session_terminated: false,
 860        }
 861    }
 862
 863    pub fn session_id(&self) -> SessionId {
 864        self.id
 865    }
 866
 867    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 868        self.child_session_ids.clone()
 869    }
 870
 871    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 872        self.child_session_ids.insert(session_id);
 873    }
 874
 875    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 876        self.child_session_ids.remove(&session_id);
 877    }
 878
 879    pub fn parent_id(&self) -> Option<SessionId> {
 880        self.parent_id
 881    }
 882
 883    pub fn capabilities(&self) -> &Capabilities {
 884        &self.capabilities
 885    }
 886
 887    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 888        if let Mode::Local(local_mode) = &self.mode {
 889            Some(local_mode.config.clone())
 890        } else {
 891            None
 892        }
 893    }
 894
 895    pub fn is_terminated(&self) -> bool {
 896        self.is_session_terminated
 897    }
 898
 899    pub fn is_local(&self) -> bool {
 900        matches!(self.mode, Mode::Local(_))
 901    }
 902
 903    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 904        match &mut self.mode {
 905            Mode::Local(local_mode) => Some(local_mode),
 906            Mode::Remote(_) => None,
 907        }
 908    }
 909
 910    pub fn as_local(&self) -> Option<&LocalMode> {
 911        match &self.mode {
 912            Mode::Local(local_mode) => Some(local_mode),
 913            Mode::Remote(_) => None,
 914        }
 915    }
 916
 917    pub(super) fn initialize_sequence(
 918        &mut self,
 919        initialize_rx: oneshot::Receiver<()>,
 920        cx: &mut Context<Self>,
 921    ) -> Task<Result<()>> {
 922        match &self.mode {
 923            Mode::Local(local_mode) => {
 924                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
 925            }
 926            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 927        }
 928    }
 929
 930    pub fn output(
 931        &self,
 932        since: OutputToken,
 933    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 934        if self.output_token.0 == 0 {
 935            return (self.output.range(0..0), OutputToken(0));
 936        };
 937
 938        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 939
 940        let clamped_events_since = events_since.clamp(0, self.output.len());
 941        (
 942            self.output
 943                .range(self.output.len() - clamped_events_since..),
 944            self.output_token,
 945        )
 946    }
 947
 948    pub fn respond_to_client(
 949        &self,
 950        request_seq: u64,
 951        success: bool,
 952        command: String,
 953        body: Option<serde_json::Value>,
 954        cx: &mut Context<Self>,
 955    ) -> Task<Result<()>> {
 956        let Some(local_session) = self.as_local().cloned() else {
 957            unreachable!("Cannot respond to remote client");
 958        };
 959
 960        cx.background_spawn(async move {
 961            local_session
 962                .client
 963                .send_message(Message::Response(Response {
 964                    body,
 965                    success,
 966                    command,
 967                    seq: request_seq + 1,
 968                    request_seq,
 969                    message: None,
 970                }))
 971                .await
 972        })
 973    }
 974
 975    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
 976        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
 977            self.thread_states.stop_all_threads();
 978
 979            self.invalidate_command_type::<StackTraceCommand>();
 980        }
 981
 982        // Event if we stopped all threads we still need to insert the thread_id
 983        // to our own data
 984        if let Some(thread_id) = event.thread_id {
 985            self.thread_states.stop_thread(ThreadId(thread_id));
 986
 987            self.invalidate_state(
 988                &StackTraceCommand {
 989                    thread_id,
 990                    start_frame: None,
 991                    levels: None,
 992                }
 993                .into(),
 994            );
 995        }
 996
 997        self.invalidate_generic();
 998        self.threads.clear();
 999        self.variables.clear();
1000        cx.emit(SessionEvent::Stopped(
1001            event
1002                .thread_id
1003                .map(Into::into)
1004                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1005        ));
1006        cx.notify();
1007    }
1008
1009    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1010        match *event {
1011            Events::Initialized(_) => {
1012                debug_assert!(
1013                    false,
1014                    "Initialized event should have been handled in LocalMode"
1015                );
1016            }
1017            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1018            Events::Continued(event) => {
1019                if event.all_threads_continued.unwrap_or_default() {
1020                    self.thread_states.continue_all_threads();
1021                } else {
1022                    self.thread_states
1023                        .continue_thread(ThreadId(event.thread_id));
1024                }
1025                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1026                self.invalidate_generic();
1027            }
1028            Events::Exited(_event) => {
1029                self.clear_active_debug_line(cx);
1030            }
1031            Events::Terminated(_) => {
1032                self.is_session_terminated = true;
1033                self.clear_active_debug_line(cx);
1034            }
1035            Events::Thread(event) => {
1036                let thread_id = ThreadId(event.thread_id);
1037
1038                match event.reason {
1039                    dap::ThreadEventReason::Started => {
1040                        self.thread_states.continue_thread(thread_id);
1041                    }
1042                    dap::ThreadEventReason::Exited => {
1043                        self.thread_states.exit_thread(thread_id);
1044                    }
1045                    reason => {
1046                        log::error!("Unhandled thread event reason {:?}", reason);
1047                    }
1048                }
1049                self.invalidate_state(&ThreadsCommand.into());
1050                cx.notify();
1051            }
1052            Events::Output(event) => {
1053                if event
1054                    .category
1055                    .as_ref()
1056                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1057                {
1058                    return;
1059                }
1060
1061                self.output.push_back(event);
1062                self.output_token.0 += 1;
1063                cx.notify();
1064            }
1065            Events::Breakpoint(_) => {}
1066            Events::Module(event) => {
1067                match event.reason {
1068                    dap::ModuleEventReason::New => {
1069                        self.modules.push(event.module);
1070                    }
1071                    dap::ModuleEventReason::Changed => {
1072                        if let Some(module) = self
1073                            .modules
1074                            .iter_mut()
1075                            .find(|other| event.module.id == other.id)
1076                        {
1077                            *module = event.module;
1078                        }
1079                    }
1080                    dap::ModuleEventReason::Removed => {
1081                        self.modules.retain(|other| event.module.id != other.id);
1082                    }
1083                }
1084
1085                // todo(debugger): We should only send the invalidate command to downstream clients.
1086                // self.invalidate_state(&ModulesCommand.into());
1087            }
1088            Events::LoadedSource(_) => {
1089                self.invalidate_state(&LoadedSourcesCommand.into());
1090            }
1091            Events::Capabilities(event) => {
1092                self.capabilities = self.capabilities.merge(event.capabilities);
1093                cx.notify();
1094            }
1095            Events::Memory(_) => {}
1096            Events::Process(_) => {}
1097            Events::ProgressEnd(_) => {}
1098            Events::ProgressStart(_) => {}
1099            Events::ProgressUpdate(_) => {}
1100            Events::Invalidated(_) => {}
1101            Events::Other(_) => {}
1102        }
1103    }
1104
1105    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1106    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1107        &mut self,
1108        request: T,
1109        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1110            + 'static,
1111        cx: &mut Context<Self>,
1112    ) {
1113        const {
1114            assert!(
1115                T::CACHEABLE,
1116                "Only requests marked as cacheable should invoke `fetch`"
1117            );
1118        }
1119
1120        if !self.thread_states.any_stopped_thread()
1121            && request.type_id() != TypeId::of::<ThreadsCommand>()
1122            || self.is_session_terminated
1123        {
1124            return;
1125        }
1126
1127        let request_map = self
1128            .requests
1129            .entry(std::any::TypeId::of::<T>())
1130            .or_default();
1131
1132        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1133            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1134
1135            let task = Self::request_inner::<Arc<T>>(
1136                &self.capabilities,
1137                self.id,
1138                &self.mode,
1139                command,
1140                process_result,
1141                cx,
1142            );
1143            let task = cx
1144                .background_executor()
1145                .spawn(async move {
1146                    let _ = task.await?;
1147                    Some(())
1148                })
1149                .shared();
1150
1151            vacant.insert(task);
1152            cx.notify();
1153        }
1154    }
1155
1156    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1157        capabilities: &Capabilities,
1158        session_id: SessionId,
1159        mode: &Mode,
1160        request: T,
1161        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1162            + 'static,
1163        cx: &mut Context<Self>,
1164    ) -> Task<Option<T::Response>> {
1165        if !T::is_supported(&capabilities) {
1166            log::warn!(
1167                "Attempted to send a DAP request that isn't supported: {:?}",
1168                request
1169            );
1170            let error = Err(anyhow::Error::msg(
1171                "Couldn't complete request because it's not supported",
1172            ));
1173            return cx.spawn(async move |this, cx| {
1174                this.update(cx, |this, cx| process_result(this, error, cx))
1175                    .log_err()
1176                    .flatten()
1177            });
1178        }
1179
1180        let request = mode.request_dap(session_id, request, cx);
1181        cx.spawn(async move |this, cx| {
1182            let result = request.await;
1183            this.update(cx, |this, cx| process_result(this, result, cx))
1184                .log_err()
1185                .flatten()
1186        })
1187    }
1188
1189    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1190        &self,
1191        request: T,
1192        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1193            + 'static,
1194        cx: &mut Context<Self>,
1195    ) -> Task<Option<T::Response>> {
1196        Self::request_inner(
1197            &self.capabilities,
1198            self.id,
1199            &self.mode,
1200            request,
1201            process_result,
1202            cx,
1203        )
1204    }
1205
1206    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1207        self.requests.remove(&std::any::TypeId::of::<Command>());
1208    }
1209
1210    fn invalidate_generic(&mut self) {
1211        self.invalidate_command_type::<ModulesCommand>();
1212        self.invalidate_command_type::<LoadedSourcesCommand>();
1213        self.invalidate_command_type::<ThreadsCommand>();
1214    }
1215
1216    fn invalidate_state(&mut self, key: &RequestSlot) {
1217        self.requests
1218            .entry(key.0.as_any().type_id())
1219            .and_modify(|request_map| {
1220                request_map.remove(&key);
1221            });
1222    }
1223
1224    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1225        self.thread_states.thread_status(thread_id)
1226    }
1227
1228    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1229        self.fetch(
1230            dap_command::ThreadsCommand,
1231            |this, result, cx| {
1232                let result = result.log_err()?;
1233
1234                this.threads = result
1235                    .iter()
1236                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1237                    .collect();
1238
1239                this.invalidate_command_type::<StackTraceCommand>();
1240                cx.emit(SessionEvent::Threads);
1241                cx.notify();
1242
1243                Some(result)
1244            },
1245            cx,
1246        );
1247
1248        self.threads
1249            .values()
1250            .map(|thread| {
1251                (
1252                    thread.dap.clone(),
1253                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1254                )
1255            })
1256            .collect()
1257    }
1258
1259    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1260        self.fetch(
1261            dap_command::ModulesCommand,
1262            |this, result, cx| {
1263                let result = result.log_err()?;
1264
1265                this.modules = result.iter().cloned().collect();
1266                cx.emit(SessionEvent::Modules);
1267                cx.notify();
1268
1269                Some(result)
1270            },
1271            cx,
1272        );
1273
1274        &self.modules
1275    }
1276
1277    pub fn ignore_breakpoints(&self) -> bool {
1278        self.ignore_breakpoints
1279    }
1280
1281    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1282        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1283    }
1284
1285    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1286        if self.ignore_breakpoints == ignore {
1287            return Task::ready(());
1288        }
1289
1290        self.ignore_breakpoints = ignore;
1291
1292        if let Some(local) = self.as_local() {
1293            local.send_all_breakpoints(ignore, cx)
1294        } else {
1295            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1296            unimplemented!()
1297        }
1298    }
1299
1300    pub fn breakpoints_enabled(&self) -> bool {
1301        self.ignore_breakpoints
1302    }
1303
1304    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1305        self.fetch(
1306            dap_command::LoadedSourcesCommand,
1307            |this, result, cx| {
1308                let result = result.log_err()?;
1309                this.loaded_sources = result.iter().cloned().collect();
1310                cx.emit(SessionEvent::LoadedSources);
1311                cx.notify();
1312                Some(result)
1313            },
1314            cx,
1315        );
1316
1317        &self.loaded_sources
1318    }
1319
1320    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1321        res.log_err()?;
1322        Some(())
1323    }
1324
1325    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1326        thread_id: ThreadId,
1327    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1328    {
1329        move |this, response, cx| match response.log_err() {
1330            Some(response) => Some(response),
1331            None => {
1332                this.thread_states.stop_thread(thread_id);
1333                cx.notify();
1334                None
1335            }
1336        }
1337    }
1338
1339    fn clear_active_debug_line_response(
1340        &mut self,
1341        response: Result<()>,
1342        cx: &mut Context<'_, Session>,
1343    ) -> Option<()> {
1344        response.log_err()?;
1345        self.clear_active_debug_line(cx);
1346        Some(())
1347    }
1348
1349    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1350        self.as_local()
1351            .expect("Message handler will only run in local mode")
1352            .breakpoint_store
1353            .update(cx, |store, cx| {
1354                store.remove_active_position(Some(self.id), cx)
1355            });
1356    }
1357
1358    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1359        self.request(
1360            PauseCommand {
1361                thread_id: thread_id.0,
1362            },
1363            Self::empty_response,
1364            cx,
1365        )
1366        .detach();
1367    }
1368
1369    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1370        self.request(
1371            RestartStackFrameCommand { stack_frame_id },
1372            Self::empty_response,
1373            cx,
1374        )
1375        .detach();
1376    }
1377
1378    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1379        if self.capabilities.supports_restart_request.unwrap_or(false) {
1380            self.request(
1381                RestartCommand {
1382                    raw: args.unwrap_or(Value::Null),
1383                },
1384                Self::empty_response,
1385                cx,
1386            )
1387            .detach();
1388        } else {
1389            self.request(
1390                DisconnectCommand {
1391                    restart: Some(false),
1392                    terminate_debuggee: Some(true),
1393                    suspend_debuggee: Some(false),
1394                },
1395                Self::empty_response,
1396                cx,
1397            )
1398            .detach();
1399        }
1400    }
1401
1402    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1403        self.is_session_terminated = true;
1404        self.thread_states.exit_all_threads();
1405        cx.notify();
1406
1407        let task = if self
1408            .capabilities
1409            .supports_terminate_request
1410            .unwrap_or_default()
1411        {
1412            self.request(
1413                TerminateCommand {
1414                    restart: Some(false),
1415                },
1416                Self::clear_active_debug_line_response,
1417                cx,
1418            )
1419        } else {
1420            self.request(
1421                DisconnectCommand {
1422                    restart: Some(false),
1423                    terminate_debuggee: Some(true),
1424                    suspend_debuggee: Some(false),
1425                },
1426                Self::clear_active_debug_line_response,
1427                cx,
1428            )
1429        };
1430
1431        cx.background_spawn(async move {
1432            let _ = task.await;
1433        })
1434    }
1435
1436    pub fn completions(
1437        &mut self,
1438        query: CompletionsQuery,
1439        cx: &mut Context<Self>,
1440    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1441        let task = self.request(query, |_, result, _| result.log_err(), cx);
1442
1443        cx.background_executor().spawn(async move {
1444            anyhow::Ok(
1445                task.await
1446                    .map(|response| response.targets)
1447                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1448            )
1449        })
1450    }
1451
1452    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1453        self.thread_states.continue_thread(thread_id);
1454        self.request(
1455            ContinueCommand {
1456                args: ContinueArguments {
1457                    thread_id: thread_id.0,
1458                    single_thread: Some(true),
1459                },
1460            },
1461            Self::on_step_response::<ContinueCommand>(thread_id),
1462            cx,
1463        )
1464        .detach();
1465    }
1466
1467    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1468        match self.mode {
1469            Mode::Local(ref local) => Some(local.client.clone()),
1470            Mode::Remote(_) => None,
1471        }
1472    }
1473
1474    pub fn step_over(
1475        &mut self,
1476        thread_id: ThreadId,
1477        granularity: SteppingGranularity,
1478        cx: &mut Context<Self>,
1479    ) {
1480        let supports_single_thread_execution_requests =
1481            self.capabilities.supports_single_thread_execution_requests;
1482        let supports_stepping_granularity = self
1483            .capabilities
1484            .supports_stepping_granularity
1485            .unwrap_or_default();
1486
1487        let command = NextCommand {
1488            inner: StepCommand {
1489                thread_id: thread_id.0,
1490                granularity: supports_stepping_granularity.then(|| granularity),
1491                single_thread: supports_single_thread_execution_requests,
1492            },
1493        };
1494
1495        self.thread_states.process_step(thread_id);
1496        self.request(
1497            command,
1498            Self::on_step_response::<NextCommand>(thread_id),
1499            cx,
1500        )
1501        .detach();
1502    }
1503
1504    pub fn step_in(
1505        &mut self,
1506        thread_id: ThreadId,
1507        granularity: SteppingGranularity,
1508        cx: &mut Context<Self>,
1509    ) {
1510        let supports_single_thread_execution_requests =
1511            self.capabilities.supports_single_thread_execution_requests;
1512        let supports_stepping_granularity = self
1513            .capabilities
1514            .supports_stepping_granularity
1515            .unwrap_or_default();
1516
1517        let command = StepInCommand {
1518            inner: StepCommand {
1519                thread_id: thread_id.0,
1520                granularity: supports_stepping_granularity.then(|| granularity),
1521                single_thread: supports_single_thread_execution_requests,
1522            },
1523        };
1524
1525        self.thread_states.process_step(thread_id);
1526        self.request(
1527            command,
1528            Self::on_step_response::<StepInCommand>(thread_id),
1529            cx,
1530        )
1531        .detach();
1532    }
1533
1534    pub fn step_out(
1535        &mut self,
1536        thread_id: ThreadId,
1537        granularity: SteppingGranularity,
1538        cx: &mut Context<Self>,
1539    ) {
1540        let supports_single_thread_execution_requests =
1541            self.capabilities.supports_single_thread_execution_requests;
1542        let supports_stepping_granularity = self
1543            .capabilities
1544            .supports_stepping_granularity
1545            .unwrap_or_default();
1546
1547        let command = StepOutCommand {
1548            inner: StepCommand {
1549                thread_id: thread_id.0,
1550                granularity: supports_stepping_granularity.then(|| granularity),
1551                single_thread: supports_single_thread_execution_requests,
1552            },
1553        };
1554
1555        self.thread_states.process_step(thread_id);
1556        self.request(
1557            command,
1558            Self::on_step_response::<StepOutCommand>(thread_id),
1559            cx,
1560        )
1561        .detach();
1562    }
1563
1564    pub fn step_back(
1565        &mut self,
1566        thread_id: ThreadId,
1567        granularity: SteppingGranularity,
1568        cx: &mut Context<Self>,
1569    ) {
1570        let supports_single_thread_execution_requests =
1571            self.capabilities.supports_single_thread_execution_requests;
1572        let supports_stepping_granularity = self
1573            .capabilities
1574            .supports_stepping_granularity
1575            .unwrap_or_default();
1576
1577        let command = StepBackCommand {
1578            inner: StepCommand {
1579                thread_id: thread_id.0,
1580                granularity: supports_stepping_granularity.then(|| granularity),
1581                single_thread: supports_single_thread_execution_requests,
1582            },
1583        };
1584
1585        self.thread_states.process_step(thread_id);
1586
1587        self.request(
1588            command,
1589            Self::on_step_response::<StepBackCommand>(thread_id),
1590            cx,
1591        )
1592        .detach();
1593    }
1594
1595    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1596        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1597            && self.requests.contains_key(&ThreadsCommand.type_id())
1598            && self.threads.contains_key(&thread_id)
1599        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1600        // We could still be using an old thread id and have sent a new thread's request
1601        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1602        // But it very well could cause a minor bug in the future that is hard to track down
1603        {
1604            self.fetch(
1605                super::dap_command::StackTraceCommand {
1606                    thread_id: thread_id.0,
1607                    start_frame: None,
1608                    levels: None,
1609                },
1610                move |this, stack_frames, cx| {
1611                    let stack_frames = stack_frames.log_err()?;
1612
1613                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1614                        thread.stack_frame_ids =
1615                            stack_frames.iter().map(|frame| frame.id).collect();
1616                    });
1617                    debug_assert!(
1618                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1619                        "Sent request for thread_id that doesn't exist"
1620                    );
1621
1622                    this.stack_frames.extend(
1623                        stack_frames
1624                            .iter()
1625                            .cloned()
1626                            .map(|frame| (frame.id, StackFrame::from(frame))),
1627                    );
1628
1629                    this.invalidate_command_type::<ScopesCommand>();
1630                    this.invalidate_command_type::<VariablesCommand>();
1631
1632                    cx.emit(SessionEvent::StackTrace);
1633                    cx.notify();
1634                    Some(stack_frames)
1635                },
1636                cx,
1637            );
1638        }
1639
1640        self.threads
1641            .get(&thread_id)
1642            .map(|thread| {
1643                thread
1644                    .stack_frame_ids
1645                    .iter()
1646                    .filter_map(|id| self.stack_frames.get(id))
1647                    .cloned()
1648                    .collect()
1649            })
1650            .unwrap_or_default()
1651    }
1652
1653    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1654        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1655            && self
1656                .requests
1657                .contains_key(&TypeId::of::<StackTraceCommand>())
1658        {
1659            self.fetch(
1660                ScopesCommand { stack_frame_id },
1661                move |this, scopes, cx| {
1662                    let scopes = scopes.log_err()?;
1663
1664                    for scope in scopes .iter(){
1665                        this.variables(scope.variables_reference, cx);
1666                    }
1667
1668                    let entry = this
1669                        .stack_frames
1670                        .entry(stack_frame_id)
1671                        .and_modify(|stack_frame| {
1672                            stack_frame.scopes = scopes.clone();
1673                        });
1674
1675                    cx.emit(SessionEvent::Variables);
1676
1677                    debug_assert!(
1678                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1679                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1680                    );
1681
1682                    Some(scopes)
1683                },
1684                cx,
1685            );
1686        }
1687
1688        self.stack_frames
1689            .get(&stack_frame_id)
1690            .map(|frame| frame.scopes.as_slice())
1691            .unwrap_or_default()
1692    }
1693
1694    pub fn variables(
1695        &mut self,
1696        variables_reference: VariableReference,
1697        cx: &mut Context<Self>,
1698    ) -> Vec<dap::Variable> {
1699        let command = VariablesCommand {
1700            variables_reference,
1701            filter: None,
1702            start: None,
1703            count: None,
1704            format: None,
1705        };
1706
1707        self.fetch(
1708            command,
1709            move |this, variables, cx| {
1710                let variables = variables.log_err()?;
1711                this.variables
1712                    .insert(variables_reference, variables.clone());
1713
1714                cx.emit(SessionEvent::Variables);
1715                Some(variables)
1716            },
1717            cx,
1718        );
1719
1720        self.variables
1721            .get(&variables_reference)
1722            .cloned()
1723            .unwrap_or_default()
1724    }
1725
1726    pub fn set_variable_value(
1727        &mut self,
1728        variables_reference: u64,
1729        name: String,
1730        value: String,
1731        cx: &mut Context<Self>,
1732    ) {
1733        if self.capabilities.supports_set_variable.unwrap_or_default() {
1734            self.request(
1735                SetVariableValueCommand {
1736                    name,
1737                    value,
1738                    variables_reference,
1739                },
1740                move |this, response, cx| {
1741                    let response = response.log_err()?;
1742                    this.invalidate_command_type::<VariablesCommand>();
1743                    cx.notify();
1744                    Some(response)
1745                },
1746                cx,
1747            )
1748            .detach()
1749        }
1750    }
1751
1752    pub fn evaluate(
1753        &mut self,
1754        expression: String,
1755        context: Option<EvaluateArgumentsContext>,
1756        frame_id: Option<u64>,
1757        source: Option<Source>,
1758        cx: &mut Context<Self>,
1759    ) {
1760        self.request(
1761            EvaluateCommand {
1762                expression,
1763                context,
1764                frame_id,
1765                source,
1766            },
1767            |this, response, cx| {
1768                let response = response.log_err()?;
1769                this.output_token.0 += 1;
1770                this.output.push_back(dap::OutputEvent {
1771                    category: None,
1772                    output: response.result.clone(),
1773                    group: None,
1774                    variables_reference: Some(response.variables_reference),
1775                    source: None,
1776                    line: None,
1777                    column: None,
1778                    data: None,
1779                    location_reference: None,
1780                });
1781
1782                this.invalidate_command_type::<ScopesCommand>();
1783                cx.notify();
1784                Some(response)
1785            },
1786            cx,
1787        )
1788        .detach();
1789    }
1790
1791    pub fn location(
1792        &mut self,
1793        reference: u64,
1794        cx: &mut Context<Self>,
1795    ) -> Option<dap::LocationsResponse> {
1796        self.fetch(
1797            LocationsCommand { reference },
1798            move |this, response, _| {
1799                let response = response.log_err()?;
1800                this.locations.insert(reference, response.clone());
1801                Some(response)
1802            },
1803            cx,
1804        );
1805        self.locations.get(&reference).cloned()
1806    }
1807    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1808        let command = DisconnectCommand {
1809            restart: Some(false),
1810            terminate_debuggee: Some(true),
1811            suspend_debuggee: Some(false),
1812        };
1813
1814        self.request(command, Self::empty_response, cx).detach()
1815    }
1816
1817    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1818        if self
1819            .capabilities
1820            .supports_terminate_threads_request
1821            .unwrap_or_default()
1822        {
1823            self.request(
1824                TerminateThreadsCommand {
1825                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1826                },
1827                Self::clear_active_debug_line_response,
1828                cx,
1829            )
1830            .detach();
1831        } else {
1832            self.shutdown(cx).detach();
1833        }
1834    }
1835}