session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
   9    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  10    VariablesCommand,
  11};
  12use super::dap_store::DapAdapterDelegate;
  13use anyhow::{Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    adapters::{DapDelegate, DapStatus},
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
  25use futures::channel::oneshot;
  26use futures::{FutureExt, future::Shared};
  27use gpui::{
  28    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  29};
  30use rpc::AnyProtoClient;
  31use serde_json::{Value, json};
  32use settings::Settings;
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::path::PathBuf;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::{DebugAdapterConfig, DebugTaskDefinition};
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{ResultExt, merge_json_value_into};
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117type UpstreamProjectId = u64;
 118
 119struct RemoteConnection {
 120    _client: AnyProtoClient,
 121    _upstream_project_id: UpstreamProjectId,
 122}
 123
 124impl RemoteConnection {
 125    fn send_proto_client_request<R: DapCommand>(
 126        &self,
 127        _request: R,
 128        _session_id: SessionId,
 129        cx: &mut App,
 130    ) -> Task<Result<R::Response>> {
 131        // let message = request.to_proto(session_id, self.upstream_project_id);
 132        // let upstream_client = self.client.clone();
 133        cx.background_executor().spawn(async move {
 134            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 135            // let response = upstream_client.request(message).await?;
 136            // request.response_from_proto(response)
 137            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 138        })
 139    }
 140
 141    fn request<R: DapCommand>(
 142        &self,
 143        request: R,
 144        session_id: SessionId,
 145        cx: &mut App,
 146    ) -> Task<Result<R::Response>>
 147    where
 148        <R::DapRequest as dap::requests::Request>::Response: 'static,
 149        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 150    {
 151        return self.send_proto_client_request::<R>(request, session_id, cx);
 152    }
 153}
 154
 155enum Mode {
 156    Local(LocalMode),
 157    Remote(RemoteConnection),
 158}
 159
 160#[derive(Clone)]
 161pub struct LocalMode {
 162    client: Arc<DebugAdapterClient>,
 163    config: DebugAdapterConfig,
 164    adapter: Arc<dyn DebugAdapter>,
 165    breakpoint_store: Entity<BreakpointStore>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        debug_adapters: Arc<DapRegistry>,
 186        session_id: SessionId,
 187        parent_session: Option<Entity<Session>>,
 188        breakpoint_store: Entity<BreakpointStore>,
 189        config: DebugAdapterConfig,
 190        delegate: DapAdapterDelegate,
 191        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 192        cx: AsyncApp,
 193    ) -> Task<Result<(Self, Capabilities)>> {
 194        Self::new_inner(
 195            debug_adapters,
 196            session_id,
 197            parent_session,
 198            breakpoint_store,
 199            config,
 200            delegate,
 201            messages_tx,
 202            async |_, _| {},
 203            cx,
 204        )
 205    }
 206    #[cfg(any(test, feature = "test-support"))]
 207    fn new_fake(
 208        session_id: SessionId,
 209        parent_session: Option<Entity<Session>>,
 210        breakpoint_store: Entity<BreakpointStore>,
 211        config: DebugAdapterConfig,
 212        delegate: DapAdapterDelegate,
 213        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 214        caps: Capabilities,
 215        fail: bool,
 216        cx: AsyncApp,
 217    ) -> Task<Result<(Self, Capabilities)>> {
 218        use task::DebugRequestDisposition;
 219
 220        let request = match config.request.clone() {
 221            DebugRequestDisposition::UserConfigured(request) => request,
 222            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 223                match reverse_request_args.request {
 224                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 225                        DebugRequestType::Launch(task::LaunchConfig {
 226                            program: "".to_owned(),
 227                            cwd: None,
 228                            args: Default::default(),
 229                        })
 230                    }
 231                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 232                        DebugRequestType::Attach(task::AttachConfig {
 233                            process_id: Some(0),
 234                        })
 235                    }
 236                }
 237            }
 238        };
 239
 240        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 241            session
 242                .client
 243                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 244                .await;
 245
 246            let paths = cx
 247                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 248                .expect("Breakpoint store should exist in all tests that start debuggers");
 249
 250            session
 251                .client
 252                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 253                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 254                    if !paths.contains(&p) {
 255                        panic!("Sent breakpoints for path without any")
 256                    }
 257
 258                    Ok(dap::SetBreakpointsResponse {
 259                        breakpoints: Vec::default(),
 260                    })
 261                })
 262                .await;
 263
 264            match request {
 265                dap::DebugRequestType::Launch(_) => {
 266                    if fail {
 267                        session
 268                            .client
 269                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 270                                Err(dap::ErrorResponse {
 271                                    error: Some(dap::Message {
 272                                        id: 1,
 273                                        format: "error".into(),
 274                                        variables: None,
 275                                        send_telemetry: None,
 276                                        show_user: None,
 277                                        url: None,
 278                                        url_label: None,
 279                                    }),
 280                                })
 281                            })
 282                            .await;
 283                    } else {
 284                        session
 285                            .client
 286                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 287                            .await;
 288                    }
 289                }
 290                dap::DebugRequestType::Attach(attach_config) => {
 291                    if fail {
 292                        session
 293                            .client
 294                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 295                                Err(dap::ErrorResponse {
 296                                    error: Some(dap::Message {
 297                                        id: 1,
 298                                        format: "error".into(),
 299                                        variables: None,
 300                                        send_telemetry: None,
 301                                        show_user: None,
 302                                        url: None,
 303                                        url_label: None,
 304                                    }),
 305                                })
 306                            })
 307                            .await;
 308                    } else {
 309                        session
 310                            .client
 311                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 312                                assert_eq!(
 313                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 314                                    args.raw
 315                                );
 316
 317                                Ok(())
 318                            })
 319                            .await;
 320                    }
 321                }
 322            }
 323
 324            session
 325                .client
 326                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 327                .await;
 328            session.client.fake_event(Events::Initialized(None)).await;
 329        };
 330        Self::new_inner(
 331            DapRegistry::fake().into(),
 332            session_id,
 333            parent_session,
 334            breakpoint_store,
 335            config,
 336            delegate,
 337            messages_tx,
 338            callback,
 339            cx,
 340        )
 341    }
 342    fn new_inner(
 343        registry: Arc<DapRegistry>,
 344        session_id: SessionId,
 345        parent_session: Option<Entity<Session>>,
 346        breakpoint_store: Entity<BreakpointStore>,
 347        config: DebugAdapterConfig,
 348        delegate: DapAdapterDelegate,
 349        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 350        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 351        cx: AsyncApp,
 352    ) -> Task<Result<(Self, Capabilities)>> {
 353        cx.spawn(async move |cx| {
 354            let (adapter, binary) =
 355                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 356
 357            let message_handler = Box::new(move |message| {
 358                messages_tx.unbounded_send(message).ok();
 359            });
 360
 361            let client = Arc::new(
 362                if let Some(client) = parent_session
 363                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 364                    .flatten()
 365                {
 366                    client
 367                        .reconnect(session_id, binary, message_handler, cx.clone())
 368                        .await?
 369                } else {
 370                    DebugAdapterClient::start(
 371                        session_id,
 372                        adapter.name(),
 373                        binary,
 374                        message_handler,
 375                        cx.clone(),
 376                    )
 377                    .await?
 378                },
 379            );
 380
 381            let adapter_id = adapter.name().to_string().to_owned();
 382            let mut session = Self {
 383                client,
 384                adapter,
 385                breakpoint_store,
 386                config: config.clone(),
 387            };
 388
 389            on_initialized(&mut session, cx.clone()).await;
 390            let capabilities = session
 391                .request(Initialize { adapter_id }, cx.background_executor().clone())
 392                .await?;
 393
 394            Ok((session, capabilities))
 395        })
 396    }
 397
 398    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 399        let tasks: Vec<_> = paths
 400            .into_iter()
 401            .map(|path| {
 402                self.request(
 403                    dap_command::SetBreakpoints {
 404                        source: client_source(path),
 405                        source_modified: None,
 406                        breakpoints: vec![],
 407                    },
 408                    cx.background_executor().clone(),
 409                )
 410            })
 411            .collect();
 412
 413        cx.background_spawn(async move {
 414            futures::future::join_all(tasks)
 415                .await
 416                .iter()
 417                .for_each(|res| match res {
 418                    Ok(_) => {}
 419                    Err(err) => {
 420                        log::warn!("Set breakpoints request failed: {}", err);
 421                    }
 422                });
 423        })
 424    }
 425
 426    fn send_breakpoints_from_path(
 427        &self,
 428        abs_path: Arc<Path>,
 429        reason: BreakpointUpdatedReason,
 430        cx: &mut App,
 431    ) -> Task<()> {
 432        let breakpoints = self
 433            .breakpoint_store
 434            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 435            .into_iter()
 436            .filter(|bp| bp.state.is_enabled())
 437            .map(Into::into)
 438            .collect();
 439
 440        let task = self.request(
 441            dap_command::SetBreakpoints {
 442                source: client_source(&abs_path),
 443                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 444                breakpoints,
 445            },
 446            cx.background_executor().clone(),
 447        );
 448
 449        cx.background_spawn(async move {
 450            match task.await {
 451                Ok(_) => {}
 452                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 453            }
 454        })
 455    }
 456
 457    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 458        let mut breakpoint_tasks = Vec::new();
 459        let breakpoints = self
 460            .breakpoint_store
 461            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 462
 463        for (path, breakpoints) in breakpoints {
 464            let breakpoints = if ignore_breakpoints {
 465                vec![]
 466            } else {
 467                breakpoints
 468                    .into_iter()
 469                    .filter(|bp| bp.state.is_enabled())
 470                    .map(Into::into)
 471                    .collect()
 472            };
 473
 474            breakpoint_tasks.push(self.request(
 475                dap_command::SetBreakpoints {
 476                    source: client_source(&path),
 477                    source_modified: Some(false),
 478                    breakpoints,
 479                },
 480                cx.background_executor().clone(),
 481            ));
 482        }
 483
 484        cx.background_spawn(async move {
 485            futures::future::join_all(breakpoint_tasks)
 486                .await
 487                .iter()
 488                .for_each(|res| match res {
 489                    Ok(_) => {}
 490                    Err(err) => {
 491                        log::warn!("Set breakpoints request failed: {}", err);
 492                    }
 493                });
 494        })
 495    }
 496
 497    async fn get_adapter_binary(
 498        registry: &Arc<DapRegistry>,
 499        config: &DebugAdapterConfig,
 500        delegate: &DapAdapterDelegate,
 501        cx: &mut AsyncApp,
 502    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 503        let adapter = registry
 504            .adapter(&config.adapter)
 505            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 506
 507        let binary = cx.update(|cx| {
 508            ProjectSettings::get_global(cx)
 509                .dap
 510                .get(&adapter.name())
 511                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 512        })?;
 513
 514        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 515            Err(error) => {
 516                delegate.update_status(
 517                    adapter.name(),
 518                    DapStatus::Failed {
 519                        error: error.to_string(),
 520                    },
 521                );
 522
 523                return Err(error);
 524            }
 525            Ok(mut binary) => {
 526                delegate.update_status(adapter.name(), DapStatus::None);
 527
 528                let shell_env = delegate.shell_env().await;
 529                let mut envs = binary.envs.unwrap_or_default();
 530                envs.extend(shell_env);
 531                binary.envs = Some(envs);
 532
 533                binary
 534            }
 535        };
 536
 537        Ok((adapter, binary))
 538    }
 539
 540    pub fn initialize_sequence(
 541        &self,
 542        capabilities: &Capabilities,
 543        initialized_rx: oneshot::Receiver<()>,
 544        cx: &App,
 545    ) -> Task<Result<()>> {
 546        let (mut raw, is_launch) = match &self.config.request {
 547            task::DebugRequestDisposition::UserConfigured(_) => {
 548                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 549                    debug_assert!(false, "This part of code should be unreachable in practice");
 550                    return Task::ready(Err(anyhow!(
 551                        "Expected debug config conversion to succeed"
 552                    )));
 553                };
 554                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 555                let raw = self.adapter.request_args(&raw);
 556                (raw, is_launch)
 557            }
 558            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 559                start_debugging_request_arguments.configuration.clone(),
 560                matches!(
 561                    start_debugging_request_arguments.request,
 562                    dap::StartDebuggingRequestArgumentsRequest::Launch
 563                ),
 564            ),
 565        };
 566
 567        merge_json_value_into(
 568            self.config.initialize_args.clone().unwrap_or(json!({})),
 569            &mut raw,
 570        );
 571        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 572        let launch = if is_launch {
 573            self.request(Launch { raw }, cx.background_executor().clone())
 574        } else {
 575            self.request(Attach { raw }, cx.background_executor().clone())
 576        };
 577
 578        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 579
 580        let configuration_sequence = cx.spawn({
 581            let this = self.clone();
 582            async move |cx| {
 583                initialized_rx.await?;
 584                // todo(debugger) figure out if we want to handle a breakpoint response error
 585                // This will probably consist of letting a user know that breakpoints failed to be set
 586                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 587
 588                if configuration_done_supported {
 589                    this.request(ConfigurationDone, cx.background_executor().clone())
 590                } else {
 591                    Task::ready(Ok(()))
 592                }
 593                .await
 594            }
 595        });
 596
 597        cx.background_spawn(async move {
 598            futures::future::try_join(launch, configuration_sequence).await?;
 599            Ok(())
 600        })
 601    }
 602
 603    fn request<R: LocalDapCommand>(
 604        &self,
 605        request: R,
 606        executor: BackgroundExecutor,
 607    ) -> Task<Result<R::Response>>
 608    where
 609        <R::DapRequest as dap::requests::Request>::Response: 'static,
 610        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 611    {
 612        let request = Arc::new(request);
 613
 614        let request_clone = request.clone();
 615        let connection = self.client.clone();
 616        let request_task = executor.spawn(async move {
 617            let args = request_clone.to_dap();
 618            connection.request::<R::DapRequest>(args).await
 619        });
 620
 621        executor.spawn(async move {
 622            let response = request.response_from_dap(request_task.await?);
 623            response
 624        })
 625    }
 626}
 627impl From<RemoteConnection> for Mode {
 628    fn from(value: RemoteConnection) -> Self {
 629        Self::Remote(value)
 630    }
 631}
 632
 633impl Mode {
 634    fn request_dap<R: DapCommand>(
 635        &self,
 636        session_id: SessionId,
 637        request: R,
 638        cx: &mut Context<Session>,
 639    ) -> Task<Result<R::Response>>
 640    where
 641        <R::DapRequest as dap::requests::Request>::Response: 'static,
 642        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 643    {
 644        match self {
 645            Mode::Local(debug_adapter_client) => {
 646                debug_adapter_client.request(request, cx.background_executor().clone())
 647            }
 648            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 649        }
 650    }
 651}
 652
 653#[derive(Default)]
 654struct ThreadStates {
 655    global_state: Option<ThreadStatus>,
 656    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 657}
 658
 659impl ThreadStates {
 660    fn stop_all_threads(&mut self) {
 661        self.global_state = Some(ThreadStatus::Stopped);
 662        self.known_thread_states.clear();
 663    }
 664
 665    fn exit_all_threads(&mut self) {
 666        self.global_state = Some(ThreadStatus::Exited);
 667        self.known_thread_states.clear();
 668    }
 669
 670    fn continue_all_threads(&mut self) {
 671        self.global_state = Some(ThreadStatus::Running);
 672        self.known_thread_states.clear();
 673    }
 674
 675    fn stop_thread(&mut self, thread_id: ThreadId) {
 676        self.known_thread_states
 677            .insert(thread_id, ThreadStatus::Stopped);
 678    }
 679
 680    fn continue_thread(&mut self, thread_id: ThreadId) {
 681        self.known_thread_states
 682            .insert(thread_id, ThreadStatus::Running);
 683    }
 684
 685    fn process_step(&mut self, thread_id: ThreadId) {
 686        self.known_thread_states
 687            .insert(thread_id, ThreadStatus::Stepping);
 688    }
 689
 690    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 691        self.thread_state(thread_id)
 692            .unwrap_or(ThreadStatus::Running)
 693    }
 694
 695    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 696        self.known_thread_states
 697            .get(&thread_id)
 698            .copied()
 699            .or(self.global_state)
 700    }
 701
 702    fn exit_thread(&mut self, thread_id: ThreadId) {
 703        self.known_thread_states
 704            .insert(thread_id, ThreadStatus::Exited);
 705    }
 706
 707    fn any_stopped_thread(&self) -> bool {
 708        self.global_state
 709            .is_some_and(|state| state == ThreadStatus::Stopped)
 710            || self
 711                .known_thread_states
 712                .values()
 713                .any(|status| *status == ThreadStatus::Stopped)
 714    }
 715}
 716const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 717
 718#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 719pub struct OutputToken(pub usize);
 720/// Represents a current state of a single debug adapter and provides ways to mutate it.
 721pub struct Session {
 722    mode: Mode,
 723    pub(super) capabilities: Capabilities,
 724    id: SessionId,
 725    child_session_ids: HashSet<SessionId>,
 726    parent_id: Option<SessionId>,
 727    ignore_breakpoints: bool,
 728    modules: Vec<dap::Module>,
 729    loaded_sources: Vec<dap::Source>,
 730    output_token: OutputToken,
 731    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 732    threads: IndexMap<ThreadId, Thread>,
 733    thread_states: ThreadStates,
 734    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 735    stack_frames: IndexMap<StackFrameId, StackFrame>,
 736    locations: HashMap<u64, dap::LocationsResponse>,
 737    is_session_terminated: bool,
 738    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 739    _background_tasks: Vec<Task<()>>,
 740}
 741
 742trait CacheableCommand: 'static + Send + Sync {
 743    fn as_any(&self) -> &dyn Any;
 744    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 745    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 746    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 747}
 748
 749impl<T> CacheableCommand for T
 750where
 751    T: DapCommand + PartialEq + Eq + Hash,
 752{
 753    fn as_any(&self) -> &dyn Any {
 754        self
 755    }
 756
 757    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 758        rhs.as_any()
 759            .downcast_ref::<Self>()
 760            .map_or(false, |rhs| self == rhs)
 761    }
 762
 763    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 764        T::hash(self, &mut hasher);
 765    }
 766
 767    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 768        self
 769    }
 770}
 771
 772pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 773
 774impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 775    fn from(request: T) -> Self {
 776        Self(Arc::new(request))
 777    }
 778}
 779
 780impl PartialEq for RequestSlot {
 781    fn eq(&self, other: &Self) -> bool {
 782        self.0.dyn_eq(other.0.as_ref())
 783    }
 784}
 785
 786impl Eq for RequestSlot {}
 787
 788impl Hash for RequestSlot {
 789    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 790        self.0.dyn_hash(state);
 791        self.0.as_any().type_id().hash(state)
 792    }
 793}
 794
 795#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 796pub struct CompletionsQuery {
 797    pub query: String,
 798    pub column: u64,
 799    pub line: Option<u64>,
 800    pub frame_id: Option<u64>,
 801}
 802
 803impl CompletionsQuery {
 804    pub fn new(
 805        buffer: &language::Buffer,
 806        cursor_position: language::Anchor,
 807        frame_id: Option<u64>,
 808    ) -> Self {
 809        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 810        Self {
 811            query: buffer.text(),
 812            column: column as u64,
 813            frame_id,
 814            line: Some(row as u64),
 815        }
 816    }
 817}
 818
 819pub enum SessionEvent {
 820    Modules,
 821    LoadedSources,
 822    Stopped(Option<ThreadId>),
 823    StackTrace,
 824    Variables,
 825    Threads,
 826}
 827
 828impl EventEmitter<SessionEvent> for Session {}
 829
 830// local session will send breakpoint updates to DAP for all new breakpoints
 831// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 832// BreakpointStore notifies session on breakpoint changes
 833impl Session {
 834    pub(crate) fn local(
 835        breakpoint_store: Entity<BreakpointStore>,
 836        session_id: SessionId,
 837        parent_session: Option<Entity<Session>>,
 838        delegate: DapAdapterDelegate,
 839        config: DebugAdapterConfig,
 840        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 841        initialized_tx: oneshot::Sender<()>,
 842        debug_adapters: Arc<DapRegistry>,
 843        cx: &mut App,
 844    ) -> Task<Result<Entity<Self>>> {
 845        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 846
 847        cx.spawn(async move |cx| {
 848            let (mode, capabilities) = LocalMode::new(
 849                debug_adapters,
 850                session_id,
 851                parent_session.clone(),
 852                breakpoint_store.clone(),
 853                config.clone(),
 854                delegate,
 855                message_tx,
 856                cx.clone(),
 857            )
 858            .await?;
 859
 860            cx.new(|cx| {
 861                create_local_session(
 862                    breakpoint_store,
 863                    session_id,
 864                    parent_session,
 865                    start_debugging_requests_tx,
 866                    initialized_tx,
 867                    message_rx,
 868                    mode,
 869                    capabilities,
 870                    cx,
 871                )
 872            })
 873        })
 874    }
 875
 876    #[cfg(any(test, feature = "test-support"))]
 877    pub(crate) fn fake(
 878        breakpoint_store: Entity<BreakpointStore>,
 879        session_id: SessionId,
 880        parent_session: Option<Entity<Session>>,
 881        delegate: DapAdapterDelegate,
 882        config: DebugAdapterConfig,
 883        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 884        initialized_tx: oneshot::Sender<()>,
 885        caps: Capabilities,
 886        fails: bool,
 887        cx: &mut App,
 888    ) -> Task<Result<Entity<Session>>> {
 889        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 890
 891        cx.spawn(async move |cx| {
 892            let (mode, capabilities) = LocalMode::new_fake(
 893                session_id,
 894                parent_session.clone(),
 895                breakpoint_store.clone(),
 896                config.clone(),
 897                delegate,
 898                message_tx,
 899                caps,
 900                fails,
 901                cx.clone(),
 902            )
 903            .await?;
 904
 905            cx.new(|cx| {
 906                create_local_session(
 907                    breakpoint_store,
 908                    session_id,
 909                    parent_session,
 910                    start_debugging_requests_tx,
 911                    initialized_tx,
 912                    message_rx,
 913                    mode,
 914                    capabilities,
 915                    cx,
 916                )
 917            })
 918        })
 919    }
 920
 921    pub(crate) fn remote(
 922        session_id: SessionId,
 923        client: AnyProtoClient,
 924        upstream_project_id: u64,
 925        ignore_breakpoints: bool,
 926    ) -> Self {
 927        Self {
 928            mode: Mode::Remote(RemoteConnection {
 929                _client: client,
 930                _upstream_project_id: upstream_project_id,
 931            }),
 932            id: session_id,
 933            child_session_ids: HashSet::default(),
 934            parent_id: None,
 935            capabilities: Capabilities::default(),
 936            ignore_breakpoints,
 937            variables: Default::default(),
 938            stack_frames: Default::default(),
 939            thread_states: ThreadStates::default(),
 940            output_token: OutputToken(0),
 941            output: circular_buffer::CircularBuffer::boxed(),
 942            requests: HashMap::default(),
 943            modules: Vec::default(),
 944            loaded_sources: Vec::default(),
 945            threads: IndexMap::default(),
 946            _background_tasks: Vec::default(),
 947            locations: Default::default(),
 948            is_session_terminated: false,
 949        }
 950    }
 951
 952    pub fn session_id(&self) -> SessionId {
 953        self.id
 954    }
 955
 956    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 957        self.child_session_ids.clone()
 958    }
 959
 960    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 961        self.child_session_ids.insert(session_id);
 962    }
 963
 964    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 965        self.child_session_ids.remove(&session_id);
 966    }
 967
 968    pub fn parent_id(&self) -> Option<SessionId> {
 969        self.parent_id
 970    }
 971
 972    pub fn capabilities(&self) -> &Capabilities {
 973        &self.capabilities
 974    }
 975
 976    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 977        if let Mode::Local(local_mode) = &self.mode {
 978            Some(local_mode.config.clone())
 979        } else {
 980            None
 981        }
 982    }
 983
 984    pub fn is_terminated(&self) -> bool {
 985        self.is_session_terminated
 986    }
 987
 988    pub fn is_local(&self) -> bool {
 989        matches!(self.mode, Mode::Local(_))
 990    }
 991
 992    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 993        match &mut self.mode {
 994            Mode::Local(local_mode) => Some(local_mode),
 995            Mode::Remote(_) => None,
 996        }
 997    }
 998
 999    pub fn as_local(&self) -> Option<&LocalMode> {
1000        match &self.mode {
1001            Mode::Local(local_mode) => Some(local_mode),
1002            Mode::Remote(_) => None,
1003        }
1004    }
1005
1006    pub(super) fn initialize_sequence(
1007        &mut self,
1008        initialize_rx: oneshot::Receiver<()>,
1009        cx: &mut Context<Self>,
1010    ) -> Task<Result<()>> {
1011        match &self.mode {
1012            Mode::Local(local_mode) => {
1013                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1014            }
1015            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1016        }
1017    }
1018
1019    pub fn output(
1020        &self,
1021        since: OutputToken,
1022    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1023        if self.output_token.0 == 0 {
1024            return (self.output.range(0..0), OutputToken(0));
1025        };
1026
1027        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1028
1029        let clamped_events_since = events_since.clamp(0, self.output.len());
1030        (
1031            self.output
1032                .range(self.output.len() - clamped_events_since..),
1033            self.output_token,
1034        )
1035    }
1036
1037    pub fn respond_to_client(
1038        &self,
1039        request_seq: u64,
1040        success: bool,
1041        command: String,
1042        body: Option<serde_json::Value>,
1043        cx: &mut Context<Self>,
1044    ) -> Task<Result<()>> {
1045        let Some(local_session) = self.as_local().cloned() else {
1046            unreachable!("Cannot respond to remote client");
1047        };
1048
1049        cx.background_spawn(async move {
1050            local_session
1051                .client
1052                .send_message(Message::Response(Response {
1053                    body,
1054                    success,
1055                    command,
1056                    seq: request_seq + 1,
1057                    request_seq,
1058                    message: None,
1059                }))
1060                .await
1061        })
1062    }
1063
1064    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1065        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1066            self.thread_states.stop_all_threads();
1067
1068            self.invalidate_command_type::<StackTraceCommand>();
1069        }
1070
1071        // Event if we stopped all threads we still need to insert the thread_id
1072        // to our own data
1073        if let Some(thread_id) = event.thread_id {
1074            self.thread_states.stop_thread(ThreadId(thread_id));
1075
1076            self.invalidate_state(
1077                &StackTraceCommand {
1078                    thread_id,
1079                    start_frame: None,
1080                    levels: None,
1081                }
1082                .into(),
1083            );
1084        }
1085
1086        self.invalidate_generic();
1087        self.threads.clear();
1088        self.variables.clear();
1089        cx.emit(SessionEvent::Stopped(
1090            event
1091                .thread_id
1092                .map(Into::into)
1093                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1094        ));
1095        cx.notify();
1096    }
1097
1098    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1099        match *event {
1100            Events::Initialized(_) => {
1101                debug_assert!(
1102                    false,
1103                    "Initialized event should have been handled in LocalMode"
1104                );
1105            }
1106            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1107            Events::Continued(event) => {
1108                if event.all_threads_continued.unwrap_or_default() {
1109                    self.thread_states.continue_all_threads();
1110                } else {
1111                    self.thread_states
1112                        .continue_thread(ThreadId(event.thread_id));
1113                }
1114                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1115                self.invalidate_generic();
1116            }
1117            Events::Exited(_event) => {
1118                self.clear_active_debug_line(cx);
1119            }
1120            Events::Terminated(_) => {
1121                self.is_session_terminated = true;
1122                self.clear_active_debug_line(cx);
1123            }
1124            Events::Thread(event) => {
1125                let thread_id = ThreadId(event.thread_id);
1126
1127                match event.reason {
1128                    dap::ThreadEventReason::Started => {
1129                        self.thread_states.continue_thread(thread_id);
1130                    }
1131                    dap::ThreadEventReason::Exited => {
1132                        self.thread_states.exit_thread(thread_id);
1133                    }
1134                    reason => {
1135                        log::error!("Unhandled thread event reason {:?}", reason);
1136                    }
1137                }
1138                self.invalidate_state(&ThreadsCommand.into());
1139                cx.notify();
1140            }
1141            Events::Output(event) => {
1142                if event
1143                    .category
1144                    .as_ref()
1145                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1146                {
1147                    return;
1148                }
1149
1150                self.output.push_back(event);
1151                self.output_token.0 += 1;
1152                cx.notify();
1153            }
1154            Events::Breakpoint(_) => {}
1155            Events::Module(event) => {
1156                match event.reason {
1157                    dap::ModuleEventReason::New => {
1158                        self.modules.push(event.module);
1159                    }
1160                    dap::ModuleEventReason::Changed => {
1161                        if let Some(module) = self
1162                            .modules
1163                            .iter_mut()
1164                            .find(|other| event.module.id == other.id)
1165                        {
1166                            *module = event.module;
1167                        }
1168                    }
1169                    dap::ModuleEventReason::Removed => {
1170                        self.modules.retain(|other| event.module.id != other.id);
1171                    }
1172                }
1173
1174                // todo(debugger): We should only send the invalidate command to downstream clients.
1175                // self.invalidate_state(&ModulesCommand.into());
1176            }
1177            Events::LoadedSource(_) => {
1178                self.invalidate_state(&LoadedSourcesCommand.into());
1179            }
1180            Events::Capabilities(event) => {
1181                self.capabilities = self.capabilities.merge(event.capabilities);
1182                cx.notify();
1183            }
1184            Events::Memory(_) => {}
1185            Events::Process(_) => {}
1186            Events::ProgressEnd(_) => {}
1187            Events::ProgressStart(_) => {}
1188            Events::ProgressUpdate(_) => {}
1189            Events::Invalidated(_) => {}
1190            Events::Other(_) => {}
1191        }
1192    }
1193
1194    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1195    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1196        &mut self,
1197        request: T,
1198        process_result: impl FnOnce(
1199            &mut Self,
1200            Result<T::Response>,
1201            &mut Context<Self>,
1202        ) -> Option<T::Response>
1203        + 'static,
1204        cx: &mut Context<Self>,
1205    ) {
1206        const {
1207            assert!(
1208                T::CACHEABLE,
1209                "Only requests marked as cacheable should invoke `fetch`"
1210            );
1211        }
1212
1213        if !self.thread_states.any_stopped_thread()
1214            && request.type_id() != TypeId::of::<ThreadsCommand>()
1215            || self.is_session_terminated
1216        {
1217            return;
1218        }
1219
1220        let request_map = self
1221            .requests
1222            .entry(std::any::TypeId::of::<T>())
1223            .or_default();
1224
1225        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1226            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1227
1228            let task = Self::request_inner::<Arc<T>>(
1229                &self.capabilities,
1230                self.id,
1231                &self.mode,
1232                command,
1233                process_result,
1234                cx,
1235            );
1236            let task = cx
1237                .background_executor()
1238                .spawn(async move {
1239                    let _ = task.await?;
1240                    Some(())
1241                })
1242                .shared();
1243
1244            vacant.insert(task);
1245            cx.notify();
1246        }
1247    }
1248
1249    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1250        capabilities: &Capabilities,
1251        session_id: SessionId,
1252        mode: &Mode,
1253        request: T,
1254        process_result: impl FnOnce(
1255            &mut Self,
1256            Result<T::Response>,
1257            &mut Context<Self>,
1258        ) -> Option<T::Response>
1259        + 'static,
1260        cx: &mut Context<Self>,
1261    ) -> Task<Option<T::Response>> {
1262        if !T::is_supported(&capabilities) {
1263            log::warn!(
1264                "Attempted to send a DAP request that isn't supported: {:?}",
1265                request
1266            );
1267            let error = Err(anyhow::Error::msg(
1268                "Couldn't complete request because it's not supported",
1269            ));
1270            return cx.spawn(async move |this, cx| {
1271                this.update(cx, |this, cx| process_result(this, error, cx))
1272                    .log_err()
1273                    .flatten()
1274            });
1275        }
1276
1277        let request = mode.request_dap(session_id, request, cx);
1278        cx.spawn(async move |this, cx| {
1279            let result = request.await;
1280            this.update(cx, |this, cx| process_result(this, result, cx))
1281                .log_err()
1282                .flatten()
1283        })
1284    }
1285
1286    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1287        &self,
1288        request: T,
1289        process_result: impl FnOnce(
1290            &mut Self,
1291            Result<T::Response>,
1292            &mut Context<Self>,
1293        ) -> Option<T::Response>
1294        + 'static,
1295        cx: &mut Context<Self>,
1296    ) -> Task<Option<T::Response>> {
1297        Self::request_inner(
1298            &self.capabilities,
1299            self.id,
1300            &self.mode,
1301            request,
1302            process_result,
1303            cx,
1304        )
1305    }
1306
1307    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1308        self.requests.remove(&std::any::TypeId::of::<Command>());
1309    }
1310
1311    fn invalidate_generic(&mut self) {
1312        self.invalidate_command_type::<ModulesCommand>();
1313        self.invalidate_command_type::<LoadedSourcesCommand>();
1314        self.invalidate_command_type::<ThreadsCommand>();
1315    }
1316
1317    fn invalidate_state(&mut self, key: &RequestSlot) {
1318        self.requests
1319            .entry(key.0.as_any().type_id())
1320            .and_modify(|request_map| {
1321                request_map.remove(&key);
1322            });
1323    }
1324
1325    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1326        self.thread_states.thread_status(thread_id)
1327    }
1328
1329    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1330        self.fetch(
1331            dap_command::ThreadsCommand,
1332            |this, result, cx| {
1333                let result = result.log_err()?;
1334
1335                this.threads = result
1336                    .iter()
1337                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1338                    .collect();
1339
1340                this.invalidate_command_type::<StackTraceCommand>();
1341                cx.emit(SessionEvent::Threads);
1342                cx.notify();
1343
1344                Some(result)
1345            },
1346            cx,
1347        );
1348
1349        self.threads
1350            .values()
1351            .map(|thread| {
1352                (
1353                    thread.dap.clone(),
1354                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1355                )
1356            })
1357            .collect()
1358    }
1359
1360    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1361        self.fetch(
1362            dap_command::ModulesCommand,
1363            |this, result, cx| {
1364                let result = result.log_err()?;
1365
1366                this.modules = result.iter().cloned().collect();
1367                cx.emit(SessionEvent::Modules);
1368                cx.notify();
1369
1370                Some(result)
1371            },
1372            cx,
1373        );
1374
1375        &self.modules
1376    }
1377
1378    pub fn ignore_breakpoints(&self) -> bool {
1379        self.ignore_breakpoints
1380    }
1381
1382    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1383        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1384    }
1385
1386    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1387        if self.ignore_breakpoints == ignore {
1388            return Task::ready(());
1389        }
1390
1391        self.ignore_breakpoints = ignore;
1392
1393        if let Some(local) = self.as_local() {
1394            local.send_all_breakpoints(ignore, cx)
1395        } else {
1396            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1397            unimplemented!()
1398        }
1399    }
1400
1401    pub fn breakpoints_enabled(&self) -> bool {
1402        self.ignore_breakpoints
1403    }
1404
1405    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1406        self.fetch(
1407            dap_command::LoadedSourcesCommand,
1408            |this, result, cx| {
1409                let result = result.log_err()?;
1410                this.loaded_sources = result.iter().cloned().collect();
1411                cx.emit(SessionEvent::LoadedSources);
1412                cx.notify();
1413                Some(result)
1414            },
1415            cx,
1416        );
1417
1418        &self.loaded_sources
1419    }
1420
1421    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1422        res.log_err()?;
1423        Some(())
1424    }
1425
1426    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1427        thread_id: ThreadId,
1428    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1429    {
1430        move |this, response, cx| match response.log_err() {
1431            Some(response) => Some(response),
1432            None => {
1433                this.thread_states.stop_thread(thread_id);
1434                cx.notify();
1435                None
1436            }
1437        }
1438    }
1439
1440    fn clear_active_debug_line_response(
1441        &mut self,
1442        response: Result<()>,
1443        cx: &mut Context<Session>,
1444    ) -> Option<()> {
1445        response.log_err()?;
1446        self.clear_active_debug_line(cx);
1447        Some(())
1448    }
1449
1450    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1451        self.as_local()
1452            .expect("Message handler will only run in local mode")
1453            .breakpoint_store
1454            .update(cx, |store, cx| {
1455                store.remove_active_position(Some(self.id), cx)
1456            });
1457    }
1458
1459    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1460        self.request(
1461            PauseCommand {
1462                thread_id: thread_id.0,
1463            },
1464            Self::empty_response,
1465            cx,
1466        )
1467        .detach();
1468    }
1469
1470    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1471        self.request(
1472            RestartStackFrameCommand { stack_frame_id },
1473            Self::empty_response,
1474            cx,
1475        )
1476        .detach();
1477    }
1478
1479    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1480        if self.capabilities.supports_restart_request.unwrap_or(false) {
1481            self.request(
1482                RestartCommand {
1483                    raw: args.unwrap_or(Value::Null),
1484                },
1485                Self::empty_response,
1486                cx,
1487            )
1488            .detach();
1489        } else {
1490            self.request(
1491                DisconnectCommand {
1492                    restart: Some(false),
1493                    terminate_debuggee: Some(true),
1494                    suspend_debuggee: Some(false),
1495                },
1496                Self::empty_response,
1497                cx,
1498            )
1499            .detach();
1500        }
1501    }
1502
1503    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1504        self.is_session_terminated = true;
1505        self.thread_states.exit_all_threads();
1506        cx.notify();
1507
1508        let task = if self
1509            .capabilities
1510            .supports_terminate_request
1511            .unwrap_or_default()
1512        {
1513            self.request(
1514                TerminateCommand {
1515                    restart: Some(false),
1516                },
1517                Self::clear_active_debug_line_response,
1518                cx,
1519            )
1520        } else {
1521            self.request(
1522                DisconnectCommand {
1523                    restart: Some(false),
1524                    terminate_debuggee: Some(true),
1525                    suspend_debuggee: Some(false),
1526                },
1527                Self::clear_active_debug_line_response,
1528                cx,
1529            )
1530        };
1531
1532        cx.background_spawn(async move {
1533            let _ = task.await;
1534        })
1535    }
1536
1537    pub fn completions(
1538        &mut self,
1539        query: CompletionsQuery,
1540        cx: &mut Context<Self>,
1541    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1542        let task = self.request(query, |_, result, _| result.log_err(), cx);
1543
1544        cx.background_executor().spawn(async move {
1545            anyhow::Ok(
1546                task.await
1547                    .map(|response| response.targets)
1548                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1549            )
1550        })
1551    }
1552
1553    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1554        self.thread_states.continue_thread(thread_id);
1555        self.request(
1556            ContinueCommand {
1557                args: ContinueArguments {
1558                    thread_id: thread_id.0,
1559                    single_thread: Some(true),
1560                },
1561            },
1562            Self::on_step_response::<ContinueCommand>(thread_id),
1563            cx,
1564        )
1565        .detach();
1566    }
1567
1568    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1569        match self.mode {
1570            Mode::Local(ref local) => Some(local.client.clone()),
1571            Mode::Remote(_) => None,
1572        }
1573    }
1574
1575    pub fn step_over(
1576        &mut self,
1577        thread_id: ThreadId,
1578        granularity: SteppingGranularity,
1579        cx: &mut Context<Self>,
1580    ) {
1581        let supports_single_thread_execution_requests =
1582            self.capabilities.supports_single_thread_execution_requests;
1583        let supports_stepping_granularity = self
1584            .capabilities
1585            .supports_stepping_granularity
1586            .unwrap_or_default();
1587
1588        let command = NextCommand {
1589            inner: StepCommand {
1590                thread_id: thread_id.0,
1591                granularity: supports_stepping_granularity.then(|| granularity),
1592                single_thread: supports_single_thread_execution_requests,
1593            },
1594        };
1595
1596        self.thread_states.process_step(thread_id);
1597        self.request(
1598            command,
1599            Self::on_step_response::<NextCommand>(thread_id),
1600            cx,
1601        )
1602        .detach();
1603    }
1604
1605    pub fn step_in(
1606        &mut self,
1607        thread_id: ThreadId,
1608        granularity: SteppingGranularity,
1609        cx: &mut Context<Self>,
1610    ) {
1611        let supports_single_thread_execution_requests =
1612            self.capabilities.supports_single_thread_execution_requests;
1613        let supports_stepping_granularity = self
1614            .capabilities
1615            .supports_stepping_granularity
1616            .unwrap_or_default();
1617
1618        let command = StepInCommand {
1619            inner: StepCommand {
1620                thread_id: thread_id.0,
1621                granularity: supports_stepping_granularity.then(|| granularity),
1622                single_thread: supports_single_thread_execution_requests,
1623            },
1624        };
1625
1626        self.thread_states.process_step(thread_id);
1627        self.request(
1628            command,
1629            Self::on_step_response::<StepInCommand>(thread_id),
1630            cx,
1631        )
1632        .detach();
1633    }
1634
1635    pub fn step_out(
1636        &mut self,
1637        thread_id: ThreadId,
1638        granularity: SteppingGranularity,
1639        cx: &mut Context<Self>,
1640    ) {
1641        let supports_single_thread_execution_requests =
1642            self.capabilities.supports_single_thread_execution_requests;
1643        let supports_stepping_granularity = self
1644            .capabilities
1645            .supports_stepping_granularity
1646            .unwrap_or_default();
1647
1648        let command = StepOutCommand {
1649            inner: StepCommand {
1650                thread_id: thread_id.0,
1651                granularity: supports_stepping_granularity.then(|| granularity),
1652                single_thread: supports_single_thread_execution_requests,
1653            },
1654        };
1655
1656        self.thread_states.process_step(thread_id);
1657        self.request(
1658            command,
1659            Self::on_step_response::<StepOutCommand>(thread_id),
1660            cx,
1661        )
1662        .detach();
1663    }
1664
1665    pub fn step_back(
1666        &mut self,
1667        thread_id: ThreadId,
1668        granularity: SteppingGranularity,
1669        cx: &mut Context<Self>,
1670    ) {
1671        let supports_single_thread_execution_requests =
1672            self.capabilities.supports_single_thread_execution_requests;
1673        let supports_stepping_granularity = self
1674            .capabilities
1675            .supports_stepping_granularity
1676            .unwrap_or_default();
1677
1678        let command = StepBackCommand {
1679            inner: StepCommand {
1680                thread_id: thread_id.0,
1681                granularity: supports_stepping_granularity.then(|| granularity),
1682                single_thread: supports_single_thread_execution_requests,
1683            },
1684        };
1685
1686        self.thread_states.process_step(thread_id);
1687
1688        self.request(
1689            command,
1690            Self::on_step_response::<StepBackCommand>(thread_id),
1691            cx,
1692        )
1693        .detach();
1694    }
1695
1696    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1697        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1698            && self.requests.contains_key(&ThreadsCommand.type_id())
1699            && self.threads.contains_key(&thread_id)
1700        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1701        // We could still be using an old thread id and have sent a new thread's request
1702        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1703        // But it very well could cause a minor bug in the future that is hard to track down
1704        {
1705            self.fetch(
1706                super::dap_command::StackTraceCommand {
1707                    thread_id: thread_id.0,
1708                    start_frame: None,
1709                    levels: None,
1710                },
1711                move |this, stack_frames, cx| {
1712                    let stack_frames = stack_frames.log_err()?;
1713
1714                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1715                        thread.stack_frame_ids =
1716                            stack_frames.iter().map(|frame| frame.id).collect();
1717                    });
1718                    debug_assert!(
1719                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1720                        "Sent request for thread_id that doesn't exist"
1721                    );
1722
1723                    this.stack_frames.extend(
1724                        stack_frames
1725                            .iter()
1726                            .cloned()
1727                            .map(|frame| (frame.id, StackFrame::from(frame))),
1728                    );
1729
1730                    this.invalidate_command_type::<ScopesCommand>();
1731                    this.invalidate_command_type::<VariablesCommand>();
1732
1733                    cx.emit(SessionEvent::StackTrace);
1734                    cx.notify();
1735                    Some(stack_frames)
1736                },
1737                cx,
1738            );
1739        }
1740
1741        self.threads
1742            .get(&thread_id)
1743            .map(|thread| {
1744                thread
1745                    .stack_frame_ids
1746                    .iter()
1747                    .filter_map(|id| self.stack_frames.get(id))
1748                    .cloned()
1749                    .collect()
1750            })
1751            .unwrap_or_default()
1752    }
1753
1754    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1755        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1756            && self
1757                .requests
1758                .contains_key(&TypeId::of::<StackTraceCommand>())
1759        {
1760            self.fetch(
1761                ScopesCommand { stack_frame_id },
1762                move |this, scopes, cx| {
1763                    let scopes = scopes.log_err()?;
1764
1765                    for scope in scopes .iter(){
1766                        this.variables(scope.variables_reference, cx);
1767                    }
1768
1769                    let entry = this
1770                        .stack_frames
1771                        .entry(stack_frame_id)
1772                        .and_modify(|stack_frame| {
1773                            stack_frame.scopes = scopes.clone();
1774                        });
1775
1776                    cx.emit(SessionEvent::Variables);
1777
1778                    debug_assert!(
1779                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1780                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1781                    );
1782
1783                    Some(scopes)
1784                },
1785                cx,
1786            );
1787        }
1788
1789        self.stack_frames
1790            .get(&stack_frame_id)
1791            .map(|frame| frame.scopes.as_slice())
1792            .unwrap_or_default()
1793    }
1794
1795    pub fn variables(
1796        &mut self,
1797        variables_reference: VariableReference,
1798        cx: &mut Context<Self>,
1799    ) -> Vec<dap::Variable> {
1800        let command = VariablesCommand {
1801            variables_reference,
1802            filter: None,
1803            start: None,
1804            count: None,
1805            format: None,
1806        };
1807
1808        self.fetch(
1809            command,
1810            move |this, variables, cx| {
1811                let variables = variables.log_err()?;
1812                this.variables
1813                    .insert(variables_reference, variables.clone());
1814
1815                cx.emit(SessionEvent::Variables);
1816                Some(variables)
1817            },
1818            cx,
1819        );
1820
1821        self.variables
1822            .get(&variables_reference)
1823            .cloned()
1824            .unwrap_or_default()
1825    }
1826
1827    pub fn set_variable_value(
1828        &mut self,
1829        variables_reference: u64,
1830        name: String,
1831        value: String,
1832        cx: &mut Context<Self>,
1833    ) {
1834        if self.capabilities.supports_set_variable.unwrap_or_default() {
1835            self.request(
1836                SetVariableValueCommand {
1837                    name,
1838                    value,
1839                    variables_reference,
1840                },
1841                move |this, response, cx| {
1842                    let response = response.log_err()?;
1843                    this.invalidate_command_type::<VariablesCommand>();
1844                    cx.notify();
1845                    Some(response)
1846                },
1847                cx,
1848            )
1849            .detach()
1850        }
1851    }
1852
1853    pub fn evaluate(
1854        &mut self,
1855        expression: String,
1856        context: Option<EvaluateArgumentsContext>,
1857        frame_id: Option<u64>,
1858        source: Option<Source>,
1859        cx: &mut Context<Self>,
1860    ) {
1861        self.request(
1862            EvaluateCommand {
1863                expression,
1864                context,
1865                frame_id,
1866                source,
1867            },
1868            |this, response, cx| {
1869                let response = response.log_err()?;
1870                this.output_token.0 += 1;
1871                this.output.push_back(dap::OutputEvent {
1872                    category: None,
1873                    output: response.result.clone(),
1874                    group: None,
1875                    variables_reference: Some(response.variables_reference),
1876                    source: None,
1877                    line: None,
1878                    column: None,
1879                    data: None,
1880                    location_reference: None,
1881                });
1882
1883                this.invalidate_command_type::<ScopesCommand>();
1884                cx.notify();
1885                Some(response)
1886            },
1887            cx,
1888        )
1889        .detach();
1890    }
1891
1892    pub fn location(
1893        &mut self,
1894        reference: u64,
1895        cx: &mut Context<Self>,
1896    ) -> Option<dap::LocationsResponse> {
1897        self.fetch(
1898            LocationsCommand { reference },
1899            move |this, response, _| {
1900                let response = response.log_err()?;
1901                this.locations.insert(reference, response.clone());
1902                Some(response)
1903            },
1904            cx,
1905        );
1906        self.locations.get(&reference).cloned()
1907    }
1908    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1909        let command = DisconnectCommand {
1910            restart: Some(false),
1911            terminate_debuggee: Some(true),
1912            suspend_debuggee: Some(false),
1913        };
1914
1915        self.request(command, Self::empty_response, cx).detach()
1916    }
1917
1918    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1919        if self
1920            .capabilities
1921            .supports_terminate_threads_request
1922            .unwrap_or_default()
1923        {
1924            self.request(
1925                TerminateThreadsCommand {
1926                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1927                },
1928                Self::clear_active_debug_line_response,
1929                cx,
1930            )
1931            .detach();
1932        } else {
1933            self.shutdown(cx).detach();
1934        }
1935    }
1936}
1937
1938fn create_local_session(
1939    breakpoint_store: Entity<BreakpointStore>,
1940    session_id: SessionId,
1941    parent_session: Option<Entity<Session>>,
1942    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1943    initialized_tx: oneshot::Sender<()>,
1944    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1945    mode: LocalMode,
1946    capabilities: Capabilities,
1947    cx: &mut Context<Session>,
1948) -> Session {
1949    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1950        let mut initialized_tx = Some(initialized_tx);
1951        while let Some(message) = message_rx.next().await {
1952            if let Message::Event(event) = message {
1953                if let Events::Initialized(_) = *event {
1954                    if let Some(tx) = initialized_tx.take() {
1955                        tx.send(()).ok();
1956                    }
1957                } else {
1958                    let Ok(_) = this.update(cx, |session, cx| {
1959                        session.handle_dap_event(event, cx);
1960                    }) else {
1961                        break;
1962                    };
1963                }
1964            } else {
1965                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1966                else {
1967                    break;
1968                };
1969            }
1970        }
1971    })];
1972
1973    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1974        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1975            if let Some(local) = (!this.ignore_breakpoints)
1976                .then(|| this.as_local_mut())
1977                .flatten()
1978            {
1979                local
1980                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1981                    .detach();
1982            };
1983        }
1984        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1985            if let Some(local) = (!this.ignore_breakpoints)
1986                .then(|| this.as_local_mut())
1987                .flatten()
1988            {
1989                local.unset_breakpoints_from_paths(paths, cx).detach();
1990            }
1991        }
1992        BreakpointStoreEvent::ActiveDebugLineChanged => {}
1993    })
1994    .detach();
1995
1996    Session {
1997        mode: Mode::Local(mode),
1998        id: session_id,
1999        child_session_ids: HashSet::default(),
2000        parent_id: parent_session.map(|session| session.read(cx).id),
2001        variables: Default::default(),
2002        capabilities,
2003        thread_states: ThreadStates::default(),
2004        output_token: OutputToken(0),
2005        ignore_breakpoints: false,
2006        output: circular_buffer::CircularBuffer::boxed(),
2007        requests: HashMap::default(),
2008        modules: Vec::default(),
2009        loaded_sources: Vec::default(),
2010        threads: IndexMap::default(),
2011        stack_frames: IndexMap::default(),
2012        locations: Default::default(),
2013        _background_tasks,
2014        is_session_terminated: false,
2015    }
2016}