session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    adapters::{DapDelegate, DapStatus},
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
  25use futures::channel::oneshot;
  26use futures::{FutureExt, future::Shared};
  27use gpui::{
  28    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  29};
  30use rpc::AnyProtoClient;
  31use serde_json::{Value, json};
  32use settings::Settings;
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::path::PathBuf;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::{DebugAdapterConfig, DebugTaskDefinition};
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{ResultExt, merge_json_value_into};
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117type UpstreamProjectId = u64;
 118
 119struct RemoteConnection {
 120    _client: AnyProtoClient,
 121    _upstream_project_id: UpstreamProjectId,
 122}
 123
 124impl RemoteConnection {
 125    fn send_proto_client_request<R: DapCommand>(
 126        &self,
 127        _request: R,
 128        _session_id: SessionId,
 129        cx: &mut App,
 130    ) -> Task<Result<R::Response>> {
 131        // let message = request.to_proto(session_id, self.upstream_project_id);
 132        // let upstream_client = self.client.clone();
 133        cx.background_executor().spawn(async move {
 134            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 135            // let response = upstream_client.request(message).await?;
 136            // request.response_from_proto(response)
 137            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 138        })
 139    }
 140
 141    fn request<R: DapCommand>(
 142        &self,
 143        request: R,
 144        session_id: SessionId,
 145        cx: &mut App,
 146    ) -> Task<Result<R::Response>>
 147    where
 148        <R::DapRequest as dap::requests::Request>::Response: 'static,
 149        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 150    {
 151        return self.send_proto_client_request::<R>(request, session_id, cx);
 152    }
 153}
 154
 155enum Mode {
 156    Local(LocalMode),
 157    Remote(RemoteConnection),
 158}
 159
 160#[derive(Clone)]
 161pub struct LocalMode {
 162    client: Arc<DebugAdapterClient>,
 163    config: DebugAdapterConfig,
 164    adapter: Arc<dyn DebugAdapter>,
 165    breakpoint_store: Entity<BreakpointStore>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        debug_adapters: Arc<DapRegistry>,
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        Self::new_inner(
 195            debug_adapters,
 196            session_id,
 197            parent_session,
 198            breakpoint_store,
 199            config,
 200            delegate,
 201            messages_tx,
 202            async |_, _| {},
 203            cx,
 204        )
 205    }
 206    #[cfg(any(test, feature = "test-support"))]
 207    fn new_fake(
 208        session_id: SessionId,
 209        parent_session: Option<Entity<Session>>,
 210        breakpoint_store: Entity<BreakpointStore>,
 211        config: DebugAdapterConfig,
 212        delegate: DapAdapterDelegate,
 213        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 214        caps: Capabilities,
 215        fail: bool,
 216        cx: AsyncApp,
 217    ) -> Task<Result<(Self, Capabilities)>> {
 218        use task::DebugRequestDisposition;
 219
 220        let request = match config.request.clone() {
 221            DebugRequestDisposition::UserConfigured(request) => request,
 222            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 223                match reverse_request_args.request {
 224                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 225                        DebugRequestType::Launch(task::LaunchConfig {
 226                            program: "".to_owned(),
 227                            cwd: None,
 228                            args: Default::default(),
 229                        })
 230                    }
 231                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 232                        DebugRequestType::Attach(task::AttachConfig {
 233                            process_id: Some(0),
 234                        })
 235                    }
 236                }
 237            }
 238        };
 239
 240        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 241            session
 242                .client
 243                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 244                .await;
 245
 246            let paths = cx
 247                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 248                .expect("Breakpoint store should exist in all tests that start debuggers");
 249
 250            session
 251                .client
 252                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 253                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 254                    if !paths.contains(&p) {
 255                        panic!("Sent breakpoints for path without any")
 256                    }
 257
 258                    Ok(dap::SetBreakpointsResponse {
 259                        breakpoints: Vec::default(),
 260                    })
 261                })
 262                .await;
 263
 264            match request {
 265                dap::DebugRequestType::Launch(_) => {
 266                    if fail {
 267                        session
 268                            .client
 269                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 270                                Err(dap::ErrorResponse {
 271                                    error: Some(dap::Message {
 272                                        id: 1,
 273                                        format: "error".into(),
 274                                        variables: None,
 275                                        send_telemetry: None,
 276                                        show_user: None,
 277                                        url: None,
 278                                        url_label: None,
 279                                    }),
 280                                })
 281                            })
 282                            .await;
 283                    } else {
 284                        session
 285                            .client
 286                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 287                            .await;
 288                    }
 289                }
 290                dap::DebugRequestType::Attach(attach_config) => {
 291                    if fail {
 292                        session
 293                            .client
 294                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 295                                Err(dap::ErrorResponse {
 296                                    error: Some(dap::Message {
 297                                        id: 1,
 298                                        format: "error".into(),
 299                                        variables: None,
 300                                        send_telemetry: None,
 301                                        show_user: None,
 302                                        url: None,
 303                                        url_label: None,
 304                                    }),
 305                                })
 306                            })
 307                            .await;
 308                    } else {
 309                        session
 310                            .client
 311                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 312                                assert_eq!(
 313                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 314                                    args.raw
 315                                );
 316
 317                                Ok(())
 318                            })
 319                            .await;
 320                    }
 321                }
 322            }
 323
 324            session
 325                .client
 326                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 327                .await;
 328            session.client.fake_event(Events::Initialized(None)).await;
 329        };
 330        Self::new_inner(
 331            DapRegistry::fake().into(),
 332            session_id,
 333            parent_session,
 334            breakpoint_store,
 335            config,
 336            delegate,
 337            messages_tx,
 338            callback,
 339            cx,
 340        )
 341    }
 342    fn new_inner(
 343        registry: Arc<DapRegistry>,
 344        session_id: SessionId,
 345        parent_session: Option<Entity<Session>>,
 346        breakpoint_store: Entity<BreakpointStore>,
 347        config: DebugAdapterConfig,
 348        delegate: DapAdapterDelegate,
 349        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 350        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 351        cx: AsyncApp,
 352    ) -> Task<Result<(Self, Capabilities)>> {
 353        cx.spawn(async move |cx| {
 354            let (adapter, binary) =
 355                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 356
 357            let message_handler = Box::new(move |message| {
 358                messages_tx.unbounded_send(message).ok();
 359            });
 360
 361            let client = Arc::new(
 362                if let Some(client) = parent_session
 363                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 364                    .flatten()
 365                {
 366                    client
 367                        .reconnect(session_id, binary, message_handler, cx.clone())
 368                        .await?
 369                } else {
 370                    DebugAdapterClient::start(
 371                        session_id,
 372                        adapter.name(),
 373                        binary,
 374                        message_handler,
 375                        cx.clone(),
 376                    )
 377                    .await?
 378                },
 379            );
 380
 381            let adapter_id = adapter.name().to_string().to_owned();
 382            let mut session = Self {
 383                client,
 384                adapter,
 385                breakpoint_store,
 386                config: config.clone(),
 387            };
 388
 389            on_initialized(&mut session, cx.clone()).await;
 390            let capabilities = session
 391                .request(Initialize { adapter_id }, cx.background_executor().clone())
 392                .await?;
 393
 394            Ok((session, capabilities))
 395        })
 396    }
 397
 398    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 399        let tasks: Vec<_> = paths
 400            .into_iter()
 401            .map(|path| {
 402                self.request(
 403                    dap_command::SetBreakpoints {
 404                        source: client_source(path),
 405                        source_modified: None,
 406                        breakpoints: vec![],
 407                    },
 408                    cx.background_executor().clone(),
 409                )
 410            })
 411            .collect();
 412
 413        cx.background_spawn(async move {
 414            futures::future::join_all(tasks)
 415                .await
 416                .iter()
 417                .for_each(|res| match res {
 418                    Ok(_) => {}
 419                    Err(err) => {
 420                        log::warn!("Set breakpoints request failed: {}", err);
 421                    }
 422                });
 423        })
 424    }
 425
 426    fn send_breakpoints_from_path(
 427        &self,
 428        abs_path: Arc<Path>,
 429        reason: BreakpointUpdatedReason,
 430        cx: &mut App,
 431    ) -> Task<()> {
 432        let breakpoints = self
 433            .breakpoint_store
 434            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 435            .into_iter()
 436            .filter(|bp| bp.state.is_enabled())
 437            .map(Into::into)
 438            .collect();
 439
 440        let task = self.request(
 441            dap_command::SetBreakpoints {
 442                source: client_source(&abs_path),
 443                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 444                breakpoints,
 445            },
 446            cx.background_executor().clone(),
 447        );
 448
 449        cx.background_spawn(async move {
 450            match task.await {
 451                Ok(_) => {}
 452                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 453            }
 454        })
 455    }
 456
 457    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 458        let mut breakpoint_tasks = Vec::new();
 459        let breakpoints = self
 460            .breakpoint_store
 461            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 462
 463        for (path, breakpoints) in breakpoints {
 464            let breakpoints = if ignore_breakpoints {
 465                vec![]
 466            } else {
 467                breakpoints
 468                    .into_iter()
 469                    .filter(|bp| bp.state.is_enabled())
 470                    .map(Into::into)
 471                    .collect()
 472            };
 473
 474            breakpoint_tasks.push(self.request(
 475                dap_command::SetBreakpoints {
 476                    source: client_source(&path),
 477                    source_modified: Some(false),
 478                    breakpoints,
 479                },
 480                cx.background_executor().clone(),
 481            ));
 482        }
 483
 484        cx.background_spawn(async move {
 485            futures::future::join_all(breakpoint_tasks)
 486                .await
 487                .iter()
 488                .for_each(|res| match res {
 489                    Ok(_) => {}
 490                    Err(err) => {
 491                        log::warn!("Set breakpoints request failed: {}", err);
 492                    }
 493                });
 494        })
 495    }
 496
 497    async fn get_adapter_binary(
 498        registry: &Arc<DapRegistry>,
 499        config: &DebugAdapterConfig,
 500        delegate: &DapAdapterDelegate,
 501        cx: &mut AsyncApp,
 502    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 503        let adapter = registry
 504            .adapter(&config.adapter)
 505            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 506
 507        let binary = cx.update(|cx| {
 508            ProjectSettings::get_global(cx)
 509                .dap
 510                .get(&adapter.name())
 511                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 512        })?;
 513
 514        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 515            Err(error) => {
 516                delegate.update_status(
 517                    adapter.name(),
 518                    DapStatus::Failed {
 519                        error: error.to_string(),
 520                    },
 521                );
 522
 523                return Err(error);
 524            }
 525            Ok(mut binary) => {
 526                delegate.update_status(adapter.name(), DapStatus::None);
 527
 528                let shell_env = delegate.shell_env().await;
 529                let mut envs = binary.envs.unwrap_or_default();
 530                envs.extend(shell_env);
 531                binary.envs = Some(envs);
 532
 533                binary
 534            }
 535        };
 536
 537        Ok((adapter, binary))
 538    }
 539
 540    pub fn label(&self) -> String {
 541        self.config.label.clone()
 542    }
 543
 544    fn initialize_sequence(
 545        &self,
 546        capabilities: &Capabilities,
 547        initialized_rx: oneshot::Receiver<()>,
 548        cx: &App,
 549    ) -> Task<Result<()>> {
 550        let (mut raw, is_launch) = match &self.config.request {
 551            task::DebugRequestDisposition::UserConfigured(_) => {
 552                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 553                    debug_assert!(false, "This part of code should be unreachable in practice");
 554                    return Task::ready(Err(anyhow!(
 555                        "Expected debug config conversion to succeed"
 556                    )));
 557                };
 558                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 559                let raw = self.adapter.request_args(&raw);
 560                (raw, is_launch)
 561            }
 562            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 563                start_debugging_request_arguments.configuration.clone(),
 564                matches!(
 565                    start_debugging_request_arguments.request,
 566                    dap::StartDebuggingRequestArgumentsRequest::Launch
 567                ),
 568            ),
 569        };
 570
 571        merge_json_value_into(
 572            self.config.initialize_args.clone().unwrap_or(json!({})),
 573            &mut raw,
 574        );
 575        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 576        let launch = if is_launch {
 577            self.request(Launch { raw }, cx.background_executor().clone())
 578        } else {
 579            self.request(Attach { raw }, cx.background_executor().clone())
 580        };
 581
 582        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 583
 584        let configuration_sequence = cx.spawn({
 585            let this = self.clone();
 586            async move |cx| {
 587                initialized_rx.await?;
 588                // todo(debugger) figure out if we want to handle a breakpoint response error
 589                // This will probably consist of letting a user know that breakpoints failed to be set
 590                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 591
 592                if configuration_done_supported {
 593                    this.request(ConfigurationDone, cx.background_executor().clone())
 594                } else {
 595                    Task::ready(Ok(()))
 596                }
 597                .await
 598            }
 599        });
 600
 601        cx.background_spawn(async move {
 602            futures::future::try_join(launch, configuration_sequence).await?;
 603            Ok(())
 604        })
 605    }
 606
 607    fn request<R: LocalDapCommand>(
 608        &self,
 609        request: R,
 610        executor: BackgroundExecutor,
 611    ) -> Task<Result<R::Response>>
 612    where
 613        <R::DapRequest as dap::requests::Request>::Response: 'static,
 614        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 615    {
 616        let request = Arc::new(request);
 617
 618        let request_clone = request.clone();
 619        let connection = self.client.clone();
 620        let request_task = executor.spawn(async move {
 621            let args = request_clone.to_dap();
 622            connection.request::<R::DapRequest>(args).await
 623        });
 624
 625        executor.spawn(async move {
 626            let response = request.response_from_dap(request_task.await?);
 627            response
 628        })
 629    }
 630}
 631impl From<RemoteConnection> for Mode {
 632    fn from(value: RemoteConnection) -> Self {
 633        Self::Remote(value)
 634    }
 635}
 636
 637impl Mode {
 638    fn request_dap<R: DapCommand>(
 639        &self,
 640        session_id: SessionId,
 641        request: R,
 642        cx: &mut Context<Session>,
 643    ) -> Task<Result<R::Response>>
 644    where
 645        <R::DapRequest as dap::requests::Request>::Response: 'static,
 646        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 647    {
 648        match self {
 649            Mode::Local(debug_adapter_client) => {
 650                debug_adapter_client.request(request, cx.background_executor().clone())
 651            }
 652            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 653        }
 654    }
 655}
 656
 657#[derive(Default)]
 658struct ThreadStates {
 659    global_state: Option<ThreadStatus>,
 660    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 661}
 662
 663impl ThreadStates {
 664    fn stop_all_threads(&mut self) {
 665        self.global_state = Some(ThreadStatus::Stopped);
 666        self.known_thread_states.clear();
 667    }
 668
 669    fn exit_all_threads(&mut self) {
 670        self.global_state = Some(ThreadStatus::Exited);
 671        self.known_thread_states.clear();
 672    }
 673
 674    fn continue_all_threads(&mut self) {
 675        self.global_state = Some(ThreadStatus::Running);
 676        self.known_thread_states.clear();
 677    }
 678
 679    fn stop_thread(&mut self, thread_id: ThreadId) {
 680        self.known_thread_states
 681            .insert(thread_id, ThreadStatus::Stopped);
 682    }
 683
 684    fn continue_thread(&mut self, thread_id: ThreadId) {
 685        self.known_thread_states
 686            .insert(thread_id, ThreadStatus::Running);
 687    }
 688
 689    fn process_step(&mut self, thread_id: ThreadId) {
 690        self.known_thread_states
 691            .insert(thread_id, ThreadStatus::Stepping);
 692    }
 693
 694    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 695        self.thread_state(thread_id)
 696            .unwrap_or(ThreadStatus::Running)
 697    }
 698
 699    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 700        self.known_thread_states
 701            .get(&thread_id)
 702            .copied()
 703            .or(self.global_state)
 704    }
 705
 706    fn exit_thread(&mut self, thread_id: ThreadId) {
 707        self.known_thread_states
 708            .insert(thread_id, ThreadStatus::Exited);
 709    }
 710
 711    fn any_stopped_thread(&self) -> bool {
 712        self.global_state
 713            .is_some_and(|state| state == ThreadStatus::Stopped)
 714            || self
 715                .known_thread_states
 716                .values()
 717                .any(|status| *status == ThreadStatus::Stopped)
 718    }
 719}
 720const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 721
 722#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 723pub struct OutputToken(pub usize);
 724/// Represents a current state of a single debug adapter and provides ways to mutate it.
 725pub struct Session {
 726    mode: Mode,
 727    pub(super) capabilities: Capabilities,
 728    id: SessionId,
 729    child_session_ids: HashSet<SessionId>,
 730    parent_id: Option<SessionId>,
 731    ignore_breakpoints: bool,
 732    modules: Vec<dap::Module>,
 733    loaded_sources: Vec<dap::Source>,
 734    output_token: OutputToken,
 735    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 736    threads: IndexMap<ThreadId, Thread>,
 737    thread_states: ThreadStates,
 738    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 739    stack_frames: IndexMap<StackFrameId, StackFrame>,
 740    locations: HashMap<u64, dap::LocationsResponse>,
 741    is_session_terminated: bool,
 742    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 743    _background_tasks: Vec<Task<()>>,
 744}
 745
 746trait CacheableCommand: 'static + Send + Sync {
 747    fn as_any(&self) -> &dyn Any;
 748    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 749    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 750    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 751}
 752
 753impl<T> CacheableCommand for T
 754where
 755    T: DapCommand + PartialEq + Eq + Hash,
 756{
 757    fn as_any(&self) -> &dyn Any {
 758        self
 759    }
 760
 761    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 762        rhs.as_any()
 763            .downcast_ref::<Self>()
 764            .map_or(false, |rhs| self == rhs)
 765    }
 766
 767    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 768        T::hash(self, &mut hasher);
 769    }
 770
 771    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 772        self
 773    }
 774}
 775
 776pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 777
 778impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 779    fn from(request: T) -> Self {
 780        Self(Arc::new(request))
 781    }
 782}
 783
 784impl PartialEq for RequestSlot {
 785    fn eq(&self, other: &Self) -> bool {
 786        self.0.dyn_eq(other.0.as_ref())
 787    }
 788}
 789
 790impl Eq for RequestSlot {}
 791
 792impl Hash for RequestSlot {
 793    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 794        self.0.dyn_hash(state);
 795        self.0.as_any().type_id().hash(state)
 796    }
 797}
 798
 799#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 800pub struct CompletionsQuery {
 801    pub query: String,
 802    pub column: u64,
 803    pub line: Option<u64>,
 804    pub frame_id: Option<u64>,
 805}
 806
 807impl CompletionsQuery {
 808    pub fn new(
 809        buffer: &language::Buffer,
 810        cursor_position: language::Anchor,
 811        frame_id: Option<u64>,
 812    ) -> Self {
 813        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 814        Self {
 815            query: buffer.text(),
 816            column: column as u64,
 817            frame_id,
 818            line: Some(row as u64),
 819        }
 820    }
 821}
 822
 823pub enum SessionEvent {
 824    Modules,
 825    LoadedSources,
 826    Stopped(Option<ThreadId>),
 827    StackTrace,
 828    Variables,
 829    Threads,
 830}
 831
 832impl EventEmitter<SessionEvent> for Session {}
 833
 834// local session will send breakpoint updates to DAP for all new breakpoints
 835// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 836// BreakpointStore notifies session on breakpoint changes
 837impl Session {
 838    pub(crate) fn local(
 839        breakpoint_store: Entity<BreakpointStore>,
 840        session_id: SessionId,
 841        parent_session: Option<Entity<Session>>,
 842        delegate: DapAdapterDelegate,
 843        config: DebugAdapterConfig,
 844        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 845        initialized_tx: oneshot::Sender<()>,
 846        debug_adapters: Arc<DapRegistry>,
 847        cx: &mut App,
 848    ) -> Task<Result<Entity<Self>>> {
 849        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 850
 851        cx.spawn(async move |cx| {
 852            let (mode, capabilities) = LocalMode::new(
 853                debug_adapters,
 854                session_id,
 855                parent_session.clone(),
 856                breakpoint_store.clone(),
 857                config.clone(),
 858                delegate,
 859                message_tx,
 860                cx.clone(),
 861            )
 862            .await?;
 863
 864            cx.new(|cx| {
 865                create_local_session(
 866                    breakpoint_store,
 867                    session_id,
 868                    parent_session,
 869                    start_debugging_requests_tx,
 870                    initialized_tx,
 871                    message_rx,
 872                    mode,
 873                    capabilities,
 874                    cx,
 875                )
 876            })
 877        })
 878    }
 879
 880    #[cfg(any(test, feature = "test-support"))]
 881    pub(crate) fn fake(
 882        breakpoint_store: Entity<BreakpointStore>,
 883        session_id: SessionId,
 884        parent_session: Option<Entity<Session>>,
 885        delegate: DapAdapterDelegate,
 886        config: DebugAdapterConfig,
 887        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 888        initialized_tx: oneshot::Sender<()>,
 889        caps: Capabilities,
 890        fails: bool,
 891        cx: &mut App,
 892    ) -> Task<Result<Entity<Session>>> {
 893        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 894
 895        cx.spawn(async move |cx| {
 896            let (mode, capabilities) = LocalMode::new_fake(
 897                session_id,
 898                parent_session.clone(),
 899                breakpoint_store.clone(),
 900                config.clone(),
 901                delegate,
 902                message_tx,
 903                caps,
 904                fails,
 905                cx.clone(),
 906            )
 907            .await?;
 908
 909            cx.new(|cx| {
 910                create_local_session(
 911                    breakpoint_store,
 912                    session_id,
 913                    parent_session,
 914                    start_debugging_requests_tx,
 915                    initialized_tx,
 916                    message_rx,
 917                    mode,
 918                    capabilities,
 919                    cx,
 920                )
 921            })
 922        })
 923    }
 924
 925    pub(crate) fn remote(
 926        session_id: SessionId,
 927        client: AnyProtoClient,
 928        upstream_project_id: u64,
 929        ignore_breakpoints: bool,
 930    ) -> Self {
 931        Self {
 932            mode: Mode::Remote(RemoteConnection {
 933                _client: client,
 934                _upstream_project_id: upstream_project_id,
 935            }),
 936            id: session_id,
 937            child_session_ids: HashSet::default(),
 938            parent_id: None,
 939            capabilities: Capabilities::default(),
 940            ignore_breakpoints,
 941            variables: Default::default(),
 942            stack_frames: Default::default(),
 943            thread_states: ThreadStates::default(),
 944            output_token: OutputToken(0),
 945            output: circular_buffer::CircularBuffer::boxed(),
 946            requests: HashMap::default(),
 947            modules: Vec::default(),
 948            loaded_sources: Vec::default(),
 949            threads: IndexMap::default(),
 950            _background_tasks: Vec::default(),
 951            locations: Default::default(),
 952            is_session_terminated: false,
 953        }
 954    }
 955
 956    pub fn session_id(&self) -> SessionId {
 957        self.id
 958    }
 959
 960    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 961        self.child_session_ids.clone()
 962    }
 963
 964    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 965        self.child_session_ids.insert(session_id);
 966    }
 967
 968    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 969        self.child_session_ids.remove(&session_id);
 970    }
 971
 972    pub fn parent_id(&self) -> Option<SessionId> {
 973        self.parent_id
 974    }
 975
 976    pub fn capabilities(&self) -> &Capabilities {
 977        &self.capabilities
 978    }
 979
 980    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 981        if let Mode::Local(local_mode) = &self.mode {
 982            Some(local_mode.config.clone())
 983        } else {
 984            None
 985        }
 986    }
 987
 988    pub fn is_terminated(&self) -> bool {
 989        self.is_session_terminated
 990    }
 991
 992    pub fn is_local(&self) -> bool {
 993        matches!(self.mode, Mode::Local(_))
 994    }
 995
 996    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 997        match &mut self.mode {
 998            Mode::Local(local_mode) => Some(local_mode),
 999            Mode::Remote(_) => None,
1000        }
1001    }
1002
1003    pub fn as_local(&self) -> Option<&LocalMode> {
1004        match &self.mode {
1005            Mode::Local(local_mode) => Some(local_mode),
1006            Mode::Remote(_) => None,
1007        }
1008    }
1009
1010    pub(super) fn initialize_sequence(
1011        &mut self,
1012        initialize_rx: oneshot::Receiver<()>,
1013        cx: &mut Context<Self>,
1014    ) -> Task<Result<()>> {
1015        match &self.mode {
1016            Mode::Local(local_mode) => {
1017                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1018            }
1019            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1020        }
1021    }
1022
1023    pub fn output(
1024        &self,
1025        since: OutputToken,
1026    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1027        if self.output_token.0 == 0 {
1028            return (self.output.range(0..0), OutputToken(0));
1029        };
1030
1031        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1032
1033        let clamped_events_since = events_since.clamp(0, self.output.len());
1034        (
1035            self.output
1036                .range(self.output.len() - clamped_events_since..),
1037            self.output_token,
1038        )
1039    }
1040
1041    pub fn respond_to_client(
1042        &self,
1043        request_seq: u64,
1044        success: bool,
1045        command: String,
1046        body: Option<serde_json::Value>,
1047        cx: &mut Context<Self>,
1048    ) -> Task<Result<()>> {
1049        let Some(local_session) = self.as_local().cloned() else {
1050            unreachable!("Cannot respond to remote client");
1051        };
1052
1053        cx.background_spawn(async move {
1054            local_session
1055                .client
1056                .send_message(Message::Response(Response {
1057                    body,
1058                    success,
1059                    command,
1060                    seq: request_seq + 1,
1061                    request_seq,
1062                    message: None,
1063                }))
1064                .await
1065        })
1066    }
1067
1068    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1069        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1070            self.thread_states.stop_all_threads();
1071
1072            self.invalidate_command_type::<StackTraceCommand>();
1073        }
1074
1075        // Event if we stopped all threads we still need to insert the thread_id
1076        // to our own data
1077        if let Some(thread_id) = event.thread_id {
1078            self.thread_states.stop_thread(ThreadId(thread_id));
1079
1080            self.invalidate_state(
1081                &StackTraceCommand {
1082                    thread_id,
1083                    start_frame: None,
1084                    levels: None,
1085                }
1086                .into(),
1087            );
1088        }
1089
1090        self.invalidate_generic();
1091        self.threads.clear();
1092        self.variables.clear();
1093        cx.emit(SessionEvent::Stopped(
1094            event
1095                .thread_id
1096                .map(Into::into)
1097                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1098        ));
1099        cx.notify();
1100    }
1101
1102    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1103        match *event {
1104            Events::Initialized(_) => {
1105                debug_assert!(
1106                    false,
1107                    "Initialized event should have been handled in LocalMode"
1108                );
1109            }
1110            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1111            Events::Continued(event) => {
1112                if event.all_threads_continued.unwrap_or_default() {
1113                    self.thread_states.continue_all_threads();
1114                } else {
1115                    self.thread_states
1116                        .continue_thread(ThreadId(event.thread_id));
1117                }
1118                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1119                self.invalidate_generic();
1120            }
1121            Events::Exited(_event) => {
1122                self.clear_active_debug_line(cx);
1123            }
1124            Events::Terminated(_) => {
1125                self.is_session_terminated = true;
1126                self.clear_active_debug_line(cx);
1127            }
1128            Events::Thread(event) => {
1129                let thread_id = ThreadId(event.thread_id);
1130
1131                match event.reason {
1132                    dap::ThreadEventReason::Started => {
1133                        self.thread_states.continue_thread(thread_id);
1134                    }
1135                    dap::ThreadEventReason::Exited => {
1136                        self.thread_states.exit_thread(thread_id);
1137                    }
1138                    reason => {
1139                        log::error!("Unhandled thread event reason {:?}", reason);
1140                    }
1141                }
1142                self.invalidate_state(&ThreadsCommand.into());
1143                cx.notify();
1144            }
1145            Events::Output(event) => {
1146                if event
1147                    .category
1148                    .as_ref()
1149                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1150                {
1151                    return;
1152                }
1153
1154                self.output.push_back(event);
1155                self.output_token.0 += 1;
1156                cx.notify();
1157            }
1158            Events::Breakpoint(_) => {}
1159            Events::Module(event) => {
1160                match event.reason {
1161                    dap::ModuleEventReason::New => {
1162                        self.modules.push(event.module);
1163                    }
1164                    dap::ModuleEventReason::Changed => {
1165                        if let Some(module) = self
1166                            .modules
1167                            .iter_mut()
1168                            .find(|other| event.module.id == other.id)
1169                        {
1170                            *module = event.module;
1171                        }
1172                    }
1173                    dap::ModuleEventReason::Removed => {
1174                        self.modules.retain(|other| event.module.id != other.id);
1175                    }
1176                }
1177
1178                // todo(debugger): We should only send the invalidate command to downstream clients.
1179                // self.invalidate_state(&ModulesCommand.into());
1180            }
1181            Events::LoadedSource(_) => {
1182                self.invalidate_state(&LoadedSourcesCommand.into());
1183            }
1184            Events::Capabilities(event) => {
1185                self.capabilities = self.capabilities.merge(event.capabilities);
1186                cx.notify();
1187            }
1188            Events::Memory(_) => {}
1189            Events::Process(_) => {}
1190            Events::ProgressEnd(_) => {}
1191            Events::ProgressStart(_) => {}
1192            Events::ProgressUpdate(_) => {}
1193            Events::Invalidated(_) => {}
1194            Events::Other(_) => {}
1195        }
1196    }
1197
1198    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1199    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1200        &mut self,
1201        request: T,
1202        process_result: impl FnOnce(
1203            &mut Self,
1204            Result<T::Response>,
1205            &mut Context<Self>,
1206        ) -> Option<T::Response>
1207        + 'static,
1208        cx: &mut Context<Self>,
1209    ) {
1210        const {
1211            assert!(
1212                T::CACHEABLE,
1213                "Only requests marked as cacheable should invoke `fetch`"
1214            );
1215        }
1216
1217        if !self.thread_states.any_stopped_thread()
1218            && request.type_id() != TypeId::of::<ThreadsCommand>()
1219            || self.is_session_terminated
1220        {
1221            return;
1222        }
1223
1224        let request_map = self
1225            .requests
1226            .entry(std::any::TypeId::of::<T>())
1227            .or_default();
1228
1229        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1230            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1231
1232            let task = Self::request_inner::<Arc<T>>(
1233                &self.capabilities,
1234                self.id,
1235                &self.mode,
1236                command,
1237                process_result,
1238                cx,
1239            );
1240            let task = cx
1241                .background_executor()
1242                .spawn(async move {
1243                    let _ = task.await?;
1244                    Some(())
1245                })
1246                .shared();
1247
1248            vacant.insert(task);
1249            cx.notify();
1250        }
1251    }
1252
1253    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1254        capabilities: &Capabilities,
1255        session_id: SessionId,
1256        mode: &Mode,
1257        request: T,
1258        process_result: impl FnOnce(
1259            &mut Self,
1260            Result<T::Response>,
1261            &mut Context<Self>,
1262        ) -> Option<T::Response>
1263        + 'static,
1264        cx: &mut Context<Self>,
1265    ) -> Task<Option<T::Response>> {
1266        if !T::is_supported(&capabilities) {
1267            log::warn!(
1268                "Attempted to send a DAP request that isn't supported: {:?}",
1269                request
1270            );
1271            let error = Err(anyhow::Error::msg(
1272                "Couldn't complete request because it's not supported",
1273            ));
1274            return cx.spawn(async move |this, cx| {
1275                this.update(cx, |this, cx| process_result(this, error, cx))
1276                    .log_err()
1277                    .flatten()
1278            });
1279        }
1280
1281        let request = mode.request_dap(session_id, request, cx);
1282        cx.spawn(async move |this, cx| {
1283            let result = request.await;
1284            this.update(cx, |this, cx| process_result(this, result, cx))
1285                .log_err()
1286                .flatten()
1287        })
1288    }
1289
1290    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1291        &self,
1292        request: T,
1293        process_result: impl FnOnce(
1294            &mut Self,
1295            Result<T::Response>,
1296            &mut Context<Self>,
1297        ) -> Option<T::Response>
1298        + 'static,
1299        cx: &mut Context<Self>,
1300    ) -> Task<Option<T::Response>> {
1301        Self::request_inner(
1302            &self.capabilities,
1303            self.id,
1304            &self.mode,
1305            request,
1306            process_result,
1307            cx,
1308        )
1309    }
1310
1311    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1312        self.requests.remove(&std::any::TypeId::of::<Command>());
1313    }
1314
1315    fn invalidate_generic(&mut self) {
1316        self.invalidate_command_type::<ModulesCommand>();
1317        self.invalidate_command_type::<LoadedSourcesCommand>();
1318        self.invalidate_command_type::<ThreadsCommand>();
1319    }
1320
1321    fn invalidate_state(&mut self, key: &RequestSlot) {
1322        self.requests
1323            .entry(key.0.as_any().type_id())
1324            .and_modify(|request_map| {
1325                request_map.remove(&key);
1326            });
1327    }
1328
1329    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1330        self.thread_states.thread_status(thread_id)
1331    }
1332
1333    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1334        self.fetch(
1335            dap_command::ThreadsCommand,
1336            |this, result, cx| {
1337                let result = result.log_err()?;
1338
1339                this.threads = result
1340                    .iter()
1341                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1342                    .collect();
1343
1344                this.invalidate_command_type::<StackTraceCommand>();
1345                cx.emit(SessionEvent::Threads);
1346                cx.notify();
1347
1348                Some(result)
1349            },
1350            cx,
1351        );
1352
1353        self.threads
1354            .values()
1355            .map(|thread| {
1356                (
1357                    thread.dap.clone(),
1358                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1359                )
1360            })
1361            .collect()
1362    }
1363
1364    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1365        self.fetch(
1366            dap_command::ModulesCommand,
1367            |this, result, cx| {
1368                let result = result.log_err()?;
1369
1370                this.modules = result.iter().cloned().collect();
1371                cx.emit(SessionEvent::Modules);
1372                cx.notify();
1373
1374                Some(result)
1375            },
1376            cx,
1377        );
1378
1379        &self.modules
1380    }
1381
1382    pub fn ignore_breakpoints(&self) -> bool {
1383        self.ignore_breakpoints
1384    }
1385
1386    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1387        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1388    }
1389
1390    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1391        if self.ignore_breakpoints == ignore {
1392            return Task::ready(());
1393        }
1394
1395        self.ignore_breakpoints = ignore;
1396
1397        if let Some(local) = self.as_local() {
1398            local.send_all_breakpoints(ignore, cx)
1399        } else {
1400            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1401            unimplemented!()
1402        }
1403    }
1404
1405    pub fn breakpoints_enabled(&self) -> bool {
1406        self.ignore_breakpoints
1407    }
1408
1409    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1410        self.fetch(
1411            dap_command::LoadedSourcesCommand,
1412            |this, result, cx| {
1413                let result = result.log_err()?;
1414                this.loaded_sources = result.iter().cloned().collect();
1415                cx.emit(SessionEvent::LoadedSources);
1416                cx.notify();
1417                Some(result)
1418            },
1419            cx,
1420        );
1421
1422        &self.loaded_sources
1423    }
1424
1425    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1426        res.log_err()?;
1427        Some(())
1428    }
1429
1430    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1431        thread_id: ThreadId,
1432    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1433    {
1434        move |this, response, cx| match response.log_err() {
1435            Some(response) => Some(response),
1436            None => {
1437                this.thread_states.stop_thread(thread_id);
1438                cx.notify();
1439                None
1440            }
1441        }
1442    }
1443
1444    fn clear_active_debug_line_response(
1445        &mut self,
1446        response: Result<()>,
1447        cx: &mut Context<Session>,
1448    ) -> Option<()> {
1449        response.log_err()?;
1450        self.clear_active_debug_line(cx);
1451        Some(())
1452    }
1453
1454    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1455        self.as_local()
1456            .expect("Message handler will only run in local mode")
1457            .breakpoint_store
1458            .update(cx, |store, cx| {
1459                store.remove_active_position(Some(self.id), cx)
1460            });
1461    }
1462
1463    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1464        self.request(
1465            PauseCommand {
1466                thread_id: thread_id.0,
1467            },
1468            Self::empty_response,
1469            cx,
1470        )
1471        .detach();
1472    }
1473
1474    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1475        self.request(
1476            RestartStackFrameCommand { stack_frame_id },
1477            Self::empty_response,
1478            cx,
1479        )
1480        .detach();
1481    }
1482
1483    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1484        if self.capabilities.supports_restart_request.unwrap_or(false) {
1485            self.request(
1486                RestartCommand {
1487                    raw: args.unwrap_or(Value::Null),
1488                },
1489                Self::empty_response,
1490                cx,
1491            )
1492            .detach();
1493        } else {
1494            self.request(
1495                DisconnectCommand {
1496                    restart: Some(false),
1497                    terminate_debuggee: Some(true),
1498                    suspend_debuggee: Some(false),
1499                },
1500                Self::empty_response,
1501                cx,
1502            )
1503            .detach();
1504        }
1505    }
1506
1507    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1508        self.is_session_terminated = true;
1509        self.thread_states.exit_all_threads();
1510        cx.notify();
1511
1512        let task = if self
1513            .capabilities
1514            .supports_terminate_request
1515            .unwrap_or_default()
1516        {
1517            self.request(
1518                TerminateCommand {
1519                    restart: Some(false),
1520                },
1521                Self::clear_active_debug_line_response,
1522                cx,
1523            )
1524        } else {
1525            self.request(
1526                DisconnectCommand {
1527                    restart: Some(false),
1528                    terminate_debuggee: Some(true),
1529                    suspend_debuggee: Some(false),
1530                },
1531                Self::clear_active_debug_line_response,
1532                cx,
1533            )
1534        };
1535
1536        cx.background_spawn(async move {
1537            let _ = task.await;
1538        })
1539    }
1540
1541    pub fn completions(
1542        &mut self,
1543        query: CompletionsQuery,
1544        cx: &mut Context<Self>,
1545    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1546        let task = self.request(query, |_, result, _| result.log_err(), cx);
1547
1548        cx.background_executor().spawn(async move {
1549            anyhow::Ok(
1550                task.await
1551                    .map(|response| response.targets)
1552                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1553            )
1554        })
1555    }
1556
1557    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1558        self.thread_states.continue_thread(thread_id);
1559        self.request(
1560            ContinueCommand {
1561                args: ContinueArguments {
1562                    thread_id: thread_id.0,
1563                    single_thread: Some(true),
1564                },
1565            },
1566            Self::on_step_response::<ContinueCommand>(thread_id),
1567            cx,
1568        )
1569        .detach();
1570    }
1571
1572    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1573        match self.mode {
1574            Mode::Local(ref local) => Some(local.client.clone()),
1575            Mode::Remote(_) => None,
1576        }
1577    }
1578
1579    pub fn step_over(
1580        &mut self,
1581        thread_id: ThreadId,
1582        granularity: SteppingGranularity,
1583        cx: &mut Context<Self>,
1584    ) {
1585        let supports_single_thread_execution_requests =
1586            self.capabilities.supports_single_thread_execution_requests;
1587        let supports_stepping_granularity = self
1588            .capabilities
1589            .supports_stepping_granularity
1590            .unwrap_or_default();
1591
1592        let command = NextCommand {
1593            inner: StepCommand {
1594                thread_id: thread_id.0,
1595                granularity: supports_stepping_granularity.then(|| granularity),
1596                single_thread: supports_single_thread_execution_requests,
1597            },
1598        };
1599
1600        self.thread_states.process_step(thread_id);
1601        self.request(
1602            command,
1603            Self::on_step_response::<NextCommand>(thread_id),
1604            cx,
1605        )
1606        .detach();
1607    }
1608
1609    pub fn step_in(
1610        &mut self,
1611        thread_id: ThreadId,
1612        granularity: SteppingGranularity,
1613        cx: &mut Context<Self>,
1614    ) {
1615        let supports_single_thread_execution_requests =
1616            self.capabilities.supports_single_thread_execution_requests;
1617        let supports_stepping_granularity = self
1618            .capabilities
1619            .supports_stepping_granularity
1620            .unwrap_or_default();
1621
1622        let command = StepInCommand {
1623            inner: StepCommand {
1624                thread_id: thread_id.0,
1625                granularity: supports_stepping_granularity.then(|| granularity),
1626                single_thread: supports_single_thread_execution_requests,
1627            },
1628        };
1629
1630        self.thread_states.process_step(thread_id);
1631        self.request(
1632            command,
1633            Self::on_step_response::<StepInCommand>(thread_id),
1634            cx,
1635        )
1636        .detach();
1637    }
1638
1639    pub fn step_out(
1640        &mut self,
1641        thread_id: ThreadId,
1642        granularity: SteppingGranularity,
1643        cx: &mut Context<Self>,
1644    ) {
1645        let supports_single_thread_execution_requests =
1646            self.capabilities.supports_single_thread_execution_requests;
1647        let supports_stepping_granularity = self
1648            .capabilities
1649            .supports_stepping_granularity
1650            .unwrap_or_default();
1651
1652        let command = StepOutCommand {
1653            inner: StepCommand {
1654                thread_id: thread_id.0,
1655                granularity: supports_stepping_granularity.then(|| granularity),
1656                single_thread: supports_single_thread_execution_requests,
1657            },
1658        };
1659
1660        self.thread_states.process_step(thread_id);
1661        self.request(
1662            command,
1663            Self::on_step_response::<StepOutCommand>(thread_id),
1664            cx,
1665        )
1666        .detach();
1667    }
1668
1669    pub fn step_back(
1670        &mut self,
1671        thread_id: ThreadId,
1672        granularity: SteppingGranularity,
1673        cx: &mut Context<Self>,
1674    ) {
1675        let supports_single_thread_execution_requests =
1676            self.capabilities.supports_single_thread_execution_requests;
1677        let supports_stepping_granularity = self
1678            .capabilities
1679            .supports_stepping_granularity
1680            .unwrap_or_default();
1681
1682        let command = StepBackCommand {
1683            inner: StepCommand {
1684                thread_id: thread_id.0,
1685                granularity: supports_stepping_granularity.then(|| granularity),
1686                single_thread: supports_single_thread_execution_requests,
1687            },
1688        };
1689
1690        self.thread_states.process_step(thread_id);
1691
1692        self.request(
1693            command,
1694            Self::on_step_response::<StepBackCommand>(thread_id),
1695            cx,
1696        )
1697        .detach();
1698    }
1699
1700    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1701        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1702            && self.requests.contains_key(&ThreadsCommand.type_id())
1703            && self.threads.contains_key(&thread_id)
1704        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1705        // We could still be using an old thread id and have sent a new thread's request
1706        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1707        // But it very well could cause a minor bug in the future that is hard to track down
1708        {
1709            self.fetch(
1710                super::dap_command::StackTraceCommand {
1711                    thread_id: thread_id.0,
1712                    start_frame: None,
1713                    levels: None,
1714                },
1715                move |this, stack_frames, cx| {
1716                    let stack_frames = stack_frames.log_err()?;
1717
1718                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1719                        thread.stack_frame_ids =
1720                            stack_frames.iter().map(|frame| frame.id).collect();
1721                    });
1722                    debug_assert!(
1723                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1724                        "Sent request for thread_id that doesn't exist"
1725                    );
1726
1727                    this.stack_frames.extend(
1728                        stack_frames
1729                            .iter()
1730                            .cloned()
1731                            .map(|frame| (frame.id, StackFrame::from(frame))),
1732                    );
1733
1734                    this.invalidate_command_type::<ScopesCommand>();
1735                    this.invalidate_command_type::<VariablesCommand>();
1736
1737                    cx.emit(SessionEvent::StackTrace);
1738                    cx.notify();
1739                    Some(stack_frames)
1740                },
1741                cx,
1742            );
1743        }
1744
1745        self.threads
1746            .get(&thread_id)
1747            .map(|thread| {
1748                thread
1749                    .stack_frame_ids
1750                    .iter()
1751                    .filter_map(|id| self.stack_frames.get(id))
1752                    .cloned()
1753                    .collect()
1754            })
1755            .unwrap_or_default()
1756    }
1757
1758    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1759        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1760            && self
1761                .requests
1762                .contains_key(&TypeId::of::<StackTraceCommand>())
1763        {
1764            self.fetch(
1765                ScopesCommand { stack_frame_id },
1766                move |this, scopes, cx| {
1767                    let scopes = scopes.log_err()?;
1768
1769                    for scope in scopes .iter(){
1770                        this.variables(scope.variables_reference, cx);
1771                    }
1772
1773                    let entry = this
1774                        .stack_frames
1775                        .entry(stack_frame_id)
1776                        .and_modify(|stack_frame| {
1777                            stack_frame.scopes = scopes.clone();
1778                        });
1779
1780                    cx.emit(SessionEvent::Variables);
1781
1782                    debug_assert!(
1783                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1784                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1785                    );
1786
1787                    Some(scopes)
1788                },
1789                cx,
1790            );
1791        }
1792
1793        self.stack_frames
1794            .get(&stack_frame_id)
1795            .map(|frame| frame.scopes.as_slice())
1796            .unwrap_or_default()
1797    }
1798
1799    pub fn variables(
1800        &mut self,
1801        variables_reference: VariableReference,
1802        cx: &mut Context<Self>,
1803    ) -> Vec<dap::Variable> {
1804        let command = VariablesCommand {
1805            variables_reference,
1806            filter: None,
1807            start: None,
1808            count: None,
1809            format: None,
1810        };
1811
1812        self.fetch(
1813            command,
1814            move |this, variables, cx| {
1815                let variables = variables.log_err()?;
1816                this.variables
1817                    .insert(variables_reference, variables.clone());
1818
1819                cx.emit(SessionEvent::Variables);
1820                Some(variables)
1821            },
1822            cx,
1823        );
1824
1825        self.variables
1826            .get(&variables_reference)
1827            .cloned()
1828            .unwrap_or_default()
1829    }
1830
1831    pub fn set_variable_value(
1832        &mut self,
1833        variables_reference: u64,
1834        name: String,
1835        value: String,
1836        cx: &mut Context<Self>,
1837    ) {
1838        if self.capabilities.supports_set_variable.unwrap_or_default() {
1839            self.request(
1840                SetVariableValueCommand {
1841                    name,
1842                    value,
1843                    variables_reference,
1844                },
1845                move |this, response, cx| {
1846                    let response = response.log_err()?;
1847                    this.invalidate_command_type::<VariablesCommand>();
1848                    cx.notify();
1849                    Some(response)
1850                },
1851                cx,
1852            )
1853            .detach()
1854        }
1855    }
1856
1857    pub fn evaluate(
1858        &mut self,
1859        expression: String,
1860        context: Option<EvaluateArgumentsContext>,
1861        frame_id: Option<u64>,
1862        source: Option<Source>,
1863        cx: &mut Context<Self>,
1864    ) {
1865        self.request(
1866            EvaluateCommand {
1867                expression,
1868                context,
1869                frame_id,
1870                source,
1871            },
1872            |this, response, cx| {
1873                let response = response.log_err()?;
1874                this.output_token.0 += 1;
1875                this.output.push_back(dap::OutputEvent {
1876                    category: None,
1877                    output: response.result.clone(),
1878                    group: None,
1879                    variables_reference: Some(response.variables_reference),
1880                    source: None,
1881                    line: None,
1882                    column: None,
1883                    data: None,
1884                    location_reference: None,
1885                });
1886
1887                this.invalidate_command_type::<ScopesCommand>();
1888                cx.notify();
1889                Some(response)
1890            },
1891            cx,
1892        )
1893        .detach();
1894    }
1895
1896    pub fn location(
1897        &mut self,
1898        reference: u64,
1899        cx: &mut Context<Self>,
1900    ) -> Option<dap::LocationsResponse> {
1901        self.fetch(
1902            LocationsCommand { reference },
1903            move |this, response, _| {
1904                let response = response.log_err()?;
1905                this.locations.insert(reference, response.clone());
1906                Some(response)
1907            },
1908            cx,
1909        );
1910        self.locations.get(&reference).cloned()
1911    }
1912    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1913        let command = DisconnectCommand {
1914            restart: Some(false),
1915            terminate_debuggee: Some(true),
1916            suspend_debuggee: Some(false),
1917        };
1918
1919        self.request(command, Self::empty_response, cx).detach()
1920    }
1921
1922    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1923        if self
1924            .capabilities
1925            .supports_terminate_threads_request
1926            .unwrap_or_default()
1927        {
1928            self.request(
1929                TerminateThreadsCommand {
1930                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1931                },
1932                Self::clear_active_debug_line_response,
1933                cx,
1934            )
1935            .detach();
1936        } else {
1937            self.shutdown(cx).detach();
1938        }
1939    }
1940}
1941
1942fn create_local_session(
1943    breakpoint_store: Entity<BreakpointStore>,
1944    session_id: SessionId,
1945    parent_session: Option<Entity<Session>>,
1946    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1947    initialized_tx: oneshot::Sender<()>,
1948    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1949    mode: LocalMode,
1950    capabilities: Capabilities,
1951    cx: &mut Context<Session>,
1952) -> Session {
1953    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1954        let mut initialized_tx = Some(initialized_tx);
1955        while let Some(message) = message_rx.next().await {
1956            if let Message::Event(event) = message {
1957                if let Events::Initialized(_) = *event {
1958                    if let Some(tx) = initialized_tx.take() {
1959                        tx.send(()).ok();
1960                    }
1961                } else {
1962                    let Ok(_) = this.update(cx, |session, cx| {
1963                        session.handle_dap_event(event, cx);
1964                    }) else {
1965                        break;
1966                    };
1967                }
1968            } else {
1969                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1970                else {
1971                    break;
1972                };
1973            }
1974        }
1975    })];
1976
1977    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1978        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1979            if let Some(local) = (!this.ignore_breakpoints)
1980                .then(|| this.as_local_mut())
1981                .flatten()
1982            {
1983                local
1984                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1985                    .detach();
1986            };
1987        }
1988        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1989            if let Some(local) = (!this.ignore_breakpoints)
1990                .then(|| this.as_local_mut())
1991                .flatten()
1992            {
1993                local.unset_breakpoints_from_paths(paths, cx).detach();
1994            }
1995        }
1996        BreakpointStoreEvent::ActiveDebugLineChanged => {}
1997    })
1998    .detach();
1999
2000    Session {
2001        mode: Mode::Local(mode),
2002        id: session_id,
2003        child_session_ids: HashSet::default(),
2004        parent_id: parent_session.map(|session| session.read(cx).id),
2005        variables: Default::default(),
2006        capabilities,
2007        thread_states: ThreadStates::default(),
2008        output_token: OutputToken(0),
2009        ignore_breakpoints: false,
2010        output: circular_buffer::CircularBuffer::boxed(),
2011        requests: HashMap::default(),
2012        modules: Vec::default(),
2013        loaded_sources: Vec::default(),
2014        threads: IndexMap::default(),
2015        stack_frames: IndexMap::default(),
2016        locations: Default::default(),
2017        _background_tasks,
2018        is_session_terminated: false,
2019    }
2020}