session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
  11    StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
  12    VariablesCommand,
  13};
  14use super::dap_store::DapAdapterDelegate;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap, IndexSet};
  17use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  18use dap::messages::Response;
  19use dap::{
  20    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  21    SteppingGranularity, StoppedEvent, VariableReference,
  22    adapters::{DapDelegate, DapStatus},
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
  27use futures::channel::oneshot;
  28use futures::{FutureExt, future::Shared};
  29use gpui::{
  30    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  31};
  32use rpc::AnyProtoClient;
  33use serde_json::{Value, json};
  34use settings::Settings;
  35use smol::stream::StreamExt;
  36use std::any::TypeId;
  37use std::path::PathBuf;
  38use std::u64;
  39use std::{
  40    any::Any,
  41    collections::hash_map::Entry,
  42    hash::{Hash, Hasher},
  43    path::Path,
  44    sync::Arc,
  45};
  46use task::{DebugAdapterConfig, DebugTaskDefinition};
  47use text::{PointUtf16, ToPointUtf16};
  48use util::{ResultExt, merge_json_value_into};
  49
  50#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  51#[repr(transparent)]
  52pub struct ThreadId(pub u64);
  53
  54impl ThreadId {
  55    pub const MIN: ThreadId = ThreadId(u64::MIN);
  56    pub const MAX: ThreadId = ThreadId(u64::MAX);
  57}
  58
  59impl From<u64> for ThreadId {
  60    fn from(id: u64) -> Self {
  61        Self(id)
  62    }
  63}
  64
  65#[derive(Clone, Debug)]
  66pub struct StackFrame {
  67    pub dap: dap::StackFrame,
  68    pub scopes: Vec<dap::Scope>,
  69}
  70
  71impl From<dap::StackFrame> for StackFrame {
  72    fn from(stack_frame: dap::StackFrame) -> Self {
  73        Self {
  74            scopes: vec![],
  75            dap: stack_frame,
  76        }
  77    }
  78}
  79
  80#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  81pub enum ThreadStatus {
  82    #[default]
  83    Running,
  84    Stopped,
  85    Stepping,
  86    Exited,
  87    Ended,
  88}
  89
  90impl ThreadStatus {
  91    pub fn label(&self) -> &'static str {
  92        match self {
  93            ThreadStatus::Running => "Running",
  94            ThreadStatus::Stopped => "Stopped",
  95            ThreadStatus::Stepping => "Stepping",
  96            ThreadStatus::Exited => "Exited",
  97            ThreadStatus::Ended => "Ended",
  98        }
  99    }
 100}
 101
 102#[derive(Debug)]
 103pub struct Thread {
 104    dap: dap::Thread,
 105    stack_frame_ids: IndexSet<StackFrameId>,
 106    _has_stopped: bool,
 107}
 108
 109impl From<dap::Thread> for Thread {
 110    fn from(dap: dap::Thread) -> Self {
 111        Self {
 112            dap,
 113            stack_frame_ids: Default::default(),
 114            _has_stopped: false,
 115        }
 116    }
 117}
 118
 119type UpstreamProjectId = u64;
 120
 121struct RemoteConnection {
 122    _client: AnyProtoClient,
 123    _upstream_project_id: UpstreamProjectId,
 124}
 125
 126impl RemoteConnection {
 127    fn send_proto_client_request<R: DapCommand>(
 128        &self,
 129        _request: R,
 130        _session_id: SessionId,
 131        cx: &mut App,
 132    ) -> Task<Result<R::Response>> {
 133        // let message = request.to_proto(session_id, self.upstream_project_id);
 134        // let upstream_client = self.client.clone();
 135        cx.background_executor().spawn(async move {
 136            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 137            // let response = upstream_client.request(message).await?;
 138            // request.response_from_proto(response)
 139            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 140        })
 141    }
 142
 143    fn request<R: DapCommand>(
 144        &self,
 145        request: R,
 146        session_id: SessionId,
 147        cx: &mut App,
 148    ) -> Task<Result<R::Response>>
 149    where
 150        <R::DapRequest as dap::requests::Request>::Response: 'static,
 151        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 152    {
 153        return self.send_proto_client_request::<R>(request, session_id, cx);
 154    }
 155}
 156
 157enum Mode {
 158    Local(LocalMode),
 159    Remote(RemoteConnection),
 160}
 161
 162#[derive(Clone)]
 163pub struct LocalMode {
 164    client: Arc<DebugAdapterClient>,
 165    config: DebugAdapterConfig,
 166    adapter: Arc<dyn DebugAdapter>,
 167    breakpoint_store: Entity<BreakpointStore>,
 168    tmp_breakpoint: Option<SourceBreakpoint>,
 169}
 170
 171fn client_source(abs_path: &Path) -> dap::Source {
 172    dap::Source {
 173        name: abs_path
 174            .file_name()
 175            .map(|filename| filename.to_string_lossy().to_string()),
 176        path: Some(abs_path.to_string_lossy().to_string()),
 177        source_reference: None,
 178        presentation_hint: None,
 179        origin: None,
 180        sources: None,
 181        adapter_data: None,
 182        checksums: None,
 183    }
 184}
 185
 186impl LocalMode {
 187    fn new(
 188        debug_adapters: Arc<DapRegistry>,
 189        session_id: SessionId,
 190        parent_session: Option<Entity<Session>>,
 191        breakpoint_store: Entity<BreakpointStore>,
 192        config: DebugAdapterConfig,
 193        delegate: DapAdapterDelegate,
 194        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 195        cx: AsyncApp,
 196    ) -> Task<Result<Self>> {
 197        Self::new_inner(
 198            debug_adapters,
 199            session_id,
 200            parent_session,
 201            breakpoint_store,
 202            config,
 203            delegate,
 204            messages_tx,
 205            async |_, _| {},
 206            cx,
 207        )
 208    }
 209    #[cfg(any(test, feature = "test-support"))]
 210    fn new_fake(
 211        session_id: SessionId,
 212        parent_session: Option<Entity<Session>>,
 213        breakpoint_store: Entity<BreakpointStore>,
 214        config: DebugAdapterConfig,
 215        delegate: DapAdapterDelegate,
 216        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 217        caps: Capabilities,
 218        fail: bool,
 219        cx: AsyncApp,
 220    ) -> Task<Result<Self>> {
 221        use task::DebugRequestDisposition;
 222
 223        let request = match config.request.clone() {
 224            DebugRequestDisposition::UserConfigured(request) => request,
 225            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 226                match reverse_request_args.request {
 227                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 228                        DebugRequestType::Launch(task::LaunchConfig {
 229                            program: "".to_owned(),
 230                            cwd: None,
 231                            args: Default::default(),
 232                        })
 233                    }
 234                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 235                        DebugRequestType::Attach(task::AttachConfig {
 236                            process_id: Some(0),
 237                        })
 238                    }
 239                }
 240            }
 241        };
 242
 243        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 244            session
 245                .client
 246                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 247                .await;
 248
 249            let paths = cx
 250                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 251                .expect("Breakpoint store should exist in all tests that start debuggers");
 252
 253            session
 254                .client
 255                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 256                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 257                    if !paths.contains(&p) {
 258                        panic!("Sent breakpoints for path without any")
 259                    }
 260
 261                    Ok(dap::SetBreakpointsResponse {
 262                        breakpoints: Vec::default(),
 263                    })
 264                })
 265                .await;
 266
 267            match request {
 268                dap::DebugRequestType::Launch(_) => {
 269                    if fail {
 270                        session
 271                            .client
 272                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 273                                Err(dap::ErrorResponse {
 274                                    error: Some(dap::Message {
 275                                        id: 1,
 276                                        format: "error".into(),
 277                                        variables: None,
 278                                        send_telemetry: None,
 279                                        show_user: None,
 280                                        url: None,
 281                                        url_label: None,
 282                                    }),
 283                                })
 284                            })
 285                            .await;
 286                    } else {
 287                        session
 288                            .client
 289                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 290                            .await;
 291                    }
 292                }
 293                dap::DebugRequestType::Attach(attach_config) => {
 294                    if fail {
 295                        session
 296                            .client
 297                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 298                                Err(dap::ErrorResponse {
 299                                    error: Some(dap::Message {
 300                                        id: 1,
 301                                        format: "error".into(),
 302                                        variables: None,
 303                                        send_telemetry: None,
 304                                        show_user: None,
 305                                        url: None,
 306                                        url_label: None,
 307                                    }),
 308                                })
 309                            })
 310                            .await;
 311                    } else {
 312                        session
 313                            .client
 314                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 315                                assert_eq!(
 316                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 317                                    args.raw
 318                                );
 319
 320                                Ok(())
 321                            })
 322                            .await;
 323                    }
 324                }
 325            }
 326
 327            session
 328                .client
 329                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 330                .await;
 331            session.client.fake_event(Events::Initialized(None)).await;
 332        };
 333        Self::new_inner(
 334            DapRegistry::fake().into(),
 335            session_id,
 336            parent_session,
 337            breakpoint_store,
 338            config,
 339            delegate,
 340            messages_tx,
 341            callback,
 342            cx,
 343        )
 344    }
 345    fn new_inner(
 346        registry: Arc<DapRegistry>,
 347        session_id: SessionId,
 348        parent_session: Option<Entity<Session>>,
 349        breakpoint_store: Entity<BreakpointStore>,
 350        config: DebugAdapterConfig,
 351        delegate: DapAdapterDelegate,
 352        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 353        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 354        cx: AsyncApp,
 355    ) -> Task<Result<Self>> {
 356        cx.spawn(async move |cx| {
 357            let (adapter, binary) =
 358                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 359
 360            let message_handler = Box::new(move |message| {
 361                messages_tx.unbounded_send(message).ok();
 362            });
 363
 364            let client = Arc::new(
 365                if let Some(client) = parent_session
 366                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 367                    .flatten()
 368                {
 369                    client
 370                        .reconnect(session_id, binary, message_handler, cx.clone())
 371                        .await?
 372                } else {
 373                    DebugAdapterClient::start(
 374                        session_id,
 375                        adapter.name(),
 376                        binary,
 377                        message_handler,
 378                        cx.clone(),
 379                    )
 380                    .await
 381                    .with_context(|| "Failed to start communication with debug adapter")?
 382                },
 383            );
 384
 385            let mut session = Self {
 386                client,
 387                adapter,
 388                breakpoint_store,
 389                tmp_breakpoint: None,
 390                config: config.clone(),
 391            };
 392
 393            on_initialized(&mut session, cx.clone()).await;
 394
 395            Ok(session)
 396        })
 397    }
 398
 399    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 400        let tasks: Vec<_> = paths
 401            .into_iter()
 402            .map(|path| {
 403                self.request(
 404                    dap_command::SetBreakpoints {
 405                        source: client_source(path),
 406                        source_modified: None,
 407                        breakpoints: vec![],
 408                    },
 409                    cx.background_executor().clone(),
 410                )
 411            })
 412            .collect();
 413
 414        cx.background_spawn(async move {
 415            futures::future::join_all(tasks)
 416                .await
 417                .iter()
 418                .for_each(|res| match res {
 419                    Ok(_) => {}
 420                    Err(err) => {
 421                        log::warn!("Set breakpoints request failed: {}", err);
 422                    }
 423                });
 424        })
 425    }
 426
 427    fn send_breakpoints_from_path(
 428        &self,
 429        abs_path: Arc<Path>,
 430        reason: BreakpointUpdatedReason,
 431        cx: &mut App,
 432    ) -> Task<()> {
 433        let breakpoints = self
 434            .breakpoint_store
 435            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 436            .into_iter()
 437            .filter(|bp| bp.state.is_enabled())
 438            .chain(self.tmp_breakpoint.clone())
 439            .map(Into::into)
 440            .collect();
 441
 442        let task = self.request(
 443            dap_command::SetBreakpoints {
 444                source: client_source(&abs_path),
 445                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 446                breakpoints,
 447            },
 448            cx.background_executor().clone(),
 449        );
 450
 451        cx.background_spawn(async move {
 452            match task.await {
 453                Ok(_) => {}
 454                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 455            }
 456        })
 457    }
 458
 459    fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 460        let mut breakpoint_tasks = Vec::new();
 461        let breakpoints = self
 462            .breakpoint_store
 463            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 464
 465        for (path, breakpoints) in breakpoints {
 466            let breakpoints = if ignore_breakpoints {
 467                vec![]
 468            } else {
 469                breakpoints
 470                    .into_iter()
 471                    .filter(|bp| bp.state.is_enabled())
 472                    .map(Into::into)
 473                    .collect()
 474            };
 475
 476            breakpoint_tasks.push(self.request(
 477                dap_command::SetBreakpoints {
 478                    source: client_source(&path),
 479                    source_modified: Some(false),
 480                    breakpoints,
 481                },
 482                cx.background_executor().clone(),
 483            ));
 484        }
 485
 486        cx.background_spawn(async move {
 487            futures::future::join_all(breakpoint_tasks)
 488                .await
 489                .iter()
 490                .for_each(|res| match res {
 491                    Ok(_) => {}
 492                    Err(err) => {
 493                        log::warn!("Set breakpoints request failed: {}", err);
 494                    }
 495                });
 496        })
 497    }
 498
 499    async fn get_adapter_binary(
 500        registry: &Arc<DapRegistry>,
 501        config: &DebugAdapterConfig,
 502        delegate: &DapAdapterDelegate,
 503        cx: &mut AsyncApp,
 504    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 505        let adapter = registry
 506            .adapter(&config.adapter)
 507            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 508
 509        let binary = cx.update(|cx| {
 510            ProjectSettings::get_global(cx)
 511                .dap
 512                .get(&adapter.name())
 513                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 514        })?;
 515
 516        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 517            Err(error) => {
 518                delegate.update_status(
 519                    adapter.name(),
 520                    DapStatus::Failed {
 521                        error: error.to_string(),
 522                    },
 523                );
 524
 525                return Err(error);
 526            }
 527            Ok(mut binary) => {
 528                delegate.update_status(adapter.name(), DapStatus::None);
 529
 530                let shell_env = delegate.shell_env().await;
 531                let mut envs = binary.envs.unwrap_or_default();
 532                envs.extend(shell_env);
 533                binary.envs = Some(envs);
 534
 535                binary
 536            }
 537        };
 538
 539        Ok((adapter, binary))
 540    }
 541
 542    pub fn label(&self) -> String {
 543        self.config.label.clone()
 544    }
 545
 546    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 547        let adapter_id = self.adapter.name().to_string();
 548
 549        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 550    }
 551
 552    fn initialize_sequence(
 553        &self,
 554        capabilities: &Capabilities,
 555        initialized_rx: oneshot::Receiver<()>,
 556        cx: &App,
 557    ) -> Task<Result<()>> {
 558        let (mut raw, is_launch) = match &self.config.request {
 559            task::DebugRequestDisposition::UserConfigured(_) => {
 560                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 561                    debug_assert!(false, "This part of code should be unreachable in practice");
 562                    return Task::ready(Err(anyhow!(
 563                        "Expected debug config conversion to succeed"
 564                    )));
 565                };
 566                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 567                let raw = self.adapter.request_args(&raw);
 568                (raw, is_launch)
 569            }
 570            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 571                start_debugging_request_arguments.configuration.clone(),
 572                matches!(
 573                    start_debugging_request_arguments.request,
 574                    dap::StartDebuggingRequestArgumentsRequest::Launch
 575                ),
 576            ),
 577        };
 578
 579        merge_json_value_into(
 580            self.config.initialize_args.clone().unwrap_or(json!({})),
 581            &mut raw,
 582        );
 583        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 584        let launch = if is_launch {
 585            self.request(Launch { raw }, cx.background_executor().clone())
 586        } else {
 587            self.request(Attach { raw }, cx.background_executor().clone())
 588        };
 589
 590        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 591
 592        let configuration_sequence = cx.spawn({
 593            let this = self.clone();
 594            async move |cx| {
 595                initialized_rx.await?;
 596                // todo(debugger) figure out if we want to handle a breakpoint response error
 597                // This will probably consist of letting a user know that breakpoints failed to be set
 598                cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
 599
 600                if configuration_done_supported {
 601                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 602                } else {
 603                    Task::ready(Ok(()))
 604                }
 605                .await
 606            }
 607        });
 608
 609        cx.background_spawn(async move {
 610            futures::future::try_join(launch, configuration_sequence).await?;
 611            Ok(())
 612        })
 613    }
 614
 615    fn request<R: LocalDapCommand>(
 616        &self,
 617        request: R,
 618        executor: BackgroundExecutor,
 619    ) -> Task<Result<R::Response>>
 620    where
 621        <R::DapRequest as dap::requests::Request>::Response: 'static,
 622        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 623    {
 624        let request = Arc::new(request);
 625
 626        let request_clone = request.clone();
 627        let connection = self.client.clone();
 628        let request_task = executor.spawn(async move {
 629            let args = request_clone.to_dap();
 630            connection.request::<R::DapRequest>(args).await
 631        });
 632
 633        executor.spawn(async move {
 634            let response = request.response_from_dap(request_task.await?);
 635            response
 636        })
 637    }
 638}
 639impl From<RemoteConnection> for Mode {
 640    fn from(value: RemoteConnection) -> Self {
 641        Self::Remote(value)
 642    }
 643}
 644
 645impl Mode {
 646    fn request_dap<R: DapCommand>(
 647        &self,
 648        session_id: SessionId,
 649        request: R,
 650        cx: &mut Context<Session>,
 651    ) -> Task<Result<R::Response>>
 652    where
 653        <R::DapRequest as dap::requests::Request>::Response: 'static,
 654        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 655    {
 656        match self {
 657            Mode::Local(debug_adapter_client) => {
 658                debug_adapter_client.request(request, cx.background_executor().clone())
 659            }
 660            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 661        }
 662    }
 663}
 664
 665#[derive(Default)]
 666struct ThreadStates {
 667    global_state: Option<ThreadStatus>,
 668    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 669}
 670
 671impl ThreadStates {
 672    fn stop_all_threads(&mut self) {
 673        self.global_state = Some(ThreadStatus::Stopped);
 674        self.known_thread_states.clear();
 675    }
 676
 677    fn exit_all_threads(&mut self) {
 678        self.global_state = Some(ThreadStatus::Exited);
 679        self.known_thread_states.clear();
 680    }
 681
 682    fn continue_all_threads(&mut self) {
 683        self.global_state = Some(ThreadStatus::Running);
 684        self.known_thread_states.clear();
 685    }
 686
 687    fn stop_thread(&mut self, thread_id: ThreadId) {
 688        self.known_thread_states
 689            .insert(thread_id, ThreadStatus::Stopped);
 690    }
 691
 692    fn continue_thread(&mut self, thread_id: ThreadId) {
 693        self.known_thread_states
 694            .insert(thread_id, ThreadStatus::Running);
 695    }
 696
 697    fn process_step(&mut self, thread_id: ThreadId) {
 698        self.known_thread_states
 699            .insert(thread_id, ThreadStatus::Stepping);
 700    }
 701
 702    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 703        self.thread_state(thread_id)
 704            .unwrap_or(ThreadStatus::Running)
 705    }
 706
 707    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 708        self.known_thread_states
 709            .get(&thread_id)
 710            .copied()
 711            .or(self.global_state)
 712    }
 713
 714    fn exit_thread(&mut self, thread_id: ThreadId) {
 715        self.known_thread_states
 716            .insert(thread_id, ThreadStatus::Exited);
 717    }
 718
 719    fn any_stopped_thread(&self) -> bool {
 720        self.global_state
 721            .is_some_and(|state| state == ThreadStatus::Stopped)
 722            || self
 723                .known_thread_states
 724                .values()
 725                .any(|status| *status == ThreadStatus::Stopped)
 726    }
 727}
 728const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 729
 730#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 731pub struct OutputToken(pub usize);
 732/// Represents a current state of a single debug adapter and provides ways to mutate it.
 733pub struct Session {
 734    mode: Mode,
 735    pub(super) capabilities: Capabilities,
 736    id: SessionId,
 737    child_session_ids: HashSet<SessionId>,
 738    parent_id: Option<SessionId>,
 739    ignore_breakpoints: bool,
 740    modules: Vec<dap::Module>,
 741    loaded_sources: Vec<dap::Source>,
 742    output_token: OutputToken,
 743    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 744    threads: IndexMap<ThreadId, Thread>,
 745    thread_states: ThreadStates,
 746    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 747    stack_frames: IndexMap<StackFrameId, StackFrame>,
 748    locations: HashMap<u64, dap::LocationsResponse>,
 749    is_session_terminated: bool,
 750    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 751    _background_tasks: Vec<Task<()>>,
 752}
 753
 754trait CacheableCommand: Any + Send + Sync {
 755    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 756    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 757    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 758}
 759
 760impl<T> CacheableCommand for T
 761where
 762    T: DapCommand + PartialEq + Eq + Hash,
 763{
 764    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 765        (rhs as &dyn Any)
 766            .downcast_ref::<Self>()
 767            .map_or(false, |rhs| self == rhs)
 768    }
 769
 770    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 771        T::hash(self, &mut hasher);
 772    }
 773
 774    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 775        self
 776    }
 777}
 778
 779pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 780
 781impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 782    fn from(request: T) -> Self {
 783        Self(Arc::new(request))
 784    }
 785}
 786
 787impl PartialEq for RequestSlot {
 788    fn eq(&self, other: &Self) -> bool {
 789        self.0.dyn_eq(other.0.as_ref())
 790    }
 791}
 792
 793impl Eq for RequestSlot {}
 794
 795impl Hash for RequestSlot {
 796    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 797        self.0.dyn_hash(state);
 798        (&*self.0 as &dyn Any).type_id().hash(state)
 799    }
 800}
 801
 802#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 803pub struct CompletionsQuery {
 804    pub query: String,
 805    pub column: u64,
 806    pub line: Option<u64>,
 807    pub frame_id: Option<u64>,
 808}
 809
 810impl CompletionsQuery {
 811    pub fn new(
 812        buffer: &language::Buffer,
 813        cursor_position: language::Anchor,
 814        frame_id: Option<u64>,
 815    ) -> Self {
 816        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 817        Self {
 818            query: buffer.text(),
 819            column: column as u64,
 820            frame_id,
 821            line: Some(row as u64),
 822        }
 823    }
 824}
 825
 826pub enum SessionEvent {
 827    Modules,
 828    LoadedSources,
 829    Stopped(Option<ThreadId>),
 830    StackTrace,
 831    Variables,
 832    Threads,
 833}
 834
 835pub(crate) enum SessionStateEvent {
 836    Shutdown,
 837}
 838
 839impl EventEmitter<SessionEvent> for Session {}
 840impl EventEmitter<SessionStateEvent> for Session {}
 841
 842// local session will send breakpoint updates to DAP for all new breakpoints
 843// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 844// BreakpointStore notifies session on breakpoint changes
 845impl Session {
 846    pub(crate) fn local(
 847        breakpoint_store: Entity<BreakpointStore>,
 848        session_id: SessionId,
 849        parent_session: Option<Entity<Session>>,
 850        delegate: DapAdapterDelegate,
 851        config: DebugAdapterConfig,
 852        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 853        initialized_tx: oneshot::Sender<()>,
 854        debug_adapters: Arc<DapRegistry>,
 855        cx: &mut App,
 856    ) -> Task<Result<Entity<Self>>> {
 857        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 858
 859        cx.spawn(async move |cx| {
 860            let mode = LocalMode::new(
 861                debug_adapters,
 862                session_id,
 863                parent_session.clone(),
 864                breakpoint_store.clone(),
 865                config.clone(),
 866                delegate,
 867                message_tx,
 868                cx.clone(),
 869            )
 870            .await?;
 871
 872            cx.new(|cx| {
 873                create_local_session(
 874                    breakpoint_store,
 875                    session_id,
 876                    parent_session,
 877                    start_debugging_requests_tx,
 878                    initialized_tx,
 879                    message_rx,
 880                    mode,
 881                    cx,
 882                )
 883            })
 884        })
 885    }
 886
 887    #[cfg(any(test, feature = "test-support"))]
 888    pub(crate) fn fake(
 889        breakpoint_store: Entity<BreakpointStore>,
 890        session_id: SessionId,
 891        parent_session: Option<Entity<Session>>,
 892        delegate: DapAdapterDelegate,
 893        config: DebugAdapterConfig,
 894        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 895        initialized_tx: oneshot::Sender<()>,
 896        caps: Capabilities,
 897        fails: bool,
 898        cx: &mut App,
 899    ) -> Task<Result<Entity<Session>>> {
 900        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 901
 902        cx.spawn(async move |cx| {
 903            let mode = LocalMode::new_fake(
 904                session_id,
 905                parent_session.clone(),
 906                breakpoint_store.clone(),
 907                config.clone(),
 908                delegate,
 909                message_tx,
 910                caps,
 911                fails,
 912                cx.clone(),
 913            )
 914            .await?;
 915
 916            cx.new(|cx| {
 917                create_local_session(
 918                    breakpoint_store,
 919                    session_id,
 920                    parent_session,
 921                    start_debugging_requests_tx,
 922                    initialized_tx,
 923                    message_rx,
 924                    mode,
 925                    cx,
 926                )
 927            })
 928        })
 929    }
 930
 931    pub(crate) fn remote(
 932        session_id: SessionId,
 933        client: AnyProtoClient,
 934        upstream_project_id: u64,
 935        ignore_breakpoints: bool,
 936    ) -> Self {
 937        Self {
 938            mode: Mode::Remote(RemoteConnection {
 939                _client: client,
 940                _upstream_project_id: upstream_project_id,
 941            }),
 942            id: session_id,
 943            child_session_ids: HashSet::default(),
 944            parent_id: None,
 945            capabilities: Capabilities::default(),
 946            ignore_breakpoints,
 947            variables: Default::default(),
 948            stack_frames: Default::default(),
 949            thread_states: ThreadStates::default(),
 950            output_token: OutputToken(0),
 951            output: circular_buffer::CircularBuffer::boxed(),
 952            requests: HashMap::default(),
 953            modules: Vec::default(),
 954            loaded_sources: Vec::default(),
 955            threads: IndexMap::default(),
 956            _background_tasks: Vec::default(),
 957            locations: Default::default(),
 958            is_session_terminated: false,
 959        }
 960    }
 961
 962    pub fn session_id(&self) -> SessionId {
 963        self.id
 964    }
 965
 966    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 967        self.child_session_ids.clone()
 968    }
 969
 970    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 971        self.child_session_ids.insert(session_id);
 972    }
 973
 974    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 975        self.child_session_ids.remove(&session_id);
 976    }
 977
 978    pub fn parent_id(&self) -> Option<SessionId> {
 979        self.parent_id
 980    }
 981
 982    pub fn capabilities(&self) -> &Capabilities {
 983        &self.capabilities
 984    }
 985
 986    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
 987        if let Mode::Local(local_mode) = &self.mode {
 988            Some(local_mode.config.clone())
 989        } else {
 990            None
 991        }
 992    }
 993
 994    pub fn is_terminated(&self) -> bool {
 995        self.is_session_terminated
 996    }
 997
 998    pub fn is_local(&self) -> bool {
 999        matches!(self.mode, Mode::Local(_))
1000    }
1001
1002    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1003        match &mut self.mode {
1004            Mode::Local(local_mode) => Some(local_mode),
1005            Mode::Remote(_) => None,
1006        }
1007    }
1008
1009    pub fn as_local(&self) -> Option<&LocalMode> {
1010        match &self.mode {
1011            Mode::Local(local_mode) => Some(local_mode),
1012            Mode::Remote(_) => None,
1013        }
1014    }
1015
1016    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1017        match &self.mode {
1018            Mode::Local(local_mode) => {
1019                let capabilities = local_mode.clone().request_initialization(cx);
1020
1021                cx.spawn(async move |this, cx| {
1022                    let capabilities = capabilities.await?;
1023                    this.update(cx, |session, _| {
1024                        session.capabilities = capabilities;
1025                    })?;
1026                    Ok(())
1027                })
1028            }
1029            Mode::Remote(_) => Task::ready(Err(anyhow!(
1030                "Cannot send initialize request from remote session"
1031            ))),
1032        }
1033    }
1034
1035    pub(super) fn initialize_sequence(
1036        &mut self,
1037        initialize_rx: oneshot::Receiver<()>,
1038        cx: &mut Context<Self>,
1039    ) -> Task<Result<()>> {
1040        match &self.mode {
1041            Mode::Local(local_mode) => {
1042                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1043            }
1044            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1045        }
1046    }
1047
1048    pub fn run_to_position(
1049        &mut self,
1050        breakpoint: SourceBreakpoint,
1051        active_thread_id: ThreadId,
1052        cx: &mut Context<Self>,
1053    ) {
1054        match &mut self.mode {
1055            Mode::Local(local_mode) => {
1056                if !matches!(
1057                    self.thread_states.thread_state(active_thread_id),
1058                    Some(ThreadStatus::Stopped)
1059                ) {
1060                    return;
1061                };
1062                let path = breakpoint.path.clone();
1063                local_mode.tmp_breakpoint = Some(breakpoint);
1064                let task = local_mode.send_breakpoints_from_path(
1065                    path,
1066                    BreakpointUpdatedReason::Toggled,
1067                    cx,
1068                );
1069
1070                cx.spawn(async move |this, cx| {
1071                    task.await;
1072                    this.update(cx, |this, cx| {
1073                        this.continue_thread(active_thread_id, cx);
1074                    })
1075                })
1076                .detach();
1077            }
1078            Mode::Remote(_) => {}
1079        }
1080    }
1081
1082    pub fn output(
1083        &self,
1084        since: OutputToken,
1085    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1086        if self.output_token.0 == 0 {
1087            return (self.output.range(0..0), OutputToken(0));
1088        };
1089
1090        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1091
1092        let clamped_events_since = events_since.clamp(0, self.output.len());
1093        (
1094            self.output
1095                .range(self.output.len() - clamped_events_since..),
1096            self.output_token,
1097        )
1098    }
1099
1100    pub fn respond_to_client(
1101        &self,
1102        request_seq: u64,
1103        success: bool,
1104        command: String,
1105        body: Option<serde_json::Value>,
1106        cx: &mut Context<Self>,
1107    ) -> Task<Result<()>> {
1108        let Some(local_session) = self.as_local().cloned() else {
1109            unreachable!("Cannot respond to remote client");
1110        };
1111
1112        cx.background_spawn(async move {
1113            local_session
1114                .client
1115                .send_message(Message::Response(Response {
1116                    body,
1117                    success,
1118                    command,
1119                    seq: request_seq + 1,
1120                    request_seq,
1121                    message: None,
1122                }))
1123                .await
1124        })
1125    }
1126
1127    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1128        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1129            let breakpoint = local.tmp_breakpoint.take()?;
1130            let path = breakpoint.path.clone();
1131            Some((local, path))
1132        }) {
1133            local
1134                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1135                .detach();
1136        };
1137
1138        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1139            self.thread_states.stop_all_threads();
1140
1141            self.invalidate_command_type::<StackTraceCommand>();
1142        }
1143
1144        // Event if we stopped all threads we still need to insert the thread_id
1145        // to our own data
1146        if let Some(thread_id) = event.thread_id {
1147            self.thread_states.stop_thread(ThreadId(thread_id));
1148
1149            self.invalidate_state(
1150                &StackTraceCommand {
1151                    thread_id,
1152                    start_frame: None,
1153                    levels: None,
1154                }
1155                .into(),
1156            );
1157        }
1158
1159        self.invalidate_generic();
1160        self.threads.clear();
1161        self.variables.clear();
1162        cx.emit(SessionEvent::Stopped(
1163            event
1164                .thread_id
1165                .map(Into::into)
1166                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1167        ));
1168        cx.notify();
1169    }
1170
1171    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1172        match *event {
1173            Events::Initialized(_) => {
1174                debug_assert!(
1175                    false,
1176                    "Initialized event should have been handled in LocalMode"
1177                );
1178            }
1179            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1180            Events::Continued(event) => {
1181                if event.all_threads_continued.unwrap_or_default() {
1182                    self.thread_states.continue_all_threads();
1183                } else {
1184                    self.thread_states
1185                        .continue_thread(ThreadId(event.thread_id));
1186                }
1187                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1188                self.invalidate_generic();
1189            }
1190            Events::Exited(_event) => {
1191                self.clear_active_debug_line(cx);
1192            }
1193            Events::Terminated(_) => {
1194                self.is_session_terminated = true;
1195                self.clear_active_debug_line(cx);
1196            }
1197            Events::Thread(event) => {
1198                let thread_id = ThreadId(event.thread_id);
1199
1200                match event.reason {
1201                    dap::ThreadEventReason::Started => {
1202                        self.thread_states.continue_thread(thread_id);
1203                    }
1204                    dap::ThreadEventReason::Exited => {
1205                        self.thread_states.exit_thread(thread_id);
1206                    }
1207                    reason => {
1208                        log::error!("Unhandled thread event reason {:?}", reason);
1209                    }
1210                }
1211                self.invalidate_state(&ThreadsCommand.into());
1212                cx.notify();
1213            }
1214            Events::Output(event) => {
1215                if event
1216                    .category
1217                    .as_ref()
1218                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1219                {
1220                    return;
1221                }
1222
1223                self.output.push_back(event);
1224                self.output_token.0 += 1;
1225                cx.notify();
1226            }
1227            Events::Breakpoint(_) => {}
1228            Events::Module(event) => {
1229                match event.reason {
1230                    dap::ModuleEventReason::New => {
1231                        self.modules.push(event.module);
1232                    }
1233                    dap::ModuleEventReason::Changed => {
1234                        if let Some(module) = self
1235                            .modules
1236                            .iter_mut()
1237                            .find(|other| event.module.id == other.id)
1238                        {
1239                            *module = event.module;
1240                        }
1241                    }
1242                    dap::ModuleEventReason::Removed => {
1243                        self.modules.retain(|other| event.module.id != other.id);
1244                    }
1245                }
1246
1247                // todo(debugger): We should only send the invalidate command to downstream clients.
1248                // self.invalidate_state(&ModulesCommand.into());
1249            }
1250            Events::LoadedSource(_) => {
1251                self.invalidate_state(&LoadedSourcesCommand.into());
1252            }
1253            Events::Capabilities(event) => {
1254                self.capabilities = self.capabilities.merge(event.capabilities);
1255                cx.notify();
1256            }
1257            Events::Memory(_) => {}
1258            Events::Process(_) => {}
1259            Events::ProgressEnd(_) => {}
1260            Events::ProgressStart(_) => {}
1261            Events::ProgressUpdate(_) => {}
1262            Events::Invalidated(_) => {}
1263            Events::Other(_) => {}
1264        }
1265    }
1266
1267    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1268    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1269        &mut self,
1270        request: T,
1271        process_result: impl FnOnce(
1272            &mut Self,
1273            Result<T::Response>,
1274            &mut Context<Self>,
1275        ) -> Option<T::Response>
1276        + 'static,
1277        cx: &mut Context<Self>,
1278    ) {
1279        const {
1280            assert!(
1281                T::CACHEABLE,
1282                "Only requests marked as cacheable should invoke `fetch`"
1283            );
1284        }
1285
1286        if !self.thread_states.any_stopped_thread()
1287            && request.type_id() != TypeId::of::<ThreadsCommand>()
1288            || self.is_session_terminated
1289        {
1290            return;
1291        }
1292
1293        let request_map = self
1294            .requests
1295            .entry(std::any::TypeId::of::<T>())
1296            .or_default();
1297
1298        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1299            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1300
1301            let task = Self::request_inner::<Arc<T>>(
1302                &self.capabilities,
1303                self.id,
1304                &self.mode,
1305                command,
1306                process_result,
1307                cx,
1308            );
1309            let task = cx
1310                .background_executor()
1311                .spawn(async move {
1312                    let _ = task.await?;
1313                    Some(())
1314                })
1315                .shared();
1316
1317            vacant.insert(task);
1318            cx.notify();
1319        }
1320    }
1321
1322    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1323        capabilities: &Capabilities,
1324        session_id: SessionId,
1325        mode: &Mode,
1326        request: T,
1327        process_result: impl FnOnce(
1328            &mut Self,
1329            Result<T::Response>,
1330            &mut Context<Self>,
1331        ) -> Option<T::Response>
1332        + 'static,
1333        cx: &mut Context<Self>,
1334    ) -> Task<Option<T::Response>> {
1335        if !T::is_supported(&capabilities) {
1336            log::warn!(
1337                "Attempted to send a DAP request that isn't supported: {:?}",
1338                request
1339            );
1340            let error = Err(anyhow::Error::msg(
1341                "Couldn't complete request because it's not supported",
1342            ));
1343            return cx.spawn(async move |this, cx| {
1344                this.update(cx, |this, cx| process_result(this, error, cx))
1345                    .log_err()
1346                    .flatten()
1347            });
1348        }
1349
1350        let request = mode.request_dap(session_id, request, cx);
1351        cx.spawn(async move |this, cx| {
1352            let result = request.await;
1353            this.update(cx, |this, cx| process_result(this, result, cx))
1354                .log_err()
1355                .flatten()
1356        })
1357    }
1358
1359    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1360        &self,
1361        request: T,
1362        process_result: impl FnOnce(
1363            &mut Self,
1364            Result<T::Response>,
1365            &mut Context<Self>,
1366        ) -> Option<T::Response>
1367        + 'static,
1368        cx: &mut Context<Self>,
1369    ) -> Task<Option<T::Response>> {
1370        Self::request_inner(
1371            &self.capabilities,
1372            self.id,
1373            &self.mode,
1374            request,
1375            process_result,
1376            cx,
1377        )
1378    }
1379
1380    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1381        self.requests.remove(&std::any::TypeId::of::<Command>());
1382    }
1383
1384    fn invalidate_generic(&mut self) {
1385        self.invalidate_command_type::<ModulesCommand>();
1386        self.invalidate_command_type::<LoadedSourcesCommand>();
1387        self.invalidate_command_type::<ThreadsCommand>();
1388    }
1389
1390    fn invalidate_state(&mut self, key: &RequestSlot) {
1391        self.requests
1392            .entry((&*key.0 as &dyn Any).type_id())
1393            .and_modify(|request_map| {
1394                request_map.remove(&key);
1395            });
1396    }
1397
1398    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1399        self.thread_states.thread_status(thread_id)
1400    }
1401
1402    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1403        self.fetch(
1404            dap_command::ThreadsCommand,
1405            |this, result, cx| {
1406                let result = result.log_err()?;
1407
1408                this.threads = result
1409                    .iter()
1410                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1411                    .collect();
1412
1413                this.invalidate_command_type::<StackTraceCommand>();
1414                cx.emit(SessionEvent::Threads);
1415                cx.notify();
1416
1417                Some(result)
1418            },
1419            cx,
1420        );
1421
1422        self.threads
1423            .values()
1424            .map(|thread| {
1425                (
1426                    thread.dap.clone(),
1427                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1428                )
1429            })
1430            .collect()
1431    }
1432
1433    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1434        self.fetch(
1435            dap_command::ModulesCommand,
1436            |this, result, cx| {
1437                let result = result.log_err()?;
1438
1439                this.modules = result.iter().cloned().collect();
1440                cx.emit(SessionEvent::Modules);
1441                cx.notify();
1442
1443                Some(result)
1444            },
1445            cx,
1446        );
1447
1448        &self.modules
1449    }
1450
1451    pub fn ignore_breakpoints(&self) -> bool {
1452        self.ignore_breakpoints
1453    }
1454
1455    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1456        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1457    }
1458
1459    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1460        if self.ignore_breakpoints == ignore {
1461            return Task::ready(());
1462        }
1463
1464        self.ignore_breakpoints = ignore;
1465
1466        if let Some(local) = self.as_local() {
1467            local.send_all_breakpoints(ignore, cx)
1468        } else {
1469            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1470            unimplemented!()
1471        }
1472    }
1473
1474    pub fn breakpoints_enabled(&self) -> bool {
1475        self.ignore_breakpoints
1476    }
1477
1478    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1479        self.fetch(
1480            dap_command::LoadedSourcesCommand,
1481            |this, result, cx| {
1482                let result = result.log_err()?;
1483                this.loaded_sources = result.iter().cloned().collect();
1484                cx.emit(SessionEvent::LoadedSources);
1485                cx.notify();
1486                Some(result)
1487            },
1488            cx,
1489        );
1490
1491        &self.loaded_sources
1492    }
1493
1494    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1495        res.log_err()?;
1496        Some(())
1497    }
1498
1499    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1500        thread_id: ThreadId,
1501    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1502    {
1503        move |this, response, cx| match response.log_err() {
1504            Some(response) => Some(response),
1505            None => {
1506                this.thread_states.stop_thread(thread_id);
1507                cx.notify();
1508                None
1509            }
1510        }
1511    }
1512
1513    fn clear_active_debug_line_response(
1514        &mut self,
1515        response: Result<()>,
1516        cx: &mut Context<Session>,
1517    ) -> Option<()> {
1518        response.log_err()?;
1519        self.clear_active_debug_line(cx);
1520        Some(())
1521    }
1522
1523    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1524        self.as_local()
1525            .expect("Message handler will only run in local mode")
1526            .breakpoint_store
1527            .update(cx, |store, cx| {
1528                store.remove_active_position(Some(self.id), cx)
1529            });
1530    }
1531
1532    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1533        self.request(
1534            PauseCommand {
1535                thread_id: thread_id.0,
1536            },
1537            Self::empty_response,
1538            cx,
1539        )
1540        .detach();
1541    }
1542
1543    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1544        self.request(
1545            RestartStackFrameCommand { stack_frame_id },
1546            Self::empty_response,
1547            cx,
1548        )
1549        .detach();
1550    }
1551
1552    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1553        if self.capabilities.supports_restart_request.unwrap_or(false) {
1554            self.request(
1555                RestartCommand {
1556                    raw: args.unwrap_or(Value::Null),
1557                },
1558                Self::empty_response,
1559                cx,
1560            )
1561            .detach();
1562        } else {
1563            self.request(
1564                DisconnectCommand {
1565                    restart: Some(false),
1566                    terminate_debuggee: Some(true),
1567                    suspend_debuggee: Some(false),
1568                },
1569                Self::empty_response,
1570                cx,
1571            )
1572            .detach();
1573        }
1574    }
1575
1576    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1577        self.is_session_terminated = true;
1578        self.thread_states.exit_all_threads();
1579        cx.notify();
1580
1581        let task = if self
1582            .capabilities
1583            .supports_terminate_request
1584            .unwrap_or_default()
1585        {
1586            self.request(
1587                TerminateCommand {
1588                    restart: Some(false),
1589                },
1590                Self::clear_active_debug_line_response,
1591                cx,
1592            )
1593        } else {
1594            self.request(
1595                DisconnectCommand {
1596                    restart: Some(false),
1597                    terminate_debuggee: Some(true),
1598                    suspend_debuggee: Some(false),
1599                },
1600                Self::clear_active_debug_line_response,
1601                cx,
1602            )
1603        };
1604
1605        cx.emit(SessionStateEvent::Shutdown);
1606
1607        cx.background_spawn(async move {
1608            let _ = task.await;
1609        })
1610    }
1611
1612    pub fn completions(
1613        &mut self,
1614        query: CompletionsQuery,
1615        cx: &mut Context<Self>,
1616    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1617        let task = self.request(query, |_, result, _| result.log_err(), cx);
1618
1619        cx.background_executor().spawn(async move {
1620            anyhow::Ok(
1621                task.await
1622                    .map(|response| response.targets)
1623                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1624            )
1625        })
1626    }
1627
1628    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1629        self.thread_states.continue_thread(thread_id);
1630        self.request(
1631            ContinueCommand {
1632                args: ContinueArguments {
1633                    thread_id: thread_id.0,
1634                    single_thread: Some(true),
1635                },
1636            },
1637            Self::on_step_response::<ContinueCommand>(thread_id),
1638            cx,
1639        )
1640        .detach();
1641    }
1642
1643    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1644        match self.mode {
1645            Mode::Local(ref local) => Some(local.client.clone()),
1646            Mode::Remote(_) => None,
1647        }
1648    }
1649
1650    pub fn step_over(
1651        &mut self,
1652        thread_id: ThreadId,
1653        granularity: SteppingGranularity,
1654        cx: &mut Context<Self>,
1655    ) {
1656        let supports_single_thread_execution_requests =
1657            self.capabilities.supports_single_thread_execution_requests;
1658        let supports_stepping_granularity = self
1659            .capabilities
1660            .supports_stepping_granularity
1661            .unwrap_or_default();
1662
1663        let command = NextCommand {
1664            inner: StepCommand {
1665                thread_id: thread_id.0,
1666                granularity: supports_stepping_granularity.then(|| granularity),
1667                single_thread: supports_single_thread_execution_requests,
1668            },
1669        };
1670
1671        self.thread_states.process_step(thread_id);
1672        self.request(
1673            command,
1674            Self::on_step_response::<NextCommand>(thread_id),
1675            cx,
1676        )
1677        .detach();
1678    }
1679
1680    pub fn step_in(
1681        &mut self,
1682        thread_id: ThreadId,
1683        granularity: SteppingGranularity,
1684        cx: &mut Context<Self>,
1685    ) {
1686        let supports_single_thread_execution_requests =
1687            self.capabilities.supports_single_thread_execution_requests;
1688        let supports_stepping_granularity = self
1689            .capabilities
1690            .supports_stepping_granularity
1691            .unwrap_or_default();
1692
1693        let command = StepInCommand {
1694            inner: StepCommand {
1695                thread_id: thread_id.0,
1696                granularity: supports_stepping_granularity.then(|| granularity),
1697                single_thread: supports_single_thread_execution_requests,
1698            },
1699        };
1700
1701        self.thread_states.process_step(thread_id);
1702        self.request(
1703            command,
1704            Self::on_step_response::<StepInCommand>(thread_id),
1705            cx,
1706        )
1707        .detach();
1708    }
1709
1710    pub fn step_out(
1711        &mut self,
1712        thread_id: ThreadId,
1713        granularity: SteppingGranularity,
1714        cx: &mut Context<Self>,
1715    ) {
1716        let supports_single_thread_execution_requests =
1717            self.capabilities.supports_single_thread_execution_requests;
1718        let supports_stepping_granularity = self
1719            .capabilities
1720            .supports_stepping_granularity
1721            .unwrap_or_default();
1722
1723        let command = StepOutCommand {
1724            inner: StepCommand {
1725                thread_id: thread_id.0,
1726                granularity: supports_stepping_granularity.then(|| granularity),
1727                single_thread: supports_single_thread_execution_requests,
1728            },
1729        };
1730
1731        self.thread_states.process_step(thread_id);
1732        self.request(
1733            command,
1734            Self::on_step_response::<StepOutCommand>(thread_id),
1735            cx,
1736        )
1737        .detach();
1738    }
1739
1740    pub fn step_back(
1741        &mut self,
1742        thread_id: ThreadId,
1743        granularity: SteppingGranularity,
1744        cx: &mut Context<Self>,
1745    ) {
1746        let supports_single_thread_execution_requests =
1747            self.capabilities.supports_single_thread_execution_requests;
1748        let supports_stepping_granularity = self
1749            .capabilities
1750            .supports_stepping_granularity
1751            .unwrap_or_default();
1752
1753        let command = StepBackCommand {
1754            inner: StepCommand {
1755                thread_id: thread_id.0,
1756                granularity: supports_stepping_granularity.then(|| granularity),
1757                single_thread: supports_single_thread_execution_requests,
1758            },
1759        };
1760
1761        self.thread_states.process_step(thread_id);
1762
1763        self.request(
1764            command,
1765            Self::on_step_response::<StepBackCommand>(thread_id),
1766            cx,
1767        )
1768        .detach();
1769    }
1770
1771    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1772        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1773            && self.requests.contains_key(&ThreadsCommand.type_id())
1774            && self.threads.contains_key(&thread_id)
1775        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1776        // We could still be using an old thread id and have sent a new thread's request
1777        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1778        // But it very well could cause a minor bug in the future that is hard to track down
1779        {
1780            self.fetch(
1781                super::dap_command::StackTraceCommand {
1782                    thread_id: thread_id.0,
1783                    start_frame: None,
1784                    levels: None,
1785                },
1786                move |this, stack_frames, cx| {
1787                    let stack_frames = stack_frames.log_err()?;
1788
1789                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1790                        thread.stack_frame_ids =
1791                            stack_frames.iter().map(|frame| frame.id).collect();
1792                    });
1793                    debug_assert!(
1794                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1795                        "Sent request for thread_id that doesn't exist"
1796                    );
1797
1798                    this.stack_frames.extend(
1799                        stack_frames
1800                            .iter()
1801                            .cloned()
1802                            .map(|frame| (frame.id, StackFrame::from(frame))),
1803                    );
1804
1805                    this.invalidate_command_type::<ScopesCommand>();
1806                    this.invalidate_command_type::<VariablesCommand>();
1807
1808                    cx.emit(SessionEvent::StackTrace);
1809                    cx.notify();
1810                    Some(stack_frames)
1811                },
1812                cx,
1813            );
1814        }
1815
1816        self.threads
1817            .get(&thread_id)
1818            .map(|thread| {
1819                thread
1820                    .stack_frame_ids
1821                    .iter()
1822                    .filter_map(|id| self.stack_frames.get(id))
1823                    .cloned()
1824                    .collect()
1825            })
1826            .unwrap_or_default()
1827    }
1828
1829    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1830        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1831            && self
1832                .requests
1833                .contains_key(&TypeId::of::<StackTraceCommand>())
1834        {
1835            self.fetch(
1836                ScopesCommand { stack_frame_id },
1837                move |this, scopes, cx| {
1838                    let scopes = scopes.log_err()?;
1839
1840                    for scope in scopes .iter(){
1841                        this.variables(scope.variables_reference, cx);
1842                    }
1843
1844                    let entry = this
1845                        .stack_frames
1846                        .entry(stack_frame_id)
1847                        .and_modify(|stack_frame| {
1848                            stack_frame.scopes = scopes.clone();
1849                        });
1850
1851                    cx.emit(SessionEvent::Variables);
1852
1853                    debug_assert!(
1854                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1855                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1856                    );
1857
1858                    Some(scopes)
1859                },
1860                cx,
1861            );
1862        }
1863
1864        self.stack_frames
1865            .get(&stack_frame_id)
1866            .map(|frame| frame.scopes.as_slice())
1867            .unwrap_or_default()
1868    }
1869
1870    pub fn variables(
1871        &mut self,
1872        variables_reference: VariableReference,
1873        cx: &mut Context<Self>,
1874    ) -> Vec<dap::Variable> {
1875        let command = VariablesCommand {
1876            variables_reference,
1877            filter: None,
1878            start: None,
1879            count: None,
1880            format: None,
1881        };
1882
1883        self.fetch(
1884            command,
1885            move |this, variables, cx| {
1886                let variables = variables.log_err()?;
1887                this.variables
1888                    .insert(variables_reference, variables.clone());
1889
1890                cx.emit(SessionEvent::Variables);
1891                Some(variables)
1892            },
1893            cx,
1894        );
1895
1896        self.variables
1897            .get(&variables_reference)
1898            .cloned()
1899            .unwrap_or_default()
1900    }
1901
1902    pub fn set_variable_value(
1903        &mut self,
1904        variables_reference: u64,
1905        name: String,
1906        value: String,
1907        cx: &mut Context<Self>,
1908    ) {
1909        if self.capabilities.supports_set_variable.unwrap_or_default() {
1910            self.request(
1911                SetVariableValueCommand {
1912                    name,
1913                    value,
1914                    variables_reference,
1915                },
1916                move |this, response, cx| {
1917                    let response = response.log_err()?;
1918                    this.invalidate_command_type::<VariablesCommand>();
1919                    cx.notify();
1920                    Some(response)
1921                },
1922                cx,
1923            )
1924            .detach()
1925        }
1926    }
1927
1928    pub fn evaluate(
1929        &mut self,
1930        expression: String,
1931        context: Option<EvaluateArgumentsContext>,
1932        frame_id: Option<u64>,
1933        source: Option<Source>,
1934        cx: &mut Context<Self>,
1935    ) {
1936        self.request(
1937            EvaluateCommand {
1938                expression,
1939                context,
1940                frame_id,
1941                source,
1942            },
1943            |this, response, cx| {
1944                let response = response.log_err()?;
1945                this.output_token.0 += 1;
1946                this.output.push_back(dap::OutputEvent {
1947                    category: None,
1948                    output: response.result.clone(),
1949                    group: None,
1950                    variables_reference: Some(response.variables_reference),
1951                    source: None,
1952                    line: None,
1953                    column: None,
1954                    data: None,
1955                    location_reference: None,
1956                });
1957
1958                this.invalidate_command_type::<ScopesCommand>();
1959                cx.notify();
1960                Some(response)
1961            },
1962            cx,
1963        )
1964        .detach();
1965    }
1966
1967    pub fn location(
1968        &mut self,
1969        reference: u64,
1970        cx: &mut Context<Self>,
1971    ) -> Option<dap::LocationsResponse> {
1972        self.fetch(
1973            LocationsCommand { reference },
1974            move |this, response, _| {
1975                let response = response.log_err()?;
1976                this.locations.insert(reference, response.clone());
1977                Some(response)
1978            },
1979            cx,
1980        );
1981        self.locations.get(&reference).cloned()
1982    }
1983    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1984        let command = DisconnectCommand {
1985            restart: Some(false),
1986            terminate_debuggee: Some(true),
1987            suspend_debuggee: Some(false),
1988        };
1989
1990        self.request(command, Self::empty_response, cx).detach()
1991    }
1992
1993    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1994        if self
1995            .capabilities
1996            .supports_terminate_threads_request
1997            .unwrap_or_default()
1998        {
1999            self.request(
2000                TerminateThreadsCommand {
2001                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2002                },
2003                Self::clear_active_debug_line_response,
2004                cx,
2005            )
2006            .detach();
2007        } else {
2008            self.shutdown(cx).detach();
2009        }
2010    }
2011}
2012
2013fn create_local_session(
2014    breakpoint_store: Entity<BreakpointStore>,
2015    session_id: SessionId,
2016    parent_session: Option<Entity<Session>>,
2017    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
2018    initialized_tx: oneshot::Sender<()>,
2019    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
2020    mode: LocalMode,
2021    cx: &mut Context<Session>,
2022) -> Session {
2023    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
2024        let mut initialized_tx = Some(initialized_tx);
2025        while let Some(message) = message_rx.next().await {
2026            if let Message::Event(event) = message {
2027                if let Events::Initialized(_) = *event {
2028                    if let Some(tx) = initialized_tx.take() {
2029                        tx.send(()).ok();
2030                    }
2031                } else {
2032                    let Ok(_) = this.update(cx, |session, cx| {
2033                        session.handle_dap_event(event, cx);
2034                    }) else {
2035                        break;
2036                    };
2037                }
2038            } else {
2039                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
2040                else {
2041                    break;
2042                };
2043            }
2044        }
2045    })];
2046
2047    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2048        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2049            if let Some(local) = (!this.ignore_breakpoints)
2050                .then(|| this.as_local_mut())
2051                .flatten()
2052            {
2053                local
2054                    .send_breakpoints_from_path(path.clone(), *reason, cx)
2055                    .detach();
2056            };
2057        }
2058        BreakpointStoreEvent::BreakpointsCleared(paths) => {
2059            if let Some(local) = (!this.ignore_breakpoints)
2060                .then(|| this.as_local_mut())
2061                .flatten()
2062            {
2063                local.unset_breakpoints_from_paths(paths, cx).detach();
2064            }
2065        }
2066        BreakpointStoreEvent::ActiveDebugLineChanged => {}
2067    })
2068    .detach();
2069
2070    Session {
2071        mode: Mode::Local(mode),
2072        id: session_id,
2073        child_session_ids: HashSet::default(),
2074        parent_id: parent_session.map(|session| session.read(cx).id),
2075        variables: Default::default(),
2076        capabilities: Capabilities::default(),
2077        thread_states: ThreadStates::default(),
2078        output_token: OutputToken(0),
2079        ignore_breakpoints: false,
2080        output: circular_buffer::CircularBuffer::boxed(),
2081        requests: HashMap::default(),
2082        modules: Vec::default(),
2083        loaded_sources: Vec::default(),
2084        threads: IndexMap::default(),
2085        stack_frames: IndexMap::default(),
2086        locations: Default::default(),
2087        _background_tasks,
2088        is_session_terminated: false,
2089    }
2090}