session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    adapters::{DapDelegate, DapStatus},
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
  25use futures::channel::oneshot;
  26use futures::{FutureExt, future::Shared};
  27use gpui::{
  28    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  29};
  30use rpc::AnyProtoClient;
  31use serde_json::{Value, json};
  32use settings::Settings;
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::path::PathBuf;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::{DebugAdapterConfig, DebugTaskDefinition};
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{ResultExt, merge_json_value_into};
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117type UpstreamProjectId = u64;
 118
 119struct RemoteConnection {
 120    _client: AnyProtoClient,
 121    _upstream_project_id: UpstreamProjectId,
 122}
 123
 124impl RemoteConnection {
 125    fn send_proto_client_request<R: DapCommand>(
 126        &self,
 127        _request: R,
 128        _session_id: SessionId,
 129        cx: &mut App,
 130    ) -> Task<Result<R::Response>> {
 131        // let message = request.to_proto(session_id, self.upstream_project_id);
 132        // let upstream_client = self.client.clone();
 133        cx.background_executor().spawn(async move {
 134            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 135            // let response = upstream_client.request(message).await?;
 136            // request.response_from_proto(response)
 137            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 138        })
 139    }
 140
 141    fn request<R: DapCommand>(
 142        &self,
 143        request: R,
 144        session_id: SessionId,
 145        cx: &mut App,
 146    ) -> Task<Result<R::Response>>
 147    where
 148        <R::DapRequest as dap::requests::Request>::Response: 'static,
 149        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 150    {
 151        return self.send_proto_client_request::<R>(request, session_id, cx);
 152    }
 153}
 154
 155enum Mode {
 156    Local(LocalMode),
 157    Remote(RemoteConnection),
 158}
 159
 160#[derive(Clone)]
 161pub struct LocalMode {
 162    client: Arc<DebugAdapterClient>,
 163    config: DebugAdapterConfig,
 164    adapter: Arc<dyn DebugAdapter>,
 165    breakpoint_store: Entity<BreakpointStore>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        debug_adapters: Arc<DapRegistry>,
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        Self::new_inner(
 195            debug_adapters,
 196            session_id,
 197            parent_session,
 198            breakpoint_store,
 199            config,
 200            delegate,
 201            messages_tx,
 202            async |_, _| {},
 203            cx,
 204        )
 205    }
 206    #[cfg(any(test, feature = "test-support"))]
 207    fn new_fake(
 208        session_id: SessionId,
 209        parent_session: Option<Entity<Session>>,
 210        breakpoint_store: Entity<BreakpointStore>,
 211        config: DebugAdapterConfig,
 212        delegate: DapAdapterDelegate,
 213        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 214        caps: Capabilities,
 215        fail: bool,
 216        cx: AsyncApp,
 217    ) -> Task<Result<(Self, Capabilities)>> {
 218        use task::DebugRequestDisposition;
 219
 220        let request = match config.request.clone() {
 221            DebugRequestDisposition::UserConfigured(request) => request,
 222            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 223                match reverse_request_args.request {
 224                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 225                        DebugRequestType::Launch(task::LaunchConfig {
 226                            program: "".to_owned(),
 227                            cwd: None,
 228                        })
 229                    }
 230                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 231                        DebugRequestType::Attach(task::AttachConfig {
 232                            process_id: Some(0),
 233                        })
 234                    }
 235                }
 236            }
 237        };
 238
 239        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 240            session
 241                .client
 242                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 243                .await;
 244
 245            let paths = cx
 246                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 247                .expect("Breakpoint store should exist in all tests that start debuggers");
 248
 249            session
 250                .client
 251                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 252                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 253                    if !paths.contains(&p) {
 254                        panic!("Sent breakpoints for path without any")
 255                    }
 256
 257                    Ok(dap::SetBreakpointsResponse {
 258                        breakpoints: Vec::default(),
 259                    })
 260                })
 261                .await;
 262
 263            match request {
 264                dap::DebugRequestType::Launch(_) => {
 265                    if fail {
 266                        session
 267                            .client
 268                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 269                                Err(dap::ErrorResponse {
 270                                    error: Some(dap::Message {
 271                                        id: 1,
 272                                        format: "error".into(),
 273                                        variables: None,
 274                                        send_telemetry: None,
 275                                        show_user: None,
 276                                        url: None,
 277                                        url_label: None,
 278                                    }),
 279                                })
 280                            })
 281                            .await;
 282                    } else {
 283                        session
 284                            .client
 285                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 286                            .await;
 287                    }
 288                }
 289                dap::DebugRequestType::Attach(attach_config) => {
 290                    if fail {
 291                        session
 292                            .client
 293                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 294                                Err(dap::ErrorResponse {
 295                                    error: Some(dap::Message {
 296                                        id: 1,
 297                                        format: "error".into(),
 298                                        variables: None,
 299                                        send_telemetry: None,
 300                                        show_user: None,
 301                                        url: None,
 302                                        url_label: None,
 303                                    }),
 304                                })
 305                            })
 306                            .await;
 307                    } else {
 308                        session
 309                            .client
 310                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 311                                assert_eq!(
 312                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 313                                    args.raw
 314                                );
 315
 316                                Ok(())
 317                            })
 318                            .await;
 319                    }
 320                }
 321            }
 322
 323            session
 324                .client
 325                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 326                .await;
 327            session.client.fake_event(Events::Initialized(None)).await;
 328        };
 329        Self::new_inner(
 330            DapRegistry::fake().into(),
 331            session_id,
 332            parent_session,
 333            breakpoint_store,
 334            config,
 335            delegate,
 336            messages_tx,
 337            callback,
 338            cx,
 339        )
 340    }
 341    fn new_inner(
 342        registry: Arc<DapRegistry>,
 343        session_id: SessionId,
 344        parent_session: Option<Entity<Session>>,
 345        breakpoint_store: Entity<BreakpointStore>,
 346        config: DebugAdapterConfig,
 347        delegate: DapAdapterDelegate,
 348        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 349        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 350        cx: AsyncApp,
 351    ) -> Task<Result<(Self, Capabilities)>> {
 352        cx.spawn(async move |cx| {
 353            let (adapter, binary) =
 354                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 355
 356            let message_handler = Box::new(move |message| {
 357                messages_tx.unbounded_send(message).ok();
 358            });
 359
 360            let client = Arc::new(
 361                if let Some(client) = parent_session
 362                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 363                    .flatten()
 364                {
 365                    client
 366                        .reconnect(session_id, binary, message_handler, cx.clone())
 367                        .await?
 368                } else {
 369                    DebugAdapterClient::start(
 370                        session_id,
 371                        adapter.name(),
 372                        binary,
 373                        message_handler,
 374                        cx.clone(),
 375                    )
 376                    .await?
 377                },
 378            );
 379
 380            let adapter_id = adapter.name().to_string().to_owned();
 381            let mut session = Self {
 382                client,
 383                adapter,
 384                breakpoint_store,
 385                config: config.clone(),
 386            };
 387
 388            on_initialized(&mut session, cx.clone()).await;
 389            let capabilities = session
 390                .request(Initialize { adapter_id }, cx.background_executor().clone())
 391                .await?;
 392
 393            Ok((session, capabilities))
 394        })
 395    }
 396
 397    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 398        let tasks: Vec<_> = paths
 399            .into_iter()
 400            .map(|path| {
 401                self.request(
 402                    dap_command::SetBreakpoints {
 403                        source: client_source(path),
 404                        source_modified: None,
 405                        breakpoints: vec![],
 406                    },
 407                    cx.background_executor().clone(),
 408                )
 409            })
 410            .collect();
 411
 412        cx.background_spawn(async move {
 413            futures::future::join_all(tasks)
 414                .await
 415                .iter()
 416                .for_each(|res| match res {
 417                    Ok(_) => {}
 418                    Err(err) => {
 419                        log::warn!("Set breakpoints request failed: {}", err);
 420                    }
 421                });
 422        })
 423    }
 424
 425    fn send_breakpoints_from_path(
 426        &self,
 427        abs_path: Arc<Path>,
 428        reason: BreakpointUpdatedReason,
 429        cx: &mut App,
 430    ) -> Task<()> {
 431        let breakpoints = self
 432            .breakpoint_store
 433            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 434            .into_iter()
 435            .filter(|bp| bp.state.is_enabled())
 436            .map(Into::into)
 437            .collect();
 438
 439        let task = self.request(
 440            dap_command::SetBreakpoints {
 441                source: client_source(&abs_path),
 442                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 443                breakpoints,
 444            },
 445            cx.background_executor().clone(),
 446        );
 447
 448        cx.background_spawn(async move {
 449            match task.await {
 450                Ok(_) => {}
 451                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 452            }
 453        })
 454    }
 455
 456    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 457        let mut breakpoint_tasks = Vec::new();
 458        let breakpoints = self
 459            .breakpoint_store
 460            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 461
 462        for (path, breakpoints) in breakpoints {
 463            let breakpoints = if ignore_breakpoints {
 464                vec![]
 465            } else {
 466                breakpoints
 467                    .into_iter()
 468                    .filter(|bp| bp.state.is_enabled())
 469                    .map(Into::into)
 470                    .collect()
 471            };
 472
 473            breakpoint_tasks.push(self.request(
 474                dap_command::SetBreakpoints {
 475                    source: client_source(&path),
 476                    source_modified: Some(false),
 477                    breakpoints,
 478                },
 479                cx.background_executor().clone(),
 480            ));
 481        }
 482
 483        cx.background_spawn(async move {
 484            futures::future::join_all(breakpoint_tasks)
 485                .await
 486                .iter()
 487                .for_each(|res| match res {
 488                    Ok(_) => {}
 489                    Err(err) => {
 490                        log::warn!("Set breakpoints request failed: {}", err);
 491                    }
 492                });
 493        })
 494    }
 495
 496    async fn get_adapter_binary(
 497        registry: &Arc<DapRegistry>,
 498        config: &DebugAdapterConfig,
 499        delegate: &DapAdapterDelegate,
 500        cx: &mut AsyncApp,
 501    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 502        let adapter = registry
 503            .adapter(&config.adapter)
 504            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 505
 506        let binary = cx.update(|cx| {
 507            ProjectSettings::get_global(cx)
 508                .dap
 509                .get(&adapter.name())
 510                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 511        })?;
 512
 513        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 514            Err(error) => {
 515                delegate.update_status(
 516                    adapter.name(),
 517                    DapStatus::Failed {
 518                        error: error.to_string(),
 519                    },
 520                );
 521
 522                return Err(error);
 523            }
 524            Ok(mut binary) => {
 525                delegate.update_status(adapter.name(), DapStatus::None);
 526
 527                let shell_env = delegate.shell_env().await;
 528                let mut envs = binary.envs.unwrap_or_default();
 529                envs.extend(shell_env);
 530                binary.envs = Some(envs);
 531
 532                binary
 533            }
 534        };
 535
 536        Ok((adapter, binary))
 537    }
 538
 539    pub fn initialize_sequence(
 540        &self,
 541        capabilities: &Capabilities,
 542        initialized_rx: oneshot::Receiver<()>,
 543        cx: &App,
 544    ) -> Task<Result<()>> {
 545        let (mut raw, is_launch) = match &self.config.request {
 546            task::DebugRequestDisposition::UserConfigured(_) => {
 547                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 548                    debug_assert!(false, "This part of code should be unreachable in practice");
 549                    return Task::ready(Err(anyhow!(
 550                        "Expected debug config conversion to succeed"
 551                    )));
 552                };
 553                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 554                let raw = self.adapter.request_args(&raw);
 555                (raw, is_launch)
 556            }
 557            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 558                start_debugging_request_arguments.configuration.clone(),
 559                matches!(
 560                    start_debugging_request_arguments.request,
 561                    dap::StartDebuggingRequestArgumentsRequest::Launch
 562                ),
 563            ),
 564        };
 565
 566        merge_json_value_into(
 567            self.config.initialize_args.clone().unwrap_or(json!({})),
 568            &mut raw,
 569        );
 570        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 571        let launch = if is_launch {
 572            self.request(Launch { raw }, cx.background_executor().clone())
 573        } else {
 574            self.request(Attach { raw }, cx.background_executor().clone())
 575        };
 576
 577        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 578
 579        let configuration_sequence = cx.spawn({
 580            let this = self.clone();
 581            async move |cx| {
 582                initialized_rx.await?;
 583                // todo(debugger) figure out if we want to handle a breakpoint response error
 584                // This will probably consist of letting a user know that breakpoints failed to be set
 585                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 586
 587                if configuration_done_supported {
 588                    this.request(ConfigurationDone, cx.background_executor().clone())
 589                } else {
 590                    Task::ready(Ok(()))
 591                }
 592                .await
 593            }
 594        });
 595
 596        cx.background_spawn(async move {
 597            futures::future::try_join(launch, configuration_sequence).await?;
 598            Ok(())
 599        })
 600    }
 601
 602    fn request<R: LocalDapCommand>(
 603        &self,
 604        request: R,
 605        executor: BackgroundExecutor,
 606    ) -> Task<Result<R::Response>>
 607    where
 608        <R::DapRequest as dap::requests::Request>::Response: 'static,
 609        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 610    {
 611        let request = Arc::new(request);
 612
 613        let request_clone = request.clone();
 614        let connection = self.client.clone();
 615        let request_task = executor.spawn(async move {
 616            let args = request_clone.to_dap();
 617            connection.request::<R::DapRequest>(args).await
 618        });
 619
 620        executor.spawn(async move {
 621            let response = request.response_from_dap(request_task.await?);
 622            response
 623        })
 624    }
 625}
 626impl From<RemoteConnection> for Mode {
 627    fn from(value: RemoteConnection) -> Self {
 628        Self::Remote(value)
 629    }
 630}
 631
 632impl Mode {
 633    fn request_dap<R: DapCommand>(
 634        &self,
 635        session_id: SessionId,
 636        request: R,
 637        cx: &mut Context<Session>,
 638    ) -> Task<Result<R::Response>>
 639    where
 640        <R::DapRequest as dap::requests::Request>::Response: 'static,
 641        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 642    {
 643        match self {
 644            Mode::Local(debug_adapter_client) => {
 645                debug_adapter_client.request(request, cx.background_executor().clone())
 646            }
 647            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 648        }
 649    }
 650}
 651
 652#[derive(Default)]
 653struct ThreadStates {
 654    global_state: Option<ThreadStatus>,
 655    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 656}
 657
 658impl ThreadStates {
 659    fn stop_all_threads(&mut self) {
 660        self.global_state = Some(ThreadStatus::Stopped);
 661        self.known_thread_states.clear();
 662    }
 663
 664    fn exit_all_threads(&mut self) {
 665        self.global_state = Some(ThreadStatus::Exited);
 666        self.known_thread_states.clear();
 667    }
 668
 669    fn continue_all_threads(&mut self) {
 670        self.global_state = Some(ThreadStatus::Running);
 671        self.known_thread_states.clear();
 672    }
 673
 674    fn stop_thread(&mut self, thread_id: ThreadId) {
 675        self.known_thread_states
 676            .insert(thread_id, ThreadStatus::Stopped);
 677    }
 678
 679    fn continue_thread(&mut self, thread_id: ThreadId) {
 680        self.known_thread_states
 681            .insert(thread_id, ThreadStatus::Running);
 682    }
 683
 684    fn process_step(&mut self, thread_id: ThreadId) {
 685        self.known_thread_states
 686            .insert(thread_id, ThreadStatus::Stepping);
 687    }
 688
 689    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 690        self.thread_state(thread_id)
 691            .unwrap_or(ThreadStatus::Running)
 692    }
 693
 694    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 695        self.known_thread_states
 696            .get(&thread_id)
 697            .copied()
 698            .or(self.global_state)
 699    }
 700
 701    fn exit_thread(&mut self, thread_id: ThreadId) {
 702        self.known_thread_states
 703            .insert(thread_id, ThreadStatus::Exited);
 704    }
 705
 706    fn any_stopped_thread(&self) -> bool {
 707        self.global_state
 708            .is_some_and(|state| state == ThreadStatus::Stopped)
 709            || self
 710                .known_thread_states
 711                .values()
 712                .any(|status| *status == ThreadStatus::Stopped)
 713    }
 714}
 715const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 716
 717#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 718pub struct OutputToken(pub usize);
 719/// Represents a current state of a single debug adapter and provides ways to mutate it.
 720pub struct Session {
 721    mode: Mode,
 722    pub(super) capabilities: Capabilities,
 723    id: SessionId,
 724    child_session_ids: HashSet<SessionId>,
 725    parent_id: Option<SessionId>,
 726    ignore_breakpoints: bool,
 727    modules: Vec<dap::Module>,
 728    loaded_sources: Vec<dap::Source>,
 729    output_token: OutputToken,
 730    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 731    threads: IndexMap<ThreadId, Thread>,
 732    thread_states: ThreadStates,
 733    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 734    stack_frames: IndexMap<StackFrameId, StackFrame>,
 735    locations: HashMap<u64, dap::LocationsResponse>,
 736    is_session_terminated: bool,
 737    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 738    _background_tasks: Vec<Task<()>>,
 739}
 740
 741trait CacheableCommand: 'static + Send + Sync {
 742    fn as_any(&self) -> &dyn Any;
 743    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 744    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 745    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 746}
 747
 748impl<T> CacheableCommand for T
 749where
 750    T: DapCommand + PartialEq + Eq + Hash,
 751{
 752    fn as_any(&self) -> &dyn Any {
 753        self
 754    }
 755
 756    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 757        rhs.as_any()
 758            .downcast_ref::<Self>()
 759            .map_or(false, |rhs| self == rhs)
 760    }
 761
 762    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 763        T::hash(self, &mut hasher);
 764    }
 765
 766    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 767        self
 768    }
 769}
 770
 771pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 772
 773impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 774    fn from(request: T) -> Self {
 775        Self(Arc::new(request))
 776    }
 777}
 778
 779impl PartialEq for RequestSlot {
 780    fn eq(&self, other: &Self) -> bool {
 781        self.0.dyn_eq(other.0.as_ref())
 782    }
 783}
 784
 785impl Eq for RequestSlot {}
 786
 787impl Hash for RequestSlot {
 788    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 789        self.0.dyn_hash(state);
 790        self.0.as_any().type_id().hash(state)
 791    }
 792}
 793
 794#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 795pub struct CompletionsQuery {
 796    pub query: String,
 797    pub column: u64,
 798    pub line: Option<u64>,
 799    pub frame_id: Option<u64>,
 800}
 801
 802impl CompletionsQuery {
 803    pub fn new(
 804        buffer: &language::Buffer,
 805        cursor_position: language::Anchor,
 806        frame_id: Option<u64>,
 807    ) -> Self {
 808        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 809        Self {
 810            query: buffer.text(),
 811            column: column as u64,
 812            frame_id,
 813            line: Some(row as u64),
 814        }
 815    }
 816}
 817
 818pub enum SessionEvent {
 819    Modules,
 820    LoadedSources,
 821    Stopped(Option<ThreadId>),
 822    StackTrace,
 823    Variables,
 824    Threads,
 825}
 826
 827impl EventEmitter<SessionEvent> for Session {}
 828
 829// local session will send breakpoint updates to DAP for all new breakpoints
 830// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 831// BreakpointStore notifies session on breakpoint changes
 832impl Session {
 833    pub(crate) fn local(
 834        breakpoint_store: Entity<BreakpointStore>,
 835        session_id: SessionId,
 836        parent_session: Option<Entity<Session>>,
 837        delegate: DapAdapterDelegate,
 838        config: DebugAdapterConfig,
 839        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 840        initialized_tx: oneshot::Sender<()>,
 841        debug_adapters: Arc<DapRegistry>,
 842        cx: &mut App,
 843    ) -> Task<Result<Entity<Self>>> {
 844        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 845
 846        cx.spawn(async move |cx| {
 847            let (mode, capabilities) = LocalMode::new(
 848                debug_adapters,
 849                session_id,
 850                parent_session.clone(),
 851                breakpoint_store.clone(),
 852                config.clone(),
 853                delegate,
 854                message_tx,
 855                cx.clone(),
 856            )
 857            .await?;
 858
 859            cx.new(|cx| {
 860                create_local_session(
 861                    breakpoint_store,
 862                    session_id,
 863                    parent_session,
 864                    start_debugging_requests_tx,
 865                    initialized_tx,
 866                    message_rx,
 867                    mode,
 868                    capabilities,
 869                    cx,
 870                )
 871            })
 872        })
 873    }
 874
 875    #[cfg(any(test, feature = "test-support"))]
 876    pub(crate) fn fake(
 877        breakpoint_store: Entity<BreakpointStore>,
 878        session_id: SessionId,
 879        parent_session: Option<Entity<Session>>,
 880        delegate: DapAdapterDelegate,
 881        config: DebugAdapterConfig,
 882        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 883        initialized_tx: oneshot::Sender<()>,
 884        caps: Capabilities,
 885        fails: bool,
 886        cx: &mut App,
 887    ) -> Task<Result<Entity<Session>>> {
 888        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 889
 890        cx.spawn(async move |cx| {
 891            let (mode, capabilities) = LocalMode::new_fake(
 892                session_id,
 893                parent_session.clone(),
 894                breakpoint_store.clone(),
 895                config.clone(),
 896                delegate,
 897                message_tx,
 898                caps,
 899                fails,
 900                cx.clone(),
 901            )
 902            .await?;
 903
 904            cx.new(|cx| {
 905                create_local_session(
 906                    breakpoint_store,
 907                    session_id,
 908                    parent_session,
 909                    start_debugging_requests_tx,
 910                    initialized_tx,
 911                    message_rx,
 912                    mode,
 913                    capabilities,
 914                    cx,
 915                )
 916            })
 917        })
 918    }
 919
 920    pub(crate) fn remote(
 921        session_id: SessionId,
 922        client: AnyProtoClient,
 923        upstream_project_id: u64,
 924        ignore_breakpoints: bool,
 925    ) -> Self {
 926        Self {
 927            mode: Mode::Remote(RemoteConnection {
 928                _client: client,
 929                _upstream_project_id: upstream_project_id,
 930            }),
 931            id: session_id,
 932            child_session_ids: HashSet::default(),
 933            parent_id: None,
 934            capabilities: Capabilities::default(),
 935            ignore_breakpoints,
 936            variables: Default::default(),
 937            stack_frames: Default::default(),
 938            thread_states: ThreadStates::default(),
 939            output_token: OutputToken(0),
 940            output: circular_buffer::CircularBuffer::boxed(),
 941            requests: HashMap::default(),
 942            modules: Vec::default(),
 943            loaded_sources: Vec::default(),
 944            threads: IndexMap::default(),
 945            _background_tasks: Vec::default(),
 946            locations: Default::default(),
 947            is_session_terminated: false,
 948        }
 949    }
 950
 951    pub fn session_id(&self) -> SessionId {
 952        self.id
 953    }
 954
 955    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 956        self.child_session_ids.clone()
 957    }
 958
 959    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 960        self.child_session_ids.insert(session_id);
 961    }
 962
 963    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 964        self.child_session_ids.remove(&session_id);
 965    }
 966
 967    pub fn parent_id(&self) -> Option<SessionId> {
 968        self.parent_id
 969    }
 970
 971    pub fn capabilities(&self) -> &Capabilities {
 972        &self.capabilities
 973    }
 974
 975    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 976        if let Mode::Local(local_mode) = &self.mode {
 977            Some(local_mode.config.clone())
 978        } else {
 979            None
 980        }
 981    }
 982
 983    pub fn is_terminated(&self) -> bool {
 984        self.is_session_terminated
 985    }
 986
 987    pub fn is_local(&self) -> bool {
 988        matches!(self.mode, Mode::Local(_))
 989    }
 990
 991    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 992        match &mut self.mode {
 993            Mode::Local(local_mode) => Some(local_mode),
 994            Mode::Remote(_) => None,
 995        }
 996    }
 997
 998    pub fn as_local(&self) -> Option<&LocalMode> {
 999        match &self.mode {
1000            Mode::Local(local_mode) => Some(local_mode),
1001            Mode::Remote(_) => None,
1002        }
1003    }
1004
1005    pub(super) fn initialize_sequence(
1006        &mut self,
1007        initialize_rx: oneshot::Receiver<()>,
1008        cx: &mut Context<Self>,
1009    ) -> Task<Result<()>> {
1010        match &self.mode {
1011            Mode::Local(local_mode) => {
1012                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1013            }
1014            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1015        }
1016    }
1017
1018    pub fn output(
1019        &self,
1020        since: OutputToken,
1021    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1022        if self.output_token.0 == 0 {
1023            return (self.output.range(0..0), OutputToken(0));
1024        };
1025
1026        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1027
1028        let clamped_events_since = events_since.clamp(0, self.output.len());
1029        (
1030            self.output
1031                .range(self.output.len() - clamped_events_since..),
1032            self.output_token,
1033        )
1034    }
1035
1036    pub fn respond_to_client(
1037        &self,
1038        request_seq: u64,
1039        success: bool,
1040        command: String,
1041        body: Option<serde_json::Value>,
1042        cx: &mut Context<Self>,
1043    ) -> Task<Result<()>> {
1044        let Some(local_session) = self.as_local().cloned() else {
1045            unreachable!("Cannot respond to remote client");
1046        };
1047
1048        cx.background_spawn(async move {
1049            local_session
1050                .client
1051                .send_message(Message::Response(Response {
1052                    body,
1053                    success,
1054                    command,
1055                    seq: request_seq + 1,
1056                    request_seq,
1057                    message: None,
1058                }))
1059                .await
1060        })
1061    }
1062
1063    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1064        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1065            self.thread_states.stop_all_threads();
1066
1067            self.invalidate_command_type::<StackTraceCommand>();
1068        }
1069
1070        // Event if we stopped all threads we still need to insert the thread_id
1071        // to our own data
1072        if let Some(thread_id) = event.thread_id {
1073            self.thread_states.stop_thread(ThreadId(thread_id));
1074
1075            self.invalidate_state(
1076                &StackTraceCommand {
1077                    thread_id,
1078                    start_frame: None,
1079                    levels: None,
1080                }
1081                .into(),
1082            );
1083        }
1084
1085        self.invalidate_generic();
1086        self.threads.clear();
1087        self.variables.clear();
1088        cx.emit(SessionEvent::Stopped(
1089            event
1090                .thread_id
1091                .map(Into::into)
1092                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1093        ));
1094        cx.notify();
1095    }
1096
1097    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1098        match *event {
1099            Events::Initialized(_) => {
1100                debug_assert!(
1101                    false,
1102                    "Initialized event should have been handled in LocalMode"
1103                );
1104            }
1105            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1106            Events::Continued(event) => {
1107                if event.all_threads_continued.unwrap_or_default() {
1108                    self.thread_states.continue_all_threads();
1109                } else {
1110                    self.thread_states
1111                        .continue_thread(ThreadId(event.thread_id));
1112                }
1113                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1114                self.invalidate_generic();
1115            }
1116            Events::Exited(_event) => {
1117                self.clear_active_debug_line(cx);
1118            }
1119            Events::Terminated(_) => {
1120                self.is_session_terminated = true;
1121                self.clear_active_debug_line(cx);
1122            }
1123            Events::Thread(event) => {
1124                let thread_id = ThreadId(event.thread_id);
1125
1126                match event.reason {
1127                    dap::ThreadEventReason::Started => {
1128                        self.thread_states.continue_thread(thread_id);
1129                    }
1130                    dap::ThreadEventReason::Exited => {
1131                        self.thread_states.exit_thread(thread_id);
1132                    }
1133                    reason => {
1134                        log::error!("Unhandled thread event reason {:?}", reason);
1135                    }
1136                }
1137                self.invalidate_state(&ThreadsCommand.into());
1138                cx.notify();
1139            }
1140            Events::Output(event) => {
1141                if event
1142                    .category
1143                    .as_ref()
1144                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1145                {
1146                    return;
1147                }
1148
1149                self.output.push_back(event);
1150                self.output_token.0 += 1;
1151                cx.notify();
1152            }
1153            Events::Breakpoint(_) => {}
1154            Events::Module(event) => {
1155                match event.reason {
1156                    dap::ModuleEventReason::New => {
1157                        self.modules.push(event.module);
1158                    }
1159                    dap::ModuleEventReason::Changed => {
1160                        if let Some(module) = self
1161                            .modules
1162                            .iter_mut()
1163                            .find(|other| event.module.id == other.id)
1164                        {
1165                            *module = event.module;
1166                        }
1167                    }
1168                    dap::ModuleEventReason::Removed => {
1169                        self.modules.retain(|other| event.module.id != other.id);
1170                    }
1171                }
1172
1173                // todo(debugger): We should only send the invalidate command to downstream clients.
1174                // self.invalidate_state(&ModulesCommand.into());
1175            }
1176            Events::LoadedSource(_) => {
1177                self.invalidate_state(&LoadedSourcesCommand.into());
1178            }
1179            Events::Capabilities(event) => {
1180                self.capabilities = self.capabilities.merge(event.capabilities);
1181                cx.notify();
1182            }
1183            Events::Memory(_) => {}
1184            Events::Process(_) => {}
1185            Events::ProgressEnd(_) => {}
1186            Events::ProgressStart(_) => {}
1187            Events::ProgressUpdate(_) => {}
1188            Events::Invalidated(_) => {}
1189            Events::Other(_) => {}
1190        }
1191    }
1192
1193    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1194    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1195        &mut self,
1196        request: T,
1197        process_result: impl FnOnce(
1198            &mut Self,
1199            Result<T::Response>,
1200            &mut Context<Self>,
1201        ) -> Option<T::Response>
1202        + 'static,
1203        cx: &mut Context<Self>,
1204    ) {
1205        const {
1206            assert!(
1207                T::CACHEABLE,
1208                "Only requests marked as cacheable should invoke `fetch`"
1209            );
1210        }
1211
1212        if !self.thread_states.any_stopped_thread()
1213            && request.type_id() != TypeId::of::<ThreadsCommand>()
1214            || self.is_session_terminated
1215        {
1216            return;
1217        }
1218
1219        let request_map = self
1220            .requests
1221            .entry(std::any::TypeId::of::<T>())
1222            .or_default();
1223
1224        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1225            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1226
1227            let task = Self::request_inner::<Arc<T>>(
1228                &self.capabilities,
1229                self.id,
1230                &self.mode,
1231                command,
1232                process_result,
1233                cx,
1234            );
1235            let task = cx
1236                .background_executor()
1237                .spawn(async move {
1238                    let _ = task.await?;
1239                    Some(())
1240                })
1241                .shared();
1242
1243            vacant.insert(task);
1244            cx.notify();
1245        }
1246    }
1247
1248    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1249        capabilities: &Capabilities,
1250        session_id: SessionId,
1251        mode: &Mode,
1252        request: T,
1253        process_result: impl FnOnce(
1254            &mut Self,
1255            Result<T::Response>,
1256            &mut Context<Self>,
1257        ) -> Option<T::Response>
1258        + 'static,
1259        cx: &mut Context<Self>,
1260    ) -> Task<Option<T::Response>> {
1261        if !T::is_supported(&capabilities) {
1262            log::warn!(
1263                "Attempted to send a DAP request that isn't supported: {:?}",
1264                request
1265            );
1266            let error = Err(anyhow::Error::msg(
1267                "Couldn't complete request because it's not supported",
1268            ));
1269            return cx.spawn(async move |this, cx| {
1270                this.update(cx, |this, cx| process_result(this, error, cx))
1271                    .log_err()
1272                    .flatten()
1273            });
1274        }
1275
1276        let request = mode.request_dap(session_id, request, cx);
1277        cx.spawn(async move |this, cx| {
1278            let result = request.await;
1279            this.update(cx, |this, cx| process_result(this, result, cx))
1280                .log_err()
1281                .flatten()
1282        })
1283    }
1284
1285    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1286        &self,
1287        request: T,
1288        process_result: impl FnOnce(
1289            &mut Self,
1290            Result<T::Response>,
1291            &mut Context<Self>,
1292        ) -> Option<T::Response>
1293        + 'static,
1294        cx: &mut Context<Self>,
1295    ) -> Task<Option<T::Response>> {
1296        Self::request_inner(
1297            &self.capabilities,
1298            self.id,
1299            &self.mode,
1300            request,
1301            process_result,
1302            cx,
1303        )
1304    }
1305
1306    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1307        self.requests.remove(&std::any::TypeId::of::<Command>());
1308    }
1309
1310    fn invalidate_generic(&mut self) {
1311        self.invalidate_command_type::<ModulesCommand>();
1312        self.invalidate_command_type::<LoadedSourcesCommand>();
1313        self.invalidate_command_type::<ThreadsCommand>();
1314    }
1315
1316    fn invalidate_state(&mut self, key: &RequestSlot) {
1317        self.requests
1318            .entry(key.0.as_any().type_id())
1319            .and_modify(|request_map| {
1320                request_map.remove(&key);
1321            });
1322    }
1323
1324    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1325        self.thread_states.thread_status(thread_id)
1326    }
1327
1328    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1329        self.fetch(
1330            dap_command::ThreadsCommand,
1331            |this, result, cx| {
1332                let result = result.log_err()?;
1333
1334                this.threads = result
1335                    .iter()
1336                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1337                    .collect();
1338
1339                this.invalidate_command_type::<StackTraceCommand>();
1340                cx.emit(SessionEvent::Threads);
1341                cx.notify();
1342
1343                Some(result)
1344            },
1345            cx,
1346        );
1347
1348        self.threads
1349            .values()
1350            .map(|thread| {
1351                (
1352                    thread.dap.clone(),
1353                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1354                )
1355            })
1356            .collect()
1357    }
1358
1359    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1360        self.fetch(
1361            dap_command::ModulesCommand,
1362            |this, result, cx| {
1363                let result = result.log_err()?;
1364
1365                this.modules = result.iter().cloned().collect();
1366                cx.emit(SessionEvent::Modules);
1367                cx.notify();
1368
1369                Some(result)
1370            },
1371            cx,
1372        );
1373
1374        &self.modules
1375    }
1376
1377    pub fn ignore_breakpoints(&self) -> bool {
1378        self.ignore_breakpoints
1379    }
1380
1381    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1382        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1383    }
1384
1385    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1386        if self.ignore_breakpoints == ignore {
1387            return Task::ready(());
1388        }
1389
1390        self.ignore_breakpoints = ignore;
1391
1392        if let Some(local) = self.as_local() {
1393            local.send_all_breakpoints(ignore, cx)
1394        } else {
1395            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1396            unimplemented!()
1397        }
1398    }
1399
1400    pub fn breakpoints_enabled(&self) -> bool {
1401        self.ignore_breakpoints
1402    }
1403
1404    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1405        self.fetch(
1406            dap_command::LoadedSourcesCommand,
1407            |this, result, cx| {
1408                let result = result.log_err()?;
1409                this.loaded_sources = result.iter().cloned().collect();
1410                cx.emit(SessionEvent::LoadedSources);
1411                cx.notify();
1412                Some(result)
1413            },
1414            cx,
1415        );
1416
1417        &self.loaded_sources
1418    }
1419
1420    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1421        res.log_err()?;
1422        Some(())
1423    }
1424
1425    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1426        thread_id: ThreadId,
1427    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1428    {
1429        move |this, response, cx| match response.log_err() {
1430            Some(response) => Some(response),
1431            None => {
1432                this.thread_states.stop_thread(thread_id);
1433                cx.notify();
1434                None
1435            }
1436        }
1437    }
1438
1439    fn clear_active_debug_line_response(
1440        &mut self,
1441        response: Result<()>,
1442        cx: &mut Context<Session>,
1443    ) -> Option<()> {
1444        response.log_err()?;
1445        self.clear_active_debug_line(cx);
1446        Some(())
1447    }
1448
1449    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1450        self.as_local()
1451            .expect("Message handler will only run in local mode")
1452            .breakpoint_store
1453            .update(cx, |store, cx| {
1454                store.remove_active_position(Some(self.id), cx)
1455            });
1456    }
1457
1458    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1459        self.request(
1460            PauseCommand {
1461                thread_id: thread_id.0,
1462            },
1463            Self::empty_response,
1464            cx,
1465        )
1466        .detach();
1467    }
1468
1469    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1470        self.request(
1471            RestartStackFrameCommand { stack_frame_id },
1472            Self::empty_response,
1473            cx,
1474        )
1475        .detach();
1476    }
1477
1478    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1479        if self.capabilities.supports_restart_request.unwrap_or(false) {
1480            self.request(
1481                RestartCommand {
1482                    raw: args.unwrap_or(Value::Null),
1483                },
1484                Self::empty_response,
1485                cx,
1486            )
1487            .detach();
1488        } else {
1489            self.request(
1490                DisconnectCommand {
1491                    restart: Some(false),
1492                    terminate_debuggee: Some(true),
1493                    suspend_debuggee: Some(false),
1494                },
1495                Self::empty_response,
1496                cx,
1497            )
1498            .detach();
1499        }
1500    }
1501
1502    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1503        self.is_session_terminated = true;
1504        self.thread_states.exit_all_threads();
1505        cx.notify();
1506
1507        let task = if self
1508            .capabilities
1509            .supports_terminate_request
1510            .unwrap_or_default()
1511        {
1512            self.request(
1513                TerminateCommand {
1514                    restart: Some(false),
1515                },
1516                Self::clear_active_debug_line_response,
1517                cx,
1518            )
1519        } else {
1520            self.request(
1521                DisconnectCommand {
1522                    restart: Some(false),
1523                    terminate_debuggee: Some(true),
1524                    suspend_debuggee: Some(false),
1525                },
1526                Self::clear_active_debug_line_response,
1527                cx,
1528            )
1529        };
1530
1531        cx.background_spawn(async move {
1532            let _ = task.await;
1533        })
1534    }
1535
1536    pub fn completions(
1537        &mut self,
1538        query: CompletionsQuery,
1539        cx: &mut Context<Self>,
1540    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1541        let task = self.request(query, |_, result, _| result.log_err(), cx);
1542
1543        cx.background_executor().spawn(async move {
1544            anyhow::Ok(
1545                task.await
1546                    .map(|response| response.targets)
1547                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1548            )
1549        })
1550    }
1551
1552    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1553        self.thread_states.continue_thread(thread_id);
1554        self.request(
1555            ContinueCommand {
1556                args: ContinueArguments {
1557                    thread_id: thread_id.0,
1558                    single_thread: Some(true),
1559                },
1560            },
1561            Self::on_step_response::<ContinueCommand>(thread_id),
1562            cx,
1563        )
1564        .detach();
1565    }
1566
1567    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1568        match self.mode {
1569            Mode::Local(ref local) => Some(local.client.clone()),
1570            Mode::Remote(_) => None,
1571        }
1572    }
1573
1574    pub fn step_over(
1575        &mut self,
1576        thread_id: ThreadId,
1577        granularity: SteppingGranularity,
1578        cx: &mut Context<Self>,
1579    ) {
1580        let supports_single_thread_execution_requests =
1581            self.capabilities.supports_single_thread_execution_requests;
1582        let supports_stepping_granularity = self
1583            .capabilities
1584            .supports_stepping_granularity
1585            .unwrap_or_default();
1586
1587        let command = NextCommand {
1588            inner: StepCommand {
1589                thread_id: thread_id.0,
1590                granularity: supports_stepping_granularity.then(|| granularity),
1591                single_thread: supports_single_thread_execution_requests,
1592            },
1593        };
1594
1595        self.thread_states.process_step(thread_id);
1596        self.request(
1597            command,
1598            Self::on_step_response::<NextCommand>(thread_id),
1599            cx,
1600        )
1601        .detach();
1602    }
1603
1604    pub fn step_in(
1605        &mut self,
1606        thread_id: ThreadId,
1607        granularity: SteppingGranularity,
1608        cx: &mut Context<Self>,
1609    ) {
1610        let supports_single_thread_execution_requests =
1611            self.capabilities.supports_single_thread_execution_requests;
1612        let supports_stepping_granularity = self
1613            .capabilities
1614            .supports_stepping_granularity
1615            .unwrap_or_default();
1616
1617        let command = StepInCommand {
1618            inner: StepCommand {
1619                thread_id: thread_id.0,
1620                granularity: supports_stepping_granularity.then(|| granularity),
1621                single_thread: supports_single_thread_execution_requests,
1622            },
1623        };
1624
1625        self.thread_states.process_step(thread_id);
1626        self.request(
1627            command,
1628            Self::on_step_response::<StepInCommand>(thread_id),
1629            cx,
1630        )
1631        .detach();
1632    }
1633
1634    pub fn step_out(
1635        &mut self,
1636        thread_id: ThreadId,
1637        granularity: SteppingGranularity,
1638        cx: &mut Context<Self>,
1639    ) {
1640        let supports_single_thread_execution_requests =
1641            self.capabilities.supports_single_thread_execution_requests;
1642        let supports_stepping_granularity = self
1643            .capabilities
1644            .supports_stepping_granularity
1645            .unwrap_or_default();
1646
1647        let command = StepOutCommand {
1648            inner: StepCommand {
1649                thread_id: thread_id.0,
1650                granularity: supports_stepping_granularity.then(|| granularity),
1651                single_thread: supports_single_thread_execution_requests,
1652            },
1653        };
1654
1655        self.thread_states.process_step(thread_id);
1656        self.request(
1657            command,
1658            Self::on_step_response::<StepOutCommand>(thread_id),
1659            cx,
1660        )
1661        .detach();
1662    }
1663
1664    pub fn step_back(
1665        &mut self,
1666        thread_id: ThreadId,
1667        granularity: SteppingGranularity,
1668        cx: &mut Context<Self>,
1669    ) {
1670        let supports_single_thread_execution_requests =
1671            self.capabilities.supports_single_thread_execution_requests;
1672        let supports_stepping_granularity = self
1673            .capabilities
1674            .supports_stepping_granularity
1675            .unwrap_or_default();
1676
1677        let command = StepBackCommand {
1678            inner: StepCommand {
1679                thread_id: thread_id.0,
1680                granularity: supports_stepping_granularity.then(|| granularity),
1681                single_thread: supports_single_thread_execution_requests,
1682            },
1683        };
1684
1685        self.thread_states.process_step(thread_id);
1686
1687        self.request(
1688            command,
1689            Self::on_step_response::<StepBackCommand>(thread_id),
1690            cx,
1691        )
1692        .detach();
1693    }
1694
1695    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1696        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1697            && self.requests.contains_key(&ThreadsCommand.type_id())
1698            && self.threads.contains_key(&thread_id)
1699        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1700        // We could still be using an old thread id and have sent a new thread's request
1701        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1702        // But it very well could cause a minor bug in the future that is hard to track down
1703        {
1704            self.fetch(
1705                super::dap_command::StackTraceCommand {
1706                    thread_id: thread_id.0,
1707                    start_frame: None,
1708                    levels: None,
1709                },
1710                move |this, stack_frames, cx| {
1711                    let stack_frames = stack_frames.log_err()?;
1712
1713                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1714                        thread.stack_frame_ids =
1715                            stack_frames.iter().map(|frame| frame.id).collect();
1716                    });
1717                    debug_assert!(
1718                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1719                        "Sent request for thread_id that doesn't exist"
1720                    );
1721
1722                    this.stack_frames.extend(
1723                        stack_frames
1724                            .iter()
1725                            .cloned()
1726                            .map(|frame| (frame.id, StackFrame::from(frame))),
1727                    );
1728
1729                    this.invalidate_command_type::<ScopesCommand>();
1730                    this.invalidate_command_type::<VariablesCommand>();
1731
1732                    cx.emit(SessionEvent::StackTrace);
1733                    cx.notify();
1734                    Some(stack_frames)
1735                },
1736                cx,
1737            );
1738        }
1739
1740        self.threads
1741            .get(&thread_id)
1742            .map(|thread| {
1743                thread
1744                    .stack_frame_ids
1745                    .iter()
1746                    .filter_map(|id| self.stack_frames.get(id))
1747                    .cloned()
1748                    .collect()
1749            })
1750            .unwrap_or_default()
1751    }
1752
1753    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1754        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1755            && self
1756                .requests
1757                .contains_key(&TypeId::of::<StackTraceCommand>())
1758        {
1759            self.fetch(
1760                ScopesCommand { stack_frame_id },
1761                move |this, scopes, cx| {
1762                    let scopes = scopes.log_err()?;
1763
1764                    for scope in scopes .iter(){
1765                        this.variables(scope.variables_reference, cx);
1766                    }
1767
1768                    let entry = this
1769                        .stack_frames
1770                        .entry(stack_frame_id)
1771                        .and_modify(|stack_frame| {
1772                            stack_frame.scopes = scopes.clone();
1773                        });
1774
1775                    cx.emit(SessionEvent::Variables);
1776
1777                    debug_assert!(
1778                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1779                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1780                    );
1781
1782                    Some(scopes)
1783                },
1784                cx,
1785            );
1786        }
1787
1788        self.stack_frames
1789            .get(&stack_frame_id)
1790            .map(|frame| frame.scopes.as_slice())
1791            .unwrap_or_default()
1792    }
1793
1794    pub fn variables(
1795        &mut self,
1796        variables_reference: VariableReference,
1797        cx: &mut Context<Self>,
1798    ) -> Vec<dap::Variable> {
1799        let command = VariablesCommand {
1800            variables_reference,
1801            filter: None,
1802            start: None,
1803            count: None,
1804            format: None,
1805        };
1806
1807        self.fetch(
1808            command,
1809            move |this, variables, cx| {
1810                let variables = variables.log_err()?;
1811                this.variables
1812                    .insert(variables_reference, variables.clone());
1813
1814                cx.emit(SessionEvent::Variables);
1815                Some(variables)
1816            },
1817            cx,
1818        );
1819
1820        self.variables
1821            .get(&variables_reference)
1822            .cloned()
1823            .unwrap_or_default()
1824    }
1825
1826    pub fn set_variable_value(
1827        &mut self,
1828        variables_reference: u64,
1829        name: String,
1830        value: String,
1831        cx: &mut Context<Self>,
1832    ) {
1833        if self.capabilities.supports_set_variable.unwrap_or_default() {
1834            self.request(
1835                SetVariableValueCommand {
1836                    name,
1837                    value,
1838                    variables_reference,
1839                },
1840                move |this, response, cx| {
1841                    let response = response.log_err()?;
1842                    this.invalidate_command_type::<VariablesCommand>();
1843                    cx.notify();
1844                    Some(response)
1845                },
1846                cx,
1847            )
1848            .detach()
1849        }
1850    }
1851
1852    pub fn evaluate(
1853        &mut self,
1854        expression: String,
1855        context: Option<EvaluateArgumentsContext>,
1856        frame_id: Option<u64>,
1857        source: Option<Source>,
1858        cx: &mut Context<Self>,
1859    ) {
1860        self.request(
1861            EvaluateCommand {
1862                expression,
1863                context,
1864                frame_id,
1865                source,
1866            },
1867            |this, response, cx| {
1868                let response = response.log_err()?;
1869                this.output_token.0 += 1;
1870                this.output.push_back(dap::OutputEvent {
1871                    category: None,
1872                    output: response.result.clone(),
1873                    group: None,
1874                    variables_reference: Some(response.variables_reference),
1875                    source: None,
1876                    line: None,
1877                    column: None,
1878                    data: None,
1879                    location_reference: None,
1880                });
1881
1882                this.invalidate_command_type::<ScopesCommand>();
1883                cx.notify();
1884                Some(response)
1885            },
1886            cx,
1887        )
1888        .detach();
1889    }
1890
1891    pub fn location(
1892        &mut self,
1893        reference: u64,
1894        cx: &mut Context<Self>,
1895    ) -> Option<dap::LocationsResponse> {
1896        self.fetch(
1897            LocationsCommand { reference },
1898            move |this, response, _| {
1899                let response = response.log_err()?;
1900                this.locations.insert(reference, response.clone());
1901                Some(response)
1902            },
1903            cx,
1904        );
1905        self.locations.get(&reference).cloned()
1906    }
1907    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1908        let command = DisconnectCommand {
1909            restart: Some(false),
1910            terminate_debuggee: Some(true),
1911            suspend_debuggee: Some(false),
1912        };
1913
1914        self.request(command, Self::empty_response, cx).detach()
1915    }
1916
1917    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1918        if self
1919            .capabilities
1920            .supports_terminate_threads_request
1921            .unwrap_or_default()
1922        {
1923            self.request(
1924                TerminateThreadsCommand {
1925                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1926                },
1927                Self::clear_active_debug_line_response,
1928                cx,
1929            )
1930            .detach();
1931        } else {
1932            self.shutdown(cx).detach();
1933        }
1934    }
1935}
1936
1937fn create_local_session(
1938    breakpoint_store: Entity<BreakpointStore>,
1939    session_id: SessionId,
1940    parent_session: Option<Entity<Session>>,
1941    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1942    initialized_tx: oneshot::Sender<()>,
1943    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1944    mode: LocalMode,
1945    capabilities: Capabilities,
1946    cx: &mut Context<Session>,
1947) -> Session {
1948    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1949        let mut initialized_tx = Some(initialized_tx);
1950        while let Some(message) = message_rx.next().await {
1951            if let Message::Event(event) = message {
1952                if let Events::Initialized(_) = *event {
1953                    if let Some(tx) = initialized_tx.take() {
1954                        tx.send(()).ok();
1955                    }
1956                } else {
1957                    let Ok(_) = this.update(cx, |session, cx| {
1958                        session.handle_dap_event(event, cx);
1959                    }) else {
1960                        break;
1961                    };
1962                }
1963            } else {
1964                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1965                else {
1966                    break;
1967                };
1968            }
1969        }
1970    })];
1971
1972    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1973        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1974            if let Some(local) = (!this.ignore_breakpoints)
1975                .then(|| this.as_local_mut())
1976                .flatten()
1977            {
1978                local
1979                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1980                    .detach();
1981            };
1982        }
1983        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1984            if let Some(local) = (!this.ignore_breakpoints)
1985                .then(|| this.as_local_mut())
1986                .flatten()
1987            {
1988                local.unset_breakpoints_from_paths(paths, cx).detach();
1989            }
1990        }
1991        BreakpointStoreEvent::ActiveDebugLineChanged => {}
1992    })
1993    .detach();
1994
1995    Session {
1996        mode: Mode::Local(mode),
1997        id: session_id,
1998        child_session_ids: HashSet::default(),
1999        parent_id: parent_session.map(|session| session.read(cx).id),
2000        variables: Default::default(),
2001        capabilities,
2002        thread_states: ThreadStates::default(),
2003        output_token: OutputToken(0),
2004        ignore_breakpoints: false,
2005        output: circular_buffer::CircularBuffer::boxed(),
2006        requests: HashMap::default(),
2007        modules: Vec::default(),
2008        loaded_sources: Vec::default(),
2009        threads: IndexMap::default(),
2010        stack_frames: IndexMap::default(),
2011        locations: Default::default(),
2012        _background_tasks,
2013        is_session_terminated: false,
2014    }
2015}