session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    adapters::{DapDelegate, DapStatus},
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
  25use futures::channel::oneshot;
  26use futures::{FutureExt, future::Shared};
  27use gpui::{
  28    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  29};
  30use rpc::AnyProtoClient;
  31use serde_json::{Value, json};
  32use settings::Settings;
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::path::PathBuf;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::{DebugAdapterConfig, DebugTaskDefinition};
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{ResultExt, merge_json_value_into};
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117type UpstreamProjectId = u64;
 118
 119struct RemoteConnection {
 120    _client: AnyProtoClient,
 121    _upstream_project_id: UpstreamProjectId,
 122}
 123
 124impl RemoteConnection {
 125    fn send_proto_client_request<R: DapCommand>(
 126        &self,
 127        _request: R,
 128        _session_id: SessionId,
 129        cx: &mut App,
 130    ) -> Task<Result<R::Response>> {
 131        // let message = request.to_proto(session_id, self.upstream_project_id);
 132        // let upstream_client = self.client.clone();
 133        cx.background_executor().spawn(async move {
 134            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 135            // let response = upstream_client.request(message).await?;
 136            // request.response_from_proto(response)
 137            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 138        })
 139    }
 140
 141    fn request<R: DapCommand>(
 142        &self,
 143        request: R,
 144        session_id: SessionId,
 145        cx: &mut App,
 146    ) -> Task<Result<R::Response>>
 147    where
 148        <R::DapRequest as dap::requests::Request>::Response: 'static,
 149        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 150    {
 151        return self.send_proto_client_request::<R>(request, session_id, cx);
 152    }
 153}
 154
 155enum Mode {
 156    Local(LocalMode),
 157    Remote(RemoteConnection),
 158}
 159
 160#[derive(Clone)]
 161pub struct LocalMode {
 162    client: Arc<DebugAdapterClient>,
 163    config: DebugAdapterConfig,
 164    adapter: Arc<dyn DebugAdapter>,
 165    breakpoint_store: Entity<BreakpointStore>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        debug_adapters: Arc<DapRegistry>,
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<Self>> {
 194        Self::new_inner(
 195            debug_adapters,
 196            session_id,
 197            parent_session,
 198            breakpoint_store,
 199            config,
 200            delegate,
 201            messages_tx,
 202            async |_, _| {},
 203            cx,
 204        )
 205    }
 206    #[cfg(any(test, feature = "test-support"))]
 207    fn new_fake(
 208        session_id: SessionId,
 209        parent_session: Option<Entity<Session>>,
 210        breakpoint_store: Entity<BreakpointStore>,
 211        config: DebugAdapterConfig,
 212        delegate: DapAdapterDelegate,
 213        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 214        caps: Capabilities,
 215        fail: bool,
 216        cx: AsyncApp,
 217    ) -> Task<Result<Self>> {
 218        use task::DebugRequestDisposition;
 219
 220        let request = match config.request.clone() {
 221            DebugRequestDisposition::UserConfigured(request) => request,
 222            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 223                match reverse_request_args.request {
 224                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 225                        DebugRequestType::Launch(task::LaunchConfig {
 226                            program: "".to_owned(),
 227                            cwd: None,
 228                            args: Default::default(),
 229                        })
 230                    }
 231                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 232                        DebugRequestType::Attach(task::AttachConfig {
 233                            process_id: Some(0),
 234                        })
 235                    }
 236                }
 237            }
 238        };
 239
 240        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 241            session
 242                .client
 243                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 244                .await;
 245
 246            let paths = cx
 247                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 248                .expect("Breakpoint store should exist in all tests that start debuggers");
 249
 250            session
 251                .client
 252                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 253                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 254                    if !paths.contains(&p) {
 255                        panic!("Sent breakpoints for path without any")
 256                    }
 257
 258                    Ok(dap::SetBreakpointsResponse {
 259                        breakpoints: Vec::default(),
 260                    })
 261                })
 262                .await;
 263
 264            match request {
 265                dap::DebugRequestType::Launch(_) => {
 266                    if fail {
 267                        session
 268                            .client
 269                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 270                                Err(dap::ErrorResponse {
 271                                    error: Some(dap::Message {
 272                                        id: 1,
 273                                        format: "error".into(),
 274                                        variables: None,
 275                                        send_telemetry: None,
 276                                        show_user: None,
 277                                        url: None,
 278                                        url_label: None,
 279                                    }),
 280                                })
 281                            })
 282                            .await;
 283                    } else {
 284                        session
 285                            .client
 286                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 287                            .await;
 288                    }
 289                }
 290                dap::DebugRequestType::Attach(attach_config) => {
 291                    if fail {
 292                        session
 293                            .client
 294                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 295                                Err(dap::ErrorResponse {
 296                                    error: Some(dap::Message {
 297                                        id: 1,
 298                                        format: "error".into(),
 299                                        variables: None,
 300                                        send_telemetry: None,
 301                                        show_user: None,
 302                                        url: None,
 303                                        url_label: None,
 304                                    }),
 305                                })
 306                            })
 307                            .await;
 308                    } else {
 309                        session
 310                            .client
 311                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 312                                assert_eq!(
 313                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 314                                    args.raw
 315                                );
 316
 317                                Ok(())
 318                            })
 319                            .await;
 320                    }
 321                }
 322            }
 323
 324            session
 325                .client
 326                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 327                .await;
 328            session.client.fake_event(Events::Initialized(None)).await;
 329        };
 330        Self::new_inner(
 331            DapRegistry::fake().into(),
 332            session_id,
 333            parent_session,
 334            breakpoint_store,
 335            config,
 336            delegate,
 337            messages_tx,
 338            callback,
 339            cx,
 340        )
 341    }
 342    fn new_inner(
 343        registry: Arc<DapRegistry>,
 344        session_id: SessionId,
 345        parent_session: Option<Entity<Session>>,
 346        breakpoint_store: Entity<BreakpointStore>,
 347        config: DebugAdapterConfig,
 348        delegate: DapAdapterDelegate,
 349        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 350        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 351        cx: AsyncApp,
 352    ) -> Task<Result<Self>> {
 353        cx.spawn(async move |cx| {
 354            let (adapter, binary) =
 355                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 356
 357            let message_handler = Box::new(move |message| {
 358                messages_tx.unbounded_send(message).ok();
 359            });
 360
 361            let client = Arc::new(
 362                if let Some(client) = parent_session
 363                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 364                    .flatten()
 365                {
 366                    client
 367                        .reconnect(session_id, binary, message_handler, cx.clone())
 368                        .await?
 369                } else {
 370                    DebugAdapterClient::start(
 371                        session_id,
 372                        adapter.name(),
 373                        binary,
 374                        message_handler,
 375                        cx.clone(),
 376                    )
 377                    .await
 378                    .with_context(|| "Failed to start communication with debug adapter")?
 379                },
 380            );
 381
 382            let mut session = Self {
 383                client,
 384                adapter,
 385                breakpoint_store,
 386                config: config.clone(),
 387            };
 388
 389            on_initialized(&mut session, cx.clone()).await;
 390
 391            Ok(session)
 392        })
 393    }
 394
 395    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 396        let tasks: Vec<_> = paths
 397            .into_iter()
 398            .map(|path| {
 399                self.request(
 400                    dap_command::SetBreakpoints {
 401                        source: client_source(path),
 402                        source_modified: None,
 403                        breakpoints: vec![],
 404                    },
 405                    cx.background_executor().clone(),
 406                )
 407            })
 408            .collect();
 409
 410        cx.background_spawn(async move {
 411            futures::future::join_all(tasks)
 412                .await
 413                .iter()
 414                .for_each(|res| match res {
 415                    Ok(_) => {}
 416                    Err(err) => {
 417                        log::warn!("Set breakpoints request failed: {}", err);
 418                    }
 419                });
 420        })
 421    }
 422
 423    fn send_breakpoints_from_path(
 424        &self,
 425        abs_path: Arc<Path>,
 426        reason: BreakpointUpdatedReason,
 427        cx: &mut App,
 428    ) -> Task<()> {
 429        let breakpoints = self
 430            .breakpoint_store
 431            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 432            .into_iter()
 433            .filter(|bp| bp.state.is_enabled())
 434            .map(Into::into)
 435            .collect();
 436
 437        let task = self.request(
 438            dap_command::SetBreakpoints {
 439                source: client_source(&abs_path),
 440                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 441                breakpoints,
 442            },
 443            cx.background_executor().clone(),
 444        );
 445
 446        cx.background_spawn(async move {
 447            match task.await {
 448                Ok(_) => {}
 449                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 450            }
 451        })
 452    }
 453
 454    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 455        let mut breakpoint_tasks = Vec::new();
 456        let breakpoints = self
 457            .breakpoint_store
 458            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 459
 460        for (path, breakpoints) in breakpoints {
 461            let breakpoints = if ignore_breakpoints {
 462                vec![]
 463            } else {
 464                breakpoints
 465                    .into_iter()
 466                    .filter(|bp| bp.state.is_enabled())
 467                    .map(Into::into)
 468                    .collect()
 469            };
 470
 471            breakpoint_tasks.push(self.request(
 472                dap_command::SetBreakpoints {
 473                    source: client_source(&path),
 474                    source_modified: Some(false),
 475                    breakpoints,
 476                },
 477                cx.background_executor().clone(),
 478            ));
 479        }
 480
 481        cx.background_spawn(async move {
 482            futures::future::join_all(breakpoint_tasks)
 483                .await
 484                .iter()
 485                .for_each(|res| match res {
 486                    Ok(_) => {}
 487                    Err(err) => {
 488                        log::warn!("Set breakpoints request failed: {}", err);
 489                    }
 490                });
 491        })
 492    }
 493
 494    async fn get_adapter_binary(
 495        registry: &Arc<DapRegistry>,
 496        config: &DebugAdapterConfig,
 497        delegate: &DapAdapterDelegate,
 498        cx: &mut AsyncApp,
 499    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 500        let adapter = registry
 501            .adapter(&config.adapter)
 502            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 503
 504        let binary = cx.update(|cx| {
 505            ProjectSettings::get_global(cx)
 506                .dap
 507                .get(&adapter.name())
 508                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 509        })?;
 510
 511        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 512            Err(error) => {
 513                delegate.update_status(
 514                    adapter.name(),
 515                    DapStatus::Failed {
 516                        error: error.to_string(),
 517                    },
 518                );
 519
 520                return Err(error);
 521            }
 522            Ok(mut binary) => {
 523                delegate.update_status(adapter.name(), DapStatus::None);
 524
 525                let shell_env = delegate.shell_env().await;
 526                let mut envs = binary.envs.unwrap_or_default();
 527                envs.extend(shell_env);
 528                binary.envs = Some(envs);
 529
 530                binary
 531            }
 532        };
 533
 534        Ok((adapter, binary))
 535    }
 536
 537    pub fn label(&self) -> String {
 538        self.config.label.clone()
 539    }
 540
 541    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 542        let adapter_id = self.adapter.name().to_string();
 543
 544        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 545    }
 546
 547    fn initialize_sequence(
 548        &self,
 549        capabilities: &Capabilities,
 550        initialized_rx: oneshot::Receiver<()>,
 551        cx: &App,
 552    ) -> Task<Result<()>> {
 553        let (mut raw, is_launch) = match &self.config.request {
 554            task::DebugRequestDisposition::UserConfigured(_) => {
 555                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 556                    debug_assert!(false, "This part of code should be unreachable in practice");
 557                    return Task::ready(Err(anyhow!(
 558                        "Expected debug config conversion to succeed"
 559                    )));
 560                };
 561                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 562                let raw = self.adapter.request_args(&raw);
 563                (raw, is_launch)
 564            }
 565            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 566                start_debugging_request_arguments.configuration.clone(),
 567                matches!(
 568                    start_debugging_request_arguments.request,
 569                    dap::StartDebuggingRequestArgumentsRequest::Launch
 570                ),
 571            ),
 572        };
 573
 574        merge_json_value_into(
 575            self.config.initialize_args.clone().unwrap_or(json!({})),
 576            &mut raw,
 577        );
 578        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 579        let launch = if is_launch {
 580            self.request(Launch { raw }, cx.background_executor().clone())
 581        } else {
 582            self.request(Attach { raw }, cx.background_executor().clone())
 583        };
 584
 585        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 586
 587        let configuration_sequence = cx.spawn({
 588            let this = self.clone();
 589            async move |cx| {
 590                initialized_rx.await?;
 591                // todo(debugger) figure out if we want to handle a breakpoint response error
 592                // This will probably consist of letting a user know that breakpoints failed to be set
 593                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 594
 595                if configuration_done_supported {
 596                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 597                } else {
 598                    Task::ready(Ok(()))
 599                }
 600                .await
 601            }
 602        });
 603
 604        cx.background_spawn(async move {
 605            futures::future::try_join(launch, configuration_sequence).await?;
 606            Ok(())
 607        })
 608    }
 609
 610    fn request<R: LocalDapCommand>(
 611        &self,
 612        request: R,
 613        executor: BackgroundExecutor,
 614    ) -> Task<Result<R::Response>>
 615    where
 616        <R::DapRequest as dap::requests::Request>::Response: 'static,
 617        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 618    {
 619        let request = Arc::new(request);
 620
 621        let request_clone = request.clone();
 622        let connection = self.client.clone();
 623        let request_task = executor.spawn(async move {
 624            let args = request_clone.to_dap();
 625            connection.request::<R::DapRequest>(args).await
 626        });
 627
 628        executor.spawn(async move {
 629            let response = request.response_from_dap(request_task.await?);
 630            response
 631        })
 632    }
 633}
 634impl From<RemoteConnection> for Mode {
 635    fn from(value: RemoteConnection) -> Self {
 636        Self::Remote(value)
 637    }
 638}
 639
 640impl Mode {
 641    fn request_dap<R: DapCommand>(
 642        &self,
 643        session_id: SessionId,
 644        request: R,
 645        cx: &mut Context<Session>,
 646    ) -> Task<Result<R::Response>>
 647    where
 648        <R::DapRequest as dap::requests::Request>::Response: 'static,
 649        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 650    {
 651        match self {
 652            Mode::Local(debug_adapter_client) => {
 653                debug_adapter_client.request(request, cx.background_executor().clone())
 654            }
 655            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 656        }
 657    }
 658}
 659
 660#[derive(Default)]
 661struct ThreadStates {
 662    global_state: Option<ThreadStatus>,
 663    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 664}
 665
 666impl ThreadStates {
 667    fn stop_all_threads(&mut self) {
 668        self.global_state = Some(ThreadStatus::Stopped);
 669        self.known_thread_states.clear();
 670    }
 671
 672    fn exit_all_threads(&mut self) {
 673        self.global_state = Some(ThreadStatus::Exited);
 674        self.known_thread_states.clear();
 675    }
 676
 677    fn continue_all_threads(&mut self) {
 678        self.global_state = Some(ThreadStatus::Running);
 679        self.known_thread_states.clear();
 680    }
 681
 682    fn stop_thread(&mut self, thread_id: ThreadId) {
 683        self.known_thread_states
 684            .insert(thread_id, ThreadStatus::Stopped);
 685    }
 686
 687    fn continue_thread(&mut self, thread_id: ThreadId) {
 688        self.known_thread_states
 689            .insert(thread_id, ThreadStatus::Running);
 690    }
 691
 692    fn process_step(&mut self, thread_id: ThreadId) {
 693        self.known_thread_states
 694            .insert(thread_id, ThreadStatus::Stepping);
 695    }
 696
 697    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 698        self.thread_state(thread_id)
 699            .unwrap_or(ThreadStatus::Running)
 700    }
 701
 702    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 703        self.known_thread_states
 704            .get(&thread_id)
 705            .copied()
 706            .or(self.global_state)
 707    }
 708
 709    fn exit_thread(&mut self, thread_id: ThreadId) {
 710        self.known_thread_states
 711            .insert(thread_id, ThreadStatus::Exited);
 712    }
 713
 714    fn any_stopped_thread(&self) -> bool {
 715        self.global_state
 716            .is_some_and(|state| state == ThreadStatus::Stopped)
 717            || self
 718                .known_thread_states
 719                .values()
 720                .any(|status| *status == ThreadStatus::Stopped)
 721    }
 722}
 723const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 724
 725#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 726pub struct OutputToken(pub usize);
 727/// Represents a current state of a single debug adapter and provides ways to mutate it.
 728pub struct Session {
 729    mode: Mode,
 730    pub(super) capabilities: Capabilities,
 731    id: SessionId,
 732    child_session_ids: HashSet<SessionId>,
 733    parent_id: Option<SessionId>,
 734    ignore_breakpoints: bool,
 735    modules: Vec<dap::Module>,
 736    loaded_sources: Vec<dap::Source>,
 737    output_token: OutputToken,
 738    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 739    threads: IndexMap<ThreadId, Thread>,
 740    thread_states: ThreadStates,
 741    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 742    stack_frames: IndexMap<StackFrameId, StackFrame>,
 743    locations: HashMap<u64, dap::LocationsResponse>,
 744    is_session_terminated: bool,
 745    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 746    _background_tasks: Vec<Task<()>>,
 747}
 748
 749trait CacheableCommand: 'static + Send + Sync {
 750    fn as_any(&self) -> &dyn Any;
 751    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 752    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 753    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 754}
 755
 756impl<T> CacheableCommand for T
 757where
 758    T: DapCommand + PartialEq + Eq + Hash,
 759{
 760    fn as_any(&self) -> &dyn Any {
 761        self
 762    }
 763
 764    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 765        rhs.as_any()
 766            .downcast_ref::<Self>()
 767            .map_or(false, |rhs| self == rhs)
 768    }
 769
 770    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 771        T::hash(self, &mut hasher);
 772    }
 773
 774    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 775        self
 776    }
 777}
 778
 779pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 780
 781impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 782    fn from(request: T) -> Self {
 783        Self(Arc::new(request))
 784    }
 785}
 786
 787impl PartialEq for RequestSlot {
 788    fn eq(&self, other: &Self) -> bool {
 789        self.0.dyn_eq(other.0.as_ref())
 790    }
 791}
 792
 793impl Eq for RequestSlot {}
 794
 795impl Hash for RequestSlot {
 796    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 797        self.0.dyn_hash(state);
 798        self.0.as_any().type_id().hash(state)
 799    }
 800}
 801
 802#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 803pub struct CompletionsQuery {
 804    pub query: String,
 805    pub column: u64,
 806    pub line: Option<u64>,
 807    pub frame_id: Option<u64>,
 808}
 809
 810impl CompletionsQuery {
 811    pub fn new(
 812        buffer: &language::Buffer,
 813        cursor_position: language::Anchor,
 814        frame_id: Option<u64>,
 815    ) -> Self {
 816        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 817        Self {
 818            query: buffer.text(),
 819            column: column as u64,
 820            frame_id,
 821            line: Some(row as u64),
 822        }
 823    }
 824}
 825
 826pub enum SessionEvent {
 827    Modules,
 828    LoadedSources,
 829    Stopped(Option<ThreadId>),
 830    StackTrace,
 831    Variables,
 832    Threads,
 833}
 834
 835pub(crate) enum SessionStateEvent {
 836    Shutdown,
 837}
 838
 839impl EventEmitter<SessionEvent> for Session {}
 840impl EventEmitter<SessionStateEvent> for Session {}
 841
 842// local session will send breakpoint updates to DAP for all new breakpoints
 843// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 844// BreakpointStore notifies session on breakpoint changes
 845impl Session {
 846    pub(crate) fn local(
 847        breakpoint_store: Entity<BreakpointStore>,
 848        session_id: SessionId,
 849        parent_session: Option<Entity<Session>>,
 850        delegate: DapAdapterDelegate,
 851        config: DebugAdapterConfig,
 852        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 853        initialized_tx: oneshot::Sender<()>,
 854        debug_adapters: Arc<DapRegistry>,
 855        cx: &mut App,
 856    ) -> Task<Result<Entity<Self>>> {
 857        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 858
 859        cx.spawn(async move |cx| {
 860            let mode = LocalMode::new(
 861                debug_adapters,
 862                session_id,
 863                parent_session.clone(),
 864                breakpoint_store.clone(),
 865                config.clone(),
 866                delegate,
 867                message_tx,
 868                cx.clone(),
 869            )
 870            .await?;
 871
 872            cx.new(|cx| {
 873                create_local_session(
 874                    breakpoint_store,
 875                    session_id,
 876                    parent_session,
 877                    start_debugging_requests_tx,
 878                    initialized_tx,
 879                    message_rx,
 880                    mode,
 881                    cx,
 882                )
 883            })
 884        })
 885    }
 886
 887    #[cfg(any(test, feature = "test-support"))]
 888    pub(crate) fn fake(
 889        breakpoint_store: Entity<BreakpointStore>,
 890        session_id: SessionId,
 891        parent_session: Option<Entity<Session>>,
 892        delegate: DapAdapterDelegate,
 893        config: DebugAdapterConfig,
 894        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 895        initialized_tx: oneshot::Sender<()>,
 896        caps: Capabilities,
 897        fails: bool,
 898        cx: &mut App,
 899    ) -> Task<Result<Entity<Session>>> {
 900        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 901
 902        cx.spawn(async move |cx| {
 903            let mode = LocalMode::new_fake(
 904                session_id,
 905                parent_session.clone(),
 906                breakpoint_store.clone(),
 907                config.clone(),
 908                delegate,
 909                message_tx,
 910                caps,
 911                fails,
 912                cx.clone(),
 913            )
 914            .await?;
 915
 916            cx.new(|cx| {
 917                create_local_session(
 918                    breakpoint_store,
 919                    session_id,
 920                    parent_session,
 921                    start_debugging_requests_tx,
 922                    initialized_tx,
 923                    message_rx,
 924                    mode,
 925                    cx,
 926                )
 927            })
 928        })
 929    }
 930
 931    pub(crate) fn remote(
 932        session_id: SessionId,
 933        client: AnyProtoClient,
 934        upstream_project_id: u64,
 935        ignore_breakpoints: bool,
 936    ) -> Self {
 937        Self {
 938            mode: Mode::Remote(RemoteConnection {
 939                _client: client,
 940                _upstream_project_id: upstream_project_id,
 941            }),
 942            id: session_id,
 943            child_session_ids: HashSet::default(),
 944            parent_id: None,
 945            capabilities: Capabilities::default(),
 946            ignore_breakpoints,
 947            variables: Default::default(),
 948            stack_frames: Default::default(),
 949            thread_states: ThreadStates::default(),
 950            output_token: OutputToken(0),
 951            output: circular_buffer::CircularBuffer::boxed(),
 952            requests: HashMap::default(),
 953            modules: Vec::default(),
 954            loaded_sources: Vec::default(),
 955            threads: IndexMap::default(),
 956            _background_tasks: Vec::default(),
 957            locations: Default::default(),
 958            is_session_terminated: false,
 959        }
 960    }
 961
 962    pub fn session_id(&self) -> SessionId {
 963        self.id
 964    }
 965
 966    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 967        self.child_session_ids.clone()
 968    }
 969
 970    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 971        self.child_session_ids.insert(session_id);
 972    }
 973
 974    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 975        self.child_session_ids.remove(&session_id);
 976    }
 977
 978    pub fn parent_id(&self) -> Option<SessionId> {
 979        self.parent_id
 980    }
 981
 982    pub fn capabilities(&self) -> &Capabilities {
 983        &self.capabilities
 984    }
 985
 986    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 987        if let Mode::Local(local_mode) = &self.mode {
 988            Some(local_mode.config.clone())
 989        } else {
 990            None
 991        }
 992    }
 993
 994    pub fn is_terminated(&self) -> bool {
 995        self.is_session_terminated
 996    }
 997
 998    pub fn is_local(&self) -> bool {
 999        matches!(self.mode, Mode::Local(_))
1000    }
1001
1002    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1003        match &mut self.mode {
1004            Mode::Local(local_mode) => Some(local_mode),
1005            Mode::Remote(_) => None,
1006        }
1007    }
1008
1009    pub fn as_local(&self) -> Option<&LocalMode> {
1010        match &self.mode {
1011            Mode::Local(local_mode) => Some(local_mode),
1012            Mode::Remote(_) => None,
1013        }
1014    }
1015
1016    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1017        match &self.mode {
1018            Mode::Local(local_mode) => {
1019                let capabilities = local_mode.clone().request_initialization(cx);
1020
1021                cx.spawn(async move |this, cx| {
1022                    let capabilities = capabilities.await?;
1023                    this.update(cx, |session, _| {
1024                        session.capabilities = capabilities;
1025                    })?;
1026                    Ok(())
1027                })
1028            }
1029            Mode::Remote(_) => Task::ready(Err(anyhow!(
1030                "Cannot send initialize request from remote session"
1031            ))),
1032        }
1033    }
1034
1035    pub(super) fn initialize_sequence(
1036        &mut self,
1037        initialize_rx: oneshot::Receiver<()>,
1038        cx: &mut Context<Self>,
1039    ) -> Task<Result<()>> {
1040        match &self.mode {
1041            Mode::Local(local_mode) => {
1042                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1043            }
1044            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1045        }
1046    }
1047
1048    pub fn output(
1049        &self,
1050        since: OutputToken,
1051    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1052        if self.output_token.0 == 0 {
1053            return (self.output.range(0..0), OutputToken(0));
1054        };
1055
1056        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1057
1058        let clamped_events_since = events_since.clamp(0, self.output.len());
1059        (
1060            self.output
1061                .range(self.output.len() - clamped_events_since..),
1062            self.output_token,
1063        )
1064    }
1065
1066    pub fn respond_to_client(
1067        &self,
1068        request_seq: u64,
1069        success: bool,
1070        command: String,
1071        body: Option<serde_json::Value>,
1072        cx: &mut Context<Self>,
1073    ) -> Task<Result<()>> {
1074        let Some(local_session) = self.as_local().cloned() else {
1075            unreachable!("Cannot respond to remote client");
1076        };
1077
1078        cx.background_spawn(async move {
1079            local_session
1080                .client
1081                .send_message(Message::Response(Response {
1082                    body,
1083                    success,
1084                    command,
1085                    seq: request_seq + 1,
1086                    request_seq,
1087                    message: None,
1088                }))
1089                .await
1090        })
1091    }
1092
1093    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1094        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1095            self.thread_states.stop_all_threads();
1096
1097            self.invalidate_command_type::<StackTraceCommand>();
1098        }
1099
1100        // Event if we stopped all threads we still need to insert the thread_id
1101        // to our own data
1102        if let Some(thread_id) = event.thread_id {
1103            self.thread_states.stop_thread(ThreadId(thread_id));
1104
1105            self.invalidate_state(
1106                &StackTraceCommand {
1107                    thread_id,
1108                    start_frame: None,
1109                    levels: None,
1110                }
1111                .into(),
1112            );
1113        }
1114
1115        self.invalidate_generic();
1116        self.threads.clear();
1117        self.variables.clear();
1118        cx.emit(SessionEvent::Stopped(
1119            event
1120                .thread_id
1121                .map(Into::into)
1122                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1123        ));
1124        cx.notify();
1125    }
1126
1127    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1128        match *event {
1129            Events::Initialized(_) => {
1130                debug_assert!(
1131                    false,
1132                    "Initialized event should have been handled in LocalMode"
1133                );
1134            }
1135            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1136            Events::Continued(event) => {
1137                if event.all_threads_continued.unwrap_or_default() {
1138                    self.thread_states.continue_all_threads();
1139                } else {
1140                    self.thread_states
1141                        .continue_thread(ThreadId(event.thread_id));
1142                }
1143                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1144                self.invalidate_generic();
1145            }
1146            Events::Exited(_event) => {
1147                self.clear_active_debug_line(cx);
1148            }
1149            Events::Terminated(_) => {
1150                self.is_session_terminated = true;
1151                self.clear_active_debug_line(cx);
1152            }
1153            Events::Thread(event) => {
1154                let thread_id = ThreadId(event.thread_id);
1155
1156                match event.reason {
1157                    dap::ThreadEventReason::Started => {
1158                        self.thread_states.continue_thread(thread_id);
1159                    }
1160                    dap::ThreadEventReason::Exited => {
1161                        self.thread_states.exit_thread(thread_id);
1162                    }
1163                    reason => {
1164                        log::error!("Unhandled thread event reason {:?}", reason);
1165                    }
1166                }
1167                self.invalidate_state(&ThreadsCommand.into());
1168                cx.notify();
1169            }
1170            Events::Output(event) => {
1171                if event
1172                    .category
1173                    .as_ref()
1174                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1175                {
1176                    return;
1177                }
1178
1179                self.output.push_back(event);
1180                self.output_token.0 += 1;
1181                cx.notify();
1182            }
1183            Events::Breakpoint(_) => {}
1184            Events::Module(event) => {
1185                match event.reason {
1186                    dap::ModuleEventReason::New => {
1187                        self.modules.push(event.module);
1188                    }
1189                    dap::ModuleEventReason::Changed => {
1190                        if let Some(module) = self
1191                            .modules
1192                            .iter_mut()
1193                            .find(|other| event.module.id == other.id)
1194                        {
1195                            *module = event.module;
1196                        }
1197                    }
1198                    dap::ModuleEventReason::Removed => {
1199                        self.modules.retain(|other| event.module.id != other.id);
1200                    }
1201                }
1202
1203                // todo(debugger): We should only send the invalidate command to downstream clients.
1204                // self.invalidate_state(&ModulesCommand.into());
1205            }
1206            Events::LoadedSource(_) => {
1207                self.invalidate_state(&LoadedSourcesCommand.into());
1208            }
1209            Events::Capabilities(event) => {
1210                self.capabilities = self.capabilities.merge(event.capabilities);
1211                cx.notify();
1212            }
1213            Events::Memory(_) => {}
1214            Events::Process(_) => {}
1215            Events::ProgressEnd(_) => {}
1216            Events::ProgressStart(_) => {}
1217            Events::ProgressUpdate(_) => {}
1218            Events::Invalidated(_) => {}
1219            Events::Other(_) => {}
1220        }
1221    }
1222
1223    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1224    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1225        &mut self,
1226        request: T,
1227        process_result: impl FnOnce(
1228            &mut Self,
1229            Result<T::Response>,
1230            &mut Context<Self>,
1231        ) -> Option<T::Response>
1232        + 'static,
1233        cx: &mut Context<Self>,
1234    ) {
1235        const {
1236            assert!(
1237                T::CACHEABLE,
1238                "Only requests marked as cacheable should invoke `fetch`"
1239            );
1240        }
1241
1242        if !self.thread_states.any_stopped_thread()
1243            && request.type_id() != TypeId::of::<ThreadsCommand>()
1244            || self.is_session_terminated
1245        {
1246            return;
1247        }
1248
1249        let request_map = self
1250            .requests
1251            .entry(std::any::TypeId::of::<T>())
1252            .or_default();
1253
1254        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1255            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1256
1257            let task = Self::request_inner::<Arc<T>>(
1258                &self.capabilities,
1259                self.id,
1260                &self.mode,
1261                command,
1262                process_result,
1263                cx,
1264            );
1265            let task = cx
1266                .background_executor()
1267                .spawn(async move {
1268                    let _ = task.await?;
1269                    Some(())
1270                })
1271                .shared();
1272
1273            vacant.insert(task);
1274            cx.notify();
1275        }
1276    }
1277
1278    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1279        capabilities: &Capabilities,
1280        session_id: SessionId,
1281        mode: &Mode,
1282        request: T,
1283        process_result: impl FnOnce(
1284            &mut Self,
1285            Result<T::Response>,
1286            &mut Context<Self>,
1287        ) -> Option<T::Response>
1288        + 'static,
1289        cx: &mut Context<Self>,
1290    ) -> Task<Option<T::Response>> {
1291        if !T::is_supported(&capabilities) {
1292            log::warn!(
1293                "Attempted to send a DAP request that isn't supported: {:?}",
1294                request
1295            );
1296            let error = Err(anyhow::Error::msg(
1297                "Couldn't complete request because it's not supported",
1298            ));
1299            return cx.spawn(async move |this, cx| {
1300                this.update(cx, |this, cx| process_result(this, error, cx))
1301                    .log_err()
1302                    .flatten()
1303            });
1304        }
1305
1306        let request = mode.request_dap(session_id, request, cx);
1307        cx.spawn(async move |this, cx| {
1308            let result = request.await;
1309            this.update(cx, |this, cx| process_result(this, result, cx))
1310                .log_err()
1311                .flatten()
1312        })
1313    }
1314
1315    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1316        &self,
1317        request: T,
1318        process_result: impl FnOnce(
1319            &mut Self,
1320            Result<T::Response>,
1321            &mut Context<Self>,
1322        ) -> Option<T::Response>
1323        + 'static,
1324        cx: &mut Context<Self>,
1325    ) -> Task<Option<T::Response>> {
1326        Self::request_inner(
1327            &self.capabilities,
1328            self.id,
1329            &self.mode,
1330            request,
1331            process_result,
1332            cx,
1333        )
1334    }
1335
1336    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1337        self.requests.remove(&std::any::TypeId::of::<Command>());
1338    }
1339
1340    fn invalidate_generic(&mut self) {
1341        self.invalidate_command_type::<ModulesCommand>();
1342        self.invalidate_command_type::<LoadedSourcesCommand>();
1343        self.invalidate_command_type::<ThreadsCommand>();
1344    }
1345
1346    fn invalidate_state(&mut self, key: &RequestSlot) {
1347        self.requests
1348            .entry(key.0.as_any().type_id())
1349            .and_modify(|request_map| {
1350                request_map.remove(&key);
1351            });
1352    }
1353
1354    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1355        self.thread_states.thread_status(thread_id)
1356    }
1357
1358    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1359        self.fetch(
1360            dap_command::ThreadsCommand,
1361            |this, result, cx| {
1362                let result = result.log_err()?;
1363
1364                this.threads = result
1365                    .iter()
1366                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1367                    .collect();
1368
1369                this.invalidate_command_type::<StackTraceCommand>();
1370                cx.emit(SessionEvent::Threads);
1371                cx.notify();
1372
1373                Some(result)
1374            },
1375            cx,
1376        );
1377
1378        self.threads
1379            .values()
1380            .map(|thread| {
1381                (
1382                    thread.dap.clone(),
1383                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1384                )
1385            })
1386            .collect()
1387    }
1388
1389    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1390        self.fetch(
1391            dap_command::ModulesCommand,
1392            |this, result, cx| {
1393                let result = result.log_err()?;
1394
1395                this.modules = result.iter().cloned().collect();
1396                cx.emit(SessionEvent::Modules);
1397                cx.notify();
1398
1399                Some(result)
1400            },
1401            cx,
1402        );
1403
1404        &self.modules
1405    }
1406
1407    pub fn ignore_breakpoints(&self) -> bool {
1408        self.ignore_breakpoints
1409    }
1410
1411    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1412        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1413    }
1414
1415    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1416        if self.ignore_breakpoints == ignore {
1417            return Task::ready(());
1418        }
1419
1420        self.ignore_breakpoints = ignore;
1421
1422        if let Some(local) = self.as_local() {
1423            local.send_all_breakpoints(ignore, cx)
1424        } else {
1425            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1426            unimplemented!()
1427        }
1428    }
1429
1430    pub fn breakpoints_enabled(&self) -> bool {
1431        self.ignore_breakpoints
1432    }
1433
1434    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1435        self.fetch(
1436            dap_command::LoadedSourcesCommand,
1437            |this, result, cx| {
1438                let result = result.log_err()?;
1439                this.loaded_sources = result.iter().cloned().collect();
1440                cx.emit(SessionEvent::LoadedSources);
1441                cx.notify();
1442                Some(result)
1443            },
1444            cx,
1445        );
1446
1447        &self.loaded_sources
1448    }
1449
1450    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1451        res.log_err()?;
1452        Some(())
1453    }
1454
1455    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1456        thread_id: ThreadId,
1457    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1458    {
1459        move |this, response, cx| match response.log_err() {
1460            Some(response) => Some(response),
1461            None => {
1462                this.thread_states.stop_thread(thread_id);
1463                cx.notify();
1464                None
1465            }
1466        }
1467    }
1468
1469    fn clear_active_debug_line_response(
1470        &mut self,
1471        response: Result<()>,
1472        cx: &mut Context<Session>,
1473    ) -> Option<()> {
1474        response.log_err()?;
1475        self.clear_active_debug_line(cx);
1476        Some(())
1477    }
1478
1479    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1480        self.as_local()
1481            .expect("Message handler will only run in local mode")
1482            .breakpoint_store
1483            .update(cx, |store, cx| {
1484                store.remove_active_position(Some(self.id), cx)
1485            });
1486    }
1487
1488    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1489        self.request(
1490            PauseCommand {
1491                thread_id: thread_id.0,
1492            },
1493            Self::empty_response,
1494            cx,
1495        )
1496        .detach();
1497    }
1498
1499    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1500        self.request(
1501            RestartStackFrameCommand { stack_frame_id },
1502            Self::empty_response,
1503            cx,
1504        )
1505        .detach();
1506    }
1507
1508    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1509        if self.capabilities.supports_restart_request.unwrap_or(false) {
1510            self.request(
1511                RestartCommand {
1512                    raw: args.unwrap_or(Value::Null),
1513                },
1514                Self::empty_response,
1515                cx,
1516            )
1517            .detach();
1518        } else {
1519            self.request(
1520                DisconnectCommand {
1521                    restart: Some(false),
1522                    terminate_debuggee: Some(true),
1523                    suspend_debuggee: Some(false),
1524                },
1525                Self::empty_response,
1526                cx,
1527            )
1528            .detach();
1529        }
1530    }
1531
1532    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1533        self.is_session_terminated = true;
1534        self.thread_states.exit_all_threads();
1535        cx.notify();
1536
1537        let task = if self
1538            .capabilities
1539            .supports_terminate_request
1540            .unwrap_or_default()
1541        {
1542            self.request(
1543                TerminateCommand {
1544                    restart: Some(false),
1545                },
1546                Self::clear_active_debug_line_response,
1547                cx,
1548            )
1549        } else {
1550            self.request(
1551                DisconnectCommand {
1552                    restart: Some(false),
1553                    terminate_debuggee: Some(true),
1554                    suspend_debuggee: Some(false),
1555                },
1556                Self::clear_active_debug_line_response,
1557                cx,
1558            )
1559        };
1560
1561        cx.emit(SessionStateEvent::Shutdown);
1562
1563        cx.background_spawn(async move {
1564            let _ = task.await;
1565        })
1566    }
1567
1568    pub fn completions(
1569        &mut self,
1570        query: CompletionsQuery,
1571        cx: &mut Context<Self>,
1572    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1573        let task = self.request(query, |_, result, _| result.log_err(), cx);
1574
1575        cx.background_executor().spawn(async move {
1576            anyhow::Ok(
1577                task.await
1578                    .map(|response| response.targets)
1579                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1580            )
1581        })
1582    }
1583
1584    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1585        self.thread_states.continue_thread(thread_id);
1586        self.request(
1587            ContinueCommand {
1588                args: ContinueArguments {
1589                    thread_id: thread_id.0,
1590                    single_thread: Some(true),
1591                },
1592            },
1593            Self::on_step_response::<ContinueCommand>(thread_id),
1594            cx,
1595        )
1596        .detach();
1597    }
1598
1599    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1600        match self.mode {
1601            Mode::Local(ref local) => Some(local.client.clone()),
1602            Mode::Remote(_) => None,
1603        }
1604    }
1605
1606    pub fn step_over(
1607        &mut self,
1608        thread_id: ThreadId,
1609        granularity: SteppingGranularity,
1610        cx: &mut Context<Self>,
1611    ) {
1612        let supports_single_thread_execution_requests =
1613            self.capabilities.supports_single_thread_execution_requests;
1614        let supports_stepping_granularity = self
1615            .capabilities
1616            .supports_stepping_granularity
1617            .unwrap_or_default();
1618
1619        let command = NextCommand {
1620            inner: StepCommand {
1621                thread_id: thread_id.0,
1622                granularity: supports_stepping_granularity.then(|| granularity),
1623                single_thread: supports_single_thread_execution_requests,
1624            },
1625        };
1626
1627        self.thread_states.process_step(thread_id);
1628        self.request(
1629            command,
1630            Self::on_step_response::<NextCommand>(thread_id),
1631            cx,
1632        )
1633        .detach();
1634    }
1635
1636    pub fn step_in(
1637        &mut self,
1638        thread_id: ThreadId,
1639        granularity: SteppingGranularity,
1640        cx: &mut Context<Self>,
1641    ) {
1642        let supports_single_thread_execution_requests =
1643            self.capabilities.supports_single_thread_execution_requests;
1644        let supports_stepping_granularity = self
1645            .capabilities
1646            .supports_stepping_granularity
1647            .unwrap_or_default();
1648
1649        let command = StepInCommand {
1650            inner: StepCommand {
1651                thread_id: thread_id.0,
1652                granularity: supports_stepping_granularity.then(|| granularity),
1653                single_thread: supports_single_thread_execution_requests,
1654            },
1655        };
1656
1657        self.thread_states.process_step(thread_id);
1658        self.request(
1659            command,
1660            Self::on_step_response::<StepInCommand>(thread_id),
1661            cx,
1662        )
1663        .detach();
1664    }
1665
1666    pub fn step_out(
1667        &mut self,
1668        thread_id: ThreadId,
1669        granularity: SteppingGranularity,
1670        cx: &mut Context<Self>,
1671    ) {
1672        let supports_single_thread_execution_requests =
1673            self.capabilities.supports_single_thread_execution_requests;
1674        let supports_stepping_granularity = self
1675            .capabilities
1676            .supports_stepping_granularity
1677            .unwrap_or_default();
1678
1679        let command = StepOutCommand {
1680            inner: StepCommand {
1681                thread_id: thread_id.0,
1682                granularity: supports_stepping_granularity.then(|| granularity),
1683                single_thread: supports_single_thread_execution_requests,
1684            },
1685        };
1686
1687        self.thread_states.process_step(thread_id);
1688        self.request(
1689            command,
1690            Self::on_step_response::<StepOutCommand>(thread_id),
1691            cx,
1692        )
1693        .detach();
1694    }
1695
1696    pub fn step_back(
1697        &mut self,
1698        thread_id: ThreadId,
1699        granularity: SteppingGranularity,
1700        cx: &mut Context<Self>,
1701    ) {
1702        let supports_single_thread_execution_requests =
1703            self.capabilities.supports_single_thread_execution_requests;
1704        let supports_stepping_granularity = self
1705            .capabilities
1706            .supports_stepping_granularity
1707            .unwrap_or_default();
1708
1709        let command = StepBackCommand {
1710            inner: StepCommand {
1711                thread_id: thread_id.0,
1712                granularity: supports_stepping_granularity.then(|| granularity),
1713                single_thread: supports_single_thread_execution_requests,
1714            },
1715        };
1716
1717        self.thread_states.process_step(thread_id);
1718
1719        self.request(
1720            command,
1721            Self::on_step_response::<StepBackCommand>(thread_id),
1722            cx,
1723        )
1724        .detach();
1725    }
1726
1727    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1728        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1729            && self.requests.contains_key(&ThreadsCommand.type_id())
1730            && self.threads.contains_key(&thread_id)
1731        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1732        // We could still be using an old thread id and have sent a new thread's request
1733        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1734        // But it very well could cause a minor bug in the future that is hard to track down
1735        {
1736            self.fetch(
1737                super::dap_command::StackTraceCommand {
1738                    thread_id: thread_id.0,
1739                    start_frame: None,
1740                    levels: None,
1741                },
1742                move |this, stack_frames, cx| {
1743                    let stack_frames = stack_frames.log_err()?;
1744
1745                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1746                        thread.stack_frame_ids =
1747                            stack_frames.iter().map(|frame| frame.id).collect();
1748                    });
1749                    debug_assert!(
1750                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1751                        "Sent request for thread_id that doesn't exist"
1752                    );
1753
1754                    this.stack_frames.extend(
1755                        stack_frames
1756                            .iter()
1757                            .cloned()
1758                            .map(|frame| (frame.id, StackFrame::from(frame))),
1759                    );
1760
1761                    this.invalidate_command_type::<ScopesCommand>();
1762                    this.invalidate_command_type::<VariablesCommand>();
1763
1764                    cx.emit(SessionEvent::StackTrace);
1765                    cx.notify();
1766                    Some(stack_frames)
1767                },
1768                cx,
1769            );
1770        }
1771
1772        self.threads
1773            .get(&thread_id)
1774            .map(|thread| {
1775                thread
1776                    .stack_frame_ids
1777                    .iter()
1778                    .filter_map(|id| self.stack_frames.get(id))
1779                    .cloned()
1780                    .collect()
1781            })
1782            .unwrap_or_default()
1783    }
1784
1785    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1786        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1787            && self
1788                .requests
1789                .contains_key(&TypeId::of::<StackTraceCommand>())
1790        {
1791            self.fetch(
1792                ScopesCommand { stack_frame_id },
1793                move |this, scopes, cx| {
1794                    let scopes = scopes.log_err()?;
1795
1796                    for scope in scopes .iter(){
1797                        this.variables(scope.variables_reference, cx);
1798                    }
1799
1800                    let entry = this
1801                        .stack_frames
1802                        .entry(stack_frame_id)
1803                        .and_modify(|stack_frame| {
1804                            stack_frame.scopes = scopes.clone();
1805                        });
1806
1807                    cx.emit(SessionEvent::Variables);
1808
1809                    debug_assert!(
1810                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1811                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1812                    );
1813
1814                    Some(scopes)
1815                },
1816                cx,
1817            );
1818        }
1819
1820        self.stack_frames
1821            .get(&stack_frame_id)
1822            .map(|frame| frame.scopes.as_slice())
1823            .unwrap_or_default()
1824    }
1825
1826    pub fn variables(
1827        &mut self,
1828        variables_reference: VariableReference,
1829        cx: &mut Context<Self>,
1830    ) -> Vec<dap::Variable> {
1831        let command = VariablesCommand {
1832            variables_reference,
1833            filter: None,
1834            start: None,
1835            count: None,
1836            format: None,
1837        };
1838
1839        self.fetch(
1840            command,
1841            move |this, variables, cx| {
1842                let variables = variables.log_err()?;
1843                this.variables
1844                    .insert(variables_reference, variables.clone());
1845
1846                cx.emit(SessionEvent::Variables);
1847                Some(variables)
1848            },
1849            cx,
1850        );
1851
1852        self.variables
1853            .get(&variables_reference)
1854            .cloned()
1855            .unwrap_or_default()
1856    }
1857
1858    pub fn set_variable_value(
1859        &mut self,
1860        variables_reference: u64,
1861        name: String,
1862        value: String,
1863        cx: &mut Context<Self>,
1864    ) {
1865        if self.capabilities.supports_set_variable.unwrap_or_default() {
1866            self.request(
1867                SetVariableValueCommand {
1868                    name,
1869                    value,
1870                    variables_reference,
1871                },
1872                move |this, response, cx| {
1873                    let response = response.log_err()?;
1874                    this.invalidate_command_type::<VariablesCommand>();
1875                    cx.notify();
1876                    Some(response)
1877                },
1878                cx,
1879            )
1880            .detach()
1881        }
1882    }
1883
1884    pub fn evaluate(
1885        &mut self,
1886        expression: String,
1887        context: Option<EvaluateArgumentsContext>,
1888        frame_id: Option<u64>,
1889        source: Option<Source>,
1890        cx: &mut Context<Self>,
1891    ) {
1892        self.request(
1893            EvaluateCommand {
1894                expression,
1895                context,
1896                frame_id,
1897                source,
1898            },
1899            |this, response, cx| {
1900                let response = response.log_err()?;
1901                this.output_token.0 += 1;
1902                this.output.push_back(dap::OutputEvent {
1903                    category: None,
1904                    output: response.result.clone(),
1905                    group: None,
1906                    variables_reference: Some(response.variables_reference),
1907                    source: None,
1908                    line: None,
1909                    column: None,
1910                    data: None,
1911                    location_reference: None,
1912                });
1913
1914                this.invalidate_command_type::<ScopesCommand>();
1915                cx.notify();
1916                Some(response)
1917            },
1918            cx,
1919        )
1920        .detach();
1921    }
1922
1923    pub fn location(
1924        &mut self,
1925        reference: u64,
1926        cx: &mut Context<Self>,
1927    ) -> Option<dap::LocationsResponse> {
1928        self.fetch(
1929            LocationsCommand { reference },
1930            move |this, response, _| {
1931                let response = response.log_err()?;
1932                this.locations.insert(reference, response.clone());
1933                Some(response)
1934            },
1935            cx,
1936        );
1937        self.locations.get(&reference).cloned()
1938    }
1939    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1940        let command = DisconnectCommand {
1941            restart: Some(false),
1942            terminate_debuggee: Some(true),
1943            suspend_debuggee: Some(false),
1944        };
1945
1946        self.request(command, Self::empty_response, cx).detach()
1947    }
1948
1949    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1950        if self
1951            .capabilities
1952            .supports_terminate_threads_request
1953            .unwrap_or_default()
1954        {
1955            self.request(
1956                TerminateThreadsCommand {
1957                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1958                },
1959                Self::clear_active_debug_line_response,
1960                cx,
1961            )
1962            .detach();
1963        } else {
1964            self.shutdown(cx).detach();
1965        }
1966    }
1967}
1968
1969fn create_local_session(
1970    breakpoint_store: Entity<BreakpointStore>,
1971    session_id: SessionId,
1972    parent_session: Option<Entity<Session>>,
1973    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1974    initialized_tx: oneshot::Sender<()>,
1975    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1976    mode: LocalMode,
1977    cx: &mut Context<Session>,
1978) -> Session {
1979    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1980        let mut initialized_tx = Some(initialized_tx);
1981        while let Some(message) = message_rx.next().await {
1982            if let Message::Event(event) = message {
1983                if let Events::Initialized(_) = *event {
1984                    if let Some(tx) = initialized_tx.take() {
1985                        tx.send(()).ok();
1986                    }
1987                } else {
1988                    let Ok(_) = this.update(cx, |session, cx| {
1989                        session.handle_dap_event(event, cx);
1990                    }) else {
1991                        break;
1992                    };
1993                }
1994            } else {
1995                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1996                else {
1997                    break;
1998                };
1999            }
2000        }
2001    })];
2002
2003    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2004        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2005            if let Some(local) = (!this.ignore_breakpoints)
2006                .then(|| this.as_local_mut())
2007                .flatten()
2008            {
2009                local
2010                    .send_breakpoints_from_path(path.clone(), *reason, cx)
2011                    .detach();
2012            };
2013        }
2014        BreakpointStoreEvent::BreakpointsCleared(paths) => {
2015            if let Some(local) = (!this.ignore_breakpoints)
2016                .then(|| this.as_local_mut())
2017                .flatten()
2018            {
2019                local.unset_breakpoints_from_paths(paths, cx).detach();
2020            }
2021        }
2022        BreakpointStoreEvent::ActiveDebugLineChanged => {}
2023    })
2024    .detach();
2025
2026    Session {
2027        mode: Mode::Local(mode),
2028        id: session_id,
2029        child_session_ids: HashSet::default(),
2030        parent_id: parent_session.map(|session| session.read(cx).id),
2031        variables: Default::default(),
2032        capabilities: Capabilities::default(),
2033        thread_states: ThreadStates::default(),
2034        output_token: OutputToken(0),
2035        ignore_breakpoints: false,
2036        output: circular_buffer::CircularBuffer::boxed(),
2037        requests: HashMap::default(),
2038        modules: Vec::default(),
2039        loaded_sources: Vec::default(),
2040        threads: IndexMap::default(),
2041        stack_frames: IndexMap::default(),
2042        locations: Default::default(),
2043        _background_tasks,
2044        is_session_terminated: false,
2045    }
2046}