session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{anyhow, Result};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::{
  18    adapters::{DapDelegate, DapStatus},
  19    client::{DebugAdapterClient, SessionId},
  20    messages::{Events, Message},
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23};
  24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
  25use futures::channel::oneshot;
  26use futures::{future::Shared, FutureExt};
  27use gpui::{
  28    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  29};
  30use rpc::AnyProtoClient;
  31use serde_json::{json, Value};
  32use settings::Settings;
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::path::PathBuf;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::{DebugAdapterConfig, DebugTaskDefinition};
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{merge_json_value_into, ResultExt};
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117type UpstreamProjectId = u64;
 118
 119struct RemoteConnection {
 120    _client: AnyProtoClient,
 121    _upstream_project_id: UpstreamProjectId,
 122}
 123
 124impl RemoteConnection {
 125    fn send_proto_client_request<R: DapCommand>(
 126        &self,
 127        _request: R,
 128        _session_id: SessionId,
 129        cx: &mut App,
 130    ) -> Task<Result<R::Response>> {
 131        // let message = request.to_proto(session_id, self.upstream_project_id);
 132        // let upstream_client = self.client.clone();
 133        cx.background_executor().spawn(async move {
 134            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 135            // let response = upstream_client.request(message).await?;
 136            // request.response_from_proto(response)
 137            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 138        })
 139    }
 140
 141    fn request<R: DapCommand>(
 142        &self,
 143        request: R,
 144        session_id: SessionId,
 145        cx: &mut App,
 146    ) -> Task<Result<R::Response>>
 147    where
 148        <R::DapRequest as dap::requests::Request>::Response: 'static,
 149        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 150    {
 151        return self.send_proto_client_request::<R>(request, session_id, cx);
 152    }
 153}
 154
 155enum Mode {
 156    Local(LocalMode),
 157    Remote(RemoteConnection),
 158}
 159
 160#[derive(Clone)]
 161pub struct LocalMode {
 162    client: Arc<DebugAdapterClient>,
 163    config: DebugAdapterConfig,
 164    adapter: Arc<dyn DebugAdapter>,
 165    breakpoint_store: Entity<BreakpointStore>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        debug_adapters: Arc<DapRegistry>,
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        Self::new_inner(
 195            debug_adapters,
 196            session_id,
 197            parent_session,
 198            breakpoint_store,
 199            config,
 200            delegate,
 201            messages_tx,
 202            async |_, _| {},
 203            cx,
 204        )
 205    }
 206    #[cfg(any(test, feature = "test-support"))]
 207    fn new_fake(
 208        session_id: SessionId,
 209        parent_session: Option<Entity<Session>>,
 210        breakpoint_store: Entity<BreakpointStore>,
 211        config: DebugAdapterConfig,
 212        delegate: DapAdapterDelegate,
 213        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 214        caps: Capabilities,
 215        fail: bool,
 216        cx: AsyncApp,
 217    ) -> Task<Result<(Self, Capabilities)>> {
 218        use task::DebugRequestDisposition;
 219
 220        let request = match config.request.clone() {
 221            DebugRequestDisposition::UserConfigured(request) => request,
 222            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 223                match reverse_request_args.request {
 224                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 225                        DebugRequestType::Launch(task::LaunchConfig {
 226                            program: "".to_owned(),
 227                            cwd: None,
 228                        })
 229                    }
 230                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 231                        DebugRequestType::Attach(task::AttachConfig {
 232                            process_id: Some(0),
 233                        })
 234                    }
 235                }
 236            }
 237        };
 238
 239        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 240            session
 241                .client
 242                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 243                .await;
 244
 245            let paths = cx
 246                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 247                .expect("Breakpoint store should exist in all tests that start debuggers");
 248
 249            session
 250                .client
 251                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 252                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 253                    if !paths.contains(&p) {
 254                        panic!("Sent breakpoints for path without any")
 255                    }
 256
 257                    Ok(dap::SetBreakpointsResponse {
 258                        breakpoints: Vec::default(),
 259                    })
 260                })
 261                .await;
 262
 263            match request {
 264                dap::DebugRequestType::Launch(_) => {
 265                    if fail {
 266                        session
 267                            .client
 268                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 269                                Err(dap::ErrorResponse {
 270                                    error: Some(dap::Message {
 271                                        id: 1,
 272                                        format: "error".into(),
 273                                        variables: None,
 274                                        send_telemetry: None,
 275                                        show_user: None,
 276                                        url: None,
 277                                        url_label: None,
 278                                    }),
 279                                })
 280                            })
 281                            .await;
 282                    } else {
 283                        session
 284                            .client
 285                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 286                            .await;
 287                    }
 288                }
 289                dap::DebugRequestType::Attach(attach_config) => {
 290                    if fail {
 291                        session
 292                            .client
 293                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 294                                Err(dap::ErrorResponse {
 295                                    error: Some(dap::Message {
 296                                        id: 1,
 297                                        format: "error".into(),
 298                                        variables: None,
 299                                        send_telemetry: None,
 300                                        show_user: None,
 301                                        url: None,
 302                                        url_label: None,
 303                                    }),
 304                                })
 305                            })
 306                            .await;
 307                    } else {
 308                        session
 309                            .client
 310                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 311                                assert_eq!(
 312                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 313                                    args.raw
 314                                );
 315
 316                                Ok(())
 317                            })
 318                            .await;
 319                    }
 320                }
 321            }
 322
 323            session
 324                .client
 325                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 326                .await;
 327            session.client.fake_event(Events::Initialized(None)).await;
 328        };
 329        Self::new_inner(
 330            DapRegistry::fake().into(),
 331            session_id,
 332            parent_session,
 333            breakpoint_store,
 334            config,
 335            delegate,
 336            messages_tx,
 337            callback,
 338            cx,
 339        )
 340    }
 341    fn new_inner(
 342        registry: Arc<DapRegistry>,
 343        session_id: SessionId,
 344        parent_session: Option<Entity<Session>>,
 345        breakpoint_store: Entity<BreakpointStore>,
 346        config: DebugAdapterConfig,
 347        delegate: DapAdapterDelegate,
 348        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 349        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 350        cx: AsyncApp,
 351    ) -> Task<Result<(Self, Capabilities)>> {
 352        cx.spawn(async move |cx| {
 353            let (adapter, binary) =
 354                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 355
 356            let message_handler = Box::new(move |message| {
 357                messages_tx.unbounded_send(message).ok();
 358            });
 359
 360            let client = Arc::new(
 361                if let Some(client) = parent_session
 362                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 363                    .flatten()
 364                {
 365                    client
 366                        .reconnect(session_id, binary, message_handler, cx.clone())
 367                        .await?
 368                } else {
 369                    DebugAdapterClient::start(
 370                        session_id,
 371                        adapter.name(),
 372                        binary,
 373                        message_handler,
 374                        cx.clone(),
 375                    )
 376                    .await?
 377                },
 378            );
 379
 380            let adapter_id = adapter.name().to_string().to_owned();
 381            let mut session = Self {
 382                client,
 383                adapter,
 384                breakpoint_store,
 385                config: config.clone(),
 386            };
 387
 388            on_initialized(&mut session, cx.clone()).await;
 389            let capabilities = session
 390                .request(Initialize { adapter_id }, cx.background_executor().clone())
 391                .await?;
 392
 393            Ok((session, capabilities))
 394        })
 395    }
 396
 397    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 398        let tasks: Vec<_> = paths
 399            .into_iter()
 400            .map(|path| {
 401                self.request(
 402                    dap_command::SetBreakpoints {
 403                        source: client_source(path),
 404                        source_modified: None,
 405                        breakpoints: vec![],
 406                    },
 407                    cx.background_executor().clone(),
 408                )
 409            })
 410            .collect();
 411
 412        cx.background_spawn(async move {
 413            futures::future::join_all(tasks)
 414                .await
 415                .iter()
 416                .for_each(|res| match res {
 417                    Ok(_) => {}
 418                    Err(err) => {
 419                        log::warn!("Set breakpoints request failed: {}", err);
 420                    }
 421                });
 422        })
 423    }
 424
 425    fn send_breakpoints_from_path(
 426        &self,
 427        abs_path: Arc<Path>,
 428        reason: BreakpointUpdatedReason,
 429        cx: &mut App,
 430    ) -> Task<()> {
 431        let breakpoints = self
 432            .breakpoint_store
 433            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 434            .into_iter()
 435            .filter(|bp| bp.state.is_enabled())
 436            .map(Into::into)
 437            .collect();
 438
 439        let task = self.request(
 440            dap_command::SetBreakpoints {
 441                source: client_source(&abs_path),
 442                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 443                breakpoints,
 444            },
 445            cx.background_executor().clone(),
 446        );
 447
 448        cx.background_spawn(async move {
 449            match task.await {
 450                Ok(_) => {}
 451                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 452            }
 453        })
 454    }
 455
 456    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 457        let mut breakpoint_tasks = Vec::new();
 458        let breakpoints = self
 459            .breakpoint_store
 460            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 461
 462        for (path, breakpoints) in breakpoints {
 463            let breakpoints = if ignore_breakpoints {
 464                vec![]
 465            } else {
 466                breakpoints
 467                    .into_iter()
 468                    .filter(|bp| bp.state.is_enabled())
 469                    .map(Into::into)
 470                    .collect()
 471            };
 472
 473            breakpoint_tasks.push(self.request(
 474                dap_command::SetBreakpoints {
 475                    source: client_source(&path),
 476                    source_modified: Some(false),
 477                    breakpoints,
 478                },
 479                cx.background_executor().clone(),
 480            ));
 481        }
 482
 483        cx.background_spawn(async move {
 484            futures::future::join_all(breakpoint_tasks)
 485                .await
 486                .iter()
 487                .for_each(|res| match res {
 488                    Ok(_) => {}
 489                    Err(err) => {
 490                        log::warn!("Set breakpoints request failed: {}", err);
 491                    }
 492                });
 493        })
 494    }
 495
 496    async fn get_adapter_binary(
 497        registry: &Arc<DapRegistry>,
 498        config: &DebugAdapterConfig,
 499        delegate: &DapAdapterDelegate,
 500        cx: &mut AsyncApp,
 501    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 502        let adapter = registry
 503            .adapter(&config.adapter)
 504            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 505
 506        let binary = cx.update(|cx| {
 507            ProjectSettings::get_global(cx)
 508                .dap
 509                .get(&adapter.name())
 510                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 511        })?;
 512
 513        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 514            Err(error) => {
 515                delegate.update_status(
 516                    adapter.name(),
 517                    DapStatus::Failed {
 518                        error: error.to_string(),
 519                    },
 520                );
 521
 522                return Err(error);
 523            }
 524            Ok(mut binary) => {
 525                delegate.update_status(adapter.name(), DapStatus::None);
 526
 527                let shell_env = delegate.shell_env().await;
 528                let mut envs = binary.envs.unwrap_or_default();
 529                envs.extend(shell_env);
 530                binary.envs = Some(envs);
 531
 532                binary
 533            }
 534        };
 535
 536        Ok((adapter, binary))
 537    }
 538
 539    pub fn initialize_sequence(
 540        &self,
 541        capabilities: &Capabilities,
 542        initialized_rx: oneshot::Receiver<()>,
 543        cx: &App,
 544    ) -> Task<Result<()>> {
 545        let (mut raw, is_launch) = match &self.config.request {
 546            task::DebugRequestDisposition::UserConfigured(_) => {
 547                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 548                    debug_assert!(false, "This part of code should be unreachable in practice");
 549                    return Task::ready(Err(anyhow!(
 550                        "Expected debug config conversion to succeed"
 551                    )));
 552                };
 553                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 554                let raw = self.adapter.request_args(&raw);
 555                (raw, is_launch)
 556            }
 557            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 558                start_debugging_request_arguments.configuration.clone(),
 559                matches!(
 560                    start_debugging_request_arguments.request,
 561                    dap::StartDebuggingRequestArgumentsRequest::Launch
 562                ),
 563            ),
 564        };
 565
 566        merge_json_value_into(
 567            self.config.initialize_args.clone().unwrap_or(json!({})),
 568            &mut raw,
 569        );
 570        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 571        let launch = if is_launch {
 572            self.request(Launch { raw }, cx.background_executor().clone())
 573        } else {
 574            self.request(Attach { raw }, cx.background_executor().clone())
 575        };
 576
 577        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 578
 579        let configuration_sequence = cx.spawn({
 580            let this = self.clone();
 581            async move |cx| {
 582                initialized_rx.await?;
 583                // todo(debugger) figure out if we want to handle a breakpoint response error
 584                // This will probably consist of letting a user know that breakpoints failed to be set
 585                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 586
 587                if configuration_done_supported {
 588                    this.request(ConfigurationDone, cx.background_executor().clone())
 589                } else {
 590                    Task::ready(Ok(()))
 591                }
 592                .await
 593            }
 594        });
 595
 596        cx.background_spawn(async move {
 597            futures::future::try_join(launch, configuration_sequence).await?;
 598            Ok(())
 599        })
 600    }
 601
 602    fn request<R: LocalDapCommand>(
 603        &self,
 604        request: R,
 605        executor: BackgroundExecutor,
 606    ) -> Task<Result<R::Response>>
 607    where
 608        <R::DapRequest as dap::requests::Request>::Response: 'static,
 609        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 610    {
 611        let request = Arc::new(request);
 612
 613        let request_clone = request.clone();
 614        let connection = self.client.clone();
 615        let request_task = executor.spawn(async move {
 616            let args = request_clone.to_dap();
 617            connection.request::<R::DapRequest>(args).await
 618        });
 619
 620        executor.spawn(async move {
 621            let response = request.response_from_dap(request_task.await?);
 622            response
 623        })
 624    }
 625}
 626impl From<RemoteConnection> for Mode {
 627    fn from(value: RemoteConnection) -> Self {
 628        Self::Remote(value)
 629    }
 630}
 631
 632impl Mode {
 633    fn request_dap<R: DapCommand>(
 634        &self,
 635        session_id: SessionId,
 636        request: R,
 637        cx: &mut Context<Session>,
 638    ) -> Task<Result<R::Response>>
 639    where
 640        <R::DapRequest as dap::requests::Request>::Response: 'static,
 641        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 642    {
 643        match self {
 644            Mode::Local(debug_adapter_client) => {
 645                debug_adapter_client.request(request, cx.background_executor().clone())
 646            }
 647            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 648        }
 649    }
 650}
 651
 652#[derive(Default)]
 653struct ThreadStates {
 654    global_state: Option<ThreadStatus>,
 655    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 656}
 657
 658impl ThreadStates {
 659    fn stop_all_threads(&mut self) {
 660        self.global_state = Some(ThreadStatus::Stopped);
 661        self.known_thread_states.clear();
 662    }
 663
 664    fn exit_all_threads(&mut self) {
 665        self.global_state = Some(ThreadStatus::Exited);
 666        self.known_thread_states.clear();
 667    }
 668
 669    fn continue_all_threads(&mut self) {
 670        self.global_state = Some(ThreadStatus::Running);
 671        self.known_thread_states.clear();
 672    }
 673
 674    fn stop_thread(&mut self, thread_id: ThreadId) {
 675        self.known_thread_states
 676            .insert(thread_id, ThreadStatus::Stopped);
 677    }
 678
 679    fn continue_thread(&mut self, thread_id: ThreadId) {
 680        self.known_thread_states
 681            .insert(thread_id, ThreadStatus::Running);
 682    }
 683
 684    fn process_step(&mut self, thread_id: ThreadId) {
 685        self.known_thread_states
 686            .insert(thread_id, ThreadStatus::Stepping);
 687    }
 688
 689    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 690        self.thread_state(thread_id)
 691            .unwrap_or(ThreadStatus::Running)
 692    }
 693
 694    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 695        self.known_thread_states
 696            .get(&thread_id)
 697            .copied()
 698            .or(self.global_state)
 699    }
 700
 701    fn exit_thread(&mut self, thread_id: ThreadId) {
 702        self.known_thread_states
 703            .insert(thread_id, ThreadStatus::Exited);
 704    }
 705
 706    fn any_stopped_thread(&self) -> bool {
 707        self.global_state
 708            .is_some_and(|state| state == ThreadStatus::Stopped)
 709            || self
 710                .known_thread_states
 711                .values()
 712                .any(|status| *status == ThreadStatus::Stopped)
 713    }
 714}
 715const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 716
 717#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 718pub struct OutputToken(pub usize);
 719/// Represents a current state of a single debug adapter and provides ways to mutate it.
 720pub struct Session {
 721    mode: Mode,
 722    pub(super) capabilities: Capabilities,
 723    id: SessionId,
 724    child_session_ids: HashSet<SessionId>,
 725    parent_id: Option<SessionId>,
 726    ignore_breakpoints: bool,
 727    modules: Vec<dap::Module>,
 728    loaded_sources: Vec<dap::Source>,
 729    output_token: OutputToken,
 730    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 731    threads: IndexMap<ThreadId, Thread>,
 732    thread_states: ThreadStates,
 733    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 734    stack_frames: IndexMap<StackFrameId, StackFrame>,
 735    locations: HashMap<u64, dap::LocationsResponse>,
 736    is_session_terminated: bool,
 737    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 738    _background_tasks: Vec<Task<()>>,
 739}
 740
 741trait CacheableCommand: 'static + Send + Sync {
 742    fn as_any(&self) -> &dyn Any;
 743    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 744    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 745    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 746}
 747
 748impl<T> CacheableCommand for T
 749where
 750    T: DapCommand + PartialEq + Eq + Hash,
 751{
 752    fn as_any(&self) -> &dyn Any {
 753        self
 754    }
 755
 756    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 757        rhs.as_any()
 758            .downcast_ref::<Self>()
 759            .map_or(false, |rhs| self == rhs)
 760    }
 761
 762    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 763        T::hash(self, &mut hasher);
 764    }
 765
 766    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 767        self
 768    }
 769}
 770
 771pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 772
 773impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 774    fn from(request: T) -> Self {
 775        Self(Arc::new(request))
 776    }
 777}
 778
 779impl PartialEq for RequestSlot {
 780    fn eq(&self, other: &Self) -> bool {
 781        self.0.dyn_eq(other.0.as_ref())
 782    }
 783}
 784
 785impl Eq for RequestSlot {}
 786
 787impl Hash for RequestSlot {
 788    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 789        self.0.dyn_hash(state);
 790        self.0.as_any().type_id().hash(state)
 791    }
 792}
 793
 794#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 795pub struct CompletionsQuery {
 796    pub query: String,
 797    pub column: u64,
 798    pub line: Option<u64>,
 799    pub frame_id: Option<u64>,
 800}
 801
 802impl CompletionsQuery {
 803    pub fn new(
 804        buffer: &language::Buffer,
 805        cursor_position: language::Anchor,
 806        frame_id: Option<u64>,
 807    ) -> Self {
 808        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 809        Self {
 810            query: buffer.text(),
 811            column: column as u64,
 812            frame_id,
 813            line: Some(row as u64),
 814        }
 815    }
 816}
 817
 818pub enum SessionEvent {
 819    Modules,
 820    LoadedSources,
 821    Stopped(Option<ThreadId>),
 822    StackTrace,
 823    Variables,
 824    Threads,
 825}
 826
 827impl EventEmitter<SessionEvent> for Session {}
 828
 829// local session will send breakpoint updates to DAP for all new breakpoints
 830// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 831// BreakpointStore notifies session on breakpoint changes
 832impl Session {
 833    pub(crate) fn local(
 834        breakpoint_store: Entity<BreakpointStore>,
 835        session_id: SessionId,
 836        parent_session: Option<Entity<Session>>,
 837        delegate: DapAdapterDelegate,
 838        config: DebugAdapterConfig,
 839        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 840        initialized_tx: oneshot::Sender<()>,
 841        debug_adapters: Arc<DapRegistry>,
 842        cx: &mut App,
 843    ) -> Task<Result<Entity<Self>>> {
 844        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 845
 846        cx.spawn(async move |cx| {
 847            let (mode, capabilities) = LocalMode::new(
 848                debug_adapters,
 849                session_id,
 850                parent_session.clone(),
 851                breakpoint_store.clone(),
 852                config.clone(),
 853                delegate,
 854                message_tx,
 855                cx.clone(),
 856            )
 857            .await?;
 858
 859            cx.new(|cx| {
 860                create_local_session(
 861                    breakpoint_store,
 862                    session_id,
 863                    parent_session,
 864                    start_debugging_requests_tx,
 865                    initialized_tx,
 866                    message_rx,
 867                    mode,
 868                    capabilities,
 869                    cx,
 870                )
 871            })
 872        })
 873    }
 874
 875    #[cfg(any(test, feature = "test-support"))]
 876    pub(crate) fn fake(
 877        breakpoint_store: Entity<BreakpointStore>,
 878        session_id: SessionId,
 879        parent_session: Option<Entity<Session>>,
 880        delegate: DapAdapterDelegate,
 881        config: DebugAdapterConfig,
 882        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 883        initialized_tx: oneshot::Sender<()>,
 884        caps: Capabilities,
 885        fails: bool,
 886        cx: &mut App,
 887    ) -> Task<Result<Entity<Session>>> {
 888        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 889
 890        cx.spawn(async move |cx| {
 891            let (mode, capabilities) = LocalMode::new_fake(
 892                session_id,
 893                parent_session.clone(),
 894                breakpoint_store.clone(),
 895                config.clone(),
 896                delegate,
 897                message_tx,
 898                caps,
 899                fails,
 900                cx.clone(),
 901            )
 902            .await?;
 903
 904            cx.new(|cx| {
 905                create_local_session(
 906                    breakpoint_store,
 907                    session_id,
 908                    parent_session,
 909                    start_debugging_requests_tx,
 910                    initialized_tx,
 911                    message_rx,
 912                    mode,
 913                    capabilities,
 914                    cx,
 915                )
 916            })
 917        })
 918    }
 919
 920    pub(crate) fn remote(
 921        session_id: SessionId,
 922        client: AnyProtoClient,
 923        upstream_project_id: u64,
 924        ignore_breakpoints: bool,
 925    ) -> Self {
 926        Self {
 927            mode: Mode::Remote(RemoteConnection {
 928                _client: client,
 929                _upstream_project_id: upstream_project_id,
 930            }),
 931            id: session_id,
 932            child_session_ids: HashSet::default(),
 933            parent_id: None,
 934            capabilities: Capabilities::default(),
 935            ignore_breakpoints,
 936            variables: Default::default(),
 937            stack_frames: Default::default(),
 938            thread_states: ThreadStates::default(),
 939            output_token: OutputToken(0),
 940            output: circular_buffer::CircularBuffer::boxed(),
 941            requests: HashMap::default(),
 942            modules: Vec::default(),
 943            loaded_sources: Vec::default(),
 944            threads: IndexMap::default(),
 945            _background_tasks: Vec::default(),
 946            locations: Default::default(),
 947            is_session_terminated: false,
 948        }
 949    }
 950
 951    pub fn session_id(&self) -> SessionId {
 952        self.id
 953    }
 954
 955    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 956        self.child_session_ids.clone()
 957    }
 958
 959    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 960        self.child_session_ids.insert(session_id);
 961    }
 962
 963    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 964        self.child_session_ids.remove(&session_id);
 965    }
 966
 967    pub fn parent_id(&self) -> Option<SessionId> {
 968        self.parent_id
 969    }
 970
 971    pub fn capabilities(&self) -> &Capabilities {
 972        &self.capabilities
 973    }
 974
 975    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 976        if let Mode::Local(local_mode) = &self.mode {
 977            Some(local_mode.config.clone())
 978        } else {
 979            None
 980        }
 981    }
 982
 983    pub fn is_terminated(&self) -> bool {
 984        self.is_session_terminated
 985    }
 986
 987    pub fn is_local(&self) -> bool {
 988        matches!(self.mode, Mode::Local(_))
 989    }
 990
 991    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 992        match &mut self.mode {
 993            Mode::Local(local_mode) => Some(local_mode),
 994            Mode::Remote(_) => None,
 995        }
 996    }
 997
 998    pub fn as_local(&self) -> Option<&LocalMode> {
 999        match &self.mode {
1000            Mode::Local(local_mode) => Some(local_mode),
1001            Mode::Remote(_) => None,
1002        }
1003    }
1004
1005    pub(super) fn initialize_sequence(
1006        &mut self,
1007        initialize_rx: oneshot::Receiver<()>,
1008        cx: &mut Context<Self>,
1009    ) -> Task<Result<()>> {
1010        match &self.mode {
1011            Mode::Local(local_mode) => {
1012                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1013            }
1014            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1015        }
1016    }
1017
1018    pub fn output(
1019        &self,
1020        since: OutputToken,
1021    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1022        if self.output_token.0 == 0 {
1023            return (self.output.range(0..0), OutputToken(0));
1024        };
1025
1026        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1027
1028        let clamped_events_since = events_since.clamp(0, self.output.len());
1029        (
1030            self.output
1031                .range(self.output.len() - clamped_events_since..),
1032            self.output_token,
1033        )
1034    }
1035
1036    pub fn respond_to_client(
1037        &self,
1038        request_seq: u64,
1039        success: bool,
1040        command: String,
1041        body: Option<serde_json::Value>,
1042        cx: &mut Context<Self>,
1043    ) -> Task<Result<()>> {
1044        let Some(local_session) = self.as_local().cloned() else {
1045            unreachable!("Cannot respond to remote client");
1046        };
1047
1048        cx.background_spawn(async move {
1049            local_session
1050                .client
1051                .send_message(Message::Response(Response {
1052                    body,
1053                    success,
1054                    command,
1055                    seq: request_seq + 1,
1056                    request_seq,
1057                    message: None,
1058                }))
1059                .await
1060        })
1061    }
1062
1063    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1064        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1065            self.thread_states.stop_all_threads();
1066
1067            self.invalidate_command_type::<StackTraceCommand>();
1068        }
1069
1070        // Event if we stopped all threads we still need to insert the thread_id
1071        // to our own data
1072        if let Some(thread_id) = event.thread_id {
1073            self.thread_states.stop_thread(ThreadId(thread_id));
1074
1075            self.invalidate_state(
1076                &StackTraceCommand {
1077                    thread_id,
1078                    start_frame: None,
1079                    levels: None,
1080                }
1081                .into(),
1082            );
1083        }
1084
1085        self.invalidate_generic();
1086        self.threads.clear();
1087        self.variables.clear();
1088        cx.emit(SessionEvent::Stopped(
1089            event
1090                .thread_id
1091                .map(Into::into)
1092                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1093        ));
1094        cx.notify();
1095    }
1096
1097    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1098        match *event {
1099            Events::Initialized(_) => {
1100                debug_assert!(
1101                    false,
1102                    "Initialized event should have been handled in LocalMode"
1103                );
1104            }
1105            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1106            Events::Continued(event) => {
1107                if event.all_threads_continued.unwrap_or_default() {
1108                    self.thread_states.continue_all_threads();
1109                } else {
1110                    self.thread_states
1111                        .continue_thread(ThreadId(event.thread_id));
1112                }
1113                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1114                self.invalidate_generic();
1115            }
1116            Events::Exited(_event) => {
1117                self.clear_active_debug_line(cx);
1118            }
1119            Events::Terminated(_) => {
1120                self.is_session_terminated = true;
1121                self.clear_active_debug_line(cx);
1122            }
1123            Events::Thread(event) => {
1124                let thread_id = ThreadId(event.thread_id);
1125
1126                match event.reason {
1127                    dap::ThreadEventReason::Started => {
1128                        self.thread_states.continue_thread(thread_id);
1129                    }
1130                    dap::ThreadEventReason::Exited => {
1131                        self.thread_states.exit_thread(thread_id);
1132                    }
1133                    reason => {
1134                        log::error!("Unhandled thread event reason {:?}", reason);
1135                    }
1136                }
1137                self.invalidate_state(&ThreadsCommand.into());
1138                cx.notify();
1139            }
1140            Events::Output(event) => {
1141                if event
1142                    .category
1143                    .as_ref()
1144                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1145                {
1146                    return;
1147                }
1148
1149                self.output.push_back(event);
1150                self.output_token.0 += 1;
1151                cx.notify();
1152            }
1153            Events::Breakpoint(_) => {}
1154            Events::Module(event) => {
1155                match event.reason {
1156                    dap::ModuleEventReason::New => {
1157                        self.modules.push(event.module);
1158                    }
1159                    dap::ModuleEventReason::Changed => {
1160                        if let Some(module) = self
1161                            .modules
1162                            .iter_mut()
1163                            .find(|other| event.module.id == other.id)
1164                        {
1165                            *module = event.module;
1166                        }
1167                    }
1168                    dap::ModuleEventReason::Removed => {
1169                        self.modules.retain(|other| event.module.id != other.id);
1170                    }
1171                }
1172
1173                // todo(debugger): We should only send the invalidate command to downstream clients.
1174                // self.invalidate_state(&ModulesCommand.into());
1175            }
1176            Events::LoadedSource(_) => {
1177                self.invalidate_state(&LoadedSourcesCommand.into());
1178            }
1179            Events::Capabilities(event) => {
1180                self.capabilities = self.capabilities.merge(event.capabilities);
1181                cx.notify();
1182            }
1183            Events::Memory(_) => {}
1184            Events::Process(_) => {}
1185            Events::ProgressEnd(_) => {}
1186            Events::ProgressStart(_) => {}
1187            Events::ProgressUpdate(_) => {}
1188            Events::Invalidated(_) => {}
1189            Events::Other(_) => {}
1190        }
1191    }
1192
1193    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1194    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1195        &mut self,
1196        request: T,
1197        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1198            + 'static,
1199        cx: &mut Context<Self>,
1200    ) {
1201        const {
1202            assert!(
1203                T::CACHEABLE,
1204                "Only requests marked as cacheable should invoke `fetch`"
1205            );
1206        }
1207
1208        if !self.thread_states.any_stopped_thread()
1209            && request.type_id() != TypeId::of::<ThreadsCommand>()
1210            || self.is_session_terminated
1211        {
1212            return;
1213        }
1214
1215        let request_map = self
1216            .requests
1217            .entry(std::any::TypeId::of::<T>())
1218            .or_default();
1219
1220        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1221            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1222
1223            let task = Self::request_inner::<Arc<T>>(
1224                &self.capabilities,
1225                self.id,
1226                &self.mode,
1227                command,
1228                process_result,
1229                cx,
1230            );
1231            let task = cx
1232                .background_executor()
1233                .spawn(async move {
1234                    let _ = task.await?;
1235                    Some(())
1236                })
1237                .shared();
1238
1239            vacant.insert(task);
1240            cx.notify();
1241        }
1242    }
1243
1244    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1245        capabilities: &Capabilities,
1246        session_id: SessionId,
1247        mode: &Mode,
1248        request: T,
1249        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1250            + 'static,
1251        cx: &mut Context<Self>,
1252    ) -> Task<Option<T::Response>> {
1253        if !T::is_supported(&capabilities) {
1254            log::warn!(
1255                "Attempted to send a DAP request that isn't supported: {:?}",
1256                request
1257            );
1258            let error = Err(anyhow::Error::msg(
1259                "Couldn't complete request because it's not supported",
1260            ));
1261            return cx.spawn(async move |this, cx| {
1262                this.update(cx, |this, cx| process_result(this, error, cx))
1263                    .log_err()
1264                    .flatten()
1265            });
1266        }
1267
1268        let request = mode.request_dap(session_id, request, cx);
1269        cx.spawn(async move |this, cx| {
1270            let result = request.await;
1271            this.update(cx, |this, cx| process_result(this, result, cx))
1272                .log_err()
1273                .flatten()
1274        })
1275    }
1276
1277    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1278        &self,
1279        request: T,
1280        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1281            + 'static,
1282        cx: &mut Context<Self>,
1283    ) -> Task<Option<T::Response>> {
1284        Self::request_inner(
1285            &self.capabilities,
1286            self.id,
1287            &self.mode,
1288            request,
1289            process_result,
1290            cx,
1291        )
1292    }
1293
1294    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1295        self.requests.remove(&std::any::TypeId::of::<Command>());
1296    }
1297
1298    fn invalidate_generic(&mut self) {
1299        self.invalidate_command_type::<ModulesCommand>();
1300        self.invalidate_command_type::<LoadedSourcesCommand>();
1301        self.invalidate_command_type::<ThreadsCommand>();
1302    }
1303
1304    fn invalidate_state(&mut self, key: &RequestSlot) {
1305        self.requests
1306            .entry(key.0.as_any().type_id())
1307            .and_modify(|request_map| {
1308                request_map.remove(&key);
1309            });
1310    }
1311
1312    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1313        self.thread_states.thread_status(thread_id)
1314    }
1315
1316    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1317        self.fetch(
1318            dap_command::ThreadsCommand,
1319            |this, result, cx| {
1320                let result = result.log_err()?;
1321
1322                this.threads = result
1323                    .iter()
1324                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1325                    .collect();
1326
1327                this.invalidate_command_type::<StackTraceCommand>();
1328                cx.emit(SessionEvent::Threads);
1329                cx.notify();
1330
1331                Some(result)
1332            },
1333            cx,
1334        );
1335
1336        self.threads
1337            .values()
1338            .map(|thread| {
1339                (
1340                    thread.dap.clone(),
1341                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1342                )
1343            })
1344            .collect()
1345    }
1346
1347    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1348        self.fetch(
1349            dap_command::ModulesCommand,
1350            |this, result, cx| {
1351                let result = result.log_err()?;
1352
1353                this.modules = result.iter().cloned().collect();
1354                cx.emit(SessionEvent::Modules);
1355                cx.notify();
1356
1357                Some(result)
1358            },
1359            cx,
1360        );
1361
1362        &self.modules
1363    }
1364
1365    pub fn ignore_breakpoints(&self) -> bool {
1366        self.ignore_breakpoints
1367    }
1368
1369    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1370        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1371    }
1372
1373    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1374        if self.ignore_breakpoints == ignore {
1375            return Task::ready(());
1376        }
1377
1378        self.ignore_breakpoints = ignore;
1379
1380        if let Some(local) = self.as_local() {
1381            local.send_all_breakpoints(ignore, cx)
1382        } else {
1383            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1384            unimplemented!()
1385        }
1386    }
1387
1388    pub fn breakpoints_enabled(&self) -> bool {
1389        self.ignore_breakpoints
1390    }
1391
1392    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1393        self.fetch(
1394            dap_command::LoadedSourcesCommand,
1395            |this, result, cx| {
1396                let result = result.log_err()?;
1397                this.loaded_sources = result.iter().cloned().collect();
1398                cx.emit(SessionEvent::LoadedSources);
1399                cx.notify();
1400                Some(result)
1401            },
1402            cx,
1403        );
1404
1405        &self.loaded_sources
1406    }
1407
1408    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1409        res.log_err()?;
1410        Some(())
1411    }
1412
1413    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1414        thread_id: ThreadId,
1415    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1416    {
1417        move |this, response, cx| match response.log_err() {
1418            Some(response) => Some(response),
1419            None => {
1420                this.thread_states.stop_thread(thread_id);
1421                cx.notify();
1422                None
1423            }
1424        }
1425    }
1426
1427    fn clear_active_debug_line_response(
1428        &mut self,
1429        response: Result<()>,
1430        cx: &mut Context<Session>,
1431    ) -> Option<()> {
1432        response.log_err()?;
1433        self.clear_active_debug_line(cx);
1434        Some(())
1435    }
1436
1437    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1438        self.as_local()
1439            .expect("Message handler will only run in local mode")
1440            .breakpoint_store
1441            .update(cx, |store, cx| {
1442                store.remove_active_position(Some(self.id), cx)
1443            });
1444    }
1445
1446    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1447        self.request(
1448            PauseCommand {
1449                thread_id: thread_id.0,
1450            },
1451            Self::empty_response,
1452            cx,
1453        )
1454        .detach();
1455    }
1456
1457    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1458        self.request(
1459            RestartStackFrameCommand { stack_frame_id },
1460            Self::empty_response,
1461            cx,
1462        )
1463        .detach();
1464    }
1465
1466    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1467        if self.capabilities.supports_restart_request.unwrap_or(false) {
1468            self.request(
1469                RestartCommand {
1470                    raw: args.unwrap_or(Value::Null),
1471                },
1472                Self::empty_response,
1473                cx,
1474            )
1475            .detach();
1476        } else {
1477            self.request(
1478                DisconnectCommand {
1479                    restart: Some(false),
1480                    terminate_debuggee: Some(true),
1481                    suspend_debuggee: Some(false),
1482                },
1483                Self::empty_response,
1484                cx,
1485            )
1486            .detach();
1487        }
1488    }
1489
1490    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1491        self.is_session_terminated = true;
1492        self.thread_states.exit_all_threads();
1493        cx.notify();
1494
1495        let task = if self
1496            .capabilities
1497            .supports_terminate_request
1498            .unwrap_or_default()
1499        {
1500            self.request(
1501                TerminateCommand {
1502                    restart: Some(false),
1503                },
1504                Self::clear_active_debug_line_response,
1505                cx,
1506            )
1507        } else {
1508            self.request(
1509                DisconnectCommand {
1510                    restart: Some(false),
1511                    terminate_debuggee: Some(true),
1512                    suspend_debuggee: Some(false),
1513                },
1514                Self::clear_active_debug_line_response,
1515                cx,
1516            )
1517        };
1518
1519        cx.background_spawn(async move {
1520            let _ = task.await;
1521        })
1522    }
1523
1524    pub fn completions(
1525        &mut self,
1526        query: CompletionsQuery,
1527        cx: &mut Context<Self>,
1528    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1529        let task = self.request(query, |_, result, _| result.log_err(), cx);
1530
1531        cx.background_executor().spawn(async move {
1532            anyhow::Ok(
1533                task.await
1534                    .map(|response| response.targets)
1535                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1536            )
1537        })
1538    }
1539
1540    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1541        self.thread_states.continue_thread(thread_id);
1542        self.request(
1543            ContinueCommand {
1544                args: ContinueArguments {
1545                    thread_id: thread_id.0,
1546                    single_thread: Some(true),
1547                },
1548            },
1549            Self::on_step_response::<ContinueCommand>(thread_id),
1550            cx,
1551        )
1552        .detach();
1553    }
1554
1555    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1556        match self.mode {
1557            Mode::Local(ref local) => Some(local.client.clone()),
1558            Mode::Remote(_) => None,
1559        }
1560    }
1561
1562    pub fn step_over(
1563        &mut self,
1564        thread_id: ThreadId,
1565        granularity: SteppingGranularity,
1566        cx: &mut Context<Self>,
1567    ) {
1568        let supports_single_thread_execution_requests =
1569            self.capabilities.supports_single_thread_execution_requests;
1570        let supports_stepping_granularity = self
1571            .capabilities
1572            .supports_stepping_granularity
1573            .unwrap_or_default();
1574
1575        let command = NextCommand {
1576            inner: StepCommand {
1577                thread_id: thread_id.0,
1578                granularity: supports_stepping_granularity.then(|| granularity),
1579                single_thread: supports_single_thread_execution_requests,
1580            },
1581        };
1582
1583        self.thread_states.process_step(thread_id);
1584        self.request(
1585            command,
1586            Self::on_step_response::<NextCommand>(thread_id),
1587            cx,
1588        )
1589        .detach();
1590    }
1591
1592    pub fn step_in(
1593        &mut self,
1594        thread_id: ThreadId,
1595        granularity: SteppingGranularity,
1596        cx: &mut Context<Self>,
1597    ) {
1598        let supports_single_thread_execution_requests =
1599            self.capabilities.supports_single_thread_execution_requests;
1600        let supports_stepping_granularity = self
1601            .capabilities
1602            .supports_stepping_granularity
1603            .unwrap_or_default();
1604
1605        let command = StepInCommand {
1606            inner: StepCommand {
1607                thread_id: thread_id.0,
1608                granularity: supports_stepping_granularity.then(|| granularity),
1609                single_thread: supports_single_thread_execution_requests,
1610            },
1611        };
1612
1613        self.thread_states.process_step(thread_id);
1614        self.request(
1615            command,
1616            Self::on_step_response::<StepInCommand>(thread_id),
1617            cx,
1618        )
1619        .detach();
1620    }
1621
1622    pub fn step_out(
1623        &mut self,
1624        thread_id: ThreadId,
1625        granularity: SteppingGranularity,
1626        cx: &mut Context<Self>,
1627    ) {
1628        let supports_single_thread_execution_requests =
1629            self.capabilities.supports_single_thread_execution_requests;
1630        let supports_stepping_granularity = self
1631            .capabilities
1632            .supports_stepping_granularity
1633            .unwrap_or_default();
1634
1635        let command = StepOutCommand {
1636            inner: StepCommand {
1637                thread_id: thread_id.0,
1638                granularity: supports_stepping_granularity.then(|| granularity),
1639                single_thread: supports_single_thread_execution_requests,
1640            },
1641        };
1642
1643        self.thread_states.process_step(thread_id);
1644        self.request(
1645            command,
1646            Self::on_step_response::<StepOutCommand>(thread_id),
1647            cx,
1648        )
1649        .detach();
1650    }
1651
1652    pub fn step_back(
1653        &mut self,
1654        thread_id: ThreadId,
1655        granularity: SteppingGranularity,
1656        cx: &mut Context<Self>,
1657    ) {
1658        let supports_single_thread_execution_requests =
1659            self.capabilities.supports_single_thread_execution_requests;
1660        let supports_stepping_granularity = self
1661            .capabilities
1662            .supports_stepping_granularity
1663            .unwrap_or_default();
1664
1665        let command = StepBackCommand {
1666            inner: StepCommand {
1667                thread_id: thread_id.0,
1668                granularity: supports_stepping_granularity.then(|| granularity),
1669                single_thread: supports_single_thread_execution_requests,
1670            },
1671        };
1672
1673        self.thread_states.process_step(thread_id);
1674
1675        self.request(
1676            command,
1677            Self::on_step_response::<StepBackCommand>(thread_id),
1678            cx,
1679        )
1680        .detach();
1681    }
1682
1683    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1684        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1685            && self.requests.contains_key(&ThreadsCommand.type_id())
1686            && self.threads.contains_key(&thread_id)
1687        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1688        // We could still be using an old thread id and have sent a new thread's request
1689        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1690        // But it very well could cause a minor bug in the future that is hard to track down
1691        {
1692            self.fetch(
1693                super::dap_command::StackTraceCommand {
1694                    thread_id: thread_id.0,
1695                    start_frame: None,
1696                    levels: None,
1697                },
1698                move |this, stack_frames, cx| {
1699                    let stack_frames = stack_frames.log_err()?;
1700
1701                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1702                        thread.stack_frame_ids =
1703                            stack_frames.iter().map(|frame| frame.id).collect();
1704                    });
1705                    debug_assert!(
1706                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1707                        "Sent request for thread_id that doesn't exist"
1708                    );
1709
1710                    this.stack_frames.extend(
1711                        stack_frames
1712                            .iter()
1713                            .cloned()
1714                            .map(|frame| (frame.id, StackFrame::from(frame))),
1715                    );
1716
1717                    this.invalidate_command_type::<ScopesCommand>();
1718                    this.invalidate_command_type::<VariablesCommand>();
1719
1720                    cx.emit(SessionEvent::StackTrace);
1721                    cx.notify();
1722                    Some(stack_frames)
1723                },
1724                cx,
1725            );
1726        }
1727
1728        self.threads
1729            .get(&thread_id)
1730            .map(|thread| {
1731                thread
1732                    .stack_frame_ids
1733                    .iter()
1734                    .filter_map(|id| self.stack_frames.get(id))
1735                    .cloned()
1736                    .collect()
1737            })
1738            .unwrap_or_default()
1739    }
1740
1741    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1742        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1743            && self
1744                .requests
1745                .contains_key(&TypeId::of::<StackTraceCommand>())
1746        {
1747            self.fetch(
1748                ScopesCommand { stack_frame_id },
1749                move |this, scopes, cx| {
1750                    let scopes = scopes.log_err()?;
1751
1752                    for scope in scopes .iter(){
1753                        this.variables(scope.variables_reference, cx);
1754                    }
1755
1756                    let entry = this
1757                        .stack_frames
1758                        .entry(stack_frame_id)
1759                        .and_modify(|stack_frame| {
1760                            stack_frame.scopes = scopes.clone();
1761                        });
1762
1763                    cx.emit(SessionEvent::Variables);
1764
1765                    debug_assert!(
1766                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1767                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1768                    );
1769
1770                    Some(scopes)
1771                },
1772                cx,
1773            );
1774        }
1775
1776        self.stack_frames
1777            .get(&stack_frame_id)
1778            .map(|frame| frame.scopes.as_slice())
1779            .unwrap_or_default()
1780    }
1781
1782    pub fn variables(
1783        &mut self,
1784        variables_reference: VariableReference,
1785        cx: &mut Context<Self>,
1786    ) -> Vec<dap::Variable> {
1787        let command = VariablesCommand {
1788            variables_reference,
1789            filter: None,
1790            start: None,
1791            count: None,
1792            format: None,
1793        };
1794
1795        self.fetch(
1796            command,
1797            move |this, variables, cx| {
1798                let variables = variables.log_err()?;
1799                this.variables
1800                    .insert(variables_reference, variables.clone());
1801
1802                cx.emit(SessionEvent::Variables);
1803                Some(variables)
1804            },
1805            cx,
1806        );
1807
1808        self.variables
1809            .get(&variables_reference)
1810            .cloned()
1811            .unwrap_or_default()
1812    }
1813
1814    pub fn set_variable_value(
1815        &mut self,
1816        variables_reference: u64,
1817        name: String,
1818        value: String,
1819        cx: &mut Context<Self>,
1820    ) {
1821        if self.capabilities.supports_set_variable.unwrap_or_default() {
1822            self.request(
1823                SetVariableValueCommand {
1824                    name,
1825                    value,
1826                    variables_reference,
1827                },
1828                move |this, response, cx| {
1829                    let response = response.log_err()?;
1830                    this.invalidate_command_type::<VariablesCommand>();
1831                    cx.notify();
1832                    Some(response)
1833                },
1834                cx,
1835            )
1836            .detach()
1837        }
1838    }
1839
1840    pub fn evaluate(
1841        &mut self,
1842        expression: String,
1843        context: Option<EvaluateArgumentsContext>,
1844        frame_id: Option<u64>,
1845        source: Option<Source>,
1846        cx: &mut Context<Self>,
1847    ) {
1848        self.request(
1849            EvaluateCommand {
1850                expression,
1851                context,
1852                frame_id,
1853                source,
1854            },
1855            |this, response, cx| {
1856                let response = response.log_err()?;
1857                this.output_token.0 += 1;
1858                this.output.push_back(dap::OutputEvent {
1859                    category: None,
1860                    output: response.result.clone(),
1861                    group: None,
1862                    variables_reference: Some(response.variables_reference),
1863                    source: None,
1864                    line: None,
1865                    column: None,
1866                    data: None,
1867                    location_reference: None,
1868                });
1869
1870                this.invalidate_command_type::<ScopesCommand>();
1871                cx.notify();
1872                Some(response)
1873            },
1874            cx,
1875        )
1876        .detach();
1877    }
1878
1879    pub fn location(
1880        &mut self,
1881        reference: u64,
1882        cx: &mut Context<Self>,
1883    ) -> Option<dap::LocationsResponse> {
1884        self.fetch(
1885            LocationsCommand { reference },
1886            move |this, response, _| {
1887                let response = response.log_err()?;
1888                this.locations.insert(reference, response.clone());
1889                Some(response)
1890            },
1891            cx,
1892        );
1893        self.locations.get(&reference).cloned()
1894    }
1895    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1896        let command = DisconnectCommand {
1897            restart: Some(false),
1898            terminate_debuggee: Some(true),
1899            suspend_debuggee: Some(false),
1900        };
1901
1902        self.request(command, Self::empty_response, cx).detach()
1903    }
1904
1905    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1906        if self
1907            .capabilities
1908            .supports_terminate_threads_request
1909            .unwrap_or_default()
1910        {
1911            self.request(
1912                TerminateThreadsCommand {
1913                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1914                },
1915                Self::clear_active_debug_line_response,
1916                cx,
1917            )
1918            .detach();
1919        } else {
1920            self.shutdown(cx).detach();
1921        }
1922    }
1923}
1924
1925fn create_local_session(
1926    breakpoint_store: Entity<BreakpointStore>,
1927    session_id: SessionId,
1928    parent_session: Option<Entity<Session>>,
1929    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1930    initialized_tx: oneshot::Sender<()>,
1931    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1932    mode: LocalMode,
1933    capabilities: Capabilities,
1934    cx: &mut Context<Session>,
1935) -> Session {
1936    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1937        let mut initialized_tx = Some(initialized_tx);
1938        while let Some(message) = message_rx.next().await {
1939            if let Message::Event(event) = message {
1940                if let Events::Initialized(_) = *event {
1941                    if let Some(tx) = initialized_tx.take() {
1942                        tx.send(()).ok();
1943                    }
1944                } else {
1945                    let Ok(_) = this.update(cx, |session, cx| {
1946                        session.handle_dap_event(event, cx);
1947                    }) else {
1948                        break;
1949                    };
1950                }
1951            } else {
1952                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1953                else {
1954                    break;
1955                };
1956            }
1957        }
1958    })];
1959
1960    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1961        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1962            if let Some(local) = (!this.ignore_breakpoints)
1963                .then(|| this.as_local_mut())
1964                .flatten()
1965            {
1966                local
1967                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1968                    .detach();
1969            };
1970        }
1971        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1972            if let Some(local) = (!this.ignore_breakpoints)
1973                .then(|| this.as_local_mut())
1974                .flatten()
1975            {
1976                local.unset_breakpoints_from_paths(paths, cx).detach();
1977            }
1978        }
1979        BreakpointStoreEvent::ActiveDebugLineChanged => {}
1980    })
1981    .detach();
1982
1983    Session {
1984        mode: Mode::Local(mode),
1985        id: session_id,
1986        child_session_ids: HashSet::default(),
1987        parent_id: parent_session.map(|session| session.read(cx).id),
1988        variables: Default::default(),
1989        capabilities,
1990        thread_states: ThreadStates::default(),
1991        output_token: OutputToken(0),
1992        ignore_breakpoints: false,
1993        output: circular_buffer::CircularBuffer::boxed(),
1994        requests: HashMap::default(),
1995        modules: Vec::default(),
1996        loaded_sources: Vec::default(),
1997        threads: IndexMap::default(),
1998        stack_frames: IndexMap::default(),
1999        locations: Default::default(),
2000        _background_tasks,
2001        is_session_terminated: false,
2002    }
2003}