session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapAdapterDelegate;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap, IndexSet};
  17use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  18use dap::messages::Response;
  19use dap::{
  20    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  21    SteppingGranularity, StoppedEvent, VariableReference,
  22    adapters::{DapDelegate, DapStatus},
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    DapRegistry, DebugRequestType, ExceptionBreakpointsFilter, ExceptionFilterOptions,
  28    OutputEventCategory,
  29};
  30use futures::channel::oneshot;
  31use futures::{FutureExt, future::Shared};
  32use gpui::{
  33    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  34    Task, WeakEntity,
  35};
  36use rpc::AnyProtoClient;
  37use serde_json::{Value, json};
  38use settings::Settings;
  39use smol::stream::StreamExt;
  40use std::any::TypeId;
  41use std::collections::BTreeMap;
  42use std::path::PathBuf;
  43use std::u64;
  44use std::{
  45    any::Any,
  46    collections::hash_map::Entry,
  47    hash::{Hash, Hasher},
  48    path::Path,
  49    sync::Arc,
  50};
  51use task::{DebugAdapterConfig, DebugTaskDefinition};
  52use text::{PointUtf16, ToPointUtf16};
  53use util::{ResultExt, merge_json_value_into};
  54
  55#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  56#[repr(transparent)]
  57pub struct ThreadId(pub u64);
  58
  59impl ThreadId {
  60    pub const MIN: ThreadId = ThreadId(u64::MIN);
  61    pub const MAX: ThreadId = ThreadId(u64::MAX);
  62}
  63
  64impl From<u64> for ThreadId {
  65    fn from(id: u64) -> Self {
  66        Self(id)
  67    }
  68}
  69
  70#[derive(Clone, Debug)]
  71pub struct StackFrame {
  72    pub dap: dap::StackFrame,
  73    pub scopes: Vec<dap::Scope>,
  74}
  75
  76impl From<dap::StackFrame> for StackFrame {
  77    fn from(stack_frame: dap::StackFrame) -> Self {
  78        Self {
  79            scopes: vec![],
  80            dap: stack_frame,
  81        }
  82    }
  83}
  84
  85#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  86pub enum ThreadStatus {
  87    #[default]
  88    Running,
  89    Stopped,
  90    Stepping,
  91    Exited,
  92    Ended,
  93}
  94
  95impl ThreadStatus {
  96    pub fn label(&self) -> &'static str {
  97        match self {
  98            ThreadStatus::Running => "Running",
  99            ThreadStatus::Stopped => "Stopped",
 100            ThreadStatus::Stepping => "Stepping",
 101            ThreadStatus::Exited => "Exited",
 102            ThreadStatus::Ended => "Ended",
 103        }
 104    }
 105}
 106
 107#[derive(Debug)]
 108pub struct Thread {
 109    dap: dap::Thread,
 110    stack_frame_ids: IndexSet<StackFrameId>,
 111    _has_stopped: bool,
 112}
 113
 114impl From<dap::Thread> for Thread {
 115    fn from(dap: dap::Thread) -> Self {
 116        Self {
 117            dap,
 118            stack_frame_ids: Default::default(),
 119            _has_stopped: false,
 120        }
 121    }
 122}
 123
 124type UpstreamProjectId = u64;
 125
 126struct RemoteConnection {
 127    _client: AnyProtoClient,
 128    _upstream_project_id: UpstreamProjectId,
 129    _adapter_name: SharedString,
 130}
 131
 132impl RemoteConnection {
 133    fn send_proto_client_request<R: DapCommand>(
 134        &self,
 135        _request: R,
 136        _session_id: SessionId,
 137        cx: &mut App,
 138    ) -> Task<Result<R::Response>> {
 139        // let message = request.to_proto(session_id, self.upstream_project_id);
 140        // let upstream_client = self.client.clone();
 141        cx.background_executor().spawn(async move {
 142            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 143            // let response = upstream_client.request(message).await?;
 144            // request.response_from_proto(response)
 145            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 146        })
 147    }
 148
 149    fn request<R: DapCommand>(
 150        &self,
 151        request: R,
 152        session_id: SessionId,
 153        cx: &mut App,
 154    ) -> Task<Result<R::Response>>
 155    where
 156        <R::DapRequest as dap::requests::Request>::Response: 'static,
 157        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 158    {
 159        return self.send_proto_client_request::<R>(request, session_id, cx);
 160    }
 161}
 162
 163enum Mode {
 164    Local(LocalMode),
 165    Remote(RemoteConnection),
 166}
 167
 168#[derive(Clone)]
 169pub struct LocalMode {
 170    client: Arc<DebugAdapterClient>,
 171    config: DebugAdapterConfig,
 172    adapter: Arc<dyn DebugAdapter>,
 173    breakpoint_store: Entity<BreakpointStore>,
 174    tmp_breakpoint: Option<SourceBreakpoint>,
 175}
 176
 177fn client_source(abs_path: &Path) -> dap::Source {
 178    dap::Source {
 179        name: abs_path
 180            .file_name()
 181            .map(|filename| filename.to_string_lossy().to_string()),
 182        path: Some(abs_path.to_string_lossy().to_string()),
 183        source_reference: None,
 184        presentation_hint: None,
 185        origin: None,
 186        sources: None,
 187        adapter_data: None,
 188        checksums: None,
 189    }
 190}
 191
 192impl LocalMode {
 193    fn new(
 194        debug_adapters: Arc<DapRegistry>,
 195        session_id: SessionId,
 196        parent_session: Option<Entity<Session>>,
 197        breakpoint_store: Entity<BreakpointStore>,
 198        config: DebugAdapterConfig,
 199        delegate: DapAdapterDelegate,
 200        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 201        cx: AsyncApp,
 202    ) -> Task<Result<Self>> {
 203        Self::new_inner(
 204            debug_adapters,
 205            session_id,
 206            parent_session,
 207            breakpoint_store,
 208            config,
 209            delegate,
 210            messages_tx,
 211            async |_, _| {},
 212            cx,
 213        )
 214    }
 215    #[cfg(any(test, feature = "test-support"))]
 216    fn new_fake(
 217        session_id: SessionId,
 218        parent_session: Option<Entity<Session>>,
 219        breakpoint_store: Entity<BreakpointStore>,
 220        config: DebugAdapterConfig,
 221        delegate: DapAdapterDelegate,
 222        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 223        caps: Capabilities,
 224        fail: bool,
 225        cx: AsyncApp,
 226    ) -> Task<Result<Self>> {
 227        use task::DebugRequestDisposition;
 228
 229        let request = match config.request.clone() {
 230            DebugRequestDisposition::UserConfigured(request) => request,
 231            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 232                match reverse_request_args.request {
 233                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 234                        DebugRequestType::Launch(task::LaunchConfig {
 235                            program: "".to_owned(),
 236                            cwd: None,
 237                            args: Default::default(),
 238                        })
 239                    }
 240                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 241                        DebugRequestType::Attach(task::AttachConfig {
 242                            process_id: Some(0),
 243                        })
 244                    }
 245                }
 246            }
 247        };
 248
 249        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 250            session
 251                .client
 252                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 253                .await;
 254
 255            let paths = cx
 256                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 257                .expect("Breakpoint store should exist in all tests that start debuggers");
 258
 259            session
 260                .client
 261                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 262                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 263                    if !paths.contains(&p) {
 264                        panic!("Sent breakpoints for path without any")
 265                    }
 266
 267                    Ok(dap::SetBreakpointsResponse {
 268                        breakpoints: Vec::default(),
 269                    })
 270                })
 271                .await;
 272
 273            match request {
 274                dap::DebugRequestType::Launch(_) => {
 275                    if fail {
 276                        session
 277                            .client
 278                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 279                                Err(dap::ErrorResponse {
 280                                    error: Some(dap::Message {
 281                                        id: 1,
 282                                        format: "error".into(),
 283                                        variables: None,
 284                                        send_telemetry: None,
 285                                        show_user: None,
 286                                        url: None,
 287                                        url_label: None,
 288                                    }),
 289                                })
 290                            })
 291                            .await;
 292                    } else {
 293                        session
 294                            .client
 295                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 296                            .await;
 297                    }
 298                }
 299                dap::DebugRequestType::Attach(attach_config) => {
 300                    if fail {
 301                        session
 302                            .client
 303                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 304                                Err(dap::ErrorResponse {
 305                                    error: Some(dap::Message {
 306                                        id: 1,
 307                                        format: "error".into(),
 308                                        variables: None,
 309                                        send_telemetry: None,
 310                                        show_user: None,
 311                                        url: None,
 312                                        url_label: None,
 313                                    }),
 314                                })
 315                            })
 316                            .await;
 317                    } else {
 318                        session
 319                            .client
 320                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 321                                assert_eq!(
 322                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 323                                    args.raw
 324                                );
 325
 326                                Ok(())
 327                            })
 328                            .await;
 329                    }
 330                }
 331            }
 332
 333            session
 334                .client
 335                .on_request::<dap::requests::SetExceptionBreakpoints, _>(move |_, _| {
 336                    Ok(dap::SetExceptionBreakpointsResponse { breakpoints: None })
 337                })
 338                .await;
 339
 340            session
 341                .client
 342                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 343                .await;
 344            session.client.fake_event(Events::Initialized(None)).await;
 345        };
 346        Self::new_inner(
 347            DapRegistry::fake().into(),
 348            session_id,
 349            parent_session,
 350            breakpoint_store,
 351            config,
 352            delegate,
 353            messages_tx,
 354            callback,
 355            cx,
 356        )
 357    }
 358    fn new_inner(
 359        registry: Arc<DapRegistry>,
 360        session_id: SessionId,
 361        parent_session: Option<Entity<Session>>,
 362        breakpoint_store: Entity<BreakpointStore>,
 363        config: DebugAdapterConfig,
 364        delegate: DapAdapterDelegate,
 365        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 366        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 367        cx: AsyncApp,
 368    ) -> Task<Result<Self>> {
 369        cx.spawn(async move |cx| {
 370            let (adapter, binary) =
 371                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 372
 373            let message_handler = Box::new(move |message| {
 374                messages_tx.unbounded_send(message).ok();
 375            });
 376
 377            let client = Arc::new(
 378                if let Some(client) = parent_session
 379                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 380                    .flatten()
 381                {
 382                    client
 383                        .reconnect(session_id, binary, message_handler, cx.clone())
 384                        .await?
 385                } else {
 386                    DebugAdapterClient::start(
 387                        session_id,
 388                        adapter.name(),
 389                        binary,
 390                        message_handler,
 391                        cx.clone(),
 392                    )
 393                    .await
 394                    .with_context(|| "Failed to start communication with debug adapter")?
 395                },
 396            );
 397
 398            let mut session = Self {
 399                client,
 400                adapter,
 401                breakpoint_store,
 402                tmp_breakpoint: None,
 403                config: config.clone(),
 404            };
 405
 406            on_initialized(&mut session, cx.clone()).await;
 407
 408            Ok(session)
 409        })
 410    }
 411
 412    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 413        let tasks: Vec<_> = paths
 414            .into_iter()
 415            .map(|path| {
 416                self.request(
 417                    dap_command::SetBreakpoints {
 418                        source: client_source(path),
 419                        source_modified: None,
 420                        breakpoints: vec![],
 421                    },
 422                    cx.background_executor().clone(),
 423                )
 424            })
 425            .collect();
 426
 427        cx.background_spawn(async move {
 428            futures::future::join_all(tasks)
 429                .await
 430                .iter()
 431                .for_each(|res| match res {
 432                    Ok(_) => {}
 433                    Err(err) => {
 434                        log::warn!("Set breakpoints request failed: {}", err);
 435                    }
 436                });
 437        })
 438    }
 439
 440    fn send_breakpoints_from_path(
 441        &self,
 442        abs_path: Arc<Path>,
 443        reason: BreakpointUpdatedReason,
 444        cx: &mut App,
 445    ) -> Task<()> {
 446        let breakpoints = self
 447            .breakpoint_store
 448            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 449            .into_iter()
 450            .filter(|bp| bp.state.is_enabled())
 451            .chain(self.tmp_breakpoint.clone())
 452            .map(Into::into)
 453            .collect();
 454
 455        let task = self.request(
 456            dap_command::SetBreakpoints {
 457                source: client_source(&abs_path),
 458                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 459                breakpoints,
 460            },
 461            cx.background_executor().clone(),
 462        );
 463
 464        cx.background_spawn(async move {
 465            match task.await {
 466                Ok(_) => {}
 467                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 468            }
 469        })
 470    }
 471
 472    fn send_exception_breakpoints(
 473        &self,
 474        filters: Vec<ExceptionBreakpointsFilter>,
 475        supports_filter_options: bool,
 476        cx: &App,
 477    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 478        let arg = if supports_filter_options {
 479            SetExceptionBreakpoints::WithOptions {
 480                filters: filters
 481                    .into_iter()
 482                    .map(|filter| ExceptionFilterOptions {
 483                        filter_id: filter.filter,
 484                        condition: None,
 485                        mode: None,
 486                    })
 487                    .collect(),
 488            }
 489        } else {
 490            SetExceptionBreakpoints::Plain {
 491                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 492            }
 493        };
 494        self.request(arg, cx.background_executor().clone())
 495    }
 496    fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 497        let mut breakpoint_tasks = Vec::new();
 498        let breakpoints = self
 499            .breakpoint_store
 500            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 501
 502        for (path, breakpoints) in breakpoints {
 503            let breakpoints = if ignore_breakpoints {
 504                vec![]
 505            } else {
 506                breakpoints
 507                    .into_iter()
 508                    .filter(|bp| bp.state.is_enabled())
 509                    .map(Into::into)
 510                    .collect()
 511            };
 512
 513            breakpoint_tasks.push(self.request(
 514                dap_command::SetBreakpoints {
 515                    source: client_source(&path),
 516                    source_modified: Some(false),
 517                    breakpoints,
 518                },
 519                cx.background_executor().clone(),
 520            ));
 521        }
 522
 523        cx.background_spawn(async move {
 524            futures::future::join_all(breakpoint_tasks)
 525                .await
 526                .iter()
 527                .for_each(|res| match res {
 528                    Ok(_) => {}
 529                    Err(err) => {
 530                        log::warn!("Set breakpoints request failed: {}", err);
 531                    }
 532                });
 533        })
 534    }
 535
 536    async fn get_adapter_binary(
 537        registry: &Arc<DapRegistry>,
 538        config: &DebugAdapterConfig,
 539        delegate: &DapAdapterDelegate,
 540        cx: &mut AsyncApp,
 541    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 542        let adapter = registry
 543            .adapter(&config.adapter)
 544            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 545
 546        let binary = cx.update(|cx| {
 547            ProjectSettings::get_global(cx)
 548                .dap
 549                .get(&adapter.name())
 550                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 551        })?;
 552
 553        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 554            Err(error) => {
 555                delegate.update_status(
 556                    adapter.name(),
 557                    DapStatus::Failed {
 558                        error: error.to_string(),
 559                    },
 560                );
 561
 562                return Err(error);
 563            }
 564            Ok(mut binary) => {
 565                delegate.update_status(adapter.name(), DapStatus::None);
 566
 567                let shell_env = delegate.shell_env().await;
 568                let mut envs = binary.envs.unwrap_or_default();
 569                envs.extend(shell_env);
 570                binary.envs = Some(envs);
 571
 572                binary
 573            }
 574        };
 575
 576        Ok((adapter, binary))
 577    }
 578
 579    pub fn label(&self) -> String {
 580        self.config.label.clone()
 581    }
 582
 583    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 584        let adapter_id = self.adapter.name().to_string();
 585
 586        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 587    }
 588
 589    fn initialize_sequence(
 590        &self,
 591        capabilities: &Capabilities,
 592        initialized_rx: oneshot::Receiver<()>,
 593        cx: &App,
 594    ) -> Task<Result<()>> {
 595        let (mut raw, is_launch) = match &self.config.request {
 596            task::DebugRequestDisposition::UserConfigured(_) => {
 597                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 598                    debug_assert!(false, "This part of code should be unreachable in practice");
 599                    return Task::ready(Err(anyhow!(
 600                        "Expected debug config conversion to succeed"
 601                    )));
 602                };
 603                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 604                let raw = self.adapter.request_args(&raw);
 605                (raw, is_launch)
 606            }
 607            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 608                start_debugging_request_arguments.configuration.clone(),
 609                matches!(
 610                    start_debugging_request_arguments.request,
 611                    dap::StartDebuggingRequestArgumentsRequest::Launch
 612                ),
 613            ),
 614        };
 615
 616        merge_json_value_into(
 617            self.config.initialize_args.clone().unwrap_or(json!({})),
 618            &mut raw,
 619        );
 620        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 621        let launch = if is_launch {
 622            self.request(Launch { raw }, cx.background_executor().clone())
 623        } else {
 624            self.request(Attach { raw }, cx.background_executor().clone())
 625        };
 626
 627        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 628        let exception_filters = capabilities
 629            .exception_breakpoint_filters
 630            .as_ref()
 631            .map(|exception_filters| {
 632                exception_filters
 633                    .iter()
 634                    .filter(|filter| filter.default == Some(true))
 635                    .cloned()
 636                    .collect::<Vec<_>>()
 637            })
 638            .unwrap_or_default();
 639        let supports_exception_filters = capabilities
 640            .supports_exception_filter_options
 641            .unwrap_or_default();
 642        let configuration_sequence = cx.spawn({
 643            let this = self.clone();
 644            async move |cx| {
 645                initialized_rx.await?;
 646                // todo(debugger) figure out if we want to handle a breakpoint response error
 647                // This will probably consist of letting a user know that breakpoints failed to be set
 648                cx.update(|cx| this.send_source_breakpoints(false, cx))?
 649                    .await;
 650                cx.update(|cx| {
 651                    this.send_exception_breakpoints(
 652                        exception_filters,
 653                        supports_exception_filters,
 654                        cx,
 655                    )
 656                })?
 657                .await
 658                .ok();
 659                if configuration_done_supported {
 660                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 661                } else {
 662                    Task::ready(Ok(()))
 663                }
 664                .await
 665            }
 666        });
 667
 668        cx.background_spawn(async move {
 669            futures::future::try_join(launch, configuration_sequence).await?;
 670            Ok(())
 671        })
 672    }
 673
 674    fn request<R: LocalDapCommand>(
 675        &self,
 676        request: R,
 677        executor: BackgroundExecutor,
 678    ) -> Task<Result<R::Response>>
 679    where
 680        <R::DapRequest as dap::requests::Request>::Response: 'static,
 681        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 682    {
 683        let request = Arc::new(request);
 684
 685        let request_clone = request.clone();
 686        let connection = self.client.clone();
 687        let request_task = executor.spawn(async move {
 688            let args = request_clone.to_dap();
 689            connection.request::<R::DapRequest>(args).await
 690        });
 691
 692        executor.spawn(async move {
 693            let response = request.response_from_dap(request_task.await?);
 694            response
 695        })
 696    }
 697}
 698impl From<RemoteConnection> for Mode {
 699    fn from(value: RemoteConnection) -> Self {
 700        Self::Remote(value)
 701    }
 702}
 703
 704impl Mode {
 705    fn request_dap<R: DapCommand>(
 706        &self,
 707        session_id: SessionId,
 708        request: R,
 709        cx: &mut Context<Session>,
 710    ) -> Task<Result<R::Response>>
 711    where
 712        <R::DapRequest as dap::requests::Request>::Response: 'static,
 713        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 714    {
 715        match self {
 716            Mode::Local(debug_adapter_client) => {
 717                debug_adapter_client.request(request, cx.background_executor().clone())
 718            }
 719            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 720        }
 721    }
 722}
 723
 724#[derive(Default)]
 725struct ThreadStates {
 726    global_state: Option<ThreadStatus>,
 727    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 728}
 729
 730impl ThreadStates {
 731    fn stop_all_threads(&mut self) {
 732        self.global_state = Some(ThreadStatus::Stopped);
 733        self.known_thread_states.clear();
 734    }
 735
 736    fn exit_all_threads(&mut self) {
 737        self.global_state = Some(ThreadStatus::Exited);
 738        self.known_thread_states.clear();
 739    }
 740
 741    fn continue_all_threads(&mut self) {
 742        self.global_state = Some(ThreadStatus::Running);
 743        self.known_thread_states.clear();
 744    }
 745
 746    fn stop_thread(&mut self, thread_id: ThreadId) {
 747        self.known_thread_states
 748            .insert(thread_id, ThreadStatus::Stopped);
 749    }
 750
 751    fn continue_thread(&mut self, thread_id: ThreadId) {
 752        self.known_thread_states
 753            .insert(thread_id, ThreadStatus::Running);
 754    }
 755
 756    fn process_step(&mut self, thread_id: ThreadId) {
 757        self.known_thread_states
 758            .insert(thread_id, ThreadStatus::Stepping);
 759    }
 760
 761    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 762        self.thread_state(thread_id)
 763            .unwrap_or(ThreadStatus::Running)
 764    }
 765
 766    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 767        self.known_thread_states
 768            .get(&thread_id)
 769            .copied()
 770            .or(self.global_state)
 771    }
 772
 773    fn exit_thread(&mut self, thread_id: ThreadId) {
 774        self.known_thread_states
 775            .insert(thread_id, ThreadStatus::Exited);
 776    }
 777
 778    fn any_stopped_thread(&self) -> bool {
 779        self.global_state
 780            .is_some_and(|state| state == ThreadStatus::Stopped)
 781            || self
 782                .known_thread_states
 783                .values()
 784                .any(|status| *status == ThreadStatus::Stopped)
 785    }
 786}
 787const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 788
 789type IsEnabled = bool;
 790
 791#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 792pub struct OutputToken(pub usize);
 793/// Represents a current state of a single debug adapter and provides ways to mutate it.
 794pub struct Session {
 795    mode: Mode,
 796    pub(super) capabilities: Capabilities,
 797    id: SessionId,
 798    child_session_ids: HashSet<SessionId>,
 799    parent_id: Option<SessionId>,
 800    ignore_breakpoints: bool,
 801    modules: Vec<dap::Module>,
 802    loaded_sources: Vec<dap::Source>,
 803    output_token: OutputToken,
 804    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 805    threads: IndexMap<ThreadId, Thread>,
 806    thread_states: ThreadStates,
 807    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 808    stack_frames: IndexMap<StackFrameId, StackFrame>,
 809    locations: HashMap<u64, dap::LocationsResponse>,
 810    is_session_terminated: bool,
 811    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 812    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 813    _background_tasks: Vec<Task<()>>,
 814}
 815
 816trait CacheableCommand: Any + Send + Sync {
 817    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 818    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 819    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 820}
 821
 822impl<T> CacheableCommand for T
 823where
 824    T: DapCommand + PartialEq + Eq + Hash,
 825{
 826    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 827        (rhs as &dyn Any)
 828            .downcast_ref::<Self>()
 829            .map_or(false, |rhs| self == rhs)
 830    }
 831
 832    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 833        T::hash(self, &mut hasher);
 834    }
 835
 836    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 837        self
 838    }
 839}
 840
 841pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 842
 843impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 844    fn from(request: T) -> Self {
 845        Self(Arc::new(request))
 846    }
 847}
 848
 849impl PartialEq for RequestSlot {
 850    fn eq(&self, other: &Self) -> bool {
 851        self.0.dyn_eq(other.0.as_ref())
 852    }
 853}
 854
 855impl Eq for RequestSlot {}
 856
 857impl Hash for RequestSlot {
 858    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 859        self.0.dyn_hash(state);
 860        (&*self.0 as &dyn Any).type_id().hash(state)
 861    }
 862}
 863
 864#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 865pub struct CompletionsQuery {
 866    pub query: String,
 867    pub column: u64,
 868    pub line: Option<u64>,
 869    pub frame_id: Option<u64>,
 870}
 871
 872impl CompletionsQuery {
 873    pub fn new(
 874        buffer: &language::Buffer,
 875        cursor_position: language::Anchor,
 876        frame_id: Option<u64>,
 877    ) -> Self {
 878        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 879        Self {
 880            query: buffer.text(),
 881            column: column as u64,
 882            frame_id,
 883            line: Some(row as u64),
 884        }
 885    }
 886}
 887
 888pub enum SessionEvent {
 889    Modules,
 890    LoadedSources,
 891    Stopped(Option<ThreadId>),
 892    StackTrace,
 893    Variables,
 894    Threads,
 895}
 896
 897pub(crate) enum SessionStateEvent {
 898    Shutdown,
 899}
 900
 901impl EventEmitter<SessionEvent> for Session {}
 902impl EventEmitter<SessionStateEvent> for Session {}
 903
 904// local session will send breakpoint updates to DAP for all new breakpoints
 905// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 906// BreakpointStore notifies session on breakpoint changes
 907impl Session {
 908    pub(crate) fn local(
 909        breakpoint_store: Entity<BreakpointStore>,
 910        session_id: SessionId,
 911        parent_session: Option<Entity<Session>>,
 912        delegate: DapAdapterDelegate,
 913        config: DebugAdapterConfig,
 914        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 915        initialized_tx: oneshot::Sender<()>,
 916        debug_adapters: Arc<DapRegistry>,
 917        cx: &mut App,
 918    ) -> Task<Result<Entity<Self>>> {
 919        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 920
 921        cx.spawn(async move |cx| {
 922            let mode = LocalMode::new(
 923                debug_adapters,
 924                session_id,
 925                parent_session.clone(),
 926                breakpoint_store.clone(),
 927                config.clone(),
 928                delegate,
 929                message_tx,
 930                cx.clone(),
 931            )
 932            .await?;
 933
 934            cx.new(|cx| {
 935                create_local_session(
 936                    breakpoint_store,
 937                    session_id,
 938                    parent_session,
 939                    start_debugging_requests_tx,
 940                    initialized_tx,
 941                    message_rx,
 942                    mode,
 943                    cx,
 944                )
 945            })
 946        })
 947    }
 948
 949    #[cfg(any(test, feature = "test-support"))]
 950    pub(crate) fn fake(
 951        breakpoint_store: Entity<BreakpointStore>,
 952        session_id: SessionId,
 953        parent_session: Option<Entity<Session>>,
 954        delegate: DapAdapterDelegate,
 955        config: DebugAdapterConfig,
 956        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 957        initialized_tx: oneshot::Sender<()>,
 958        caps: Capabilities,
 959        fails: bool,
 960        cx: &mut App,
 961    ) -> Task<Result<Entity<Session>>> {
 962        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 963
 964        cx.spawn(async move |cx| {
 965            let mode = LocalMode::new_fake(
 966                session_id,
 967                parent_session.clone(),
 968                breakpoint_store.clone(),
 969                config.clone(),
 970                delegate,
 971                message_tx,
 972                caps,
 973                fails,
 974                cx.clone(),
 975            )
 976            .await?;
 977
 978            cx.new(|cx| {
 979                create_local_session(
 980                    breakpoint_store,
 981                    session_id,
 982                    parent_session,
 983                    start_debugging_requests_tx,
 984                    initialized_tx,
 985                    message_rx,
 986                    mode,
 987                    cx,
 988                )
 989            })
 990        })
 991    }
 992
 993    pub(crate) fn remote(
 994        session_id: SessionId,
 995        client: AnyProtoClient,
 996        upstream_project_id: u64,
 997        ignore_breakpoints: bool,
 998    ) -> Self {
 999        Self {
1000            mode: Mode::Remote(RemoteConnection {
1001                _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
1002                _client: client,
1003                _upstream_project_id: upstream_project_id,
1004            }),
1005            id: session_id,
1006            child_session_ids: HashSet::default(),
1007            parent_id: None,
1008            capabilities: Capabilities::default(),
1009            ignore_breakpoints,
1010            variables: Default::default(),
1011            stack_frames: Default::default(),
1012            thread_states: ThreadStates::default(),
1013            output_token: OutputToken(0),
1014            output: circular_buffer::CircularBuffer::boxed(),
1015            requests: HashMap::default(),
1016            modules: Vec::default(),
1017            loaded_sources: Vec::default(),
1018            threads: IndexMap::default(),
1019            _background_tasks: Vec::default(),
1020            locations: Default::default(),
1021            is_session_terminated: false,
1022            exception_breakpoints: Default::default(),
1023        }
1024    }
1025
1026    pub fn session_id(&self) -> SessionId {
1027        self.id
1028    }
1029
1030    pub fn child_session_ids(&self) -> HashSet<SessionId> {
1031        self.child_session_ids.clone()
1032    }
1033
1034    pub fn add_child_session_id(&mut self, session_id: SessionId) {
1035        self.child_session_ids.insert(session_id);
1036    }
1037
1038    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1039        self.child_session_ids.remove(&session_id);
1040    }
1041
1042    pub fn parent_id(&self) -> Option<SessionId> {
1043        self.parent_id
1044    }
1045
1046    pub fn capabilities(&self) -> &Capabilities {
1047        &self.capabilities
1048    }
1049
1050    pub fn adapter_name(&self) -> SharedString {
1051        match &self.mode {
1052            Mode::Local(local_mode) => local_mode.adapter.name().into(),
1053            Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
1054        }
1055    }
1056
1057    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
1058        if let Mode::Local(local_mode) = &self.mode {
1059            Some(local_mode.config.clone())
1060        } else {
1061            None
1062        }
1063    }
1064
1065    pub fn is_terminated(&self) -> bool {
1066        self.is_session_terminated
1067    }
1068
1069    pub fn is_local(&self) -> bool {
1070        matches!(self.mode, Mode::Local(_))
1071    }
1072
1073    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1074        match &mut self.mode {
1075            Mode::Local(local_mode) => Some(local_mode),
1076            Mode::Remote(_) => None,
1077        }
1078    }
1079
1080    pub fn as_local(&self) -> Option<&LocalMode> {
1081        match &self.mode {
1082            Mode::Local(local_mode) => Some(local_mode),
1083            Mode::Remote(_) => None,
1084        }
1085    }
1086
1087    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1088        match &self.mode {
1089            Mode::Local(local_mode) => {
1090                let capabilities = local_mode.clone().request_initialization(cx);
1091
1092                cx.spawn(async move |this, cx| {
1093                    let capabilities = capabilities.await?;
1094                    this.update(cx, |session, _| {
1095                        session.capabilities = capabilities;
1096                        let filters = session
1097                            .capabilities
1098                            .exception_breakpoint_filters
1099                            .clone()
1100                            .unwrap_or_default();
1101                        for filter in filters {
1102                            let default = filter.default.unwrap_or_default();
1103                            session
1104                                .exception_breakpoints
1105                                .entry(filter.filter.clone())
1106                                .or_insert_with(|| (filter, default));
1107                        }
1108                    })?;
1109                    Ok(())
1110                })
1111            }
1112            Mode::Remote(_) => Task::ready(Err(anyhow!(
1113                "Cannot send initialize request from remote session"
1114            ))),
1115        }
1116    }
1117
1118    pub(super) fn initialize_sequence(
1119        &mut self,
1120        initialize_rx: oneshot::Receiver<()>,
1121        cx: &mut Context<Self>,
1122    ) -> Task<Result<()>> {
1123        match &self.mode {
1124            Mode::Local(local_mode) => {
1125                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1126            }
1127            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1128        }
1129    }
1130
1131    pub fn run_to_position(
1132        &mut self,
1133        breakpoint: SourceBreakpoint,
1134        active_thread_id: ThreadId,
1135        cx: &mut Context<Self>,
1136    ) {
1137        match &mut self.mode {
1138            Mode::Local(local_mode) => {
1139                if !matches!(
1140                    self.thread_states.thread_state(active_thread_id),
1141                    Some(ThreadStatus::Stopped)
1142                ) {
1143                    return;
1144                };
1145                let path = breakpoint.path.clone();
1146                local_mode.tmp_breakpoint = Some(breakpoint);
1147                let task = local_mode.send_breakpoints_from_path(
1148                    path,
1149                    BreakpointUpdatedReason::Toggled,
1150                    cx,
1151                );
1152
1153                cx.spawn(async move |this, cx| {
1154                    task.await;
1155                    this.update(cx, |this, cx| {
1156                        this.continue_thread(active_thread_id, cx);
1157                    })
1158                })
1159                .detach();
1160            }
1161            Mode::Remote(_) => {}
1162        }
1163    }
1164
1165    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1166        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1167    }
1168
1169    pub fn output(
1170        &self,
1171        since: OutputToken,
1172    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1173        if self.output_token.0 == 0 {
1174            return (self.output.range(0..0), OutputToken(0));
1175        };
1176
1177        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1178
1179        let clamped_events_since = events_since.clamp(0, self.output.len());
1180        (
1181            self.output
1182                .range(self.output.len() - clamped_events_since..),
1183            self.output_token,
1184        )
1185    }
1186
1187    pub fn respond_to_client(
1188        &self,
1189        request_seq: u64,
1190        success: bool,
1191        command: String,
1192        body: Option<serde_json::Value>,
1193        cx: &mut Context<Self>,
1194    ) -> Task<Result<()>> {
1195        let Some(local_session) = self.as_local().cloned() else {
1196            unreachable!("Cannot respond to remote client");
1197        };
1198
1199        cx.background_spawn(async move {
1200            local_session
1201                .client
1202                .send_message(Message::Response(Response {
1203                    body,
1204                    success,
1205                    command,
1206                    seq: request_seq + 1,
1207                    request_seq,
1208                    message: None,
1209                }))
1210                .await
1211        })
1212    }
1213
1214    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1215        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1216            let breakpoint = local.tmp_breakpoint.take()?;
1217            let path = breakpoint.path.clone();
1218            Some((local, path))
1219        }) {
1220            local
1221                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1222                .detach();
1223        };
1224
1225        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1226            self.thread_states.stop_all_threads();
1227
1228            self.invalidate_command_type::<StackTraceCommand>();
1229        }
1230
1231        // Event if we stopped all threads we still need to insert the thread_id
1232        // to our own data
1233        if let Some(thread_id) = event.thread_id {
1234            self.thread_states.stop_thread(ThreadId(thread_id));
1235
1236            self.invalidate_state(
1237                &StackTraceCommand {
1238                    thread_id,
1239                    start_frame: None,
1240                    levels: None,
1241                }
1242                .into(),
1243            );
1244        }
1245
1246        self.invalidate_generic();
1247        self.threads.clear();
1248        self.variables.clear();
1249        cx.emit(SessionEvent::Stopped(
1250            event
1251                .thread_id
1252                .map(Into::into)
1253                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1254        ));
1255        cx.notify();
1256    }
1257
1258    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1259        match *event {
1260            Events::Initialized(_) => {
1261                debug_assert!(
1262                    false,
1263                    "Initialized event should have been handled in LocalMode"
1264                );
1265            }
1266            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1267            Events::Continued(event) => {
1268                if event.all_threads_continued.unwrap_or_default() {
1269                    self.thread_states.continue_all_threads();
1270                } else {
1271                    self.thread_states
1272                        .continue_thread(ThreadId(event.thread_id));
1273                }
1274                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1275                self.invalidate_generic();
1276            }
1277            Events::Exited(_event) => {
1278                self.clear_active_debug_line(cx);
1279            }
1280            Events::Terminated(_) => {
1281                self.is_session_terminated = true;
1282                self.clear_active_debug_line(cx);
1283            }
1284            Events::Thread(event) => {
1285                let thread_id = ThreadId(event.thread_id);
1286
1287                match event.reason {
1288                    dap::ThreadEventReason::Started => {
1289                        self.thread_states.continue_thread(thread_id);
1290                    }
1291                    dap::ThreadEventReason::Exited => {
1292                        self.thread_states.exit_thread(thread_id);
1293                    }
1294                    reason => {
1295                        log::error!("Unhandled thread event reason {:?}", reason);
1296                    }
1297                }
1298                self.invalidate_state(&ThreadsCommand.into());
1299                cx.notify();
1300            }
1301            Events::Output(event) => {
1302                if event
1303                    .category
1304                    .as_ref()
1305                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1306                {
1307                    return;
1308                }
1309
1310                self.output.push_back(event);
1311                self.output_token.0 += 1;
1312                cx.notify();
1313            }
1314            Events::Breakpoint(_) => {}
1315            Events::Module(event) => {
1316                match event.reason {
1317                    dap::ModuleEventReason::New => {
1318                        self.modules.push(event.module);
1319                    }
1320                    dap::ModuleEventReason::Changed => {
1321                        if let Some(module) = self
1322                            .modules
1323                            .iter_mut()
1324                            .find(|other| event.module.id == other.id)
1325                        {
1326                            *module = event.module;
1327                        }
1328                    }
1329                    dap::ModuleEventReason::Removed => {
1330                        self.modules.retain(|other| event.module.id != other.id);
1331                    }
1332                }
1333
1334                // todo(debugger): We should only send the invalidate command to downstream clients.
1335                // self.invalidate_state(&ModulesCommand.into());
1336            }
1337            Events::LoadedSource(_) => {
1338                self.invalidate_state(&LoadedSourcesCommand.into());
1339            }
1340            Events::Capabilities(event) => {
1341                self.capabilities = self.capabilities.merge(event.capabilities);
1342                cx.notify();
1343            }
1344            Events::Memory(_) => {}
1345            Events::Process(_) => {}
1346            Events::ProgressEnd(_) => {}
1347            Events::ProgressStart(_) => {}
1348            Events::ProgressUpdate(_) => {}
1349            Events::Invalidated(_) => {}
1350            Events::Other(_) => {}
1351        }
1352    }
1353
1354    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1355    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1356        &mut self,
1357        request: T,
1358        process_result: impl FnOnce(
1359            &mut Self,
1360            Result<T::Response>,
1361            &mut Context<Self>,
1362        ) -> Option<T::Response>
1363        + 'static,
1364        cx: &mut Context<Self>,
1365    ) {
1366        const {
1367            assert!(
1368                T::CACHEABLE,
1369                "Only requests marked as cacheable should invoke `fetch`"
1370            );
1371        }
1372
1373        if !self.thread_states.any_stopped_thread()
1374            && request.type_id() != TypeId::of::<ThreadsCommand>()
1375            || self.is_session_terminated
1376        {
1377            return;
1378        }
1379
1380        let request_map = self
1381            .requests
1382            .entry(std::any::TypeId::of::<T>())
1383            .or_default();
1384
1385        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1386            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1387
1388            let task = Self::request_inner::<Arc<T>>(
1389                &self.capabilities,
1390                self.id,
1391                &self.mode,
1392                command,
1393                process_result,
1394                cx,
1395            );
1396            let task = cx
1397                .background_executor()
1398                .spawn(async move {
1399                    let _ = task.await?;
1400                    Some(())
1401                })
1402                .shared();
1403
1404            vacant.insert(task);
1405            cx.notify();
1406        }
1407    }
1408
1409    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1410        capabilities: &Capabilities,
1411        session_id: SessionId,
1412        mode: &Mode,
1413        request: T,
1414        process_result: impl FnOnce(
1415            &mut Self,
1416            Result<T::Response>,
1417            &mut Context<Self>,
1418        ) -> Option<T::Response>
1419        + 'static,
1420        cx: &mut Context<Self>,
1421    ) -> Task<Option<T::Response>> {
1422        if !T::is_supported(&capabilities) {
1423            log::warn!(
1424                "Attempted to send a DAP request that isn't supported: {:?}",
1425                request
1426            );
1427            let error = Err(anyhow::Error::msg(
1428                "Couldn't complete request because it's not supported",
1429            ));
1430            return cx.spawn(async move |this, cx| {
1431                this.update(cx, |this, cx| process_result(this, error, cx))
1432                    .log_err()
1433                    .flatten()
1434            });
1435        }
1436
1437        let request = mode.request_dap(session_id, request, cx);
1438        cx.spawn(async move |this, cx| {
1439            let result = request.await;
1440            this.update(cx, |this, cx| process_result(this, result, cx))
1441                .log_err()
1442                .flatten()
1443        })
1444    }
1445
1446    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1447        &self,
1448        request: T,
1449        process_result: impl FnOnce(
1450            &mut Self,
1451            Result<T::Response>,
1452            &mut Context<Self>,
1453        ) -> Option<T::Response>
1454        + 'static,
1455        cx: &mut Context<Self>,
1456    ) -> Task<Option<T::Response>> {
1457        Self::request_inner(
1458            &self.capabilities,
1459            self.id,
1460            &self.mode,
1461            request,
1462            process_result,
1463            cx,
1464        )
1465    }
1466
1467    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1468        self.requests.remove(&std::any::TypeId::of::<Command>());
1469    }
1470
1471    fn invalidate_generic(&mut self) {
1472        self.invalidate_command_type::<ModulesCommand>();
1473        self.invalidate_command_type::<LoadedSourcesCommand>();
1474        self.invalidate_command_type::<ThreadsCommand>();
1475    }
1476
1477    fn invalidate_state(&mut self, key: &RequestSlot) {
1478        self.requests
1479            .entry((&*key.0 as &dyn Any).type_id())
1480            .and_modify(|request_map| {
1481                request_map.remove(&key);
1482            });
1483    }
1484
1485    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1486        self.thread_states.thread_status(thread_id)
1487    }
1488
1489    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1490        self.fetch(
1491            dap_command::ThreadsCommand,
1492            |this, result, cx| {
1493                let result = result.log_err()?;
1494
1495                this.threads = result
1496                    .iter()
1497                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1498                    .collect();
1499
1500                this.invalidate_command_type::<StackTraceCommand>();
1501                cx.emit(SessionEvent::Threads);
1502                cx.notify();
1503
1504                Some(result)
1505            },
1506            cx,
1507        );
1508
1509        self.threads
1510            .values()
1511            .map(|thread| {
1512                (
1513                    thread.dap.clone(),
1514                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1515                )
1516            })
1517            .collect()
1518    }
1519
1520    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1521        self.fetch(
1522            dap_command::ModulesCommand,
1523            |this, result, cx| {
1524                let result = result.log_err()?;
1525
1526                this.modules = result.iter().cloned().collect();
1527                cx.emit(SessionEvent::Modules);
1528                cx.notify();
1529
1530                Some(result)
1531            },
1532            cx,
1533        );
1534
1535        &self.modules
1536    }
1537
1538    pub fn ignore_breakpoints(&self) -> bool {
1539        self.ignore_breakpoints
1540    }
1541
1542    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1543        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1544    }
1545
1546    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1547        if self.ignore_breakpoints == ignore {
1548            return Task::ready(());
1549        }
1550
1551        self.ignore_breakpoints = ignore;
1552
1553        if let Some(local) = self.as_local() {
1554            local.send_source_breakpoints(ignore, cx)
1555        } else {
1556            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1557            unimplemented!()
1558        }
1559    }
1560
1561    pub fn exception_breakpoints(
1562        &self,
1563    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1564        self.exception_breakpoints.values()
1565    }
1566
1567    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1568        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1569            *is_enabled = !*is_enabled;
1570            self.send_exception_breakpoints(cx);
1571        }
1572    }
1573
1574    fn send_exception_breakpoints(&mut self, cx: &App) {
1575        if let Some(local) = self.as_local() {
1576            let exception_filters = self
1577                .exception_breakpoints
1578                .values()
1579                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1580                .collect();
1581
1582            let supports_exception_filters = self
1583                .capabilities
1584                .supports_exception_filter_options
1585                .unwrap_or_default();
1586            local
1587                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1588                .detach_and_log_err(cx);
1589        } else {
1590            debug_assert!(false, "Not implemented");
1591        }
1592    }
1593
1594    pub fn breakpoints_enabled(&self) -> bool {
1595        self.ignore_breakpoints
1596    }
1597
1598    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1599        self.fetch(
1600            dap_command::LoadedSourcesCommand,
1601            |this, result, cx| {
1602                let result = result.log_err()?;
1603                this.loaded_sources = result.iter().cloned().collect();
1604                cx.emit(SessionEvent::LoadedSources);
1605                cx.notify();
1606                Some(result)
1607            },
1608            cx,
1609        );
1610
1611        &self.loaded_sources
1612    }
1613
1614    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1615        res.log_err()?;
1616        Some(())
1617    }
1618
1619    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1620        thread_id: ThreadId,
1621    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1622    {
1623        move |this, response, cx| match response.log_err() {
1624            Some(response) => Some(response),
1625            None => {
1626                this.thread_states.stop_thread(thread_id);
1627                cx.notify();
1628                None
1629            }
1630        }
1631    }
1632
1633    fn clear_active_debug_line_response(
1634        &mut self,
1635        response: Result<()>,
1636        cx: &mut Context<Session>,
1637    ) -> Option<()> {
1638        response.log_err()?;
1639        self.clear_active_debug_line(cx);
1640        Some(())
1641    }
1642
1643    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1644        self.as_local()
1645            .expect("Message handler will only run in local mode")
1646            .breakpoint_store
1647            .update(cx, |store, cx| {
1648                store.remove_active_position(Some(self.id), cx)
1649            });
1650    }
1651
1652    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1653        self.request(
1654            PauseCommand {
1655                thread_id: thread_id.0,
1656            },
1657            Self::empty_response,
1658            cx,
1659        )
1660        .detach();
1661    }
1662
1663    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1664        self.request(
1665            RestartStackFrameCommand { stack_frame_id },
1666            Self::empty_response,
1667            cx,
1668        )
1669        .detach();
1670    }
1671
1672    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1673        if self.capabilities.supports_restart_request.unwrap_or(false) {
1674            self.request(
1675                RestartCommand {
1676                    raw: args.unwrap_or(Value::Null),
1677                },
1678                Self::empty_response,
1679                cx,
1680            )
1681            .detach();
1682        } else {
1683            self.request(
1684                DisconnectCommand {
1685                    restart: Some(false),
1686                    terminate_debuggee: Some(true),
1687                    suspend_debuggee: Some(false),
1688                },
1689                Self::empty_response,
1690                cx,
1691            )
1692            .detach();
1693        }
1694    }
1695
1696    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1697        self.is_session_terminated = true;
1698        self.thread_states.exit_all_threads();
1699        cx.notify();
1700
1701        let task = if self
1702            .capabilities
1703            .supports_terminate_request
1704            .unwrap_or_default()
1705        {
1706            self.request(
1707                TerminateCommand {
1708                    restart: Some(false),
1709                },
1710                Self::clear_active_debug_line_response,
1711                cx,
1712            )
1713        } else {
1714            self.request(
1715                DisconnectCommand {
1716                    restart: Some(false),
1717                    terminate_debuggee: Some(true),
1718                    suspend_debuggee: Some(false),
1719                },
1720                Self::clear_active_debug_line_response,
1721                cx,
1722            )
1723        };
1724
1725        cx.emit(SessionStateEvent::Shutdown);
1726
1727        cx.background_spawn(async move {
1728            let _ = task.await;
1729        })
1730    }
1731
1732    pub fn completions(
1733        &mut self,
1734        query: CompletionsQuery,
1735        cx: &mut Context<Self>,
1736    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1737        let task = self.request(query, |_, result, _| result.log_err(), cx);
1738
1739        cx.background_executor().spawn(async move {
1740            anyhow::Ok(
1741                task.await
1742                    .map(|response| response.targets)
1743                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1744            )
1745        })
1746    }
1747
1748    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1749        self.thread_states.continue_thread(thread_id);
1750        self.request(
1751            ContinueCommand {
1752                args: ContinueArguments {
1753                    thread_id: thread_id.0,
1754                    single_thread: Some(true),
1755                },
1756            },
1757            Self::on_step_response::<ContinueCommand>(thread_id),
1758            cx,
1759        )
1760        .detach();
1761    }
1762
1763    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1764        match self.mode {
1765            Mode::Local(ref local) => Some(local.client.clone()),
1766            Mode::Remote(_) => None,
1767        }
1768    }
1769
1770    pub fn step_over(
1771        &mut self,
1772        thread_id: ThreadId,
1773        granularity: SteppingGranularity,
1774        cx: &mut Context<Self>,
1775    ) {
1776        let supports_single_thread_execution_requests =
1777            self.capabilities.supports_single_thread_execution_requests;
1778        let supports_stepping_granularity = self
1779            .capabilities
1780            .supports_stepping_granularity
1781            .unwrap_or_default();
1782
1783        let command = NextCommand {
1784            inner: StepCommand {
1785                thread_id: thread_id.0,
1786                granularity: supports_stepping_granularity.then(|| granularity),
1787                single_thread: supports_single_thread_execution_requests,
1788            },
1789        };
1790
1791        self.thread_states.process_step(thread_id);
1792        self.request(
1793            command,
1794            Self::on_step_response::<NextCommand>(thread_id),
1795            cx,
1796        )
1797        .detach();
1798    }
1799
1800    pub fn step_in(
1801        &mut self,
1802        thread_id: ThreadId,
1803        granularity: SteppingGranularity,
1804        cx: &mut Context<Self>,
1805    ) {
1806        let supports_single_thread_execution_requests =
1807            self.capabilities.supports_single_thread_execution_requests;
1808        let supports_stepping_granularity = self
1809            .capabilities
1810            .supports_stepping_granularity
1811            .unwrap_or_default();
1812
1813        let command = StepInCommand {
1814            inner: StepCommand {
1815                thread_id: thread_id.0,
1816                granularity: supports_stepping_granularity.then(|| granularity),
1817                single_thread: supports_single_thread_execution_requests,
1818            },
1819        };
1820
1821        self.thread_states.process_step(thread_id);
1822        self.request(
1823            command,
1824            Self::on_step_response::<StepInCommand>(thread_id),
1825            cx,
1826        )
1827        .detach();
1828    }
1829
1830    pub fn step_out(
1831        &mut self,
1832        thread_id: ThreadId,
1833        granularity: SteppingGranularity,
1834        cx: &mut Context<Self>,
1835    ) {
1836        let supports_single_thread_execution_requests =
1837            self.capabilities.supports_single_thread_execution_requests;
1838        let supports_stepping_granularity = self
1839            .capabilities
1840            .supports_stepping_granularity
1841            .unwrap_or_default();
1842
1843        let command = StepOutCommand {
1844            inner: StepCommand {
1845                thread_id: thread_id.0,
1846                granularity: supports_stepping_granularity.then(|| granularity),
1847                single_thread: supports_single_thread_execution_requests,
1848            },
1849        };
1850
1851        self.thread_states.process_step(thread_id);
1852        self.request(
1853            command,
1854            Self::on_step_response::<StepOutCommand>(thread_id),
1855            cx,
1856        )
1857        .detach();
1858    }
1859
1860    pub fn step_back(
1861        &mut self,
1862        thread_id: ThreadId,
1863        granularity: SteppingGranularity,
1864        cx: &mut Context<Self>,
1865    ) {
1866        let supports_single_thread_execution_requests =
1867            self.capabilities.supports_single_thread_execution_requests;
1868        let supports_stepping_granularity = self
1869            .capabilities
1870            .supports_stepping_granularity
1871            .unwrap_or_default();
1872
1873        let command = StepBackCommand {
1874            inner: StepCommand {
1875                thread_id: thread_id.0,
1876                granularity: supports_stepping_granularity.then(|| granularity),
1877                single_thread: supports_single_thread_execution_requests,
1878            },
1879        };
1880
1881        self.thread_states.process_step(thread_id);
1882
1883        self.request(
1884            command,
1885            Self::on_step_response::<StepBackCommand>(thread_id),
1886            cx,
1887        )
1888        .detach();
1889    }
1890
1891    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1892        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1893            && self.requests.contains_key(&ThreadsCommand.type_id())
1894            && self.threads.contains_key(&thread_id)
1895        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1896        // We could still be using an old thread id and have sent a new thread's request
1897        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1898        // But it very well could cause a minor bug in the future that is hard to track down
1899        {
1900            self.fetch(
1901                super::dap_command::StackTraceCommand {
1902                    thread_id: thread_id.0,
1903                    start_frame: None,
1904                    levels: None,
1905                },
1906                move |this, stack_frames, cx| {
1907                    let stack_frames = stack_frames.log_err()?;
1908
1909                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1910                        thread.stack_frame_ids =
1911                            stack_frames.iter().map(|frame| frame.id).collect();
1912                    });
1913                    debug_assert!(
1914                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1915                        "Sent request for thread_id that doesn't exist"
1916                    );
1917
1918                    this.stack_frames.extend(
1919                        stack_frames
1920                            .iter()
1921                            .cloned()
1922                            .map(|frame| (frame.id, StackFrame::from(frame))),
1923                    );
1924
1925                    this.invalidate_command_type::<ScopesCommand>();
1926                    this.invalidate_command_type::<VariablesCommand>();
1927
1928                    cx.emit(SessionEvent::StackTrace);
1929                    cx.notify();
1930                    Some(stack_frames)
1931                },
1932                cx,
1933            );
1934        }
1935
1936        self.threads
1937            .get(&thread_id)
1938            .map(|thread| {
1939                thread
1940                    .stack_frame_ids
1941                    .iter()
1942                    .filter_map(|id| self.stack_frames.get(id))
1943                    .cloned()
1944                    .collect()
1945            })
1946            .unwrap_or_default()
1947    }
1948
1949    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1950        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1951            && self
1952                .requests
1953                .contains_key(&TypeId::of::<StackTraceCommand>())
1954        {
1955            self.fetch(
1956                ScopesCommand { stack_frame_id },
1957                move |this, scopes, cx| {
1958                    let scopes = scopes.log_err()?;
1959
1960                    for scope in scopes .iter(){
1961                        this.variables(scope.variables_reference, cx);
1962                    }
1963
1964                    let entry = this
1965                        .stack_frames
1966                        .entry(stack_frame_id)
1967                        .and_modify(|stack_frame| {
1968                            stack_frame.scopes = scopes.clone();
1969                        });
1970
1971                    cx.emit(SessionEvent::Variables);
1972
1973                    debug_assert!(
1974                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1975                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1976                    );
1977
1978                    Some(scopes)
1979                },
1980                cx,
1981            );
1982        }
1983
1984        self.stack_frames
1985            .get(&stack_frame_id)
1986            .map(|frame| frame.scopes.as_slice())
1987            .unwrap_or_default()
1988    }
1989
1990    pub fn variables(
1991        &mut self,
1992        variables_reference: VariableReference,
1993        cx: &mut Context<Self>,
1994    ) -> Vec<dap::Variable> {
1995        let command = VariablesCommand {
1996            variables_reference,
1997            filter: None,
1998            start: None,
1999            count: None,
2000            format: None,
2001        };
2002
2003        self.fetch(
2004            command,
2005            move |this, variables, cx| {
2006                let variables = variables.log_err()?;
2007                this.variables
2008                    .insert(variables_reference, variables.clone());
2009
2010                cx.emit(SessionEvent::Variables);
2011                Some(variables)
2012            },
2013            cx,
2014        );
2015
2016        self.variables
2017            .get(&variables_reference)
2018            .cloned()
2019            .unwrap_or_default()
2020    }
2021
2022    pub fn set_variable_value(
2023        &mut self,
2024        variables_reference: u64,
2025        name: String,
2026        value: String,
2027        cx: &mut Context<Self>,
2028    ) {
2029        if self.capabilities.supports_set_variable.unwrap_or_default() {
2030            self.request(
2031                SetVariableValueCommand {
2032                    name,
2033                    value,
2034                    variables_reference,
2035                },
2036                move |this, response, cx| {
2037                    let response = response.log_err()?;
2038                    this.invalidate_command_type::<VariablesCommand>();
2039                    cx.notify();
2040                    Some(response)
2041                },
2042                cx,
2043            )
2044            .detach()
2045        }
2046    }
2047
2048    pub fn evaluate(
2049        &mut self,
2050        expression: String,
2051        context: Option<EvaluateArgumentsContext>,
2052        frame_id: Option<u64>,
2053        source: Option<Source>,
2054        cx: &mut Context<Self>,
2055    ) {
2056        self.request(
2057            EvaluateCommand {
2058                expression,
2059                context,
2060                frame_id,
2061                source,
2062            },
2063            |this, response, cx| {
2064                let response = response.log_err()?;
2065                this.output_token.0 += 1;
2066                this.output.push_back(dap::OutputEvent {
2067                    category: None,
2068                    output: response.result.clone(),
2069                    group: None,
2070                    variables_reference: Some(response.variables_reference),
2071                    source: None,
2072                    line: None,
2073                    column: None,
2074                    data: None,
2075                    location_reference: None,
2076                });
2077
2078                this.invalidate_command_type::<ScopesCommand>();
2079                cx.notify();
2080                Some(response)
2081            },
2082            cx,
2083        )
2084        .detach();
2085    }
2086
2087    pub fn location(
2088        &mut self,
2089        reference: u64,
2090        cx: &mut Context<Self>,
2091    ) -> Option<dap::LocationsResponse> {
2092        self.fetch(
2093            LocationsCommand { reference },
2094            move |this, response, _| {
2095                let response = response.log_err()?;
2096                this.locations.insert(reference, response.clone());
2097                Some(response)
2098            },
2099            cx,
2100        );
2101        self.locations.get(&reference).cloned()
2102    }
2103    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2104        let command = DisconnectCommand {
2105            restart: Some(false),
2106            terminate_debuggee: Some(true),
2107            suspend_debuggee: Some(false),
2108        };
2109
2110        self.request(command, Self::empty_response, cx).detach()
2111    }
2112
2113    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2114        if self
2115            .capabilities
2116            .supports_terminate_threads_request
2117            .unwrap_or_default()
2118        {
2119            self.request(
2120                TerminateThreadsCommand {
2121                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2122                },
2123                Self::clear_active_debug_line_response,
2124                cx,
2125            )
2126            .detach();
2127        } else {
2128            self.shutdown(cx).detach();
2129        }
2130    }
2131}
2132
2133fn create_local_session(
2134    breakpoint_store: Entity<BreakpointStore>,
2135    session_id: SessionId,
2136    parent_session: Option<Entity<Session>>,
2137    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
2138    initialized_tx: oneshot::Sender<()>,
2139    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
2140    mode: LocalMode,
2141    cx: &mut Context<Session>,
2142) -> Session {
2143    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
2144        let mut initialized_tx = Some(initialized_tx);
2145        while let Some(message) = message_rx.next().await {
2146            if let Message::Event(event) = message {
2147                if let Events::Initialized(_) = *event {
2148                    if let Some(tx) = initialized_tx.take() {
2149                        tx.send(()).ok();
2150                    }
2151                } else {
2152                    let Ok(_) = this.update(cx, |session, cx| {
2153                        session.handle_dap_event(event, cx);
2154                    }) else {
2155                        break;
2156                    };
2157                }
2158            } else {
2159                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
2160                else {
2161                    break;
2162                };
2163            }
2164        }
2165    })];
2166
2167    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2168        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2169            if let Some(local) = (!this.ignore_breakpoints)
2170                .then(|| this.as_local_mut())
2171                .flatten()
2172            {
2173                local
2174                    .send_breakpoints_from_path(path.clone(), *reason, cx)
2175                    .detach();
2176            };
2177        }
2178        BreakpointStoreEvent::BreakpointsCleared(paths) => {
2179            if let Some(local) = (!this.ignore_breakpoints)
2180                .then(|| this.as_local_mut())
2181                .flatten()
2182            {
2183                local.unset_breakpoints_from_paths(paths, cx).detach();
2184            }
2185        }
2186        BreakpointStoreEvent::ActiveDebugLineChanged => {}
2187    })
2188    .detach();
2189
2190    Session {
2191        mode: Mode::Local(mode),
2192        id: session_id,
2193        child_session_ids: HashSet::default(),
2194        parent_id: parent_session.map(|session| session.read(cx).id),
2195        variables: Default::default(),
2196        capabilities: Capabilities::default(),
2197        thread_states: ThreadStates::default(),
2198        output_token: OutputToken(0),
2199        ignore_breakpoints: false,
2200        output: circular_buffer::CircularBuffer::boxed(),
2201        requests: HashMap::default(),
2202        modules: Vec::default(),
2203        loaded_sources: Vec::default(),
2204        threads: IndexMap::default(),
2205        stack_frames: IndexMap::default(),
2206        locations: Default::default(),
2207        exception_breakpoints: Default::default(),
2208        _background_tasks,
2209        is_session_terminated: false,
2210    }
2211}