session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    adapters::{DapDelegate, DapStatus},
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
  25use futures::channel::oneshot;
  26use futures::{FutureExt, future::Shared};
  27use gpui::{
  28    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  29};
  30use rpc::AnyProtoClient;
  31use serde_json::{Value, json};
  32use settings::Settings;
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::path::PathBuf;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::{DebugAdapterConfig, DebugTaskDefinition};
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{ResultExt, merge_json_value_into};
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117type UpstreamProjectId = u64;
 118
 119struct RemoteConnection {
 120    _client: AnyProtoClient,
 121    _upstream_project_id: UpstreamProjectId,
 122}
 123
 124impl RemoteConnection {
 125    fn send_proto_client_request<R: DapCommand>(
 126        &self,
 127        _request: R,
 128        _session_id: SessionId,
 129        cx: &mut App,
 130    ) -> Task<Result<R::Response>> {
 131        // let message = request.to_proto(session_id, self.upstream_project_id);
 132        // let upstream_client = self.client.clone();
 133        cx.background_executor().spawn(async move {
 134            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 135            // let response = upstream_client.request(message).await?;
 136            // request.response_from_proto(response)
 137            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 138        })
 139    }
 140
 141    fn request<R: DapCommand>(
 142        &self,
 143        request: R,
 144        session_id: SessionId,
 145        cx: &mut App,
 146    ) -> Task<Result<R::Response>>
 147    where
 148        <R::DapRequest as dap::requests::Request>::Response: 'static,
 149        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 150    {
 151        return self.send_proto_client_request::<R>(request, session_id, cx);
 152    }
 153}
 154
 155enum Mode {
 156    Local(LocalMode),
 157    Remote(RemoteConnection),
 158}
 159
 160#[derive(Clone)]
 161pub struct LocalMode {
 162    client: Arc<DebugAdapterClient>,
 163    config: DebugAdapterConfig,
 164    adapter: Arc<dyn DebugAdapter>,
 165    breakpoint_store: Entity<BreakpointStore>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        debug_adapters: Arc<DapRegistry>,
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<Self>> {
 194        Self::new_inner(
 195            debug_adapters,
 196            session_id,
 197            parent_session,
 198            breakpoint_store,
 199            config,
 200            delegate,
 201            messages_tx,
 202            async |_, _| {},
 203            cx,
 204        )
 205    }
 206    #[cfg(any(test, feature = "test-support"))]
 207    fn new_fake(
 208        session_id: SessionId,
 209        parent_session: Option<Entity<Session>>,
 210        breakpoint_store: Entity<BreakpointStore>,
 211        config: DebugAdapterConfig,
 212        delegate: DapAdapterDelegate,
 213        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 214        caps: Capabilities,
 215        fail: bool,
 216        cx: AsyncApp,
 217    ) -> Task<Result<Self>> {
 218        use task::DebugRequestDisposition;
 219
 220        let request = match config.request.clone() {
 221            DebugRequestDisposition::UserConfigured(request) => request,
 222            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 223                match reverse_request_args.request {
 224                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 225                        DebugRequestType::Launch(task::LaunchConfig {
 226                            program: "".to_owned(),
 227                            cwd: None,
 228                            args: Default::default(),
 229                        })
 230                    }
 231                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 232                        DebugRequestType::Attach(task::AttachConfig {
 233                            process_id: Some(0),
 234                        })
 235                    }
 236                }
 237            }
 238        };
 239
 240        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 241            session
 242                .client
 243                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 244                .await;
 245
 246            let paths = cx
 247                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 248                .expect("Breakpoint store should exist in all tests that start debuggers");
 249
 250            session
 251                .client
 252                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 253                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 254                    if !paths.contains(&p) {
 255                        panic!("Sent breakpoints for path without any")
 256                    }
 257
 258                    Ok(dap::SetBreakpointsResponse {
 259                        breakpoints: Vec::default(),
 260                    })
 261                })
 262                .await;
 263
 264            match request {
 265                dap::DebugRequestType::Launch(_) => {
 266                    if fail {
 267                        session
 268                            .client
 269                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 270                                Err(dap::ErrorResponse {
 271                                    error: Some(dap::Message {
 272                                        id: 1,
 273                                        format: "error".into(),
 274                                        variables: None,
 275                                        send_telemetry: None,
 276                                        show_user: None,
 277                                        url: None,
 278                                        url_label: None,
 279                                    }),
 280                                })
 281                            })
 282                            .await;
 283                    } else {
 284                        session
 285                            .client
 286                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 287                            .await;
 288                    }
 289                }
 290                dap::DebugRequestType::Attach(attach_config) => {
 291                    if fail {
 292                        session
 293                            .client
 294                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 295                                Err(dap::ErrorResponse {
 296                                    error: Some(dap::Message {
 297                                        id: 1,
 298                                        format: "error".into(),
 299                                        variables: None,
 300                                        send_telemetry: None,
 301                                        show_user: None,
 302                                        url: None,
 303                                        url_label: None,
 304                                    }),
 305                                })
 306                            })
 307                            .await;
 308                    } else {
 309                        session
 310                            .client
 311                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 312                                assert_eq!(
 313                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 314                                    args.raw
 315                                );
 316
 317                                Ok(())
 318                            })
 319                            .await;
 320                    }
 321                }
 322            }
 323
 324            session
 325                .client
 326                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 327                .await;
 328            session.client.fake_event(Events::Initialized(None)).await;
 329        };
 330        Self::new_inner(
 331            DapRegistry::fake().into(),
 332            session_id,
 333            parent_session,
 334            breakpoint_store,
 335            config,
 336            delegate,
 337            messages_tx,
 338            callback,
 339            cx,
 340        )
 341    }
 342    fn new_inner(
 343        registry: Arc<DapRegistry>,
 344        session_id: SessionId,
 345        parent_session: Option<Entity<Session>>,
 346        breakpoint_store: Entity<BreakpointStore>,
 347        config: DebugAdapterConfig,
 348        delegate: DapAdapterDelegate,
 349        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 350        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 351        cx: AsyncApp,
 352    ) -> Task<Result<Self>> {
 353        cx.spawn(async move |cx| {
 354            let (adapter, binary) =
 355                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 356
 357            let message_handler = Box::new(move |message| {
 358                messages_tx.unbounded_send(message).ok();
 359            });
 360
 361            let client = Arc::new(
 362                if let Some(client) = parent_session
 363                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 364                    .flatten()
 365                {
 366                    client
 367                        .reconnect(session_id, binary, message_handler, cx.clone())
 368                        .await?
 369                } else {
 370                    DebugAdapterClient::start(
 371                        session_id,
 372                        adapter.name(),
 373                        binary,
 374                        message_handler,
 375                        cx.clone(),
 376                    )
 377                    .await
 378                    .with_context(|| "Failed to start communication with debug adapter")?
 379                },
 380            );
 381
 382            let mut session = Self {
 383                client,
 384                adapter,
 385                breakpoint_store,
 386                config: config.clone(),
 387            };
 388
 389            on_initialized(&mut session, cx.clone()).await;
 390
 391            Ok(session)
 392        })
 393    }
 394
 395    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 396        let tasks: Vec<_> = paths
 397            .into_iter()
 398            .map(|path| {
 399                self.request(
 400                    dap_command::SetBreakpoints {
 401                        source: client_source(path),
 402                        source_modified: None,
 403                        breakpoints: vec![],
 404                    },
 405                    cx.background_executor().clone(),
 406                )
 407            })
 408            .collect();
 409
 410        cx.background_spawn(async move {
 411            futures::future::join_all(tasks)
 412                .await
 413                .iter()
 414                .for_each(|res| match res {
 415                    Ok(_) => {}
 416                    Err(err) => {
 417                        log::warn!("Set breakpoints request failed: {}", err);
 418                    }
 419                });
 420        })
 421    }
 422
 423    fn send_breakpoints_from_path(
 424        &self,
 425        abs_path: Arc<Path>,
 426        reason: BreakpointUpdatedReason,
 427        cx: &mut App,
 428    ) -> Task<()> {
 429        let breakpoints = self
 430            .breakpoint_store
 431            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 432            .into_iter()
 433            .filter(|bp| bp.state.is_enabled())
 434            .map(Into::into)
 435            .collect();
 436
 437        let task = self.request(
 438            dap_command::SetBreakpoints {
 439                source: client_source(&abs_path),
 440                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 441                breakpoints,
 442            },
 443            cx.background_executor().clone(),
 444        );
 445
 446        cx.background_spawn(async move {
 447            match task.await {
 448                Ok(_) => {}
 449                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 450            }
 451        })
 452    }
 453
 454    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 455        let mut breakpoint_tasks = Vec::new();
 456        let breakpoints = self
 457            .breakpoint_store
 458            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 459
 460        for (path, breakpoints) in breakpoints {
 461            let breakpoints = if ignore_breakpoints {
 462                vec![]
 463            } else {
 464                breakpoints
 465                    .into_iter()
 466                    .filter(|bp| bp.state.is_enabled())
 467                    .map(Into::into)
 468                    .collect()
 469            };
 470
 471            breakpoint_tasks.push(self.request(
 472                dap_command::SetBreakpoints {
 473                    source: client_source(&path),
 474                    source_modified: Some(false),
 475                    breakpoints,
 476                },
 477                cx.background_executor().clone(),
 478            ));
 479        }
 480
 481        cx.background_spawn(async move {
 482            futures::future::join_all(breakpoint_tasks)
 483                .await
 484                .iter()
 485                .for_each(|res| match res {
 486                    Ok(_) => {}
 487                    Err(err) => {
 488                        log::warn!("Set breakpoints request failed: {}", err);
 489                    }
 490                });
 491        })
 492    }
 493
 494    async fn get_adapter_binary(
 495        registry: &Arc<DapRegistry>,
 496        config: &DebugAdapterConfig,
 497        delegate: &DapAdapterDelegate,
 498        cx: &mut AsyncApp,
 499    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 500        let adapter = registry
 501            .adapter(&config.adapter)
 502            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 503
 504        let binary = cx.update(|cx| {
 505            ProjectSettings::get_global(cx)
 506                .dap
 507                .get(&adapter.name())
 508                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 509        })?;
 510
 511        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 512            Err(error) => {
 513                delegate.update_status(
 514                    adapter.name(),
 515                    DapStatus::Failed {
 516                        error: error.to_string(),
 517                    },
 518                );
 519
 520                return Err(error);
 521            }
 522            Ok(mut binary) => {
 523                delegate.update_status(adapter.name(), DapStatus::None);
 524
 525                let shell_env = delegate.shell_env().await;
 526                let mut envs = binary.envs.unwrap_or_default();
 527                envs.extend(shell_env);
 528                binary.envs = Some(envs);
 529
 530                binary
 531            }
 532        };
 533
 534        Ok((adapter, binary))
 535    }
 536
 537    pub fn label(&self) -> String {
 538        self.config.label.clone()
 539    }
 540
 541    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 542        let adapter_id = self.adapter.name().to_string();
 543
 544        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 545    }
 546
 547    fn initialize_sequence(
 548        &self,
 549        capabilities: &Capabilities,
 550        initialized_rx: oneshot::Receiver<()>,
 551        cx: &App,
 552    ) -> Task<Result<()>> {
 553        let (mut raw, is_launch) = match &self.config.request {
 554            task::DebugRequestDisposition::UserConfigured(_) => {
 555                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 556                    debug_assert!(false, "This part of code should be unreachable in practice");
 557                    return Task::ready(Err(anyhow!(
 558                        "Expected debug config conversion to succeed"
 559                    )));
 560                };
 561                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 562                let raw = self.adapter.request_args(&raw);
 563                (raw, is_launch)
 564            }
 565            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 566                start_debugging_request_arguments.configuration.clone(),
 567                matches!(
 568                    start_debugging_request_arguments.request,
 569                    dap::StartDebuggingRequestArgumentsRequest::Launch
 570                ),
 571            ),
 572        };
 573
 574        merge_json_value_into(
 575            self.config.initialize_args.clone().unwrap_or(json!({})),
 576            &mut raw,
 577        );
 578        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 579        let launch = if is_launch {
 580            self.request(Launch { raw }, cx.background_executor().clone())
 581        } else {
 582            self.request(Attach { raw }, cx.background_executor().clone())
 583        };
 584
 585        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 586
 587        let configuration_sequence = cx.spawn({
 588            let this = self.clone();
 589            async move |cx| {
 590                initialized_rx.await?;
 591                // todo(debugger) figure out if we want to handle a breakpoint response error
 592                // This will probably consist of letting a user know that breakpoints failed to be set
 593                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 594
 595                if configuration_done_supported {
 596                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 597                } else {
 598                    Task::ready(Ok(()))
 599                }
 600                .await
 601            }
 602        });
 603
 604        cx.background_spawn(async move {
 605            futures::future::try_join(launch, configuration_sequence).await?;
 606            Ok(())
 607        })
 608    }
 609
 610    fn request<R: LocalDapCommand>(
 611        &self,
 612        request: R,
 613        executor: BackgroundExecutor,
 614    ) -> Task<Result<R::Response>>
 615    where
 616        <R::DapRequest as dap::requests::Request>::Response: 'static,
 617        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 618    {
 619        let request = Arc::new(request);
 620
 621        let request_clone = request.clone();
 622        let connection = self.client.clone();
 623        let request_task = executor.spawn(async move {
 624            let args = request_clone.to_dap();
 625            connection.request::<R::DapRequest>(args).await
 626        });
 627
 628        executor.spawn(async move {
 629            let response = request.response_from_dap(request_task.await?);
 630            response
 631        })
 632    }
 633}
 634impl From<RemoteConnection> for Mode {
 635    fn from(value: RemoteConnection) -> Self {
 636        Self::Remote(value)
 637    }
 638}
 639
 640impl Mode {
 641    fn request_dap<R: DapCommand>(
 642        &self,
 643        session_id: SessionId,
 644        request: R,
 645        cx: &mut Context<Session>,
 646    ) -> Task<Result<R::Response>>
 647    where
 648        <R::DapRequest as dap::requests::Request>::Response: 'static,
 649        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 650    {
 651        match self {
 652            Mode::Local(debug_adapter_client) => {
 653                debug_adapter_client.request(request, cx.background_executor().clone())
 654            }
 655            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 656        }
 657    }
 658}
 659
 660#[derive(Default)]
 661struct ThreadStates {
 662    global_state: Option<ThreadStatus>,
 663    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 664}
 665
 666impl ThreadStates {
 667    fn stop_all_threads(&mut self) {
 668        self.global_state = Some(ThreadStatus::Stopped);
 669        self.known_thread_states.clear();
 670    }
 671
 672    fn exit_all_threads(&mut self) {
 673        self.global_state = Some(ThreadStatus::Exited);
 674        self.known_thread_states.clear();
 675    }
 676
 677    fn continue_all_threads(&mut self) {
 678        self.global_state = Some(ThreadStatus::Running);
 679        self.known_thread_states.clear();
 680    }
 681
 682    fn stop_thread(&mut self, thread_id: ThreadId) {
 683        self.known_thread_states
 684            .insert(thread_id, ThreadStatus::Stopped);
 685    }
 686
 687    fn continue_thread(&mut self, thread_id: ThreadId) {
 688        self.known_thread_states
 689            .insert(thread_id, ThreadStatus::Running);
 690    }
 691
 692    fn process_step(&mut self, thread_id: ThreadId) {
 693        self.known_thread_states
 694            .insert(thread_id, ThreadStatus::Stepping);
 695    }
 696
 697    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 698        self.thread_state(thread_id)
 699            .unwrap_or(ThreadStatus::Running)
 700    }
 701
 702    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 703        self.known_thread_states
 704            .get(&thread_id)
 705            .copied()
 706            .or(self.global_state)
 707    }
 708
 709    fn exit_thread(&mut self, thread_id: ThreadId) {
 710        self.known_thread_states
 711            .insert(thread_id, ThreadStatus::Exited);
 712    }
 713
 714    fn any_stopped_thread(&self) -> bool {
 715        self.global_state
 716            .is_some_and(|state| state == ThreadStatus::Stopped)
 717            || self
 718                .known_thread_states
 719                .values()
 720                .any(|status| *status == ThreadStatus::Stopped)
 721    }
 722}
 723const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 724
 725#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 726pub struct OutputToken(pub usize);
 727/// Represents a current state of a single debug adapter and provides ways to mutate it.
 728pub struct Session {
 729    mode: Mode,
 730    pub(super) capabilities: Capabilities,
 731    id: SessionId,
 732    child_session_ids: HashSet<SessionId>,
 733    parent_id: Option<SessionId>,
 734    ignore_breakpoints: bool,
 735    modules: Vec<dap::Module>,
 736    loaded_sources: Vec<dap::Source>,
 737    output_token: OutputToken,
 738    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 739    threads: IndexMap<ThreadId, Thread>,
 740    thread_states: ThreadStates,
 741    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 742    stack_frames: IndexMap<StackFrameId, StackFrame>,
 743    locations: HashMap<u64, dap::LocationsResponse>,
 744    is_session_terminated: bool,
 745    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 746    _background_tasks: Vec<Task<()>>,
 747}
 748
 749trait CacheableCommand: Any + Send + Sync {
 750    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 751    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 752    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 753}
 754
 755impl<T> CacheableCommand for T
 756where
 757    T: DapCommand + PartialEq + Eq + Hash,
 758{
 759    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 760        (rhs as &dyn Any)
 761            .downcast_ref::<Self>()
 762            .map_or(false, |rhs| self == rhs)
 763    }
 764
 765    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 766        T::hash(self, &mut hasher);
 767    }
 768
 769    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 770        self
 771    }
 772}
 773
 774pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 775
 776impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 777    fn from(request: T) -> Self {
 778        Self(Arc::new(request))
 779    }
 780}
 781
 782impl PartialEq for RequestSlot {
 783    fn eq(&self, other: &Self) -> bool {
 784        self.0.dyn_eq(other.0.as_ref())
 785    }
 786}
 787
 788impl Eq for RequestSlot {}
 789
 790impl Hash for RequestSlot {
 791    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 792        self.0.dyn_hash(state);
 793        (&*self.0 as &dyn Any).type_id().hash(state)
 794    }
 795}
 796
 797#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 798pub struct CompletionsQuery {
 799    pub query: String,
 800    pub column: u64,
 801    pub line: Option<u64>,
 802    pub frame_id: Option<u64>,
 803}
 804
 805impl CompletionsQuery {
 806    pub fn new(
 807        buffer: &language::Buffer,
 808        cursor_position: language::Anchor,
 809        frame_id: Option<u64>,
 810    ) -> Self {
 811        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 812        Self {
 813            query: buffer.text(),
 814            column: column as u64,
 815            frame_id,
 816            line: Some(row as u64),
 817        }
 818    }
 819}
 820
 821pub enum SessionEvent {
 822    Modules,
 823    LoadedSources,
 824    Stopped(Option<ThreadId>),
 825    StackTrace,
 826    Variables,
 827    Threads,
 828}
 829
 830pub(crate) enum SessionStateEvent {
 831    Shutdown,
 832}
 833
 834impl EventEmitter<SessionEvent> for Session {}
 835impl EventEmitter<SessionStateEvent> for Session {}
 836
 837// local session will send breakpoint updates to DAP for all new breakpoints
 838// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 839// BreakpointStore notifies session on breakpoint changes
 840impl Session {
 841    pub(crate) fn local(
 842        breakpoint_store: Entity<BreakpointStore>,
 843        session_id: SessionId,
 844        parent_session: Option<Entity<Session>>,
 845        delegate: DapAdapterDelegate,
 846        config: DebugAdapterConfig,
 847        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 848        initialized_tx: oneshot::Sender<()>,
 849        debug_adapters: Arc<DapRegistry>,
 850        cx: &mut App,
 851    ) -> Task<Result<Entity<Self>>> {
 852        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 853
 854        cx.spawn(async move |cx| {
 855            let mode = LocalMode::new(
 856                debug_adapters,
 857                session_id,
 858                parent_session.clone(),
 859                breakpoint_store.clone(),
 860                config.clone(),
 861                delegate,
 862                message_tx,
 863                cx.clone(),
 864            )
 865            .await?;
 866
 867            cx.new(|cx| {
 868                create_local_session(
 869                    breakpoint_store,
 870                    session_id,
 871                    parent_session,
 872                    start_debugging_requests_tx,
 873                    initialized_tx,
 874                    message_rx,
 875                    mode,
 876                    cx,
 877                )
 878            })
 879        })
 880    }
 881
 882    #[cfg(any(test, feature = "test-support"))]
 883    pub(crate) fn fake(
 884        breakpoint_store: Entity<BreakpointStore>,
 885        session_id: SessionId,
 886        parent_session: Option<Entity<Session>>,
 887        delegate: DapAdapterDelegate,
 888        config: DebugAdapterConfig,
 889        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 890        initialized_tx: oneshot::Sender<()>,
 891        caps: Capabilities,
 892        fails: bool,
 893        cx: &mut App,
 894    ) -> Task<Result<Entity<Session>>> {
 895        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 896
 897        cx.spawn(async move |cx| {
 898            let mode = LocalMode::new_fake(
 899                session_id,
 900                parent_session.clone(),
 901                breakpoint_store.clone(),
 902                config.clone(),
 903                delegate,
 904                message_tx,
 905                caps,
 906                fails,
 907                cx.clone(),
 908            )
 909            .await?;
 910
 911            cx.new(|cx| {
 912                create_local_session(
 913                    breakpoint_store,
 914                    session_id,
 915                    parent_session,
 916                    start_debugging_requests_tx,
 917                    initialized_tx,
 918                    message_rx,
 919                    mode,
 920                    cx,
 921                )
 922            })
 923        })
 924    }
 925
 926    pub(crate) fn remote(
 927        session_id: SessionId,
 928        client: AnyProtoClient,
 929        upstream_project_id: u64,
 930        ignore_breakpoints: bool,
 931    ) -> Self {
 932        Self {
 933            mode: Mode::Remote(RemoteConnection {
 934                _client: client,
 935                _upstream_project_id: upstream_project_id,
 936            }),
 937            id: session_id,
 938            child_session_ids: HashSet::default(),
 939            parent_id: None,
 940            capabilities: Capabilities::default(),
 941            ignore_breakpoints,
 942            variables: Default::default(),
 943            stack_frames: Default::default(),
 944            thread_states: ThreadStates::default(),
 945            output_token: OutputToken(0),
 946            output: circular_buffer::CircularBuffer::boxed(),
 947            requests: HashMap::default(),
 948            modules: Vec::default(),
 949            loaded_sources: Vec::default(),
 950            threads: IndexMap::default(),
 951            _background_tasks: Vec::default(),
 952            locations: Default::default(),
 953            is_session_terminated: false,
 954        }
 955    }
 956
 957    pub fn session_id(&self) -> SessionId {
 958        self.id
 959    }
 960
 961    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 962        self.child_session_ids.clone()
 963    }
 964
 965    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 966        self.child_session_ids.insert(session_id);
 967    }
 968
 969    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 970        self.child_session_ids.remove(&session_id);
 971    }
 972
 973    pub fn parent_id(&self) -> Option<SessionId> {
 974        self.parent_id
 975    }
 976
 977    pub fn capabilities(&self) -> &Capabilities {
 978        &self.capabilities
 979    }
 980
 981    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 982        if let Mode::Local(local_mode) = &self.mode {
 983            Some(local_mode.config.clone())
 984        } else {
 985            None
 986        }
 987    }
 988
 989    pub fn is_terminated(&self) -> bool {
 990        self.is_session_terminated
 991    }
 992
 993    pub fn is_local(&self) -> bool {
 994        matches!(self.mode, Mode::Local(_))
 995    }
 996
 997    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 998        match &mut self.mode {
 999            Mode::Local(local_mode) => Some(local_mode),
1000            Mode::Remote(_) => None,
1001        }
1002    }
1003
1004    pub fn as_local(&self) -> Option<&LocalMode> {
1005        match &self.mode {
1006            Mode::Local(local_mode) => Some(local_mode),
1007            Mode::Remote(_) => None,
1008        }
1009    }
1010
1011    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1012        match &self.mode {
1013            Mode::Local(local_mode) => {
1014                let capabilities = local_mode.clone().request_initialization(cx);
1015
1016                cx.spawn(async move |this, cx| {
1017                    let capabilities = capabilities.await?;
1018                    this.update(cx, |session, _| {
1019                        session.capabilities = capabilities;
1020                    })?;
1021                    Ok(())
1022                })
1023            }
1024            Mode::Remote(_) => Task::ready(Err(anyhow!(
1025                "Cannot send initialize request from remote session"
1026            ))),
1027        }
1028    }
1029
1030    pub(super) fn initialize_sequence(
1031        &mut self,
1032        initialize_rx: oneshot::Receiver<()>,
1033        cx: &mut Context<Self>,
1034    ) -> Task<Result<()>> {
1035        match &self.mode {
1036            Mode::Local(local_mode) => {
1037                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1038            }
1039            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1040        }
1041    }
1042
1043    pub fn output(
1044        &self,
1045        since: OutputToken,
1046    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1047        if self.output_token.0 == 0 {
1048            return (self.output.range(0..0), OutputToken(0));
1049        };
1050
1051        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1052
1053        let clamped_events_since = events_since.clamp(0, self.output.len());
1054        (
1055            self.output
1056                .range(self.output.len() - clamped_events_since..),
1057            self.output_token,
1058        )
1059    }
1060
1061    pub fn respond_to_client(
1062        &self,
1063        request_seq: u64,
1064        success: bool,
1065        command: String,
1066        body: Option<serde_json::Value>,
1067        cx: &mut Context<Self>,
1068    ) -> Task<Result<()>> {
1069        let Some(local_session) = self.as_local().cloned() else {
1070            unreachable!("Cannot respond to remote client");
1071        };
1072
1073        cx.background_spawn(async move {
1074            local_session
1075                .client
1076                .send_message(Message::Response(Response {
1077                    body,
1078                    success,
1079                    command,
1080                    seq: request_seq + 1,
1081                    request_seq,
1082                    message: None,
1083                }))
1084                .await
1085        })
1086    }
1087
1088    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1089        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1090            self.thread_states.stop_all_threads();
1091
1092            self.invalidate_command_type::<StackTraceCommand>();
1093        }
1094
1095        // Event if we stopped all threads we still need to insert the thread_id
1096        // to our own data
1097        if let Some(thread_id) = event.thread_id {
1098            self.thread_states.stop_thread(ThreadId(thread_id));
1099
1100            self.invalidate_state(
1101                &StackTraceCommand {
1102                    thread_id,
1103                    start_frame: None,
1104                    levels: None,
1105                }
1106                .into(),
1107            );
1108        }
1109
1110        self.invalidate_generic();
1111        self.threads.clear();
1112        self.variables.clear();
1113        cx.emit(SessionEvent::Stopped(
1114            event
1115                .thread_id
1116                .map(Into::into)
1117                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1118        ));
1119        cx.notify();
1120    }
1121
1122    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1123        match *event {
1124            Events::Initialized(_) => {
1125                debug_assert!(
1126                    false,
1127                    "Initialized event should have been handled in LocalMode"
1128                );
1129            }
1130            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1131            Events::Continued(event) => {
1132                if event.all_threads_continued.unwrap_or_default() {
1133                    self.thread_states.continue_all_threads();
1134                } else {
1135                    self.thread_states
1136                        .continue_thread(ThreadId(event.thread_id));
1137                }
1138                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1139                self.invalidate_generic();
1140            }
1141            Events::Exited(_event) => {
1142                self.clear_active_debug_line(cx);
1143            }
1144            Events::Terminated(_) => {
1145                self.is_session_terminated = true;
1146                self.clear_active_debug_line(cx);
1147            }
1148            Events::Thread(event) => {
1149                let thread_id = ThreadId(event.thread_id);
1150
1151                match event.reason {
1152                    dap::ThreadEventReason::Started => {
1153                        self.thread_states.continue_thread(thread_id);
1154                    }
1155                    dap::ThreadEventReason::Exited => {
1156                        self.thread_states.exit_thread(thread_id);
1157                    }
1158                    reason => {
1159                        log::error!("Unhandled thread event reason {:?}", reason);
1160                    }
1161                }
1162                self.invalidate_state(&ThreadsCommand.into());
1163                cx.notify();
1164            }
1165            Events::Output(event) => {
1166                if event
1167                    .category
1168                    .as_ref()
1169                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1170                {
1171                    return;
1172                }
1173
1174                self.output.push_back(event);
1175                self.output_token.0 += 1;
1176                cx.notify();
1177            }
1178            Events::Breakpoint(_) => {}
1179            Events::Module(event) => {
1180                match event.reason {
1181                    dap::ModuleEventReason::New => {
1182                        self.modules.push(event.module);
1183                    }
1184                    dap::ModuleEventReason::Changed => {
1185                        if let Some(module) = self
1186                            .modules
1187                            .iter_mut()
1188                            .find(|other| event.module.id == other.id)
1189                        {
1190                            *module = event.module;
1191                        }
1192                    }
1193                    dap::ModuleEventReason::Removed => {
1194                        self.modules.retain(|other| event.module.id != other.id);
1195                    }
1196                }
1197
1198                // todo(debugger): We should only send the invalidate command to downstream clients.
1199                // self.invalidate_state(&ModulesCommand.into());
1200            }
1201            Events::LoadedSource(_) => {
1202                self.invalidate_state(&LoadedSourcesCommand.into());
1203            }
1204            Events::Capabilities(event) => {
1205                self.capabilities = self.capabilities.merge(event.capabilities);
1206                cx.notify();
1207            }
1208            Events::Memory(_) => {}
1209            Events::Process(_) => {}
1210            Events::ProgressEnd(_) => {}
1211            Events::ProgressStart(_) => {}
1212            Events::ProgressUpdate(_) => {}
1213            Events::Invalidated(_) => {}
1214            Events::Other(_) => {}
1215        }
1216    }
1217
1218    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1219    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1220        &mut self,
1221        request: T,
1222        process_result: impl FnOnce(
1223            &mut Self,
1224            Result<T::Response>,
1225            &mut Context<Self>,
1226        ) -> Option<T::Response>
1227        + 'static,
1228        cx: &mut Context<Self>,
1229    ) {
1230        const {
1231            assert!(
1232                T::CACHEABLE,
1233                "Only requests marked as cacheable should invoke `fetch`"
1234            );
1235        }
1236
1237        if !self.thread_states.any_stopped_thread()
1238            && request.type_id() != TypeId::of::<ThreadsCommand>()
1239            || self.is_session_terminated
1240        {
1241            return;
1242        }
1243
1244        let request_map = self
1245            .requests
1246            .entry(std::any::TypeId::of::<T>())
1247            .or_default();
1248
1249        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1250            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1251
1252            let task = Self::request_inner::<Arc<T>>(
1253                &self.capabilities,
1254                self.id,
1255                &self.mode,
1256                command,
1257                process_result,
1258                cx,
1259            );
1260            let task = cx
1261                .background_executor()
1262                .spawn(async move {
1263                    let _ = task.await?;
1264                    Some(())
1265                })
1266                .shared();
1267
1268            vacant.insert(task);
1269            cx.notify();
1270        }
1271    }
1272
1273    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1274        capabilities: &Capabilities,
1275        session_id: SessionId,
1276        mode: &Mode,
1277        request: T,
1278        process_result: impl FnOnce(
1279            &mut Self,
1280            Result<T::Response>,
1281            &mut Context<Self>,
1282        ) -> Option<T::Response>
1283        + 'static,
1284        cx: &mut Context<Self>,
1285    ) -> Task<Option<T::Response>> {
1286        if !T::is_supported(&capabilities) {
1287            log::warn!(
1288                "Attempted to send a DAP request that isn't supported: {:?}",
1289                request
1290            );
1291            let error = Err(anyhow::Error::msg(
1292                "Couldn't complete request because it's not supported",
1293            ));
1294            return cx.spawn(async move |this, cx| {
1295                this.update(cx, |this, cx| process_result(this, error, cx))
1296                    .log_err()
1297                    .flatten()
1298            });
1299        }
1300
1301        let request = mode.request_dap(session_id, request, cx);
1302        cx.spawn(async move |this, cx| {
1303            let result = request.await;
1304            this.update(cx, |this, cx| process_result(this, result, cx))
1305                .log_err()
1306                .flatten()
1307        })
1308    }
1309
1310    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1311        &self,
1312        request: T,
1313        process_result: impl FnOnce(
1314            &mut Self,
1315            Result<T::Response>,
1316            &mut Context<Self>,
1317        ) -> Option<T::Response>
1318        + 'static,
1319        cx: &mut Context<Self>,
1320    ) -> Task<Option<T::Response>> {
1321        Self::request_inner(
1322            &self.capabilities,
1323            self.id,
1324            &self.mode,
1325            request,
1326            process_result,
1327            cx,
1328        )
1329    }
1330
1331    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1332        self.requests.remove(&std::any::TypeId::of::<Command>());
1333    }
1334
1335    fn invalidate_generic(&mut self) {
1336        self.invalidate_command_type::<ModulesCommand>();
1337        self.invalidate_command_type::<LoadedSourcesCommand>();
1338        self.invalidate_command_type::<ThreadsCommand>();
1339    }
1340
1341    fn invalidate_state(&mut self, key: &RequestSlot) {
1342        self.requests
1343            .entry((&*key.0 as &dyn Any).type_id())
1344            .and_modify(|request_map| {
1345                request_map.remove(&key);
1346            });
1347    }
1348
1349    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1350        self.thread_states.thread_status(thread_id)
1351    }
1352
1353    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1354        self.fetch(
1355            dap_command::ThreadsCommand,
1356            |this, result, cx| {
1357                let result = result.log_err()?;
1358
1359                this.threads = result
1360                    .iter()
1361                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1362                    .collect();
1363
1364                this.invalidate_command_type::<StackTraceCommand>();
1365                cx.emit(SessionEvent::Threads);
1366                cx.notify();
1367
1368                Some(result)
1369            },
1370            cx,
1371        );
1372
1373        self.threads
1374            .values()
1375            .map(|thread| {
1376                (
1377                    thread.dap.clone(),
1378                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1379                )
1380            })
1381            .collect()
1382    }
1383
1384    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1385        self.fetch(
1386            dap_command::ModulesCommand,
1387            |this, result, cx| {
1388                let result = result.log_err()?;
1389
1390                this.modules = result.iter().cloned().collect();
1391                cx.emit(SessionEvent::Modules);
1392                cx.notify();
1393
1394                Some(result)
1395            },
1396            cx,
1397        );
1398
1399        &self.modules
1400    }
1401
1402    pub fn ignore_breakpoints(&self) -> bool {
1403        self.ignore_breakpoints
1404    }
1405
1406    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1407        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1408    }
1409
1410    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1411        if self.ignore_breakpoints == ignore {
1412            return Task::ready(());
1413        }
1414
1415        self.ignore_breakpoints = ignore;
1416
1417        if let Some(local) = self.as_local() {
1418            local.send_all_breakpoints(ignore, cx)
1419        } else {
1420            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1421            unimplemented!()
1422        }
1423    }
1424
1425    pub fn breakpoints_enabled(&self) -> bool {
1426        self.ignore_breakpoints
1427    }
1428
1429    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1430        self.fetch(
1431            dap_command::LoadedSourcesCommand,
1432            |this, result, cx| {
1433                let result = result.log_err()?;
1434                this.loaded_sources = result.iter().cloned().collect();
1435                cx.emit(SessionEvent::LoadedSources);
1436                cx.notify();
1437                Some(result)
1438            },
1439            cx,
1440        );
1441
1442        &self.loaded_sources
1443    }
1444
1445    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1446        res.log_err()?;
1447        Some(())
1448    }
1449
1450    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1451        thread_id: ThreadId,
1452    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1453    {
1454        move |this, response, cx| match response.log_err() {
1455            Some(response) => Some(response),
1456            None => {
1457                this.thread_states.stop_thread(thread_id);
1458                cx.notify();
1459                None
1460            }
1461        }
1462    }
1463
1464    fn clear_active_debug_line_response(
1465        &mut self,
1466        response: Result<()>,
1467        cx: &mut Context<Session>,
1468    ) -> Option<()> {
1469        response.log_err()?;
1470        self.clear_active_debug_line(cx);
1471        Some(())
1472    }
1473
1474    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1475        self.as_local()
1476            .expect("Message handler will only run in local mode")
1477            .breakpoint_store
1478            .update(cx, |store, cx| {
1479                store.remove_active_position(Some(self.id), cx)
1480            });
1481    }
1482
1483    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1484        self.request(
1485            PauseCommand {
1486                thread_id: thread_id.0,
1487            },
1488            Self::empty_response,
1489            cx,
1490        )
1491        .detach();
1492    }
1493
1494    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1495        self.request(
1496            RestartStackFrameCommand { stack_frame_id },
1497            Self::empty_response,
1498            cx,
1499        )
1500        .detach();
1501    }
1502
1503    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1504        if self.capabilities.supports_restart_request.unwrap_or(false) {
1505            self.request(
1506                RestartCommand {
1507                    raw: args.unwrap_or(Value::Null),
1508                },
1509                Self::empty_response,
1510                cx,
1511            )
1512            .detach();
1513        } else {
1514            self.request(
1515                DisconnectCommand {
1516                    restart: Some(false),
1517                    terminate_debuggee: Some(true),
1518                    suspend_debuggee: Some(false),
1519                },
1520                Self::empty_response,
1521                cx,
1522            )
1523            .detach();
1524        }
1525    }
1526
1527    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1528        self.is_session_terminated = true;
1529        self.thread_states.exit_all_threads();
1530        cx.notify();
1531
1532        let task = if self
1533            .capabilities
1534            .supports_terminate_request
1535            .unwrap_or_default()
1536        {
1537            self.request(
1538                TerminateCommand {
1539                    restart: Some(false),
1540                },
1541                Self::clear_active_debug_line_response,
1542                cx,
1543            )
1544        } else {
1545            self.request(
1546                DisconnectCommand {
1547                    restart: Some(false),
1548                    terminate_debuggee: Some(true),
1549                    suspend_debuggee: Some(false),
1550                },
1551                Self::clear_active_debug_line_response,
1552                cx,
1553            )
1554        };
1555
1556        cx.emit(SessionStateEvent::Shutdown);
1557
1558        cx.background_spawn(async move {
1559            let _ = task.await;
1560        })
1561    }
1562
1563    pub fn completions(
1564        &mut self,
1565        query: CompletionsQuery,
1566        cx: &mut Context<Self>,
1567    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1568        let task = self.request(query, |_, result, _| result.log_err(), cx);
1569
1570        cx.background_executor().spawn(async move {
1571            anyhow::Ok(
1572                task.await
1573                    .map(|response| response.targets)
1574                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1575            )
1576        })
1577    }
1578
1579    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1580        self.thread_states.continue_thread(thread_id);
1581        self.request(
1582            ContinueCommand {
1583                args: ContinueArguments {
1584                    thread_id: thread_id.0,
1585                    single_thread: Some(true),
1586                },
1587            },
1588            Self::on_step_response::<ContinueCommand>(thread_id),
1589            cx,
1590        )
1591        .detach();
1592    }
1593
1594    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1595        match self.mode {
1596            Mode::Local(ref local) => Some(local.client.clone()),
1597            Mode::Remote(_) => None,
1598        }
1599    }
1600
1601    pub fn step_over(
1602        &mut self,
1603        thread_id: ThreadId,
1604        granularity: SteppingGranularity,
1605        cx: &mut Context<Self>,
1606    ) {
1607        let supports_single_thread_execution_requests =
1608            self.capabilities.supports_single_thread_execution_requests;
1609        let supports_stepping_granularity = self
1610            .capabilities
1611            .supports_stepping_granularity
1612            .unwrap_or_default();
1613
1614        let command = NextCommand {
1615            inner: StepCommand {
1616                thread_id: thread_id.0,
1617                granularity: supports_stepping_granularity.then(|| granularity),
1618                single_thread: supports_single_thread_execution_requests,
1619            },
1620        };
1621
1622        self.thread_states.process_step(thread_id);
1623        self.request(
1624            command,
1625            Self::on_step_response::<NextCommand>(thread_id),
1626            cx,
1627        )
1628        .detach();
1629    }
1630
1631    pub fn step_in(
1632        &mut self,
1633        thread_id: ThreadId,
1634        granularity: SteppingGranularity,
1635        cx: &mut Context<Self>,
1636    ) {
1637        let supports_single_thread_execution_requests =
1638            self.capabilities.supports_single_thread_execution_requests;
1639        let supports_stepping_granularity = self
1640            .capabilities
1641            .supports_stepping_granularity
1642            .unwrap_or_default();
1643
1644        let command = StepInCommand {
1645            inner: StepCommand {
1646                thread_id: thread_id.0,
1647                granularity: supports_stepping_granularity.then(|| granularity),
1648                single_thread: supports_single_thread_execution_requests,
1649            },
1650        };
1651
1652        self.thread_states.process_step(thread_id);
1653        self.request(
1654            command,
1655            Self::on_step_response::<StepInCommand>(thread_id),
1656            cx,
1657        )
1658        .detach();
1659    }
1660
1661    pub fn step_out(
1662        &mut self,
1663        thread_id: ThreadId,
1664        granularity: SteppingGranularity,
1665        cx: &mut Context<Self>,
1666    ) {
1667        let supports_single_thread_execution_requests =
1668            self.capabilities.supports_single_thread_execution_requests;
1669        let supports_stepping_granularity = self
1670            .capabilities
1671            .supports_stepping_granularity
1672            .unwrap_or_default();
1673
1674        let command = StepOutCommand {
1675            inner: StepCommand {
1676                thread_id: thread_id.0,
1677                granularity: supports_stepping_granularity.then(|| granularity),
1678                single_thread: supports_single_thread_execution_requests,
1679            },
1680        };
1681
1682        self.thread_states.process_step(thread_id);
1683        self.request(
1684            command,
1685            Self::on_step_response::<StepOutCommand>(thread_id),
1686            cx,
1687        )
1688        .detach();
1689    }
1690
1691    pub fn step_back(
1692        &mut self,
1693        thread_id: ThreadId,
1694        granularity: SteppingGranularity,
1695        cx: &mut Context<Self>,
1696    ) {
1697        let supports_single_thread_execution_requests =
1698            self.capabilities.supports_single_thread_execution_requests;
1699        let supports_stepping_granularity = self
1700            .capabilities
1701            .supports_stepping_granularity
1702            .unwrap_or_default();
1703
1704        let command = StepBackCommand {
1705            inner: StepCommand {
1706                thread_id: thread_id.0,
1707                granularity: supports_stepping_granularity.then(|| granularity),
1708                single_thread: supports_single_thread_execution_requests,
1709            },
1710        };
1711
1712        self.thread_states.process_step(thread_id);
1713
1714        self.request(
1715            command,
1716            Self::on_step_response::<StepBackCommand>(thread_id),
1717            cx,
1718        )
1719        .detach();
1720    }
1721
1722    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1723        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1724            && self.requests.contains_key(&ThreadsCommand.type_id())
1725            && self.threads.contains_key(&thread_id)
1726        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1727        // We could still be using an old thread id and have sent a new thread's request
1728        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1729        // But it very well could cause a minor bug in the future that is hard to track down
1730        {
1731            self.fetch(
1732                super::dap_command::StackTraceCommand {
1733                    thread_id: thread_id.0,
1734                    start_frame: None,
1735                    levels: None,
1736                },
1737                move |this, stack_frames, cx| {
1738                    let stack_frames = stack_frames.log_err()?;
1739
1740                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1741                        thread.stack_frame_ids =
1742                            stack_frames.iter().map(|frame| frame.id).collect();
1743                    });
1744                    debug_assert!(
1745                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1746                        "Sent request for thread_id that doesn't exist"
1747                    );
1748
1749                    this.stack_frames.extend(
1750                        stack_frames
1751                            .iter()
1752                            .cloned()
1753                            .map(|frame| (frame.id, StackFrame::from(frame))),
1754                    );
1755
1756                    this.invalidate_command_type::<ScopesCommand>();
1757                    this.invalidate_command_type::<VariablesCommand>();
1758
1759                    cx.emit(SessionEvent::StackTrace);
1760                    cx.notify();
1761                    Some(stack_frames)
1762                },
1763                cx,
1764            );
1765        }
1766
1767        self.threads
1768            .get(&thread_id)
1769            .map(|thread| {
1770                thread
1771                    .stack_frame_ids
1772                    .iter()
1773                    .filter_map(|id| self.stack_frames.get(id))
1774                    .cloned()
1775                    .collect()
1776            })
1777            .unwrap_or_default()
1778    }
1779
1780    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1781        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1782            && self
1783                .requests
1784                .contains_key(&TypeId::of::<StackTraceCommand>())
1785        {
1786            self.fetch(
1787                ScopesCommand { stack_frame_id },
1788                move |this, scopes, cx| {
1789                    let scopes = scopes.log_err()?;
1790
1791                    for scope in scopes .iter(){
1792                        this.variables(scope.variables_reference, cx);
1793                    }
1794
1795                    let entry = this
1796                        .stack_frames
1797                        .entry(stack_frame_id)
1798                        .and_modify(|stack_frame| {
1799                            stack_frame.scopes = scopes.clone();
1800                        });
1801
1802                    cx.emit(SessionEvent::Variables);
1803
1804                    debug_assert!(
1805                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1806                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1807                    );
1808
1809                    Some(scopes)
1810                },
1811                cx,
1812            );
1813        }
1814
1815        self.stack_frames
1816            .get(&stack_frame_id)
1817            .map(|frame| frame.scopes.as_slice())
1818            .unwrap_or_default()
1819    }
1820
1821    pub fn variables(
1822        &mut self,
1823        variables_reference: VariableReference,
1824        cx: &mut Context<Self>,
1825    ) -> Vec<dap::Variable> {
1826        let command = VariablesCommand {
1827            variables_reference,
1828            filter: None,
1829            start: None,
1830            count: None,
1831            format: None,
1832        };
1833
1834        self.fetch(
1835            command,
1836            move |this, variables, cx| {
1837                let variables = variables.log_err()?;
1838                this.variables
1839                    .insert(variables_reference, variables.clone());
1840
1841                cx.emit(SessionEvent::Variables);
1842                Some(variables)
1843            },
1844            cx,
1845        );
1846
1847        self.variables
1848            .get(&variables_reference)
1849            .cloned()
1850            .unwrap_or_default()
1851    }
1852
1853    pub fn set_variable_value(
1854        &mut self,
1855        variables_reference: u64,
1856        name: String,
1857        value: String,
1858        cx: &mut Context<Self>,
1859    ) {
1860        if self.capabilities.supports_set_variable.unwrap_or_default() {
1861            self.request(
1862                SetVariableValueCommand {
1863                    name,
1864                    value,
1865                    variables_reference,
1866                },
1867                move |this, response, cx| {
1868                    let response = response.log_err()?;
1869                    this.invalidate_command_type::<VariablesCommand>();
1870                    cx.notify();
1871                    Some(response)
1872                },
1873                cx,
1874            )
1875            .detach()
1876        }
1877    }
1878
1879    pub fn evaluate(
1880        &mut self,
1881        expression: String,
1882        context: Option<EvaluateArgumentsContext>,
1883        frame_id: Option<u64>,
1884        source: Option<Source>,
1885        cx: &mut Context<Self>,
1886    ) {
1887        self.request(
1888            EvaluateCommand {
1889                expression,
1890                context,
1891                frame_id,
1892                source,
1893            },
1894            |this, response, cx| {
1895                let response = response.log_err()?;
1896                this.output_token.0 += 1;
1897                this.output.push_back(dap::OutputEvent {
1898                    category: None,
1899                    output: response.result.clone(),
1900                    group: None,
1901                    variables_reference: Some(response.variables_reference),
1902                    source: None,
1903                    line: None,
1904                    column: None,
1905                    data: None,
1906                    location_reference: None,
1907                });
1908
1909                this.invalidate_command_type::<ScopesCommand>();
1910                cx.notify();
1911                Some(response)
1912            },
1913            cx,
1914        )
1915        .detach();
1916    }
1917
1918    pub fn location(
1919        &mut self,
1920        reference: u64,
1921        cx: &mut Context<Self>,
1922    ) -> Option<dap::LocationsResponse> {
1923        self.fetch(
1924            LocationsCommand { reference },
1925            move |this, response, _| {
1926                let response = response.log_err()?;
1927                this.locations.insert(reference, response.clone());
1928                Some(response)
1929            },
1930            cx,
1931        );
1932        self.locations.get(&reference).cloned()
1933    }
1934    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1935        let command = DisconnectCommand {
1936            restart: Some(false),
1937            terminate_debuggee: Some(true),
1938            suspend_debuggee: Some(false),
1939        };
1940
1941        self.request(command, Self::empty_response, cx).detach()
1942    }
1943
1944    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1945        if self
1946            .capabilities
1947            .supports_terminate_threads_request
1948            .unwrap_or_default()
1949        {
1950            self.request(
1951                TerminateThreadsCommand {
1952                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1953                },
1954                Self::clear_active_debug_line_response,
1955                cx,
1956            )
1957            .detach();
1958        } else {
1959            self.shutdown(cx).detach();
1960        }
1961    }
1962}
1963
1964fn create_local_session(
1965    breakpoint_store: Entity<BreakpointStore>,
1966    session_id: SessionId,
1967    parent_session: Option<Entity<Session>>,
1968    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1969    initialized_tx: oneshot::Sender<()>,
1970    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1971    mode: LocalMode,
1972    cx: &mut Context<Session>,
1973) -> Session {
1974    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1975        let mut initialized_tx = Some(initialized_tx);
1976        while let Some(message) = message_rx.next().await {
1977            if let Message::Event(event) = message {
1978                if let Events::Initialized(_) = *event {
1979                    if let Some(tx) = initialized_tx.take() {
1980                        tx.send(()).ok();
1981                    }
1982                } else {
1983                    let Ok(_) = this.update(cx, |session, cx| {
1984                        session.handle_dap_event(event, cx);
1985                    }) else {
1986                        break;
1987                    };
1988                }
1989            } else {
1990                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1991                else {
1992                    break;
1993                };
1994            }
1995        }
1996    })];
1997
1998    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1999        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2000            if let Some(local) = (!this.ignore_breakpoints)
2001                .then(|| this.as_local_mut())
2002                .flatten()
2003            {
2004                local
2005                    .send_breakpoints_from_path(path.clone(), *reason, cx)
2006                    .detach();
2007            };
2008        }
2009        BreakpointStoreEvent::BreakpointsCleared(paths) => {
2010            if let Some(local) = (!this.ignore_breakpoints)
2011                .then(|| this.as_local_mut())
2012                .flatten()
2013            {
2014                local.unset_breakpoints_from_paths(paths, cx).detach();
2015            }
2016        }
2017        BreakpointStoreEvent::ActiveDebugLineChanged => {}
2018    })
2019    .detach();
2020
2021    Session {
2022        mode: Mode::Local(mode),
2023        id: session_id,
2024        child_session_ids: HashSet::default(),
2025        parent_id: parent_session.map(|session| session.read(cx).id),
2026        variables: Default::default(),
2027        capabilities: Capabilities::default(),
2028        thread_states: ThreadStates::default(),
2029        output_token: OutputToken(0),
2030        ignore_breakpoints: false,
2031        output: circular_buffer::CircularBuffer::boxed(),
2032        requests: HashMap::default(),
2033        modules: Vec::default(),
2034        loaded_sources: Vec::default(),
2035        threads: IndexMap::default(),
2036        stack_frames: IndexMap::default(),
2037        locations: Default::default(),
2038        _background_tasks,
2039        is_session_terminated: false,
2040    }
2041}