session.rs

   1use crate::project_settings::ProjectSettings;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapAdapterDelegate;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap, IndexSet};
  17use dap::adapters::{DebugAdapter, DebugAdapterBinary};
  18use dap::messages::Response;
  19use dap::{
  20    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  21    SteppingGranularity, StoppedEvent, VariableReference,
  22    adapters::{DapDelegate, DapStatus},
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    DapRegistry, DebugRequestType, ExceptionBreakpointsFilter, ExceptionFilterOptions,
  28    OutputEventCategory,
  29};
  30use futures::channel::oneshot;
  31use futures::{FutureExt, future::Shared};
  32use gpui::{
  33    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
  34};
  35use rpc::AnyProtoClient;
  36use serde_json::{Value, json};
  37use settings::Settings;
  38use smol::stream::StreamExt;
  39use std::any::TypeId;
  40use std::collections::BTreeMap;
  41use std::path::PathBuf;
  42use std::u64;
  43use std::{
  44    any::Any,
  45    collections::hash_map::Entry,
  46    hash::{Hash, Hasher},
  47    path::Path,
  48    sync::Arc,
  49};
  50use task::{DebugAdapterConfig, DebugTaskDefinition};
  51use text::{PointUtf16, ToPointUtf16};
  52use util::{ResultExt, merge_json_value_into};
  53
  54#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  55#[repr(transparent)]
  56pub struct ThreadId(pub u64);
  57
  58impl ThreadId {
  59    pub const MIN: ThreadId = ThreadId(u64::MIN);
  60    pub const MAX: ThreadId = ThreadId(u64::MAX);
  61}
  62
  63impl From<u64> for ThreadId {
  64    fn from(id: u64) -> Self {
  65        Self(id)
  66    }
  67}
  68
  69#[derive(Clone, Debug)]
  70pub struct StackFrame {
  71    pub dap: dap::StackFrame,
  72    pub scopes: Vec<dap::Scope>,
  73}
  74
  75impl From<dap::StackFrame> for StackFrame {
  76    fn from(stack_frame: dap::StackFrame) -> Self {
  77        Self {
  78            scopes: vec![],
  79            dap: stack_frame,
  80        }
  81    }
  82}
  83
  84#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  85pub enum ThreadStatus {
  86    #[default]
  87    Running,
  88    Stopped,
  89    Stepping,
  90    Exited,
  91    Ended,
  92}
  93
  94impl ThreadStatus {
  95    pub fn label(&self) -> &'static str {
  96        match self {
  97            ThreadStatus::Running => "Running",
  98            ThreadStatus::Stopped => "Stopped",
  99            ThreadStatus::Stepping => "Stepping",
 100            ThreadStatus::Exited => "Exited",
 101            ThreadStatus::Ended => "Ended",
 102        }
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct Thread {
 108    dap: dap::Thread,
 109    stack_frame_ids: IndexSet<StackFrameId>,
 110    _has_stopped: bool,
 111}
 112
 113impl From<dap::Thread> for Thread {
 114    fn from(dap: dap::Thread) -> Self {
 115        Self {
 116            dap,
 117            stack_frame_ids: Default::default(),
 118            _has_stopped: false,
 119        }
 120    }
 121}
 122
 123type UpstreamProjectId = u64;
 124
 125struct RemoteConnection {
 126    _client: AnyProtoClient,
 127    _upstream_project_id: UpstreamProjectId,
 128}
 129
 130impl RemoteConnection {
 131    fn send_proto_client_request<R: DapCommand>(
 132        &self,
 133        _request: R,
 134        _session_id: SessionId,
 135        cx: &mut App,
 136    ) -> Task<Result<R::Response>> {
 137        // let message = request.to_proto(session_id, self.upstream_project_id);
 138        // let upstream_client = self.client.clone();
 139        cx.background_executor().spawn(async move {
 140            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 141            // let response = upstream_client.request(message).await?;
 142            // request.response_from_proto(response)
 143            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 144        })
 145    }
 146
 147    fn request<R: DapCommand>(
 148        &self,
 149        request: R,
 150        session_id: SessionId,
 151        cx: &mut App,
 152    ) -> Task<Result<R::Response>>
 153    where
 154        <R::DapRequest as dap::requests::Request>::Response: 'static,
 155        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 156    {
 157        return self.send_proto_client_request::<R>(request, session_id, cx);
 158    }
 159}
 160
 161enum Mode {
 162    Local(LocalMode),
 163    Remote(RemoteConnection),
 164}
 165
 166#[derive(Clone)]
 167pub struct LocalMode {
 168    client: Arc<DebugAdapterClient>,
 169    config: DebugAdapterConfig,
 170    adapter: Arc<dyn DebugAdapter>,
 171    breakpoint_store: Entity<BreakpointStore>,
 172    tmp_breakpoint: Option<SourceBreakpoint>,
 173}
 174
 175fn client_source(abs_path: &Path) -> dap::Source {
 176    dap::Source {
 177        name: abs_path
 178            .file_name()
 179            .map(|filename| filename.to_string_lossy().to_string()),
 180        path: Some(abs_path.to_string_lossy().to_string()),
 181        source_reference: None,
 182        presentation_hint: None,
 183        origin: None,
 184        sources: None,
 185        adapter_data: None,
 186        checksums: None,
 187    }
 188}
 189
 190impl LocalMode {
 191    fn new(
 192        debug_adapters: Arc<DapRegistry>,
 193        session_id: SessionId,
 194        parent_session: Option<Entity<Session>>,
 195        breakpoint_store: Entity<BreakpointStore>,
 196        config: DebugAdapterConfig,
 197        delegate: DapAdapterDelegate,
 198        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 199        cx: AsyncApp,
 200    ) -> Task<Result<Self>> {
 201        Self::new_inner(
 202            debug_adapters,
 203            session_id,
 204            parent_session,
 205            breakpoint_store,
 206            config,
 207            delegate,
 208            messages_tx,
 209            async |_, _| {},
 210            cx,
 211        )
 212    }
 213    #[cfg(any(test, feature = "test-support"))]
 214    fn new_fake(
 215        session_id: SessionId,
 216        parent_session: Option<Entity<Session>>,
 217        breakpoint_store: Entity<BreakpointStore>,
 218        config: DebugAdapterConfig,
 219        delegate: DapAdapterDelegate,
 220        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 221        caps: Capabilities,
 222        fail: bool,
 223        cx: AsyncApp,
 224    ) -> Task<Result<Self>> {
 225        use task::DebugRequestDisposition;
 226
 227        let request = match config.request.clone() {
 228            DebugRequestDisposition::UserConfigured(request) => request,
 229            DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
 230                match reverse_request_args.request {
 231                    dap::StartDebuggingRequestArgumentsRequest::Launch => {
 232                        DebugRequestType::Launch(task::LaunchConfig {
 233                            program: "".to_owned(),
 234                            cwd: None,
 235                            args: Default::default(),
 236                        })
 237                    }
 238                    dap::StartDebuggingRequestArgumentsRequest::Attach => {
 239                        DebugRequestType::Attach(task::AttachConfig {
 240                            process_id: Some(0),
 241                        })
 242                    }
 243                }
 244            }
 245        };
 246
 247        let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
 248            session
 249                .client
 250                .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
 251                .await;
 252
 253            let paths = cx
 254                .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
 255                .expect("Breakpoint store should exist in all tests that start debuggers");
 256
 257            session
 258                .client
 259                .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
 260                    let p = Arc::from(Path::new(&args.source.path.unwrap()));
 261                    if !paths.contains(&p) {
 262                        panic!("Sent breakpoints for path without any")
 263                    }
 264
 265                    Ok(dap::SetBreakpointsResponse {
 266                        breakpoints: Vec::default(),
 267                    })
 268                })
 269                .await;
 270
 271            match request {
 272                dap::DebugRequestType::Launch(_) => {
 273                    if fail {
 274                        session
 275                            .client
 276                            .on_request::<dap::requests::Launch, _>(move |_, _| {
 277                                Err(dap::ErrorResponse {
 278                                    error: Some(dap::Message {
 279                                        id: 1,
 280                                        format: "error".into(),
 281                                        variables: None,
 282                                        send_telemetry: None,
 283                                        show_user: None,
 284                                        url: None,
 285                                        url_label: None,
 286                                    }),
 287                                })
 288                            })
 289                            .await;
 290                    } else {
 291                        session
 292                            .client
 293                            .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
 294                            .await;
 295                    }
 296                }
 297                dap::DebugRequestType::Attach(attach_config) => {
 298                    if fail {
 299                        session
 300                            .client
 301                            .on_request::<dap::requests::Attach, _>(move |_, _| {
 302                                Err(dap::ErrorResponse {
 303                                    error: Some(dap::Message {
 304                                        id: 1,
 305                                        format: "error".into(),
 306                                        variables: None,
 307                                        send_telemetry: None,
 308                                        show_user: None,
 309                                        url: None,
 310                                        url_label: None,
 311                                    }),
 312                                })
 313                            })
 314                            .await;
 315                    } else {
 316                        session
 317                            .client
 318                            .on_request::<dap::requests::Attach, _>(move |_, args| {
 319                                assert_eq!(
 320                                    json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
 321                                    args.raw
 322                                );
 323
 324                                Ok(())
 325                            })
 326                            .await;
 327                    }
 328                }
 329            }
 330
 331            session
 332                .client
 333                .on_request::<dap::requests::SetExceptionBreakpoints, _>(move |_, _| {
 334                    Ok(dap::SetExceptionBreakpointsResponse { breakpoints: None })
 335                })
 336                .await;
 337
 338            session
 339                .client
 340                .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
 341                .await;
 342            session.client.fake_event(Events::Initialized(None)).await;
 343        };
 344        Self::new_inner(
 345            DapRegistry::fake().into(),
 346            session_id,
 347            parent_session,
 348            breakpoint_store,
 349            config,
 350            delegate,
 351            messages_tx,
 352            callback,
 353            cx,
 354        )
 355    }
 356    fn new_inner(
 357        registry: Arc<DapRegistry>,
 358        session_id: SessionId,
 359        parent_session: Option<Entity<Session>>,
 360        breakpoint_store: Entity<BreakpointStore>,
 361        config: DebugAdapterConfig,
 362        delegate: DapAdapterDelegate,
 363        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 364        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 365        cx: AsyncApp,
 366    ) -> Task<Result<Self>> {
 367        cx.spawn(async move |cx| {
 368            let (adapter, binary) =
 369                Self::get_adapter_binary(&registry, &config, &delegate, cx).await?;
 370
 371            let message_handler = Box::new(move |message| {
 372                messages_tx.unbounded_send(message).ok();
 373            });
 374
 375            let client = Arc::new(
 376                if let Some(client) = parent_session
 377                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 378                    .flatten()
 379                {
 380                    client
 381                        .reconnect(session_id, binary, message_handler, cx.clone())
 382                        .await?
 383                } else {
 384                    DebugAdapterClient::start(
 385                        session_id,
 386                        adapter.name(),
 387                        binary,
 388                        message_handler,
 389                        cx.clone(),
 390                    )
 391                    .await
 392                    .with_context(|| "Failed to start communication with debug adapter")?
 393                },
 394            );
 395
 396            let mut session = Self {
 397                client,
 398                adapter,
 399                breakpoint_store,
 400                tmp_breakpoint: None,
 401                config: config.clone(),
 402            };
 403
 404            on_initialized(&mut session, cx.clone()).await;
 405
 406            Ok(session)
 407        })
 408    }
 409
 410    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 411        let tasks: Vec<_> = paths
 412            .into_iter()
 413            .map(|path| {
 414                self.request(
 415                    dap_command::SetBreakpoints {
 416                        source: client_source(path),
 417                        source_modified: None,
 418                        breakpoints: vec![],
 419                    },
 420                    cx.background_executor().clone(),
 421                )
 422            })
 423            .collect();
 424
 425        cx.background_spawn(async move {
 426            futures::future::join_all(tasks)
 427                .await
 428                .iter()
 429                .for_each(|res| match res {
 430                    Ok(_) => {}
 431                    Err(err) => {
 432                        log::warn!("Set breakpoints request failed: {}", err);
 433                    }
 434                });
 435        })
 436    }
 437
 438    fn send_breakpoints_from_path(
 439        &self,
 440        abs_path: Arc<Path>,
 441        reason: BreakpointUpdatedReason,
 442        cx: &mut App,
 443    ) -> Task<()> {
 444        let breakpoints = self
 445            .breakpoint_store
 446            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 447            .into_iter()
 448            .filter(|bp| bp.state.is_enabled())
 449            .chain(self.tmp_breakpoint.clone())
 450            .map(Into::into)
 451            .collect();
 452
 453        let task = self.request(
 454            dap_command::SetBreakpoints {
 455                source: client_source(&abs_path),
 456                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 457                breakpoints,
 458            },
 459            cx.background_executor().clone(),
 460        );
 461
 462        cx.background_spawn(async move {
 463            match task.await {
 464                Ok(_) => {}
 465                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 466            }
 467        })
 468    }
 469
 470    fn send_exception_breakpoints(
 471        &self,
 472        filters: Vec<ExceptionBreakpointsFilter>,
 473        supports_filter_options: bool,
 474        cx: &App,
 475    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 476        let arg = if supports_filter_options {
 477            SetExceptionBreakpoints::WithOptions {
 478                filters: filters
 479                    .into_iter()
 480                    .map(|filter| ExceptionFilterOptions {
 481                        filter_id: filter.filter,
 482                        condition: None,
 483                        mode: None,
 484                    })
 485                    .collect(),
 486            }
 487        } else {
 488            SetExceptionBreakpoints::Plain {
 489                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 490            }
 491        };
 492        self.request(arg, cx.background_executor().clone())
 493    }
 494    fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 495        let mut breakpoint_tasks = Vec::new();
 496        let breakpoints = self
 497            .breakpoint_store
 498            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 499
 500        for (path, breakpoints) in breakpoints {
 501            let breakpoints = if ignore_breakpoints {
 502                vec![]
 503            } else {
 504                breakpoints
 505                    .into_iter()
 506                    .filter(|bp| bp.state.is_enabled())
 507                    .map(Into::into)
 508                    .collect()
 509            };
 510
 511            breakpoint_tasks.push(self.request(
 512                dap_command::SetBreakpoints {
 513                    source: client_source(&path),
 514                    source_modified: Some(false),
 515                    breakpoints,
 516                },
 517                cx.background_executor().clone(),
 518            ));
 519        }
 520
 521        cx.background_spawn(async move {
 522            futures::future::join_all(breakpoint_tasks)
 523                .await
 524                .iter()
 525                .for_each(|res| match res {
 526                    Ok(_) => {}
 527                    Err(err) => {
 528                        log::warn!("Set breakpoints request failed: {}", err);
 529                    }
 530                });
 531        })
 532    }
 533
 534    async fn get_adapter_binary(
 535        registry: &Arc<DapRegistry>,
 536        config: &DebugAdapterConfig,
 537        delegate: &DapAdapterDelegate,
 538        cx: &mut AsyncApp,
 539    ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
 540        let adapter = registry
 541            .adapter(&config.adapter)
 542            .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
 543
 544        let binary = cx.update(|cx| {
 545            ProjectSettings::get_global(cx)
 546                .dap
 547                .get(&adapter.name())
 548                .and_then(|s| s.binary.as_ref().map(PathBuf::from))
 549        })?;
 550
 551        let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
 552            Err(error) => {
 553                delegate.update_status(
 554                    adapter.name(),
 555                    DapStatus::Failed {
 556                        error: error.to_string(),
 557                    },
 558                );
 559
 560                return Err(error);
 561            }
 562            Ok(mut binary) => {
 563                delegate.update_status(adapter.name(), DapStatus::None);
 564
 565                let shell_env = delegate.shell_env().await;
 566                let mut envs = binary.envs.unwrap_or_default();
 567                envs.extend(shell_env);
 568                binary.envs = Some(envs);
 569
 570                binary
 571            }
 572        };
 573
 574        Ok((adapter, binary))
 575    }
 576
 577    pub fn label(&self) -> String {
 578        self.config.label.clone()
 579    }
 580
 581    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 582        let adapter_id = self.adapter.name().to_string();
 583
 584        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 585    }
 586
 587    fn initialize_sequence(
 588        &self,
 589        capabilities: &Capabilities,
 590        initialized_rx: oneshot::Receiver<()>,
 591        cx: &App,
 592    ) -> Task<Result<()>> {
 593        let (mut raw, is_launch) = match &self.config.request {
 594            task::DebugRequestDisposition::UserConfigured(_) => {
 595                let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
 596                    debug_assert!(false, "This part of code should be unreachable in practice");
 597                    return Task::ready(Err(anyhow!(
 598                        "Expected debug config conversion to succeed"
 599                    )));
 600                };
 601                let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
 602                let raw = self.adapter.request_args(&raw);
 603                (raw, is_launch)
 604            }
 605            task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
 606                start_debugging_request_arguments.configuration.clone(),
 607                matches!(
 608                    start_debugging_request_arguments.request,
 609                    dap::StartDebuggingRequestArgumentsRequest::Launch
 610                ),
 611            ),
 612        };
 613
 614        merge_json_value_into(
 615            self.config.initialize_args.clone().unwrap_or(json!({})),
 616            &mut raw,
 617        );
 618        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 619        let launch = if is_launch {
 620            self.request(Launch { raw }, cx.background_executor().clone())
 621        } else {
 622            self.request(Attach { raw }, cx.background_executor().clone())
 623        };
 624
 625        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 626        let exception_filters = capabilities
 627            .exception_breakpoint_filters
 628            .as_ref()
 629            .map(|exception_filters| {
 630                exception_filters
 631                    .iter()
 632                    .filter(|filter| filter.default == Some(true))
 633                    .cloned()
 634                    .collect::<Vec<_>>()
 635            })
 636            .unwrap_or_default();
 637        let supports_exception_filters = capabilities
 638            .supports_exception_filter_options
 639            .unwrap_or_default();
 640        let configuration_sequence = cx.spawn({
 641            let this = self.clone();
 642            async move |cx| {
 643                initialized_rx.await?;
 644                // todo(debugger) figure out if we want to handle a breakpoint response error
 645                // This will probably consist of letting a user know that breakpoints failed to be set
 646                cx.update(|cx| this.send_source_breakpoints(false, cx))?
 647                    .await;
 648                cx.update(|cx| {
 649                    this.send_exception_breakpoints(
 650                        exception_filters,
 651                        supports_exception_filters,
 652                        cx,
 653                    )
 654                })?
 655                .await
 656                .ok();
 657                if configuration_done_supported {
 658                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 659                } else {
 660                    Task::ready(Ok(()))
 661                }
 662                .await
 663            }
 664        });
 665
 666        cx.background_spawn(async move {
 667            futures::future::try_join(launch, configuration_sequence).await?;
 668            Ok(())
 669        })
 670    }
 671
 672    fn request<R: LocalDapCommand>(
 673        &self,
 674        request: R,
 675        executor: BackgroundExecutor,
 676    ) -> Task<Result<R::Response>>
 677    where
 678        <R::DapRequest as dap::requests::Request>::Response: 'static,
 679        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 680    {
 681        let request = Arc::new(request);
 682
 683        let request_clone = request.clone();
 684        let connection = self.client.clone();
 685        let request_task = executor.spawn(async move {
 686            let args = request_clone.to_dap();
 687            connection.request::<R::DapRequest>(args).await
 688        });
 689
 690        executor.spawn(async move {
 691            let response = request.response_from_dap(request_task.await?);
 692            response
 693        })
 694    }
 695}
 696impl From<RemoteConnection> for Mode {
 697    fn from(value: RemoteConnection) -> Self {
 698        Self::Remote(value)
 699    }
 700}
 701
 702impl Mode {
 703    fn request_dap<R: DapCommand>(
 704        &self,
 705        session_id: SessionId,
 706        request: R,
 707        cx: &mut Context<Session>,
 708    ) -> Task<Result<R::Response>>
 709    where
 710        <R::DapRequest as dap::requests::Request>::Response: 'static,
 711        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 712    {
 713        match self {
 714            Mode::Local(debug_adapter_client) => {
 715                debug_adapter_client.request(request, cx.background_executor().clone())
 716            }
 717            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 718        }
 719    }
 720}
 721
 722#[derive(Default)]
 723struct ThreadStates {
 724    global_state: Option<ThreadStatus>,
 725    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 726}
 727
 728impl ThreadStates {
 729    fn stop_all_threads(&mut self) {
 730        self.global_state = Some(ThreadStatus::Stopped);
 731        self.known_thread_states.clear();
 732    }
 733
 734    fn exit_all_threads(&mut self) {
 735        self.global_state = Some(ThreadStatus::Exited);
 736        self.known_thread_states.clear();
 737    }
 738
 739    fn continue_all_threads(&mut self) {
 740        self.global_state = Some(ThreadStatus::Running);
 741        self.known_thread_states.clear();
 742    }
 743
 744    fn stop_thread(&mut self, thread_id: ThreadId) {
 745        self.known_thread_states
 746            .insert(thread_id, ThreadStatus::Stopped);
 747    }
 748
 749    fn continue_thread(&mut self, thread_id: ThreadId) {
 750        self.known_thread_states
 751            .insert(thread_id, ThreadStatus::Running);
 752    }
 753
 754    fn process_step(&mut self, thread_id: ThreadId) {
 755        self.known_thread_states
 756            .insert(thread_id, ThreadStatus::Stepping);
 757    }
 758
 759    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 760        self.thread_state(thread_id)
 761            .unwrap_or(ThreadStatus::Running)
 762    }
 763
 764    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 765        self.known_thread_states
 766            .get(&thread_id)
 767            .copied()
 768            .or(self.global_state)
 769    }
 770
 771    fn exit_thread(&mut self, thread_id: ThreadId) {
 772        self.known_thread_states
 773            .insert(thread_id, ThreadStatus::Exited);
 774    }
 775
 776    fn any_stopped_thread(&self) -> bool {
 777        self.global_state
 778            .is_some_and(|state| state == ThreadStatus::Stopped)
 779            || self
 780                .known_thread_states
 781                .values()
 782                .any(|status| *status == ThreadStatus::Stopped)
 783    }
 784}
 785const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 786
 787type IsEnabled = bool;
 788
 789#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 790pub struct OutputToken(pub usize);
 791/// Represents a current state of a single debug adapter and provides ways to mutate it.
 792pub struct Session {
 793    mode: Mode,
 794    pub(super) capabilities: Capabilities,
 795    id: SessionId,
 796    child_session_ids: HashSet<SessionId>,
 797    parent_id: Option<SessionId>,
 798    ignore_breakpoints: bool,
 799    modules: Vec<dap::Module>,
 800    loaded_sources: Vec<dap::Source>,
 801    output_token: OutputToken,
 802    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 803    threads: IndexMap<ThreadId, Thread>,
 804    thread_states: ThreadStates,
 805    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 806    stack_frames: IndexMap<StackFrameId, StackFrame>,
 807    locations: HashMap<u64, dap::LocationsResponse>,
 808    is_session_terminated: bool,
 809    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 810    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 811    _background_tasks: Vec<Task<()>>,
 812}
 813
 814trait CacheableCommand: Any + Send + Sync {
 815    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 816    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 817    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 818}
 819
 820impl<T> CacheableCommand for T
 821where
 822    T: DapCommand + PartialEq + Eq + Hash,
 823{
 824    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 825        (rhs as &dyn Any)
 826            .downcast_ref::<Self>()
 827            .map_or(false, |rhs| self == rhs)
 828    }
 829
 830    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 831        T::hash(self, &mut hasher);
 832    }
 833
 834    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 835        self
 836    }
 837}
 838
 839pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 840
 841impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 842    fn from(request: T) -> Self {
 843        Self(Arc::new(request))
 844    }
 845}
 846
 847impl PartialEq for RequestSlot {
 848    fn eq(&self, other: &Self) -> bool {
 849        self.0.dyn_eq(other.0.as_ref())
 850    }
 851}
 852
 853impl Eq for RequestSlot {}
 854
 855impl Hash for RequestSlot {
 856    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 857        self.0.dyn_hash(state);
 858        (&*self.0 as &dyn Any).type_id().hash(state)
 859    }
 860}
 861
 862#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 863pub struct CompletionsQuery {
 864    pub query: String,
 865    pub column: u64,
 866    pub line: Option<u64>,
 867    pub frame_id: Option<u64>,
 868}
 869
 870impl CompletionsQuery {
 871    pub fn new(
 872        buffer: &language::Buffer,
 873        cursor_position: language::Anchor,
 874        frame_id: Option<u64>,
 875    ) -> Self {
 876        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 877        Self {
 878            query: buffer.text(),
 879            column: column as u64,
 880            frame_id,
 881            line: Some(row as u64),
 882        }
 883    }
 884}
 885
 886pub enum SessionEvent {
 887    Modules,
 888    LoadedSources,
 889    Stopped(Option<ThreadId>),
 890    StackTrace,
 891    Variables,
 892    Threads,
 893}
 894
 895pub(crate) enum SessionStateEvent {
 896    Shutdown,
 897}
 898
 899impl EventEmitter<SessionEvent> for Session {}
 900impl EventEmitter<SessionStateEvent> for Session {}
 901
 902// local session will send breakpoint updates to DAP for all new breakpoints
 903// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 904// BreakpointStore notifies session on breakpoint changes
 905impl Session {
 906    pub(crate) fn local(
 907        breakpoint_store: Entity<BreakpointStore>,
 908        session_id: SessionId,
 909        parent_session: Option<Entity<Session>>,
 910        delegate: DapAdapterDelegate,
 911        config: DebugAdapterConfig,
 912        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 913        initialized_tx: oneshot::Sender<()>,
 914        debug_adapters: Arc<DapRegistry>,
 915        cx: &mut App,
 916    ) -> Task<Result<Entity<Self>>> {
 917        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 918
 919        cx.spawn(async move |cx| {
 920            let mode = LocalMode::new(
 921                debug_adapters,
 922                session_id,
 923                parent_session.clone(),
 924                breakpoint_store.clone(),
 925                config.clone(),
 926                delegate,
 927                message_tx,
 928                cx.clone(),
 929            )
 930            .await?;
 931
 932            cx.new(|cx| {
 933                create_local_session(
 934                    breakpoint_store,
 935                    session_id,
 936                    parent_session,
 937                    start_debugging_requests_tx,
 938                    initialized_tx,
 939                    message_rx,
 940                    mode,
 941                    cx,
 942                )
 943            })
 944        })
 945    }
 946
 947    #[cfg(any(test, feature = "test-support"))]
 948    pub(crate) fn fake(
 949        breakpoint_store: Entity<BreakpointStore>,
 950        session_id: SessionId,
 951        parent_session: Option<Entity<Session>>,
 952        delegate: DapAdapterDelegate,
 953        config: DebugAdapterConfig,
 954        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 955        initialized_tx: oneshot::Sender<()>,
 956        caps: Capabilities,
 957        fails: bool,
 958        cx: &mut App,
 959    ) -> Task<Result<Entity<Session>>> {
 960        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 961
 962        cx.spawn(async move |cx| {
 963            let mode = LocalMode::new_fake(
 964                session_id,
 965                parent_session.clone(),
 966                breakpoint_store.clone(),
 967                config.clone(),
 968                delegate,
 969                message_tx,
 970                caps,
 971                fails,
 972                cx.clone(),
 973            )
 974            .await?;
 975
 976            cx.new(|cx| {
 977                create_local_session(
 978                    breakpoint_store,
 979                    session_id,
 980                    parent_session,
 981                    start_debugging_requests_tx,
 982                    initialized_tx,
 983                    message_rx,
 984                    mode,
 985                    cx,
 986                )
 987            })
 988        })
 989    }
 990
 991    pub(crate) fn remote(
 992        session_id: SessionId,
 993        client: AnyProtoClient,
 994        upstream_project_id: u64,
 995        ignore_breakpoints: bool,
 996    ) -> Self {
 997        Self {
 998            mode: Mode::Remote(RemoteConnection {
 999                _client: client,
1000                _upstream_project_id: upstream_project_id,
1001            }),
1002            id: session_id,
1003            child_session_ids: HashSet::default(),
1004            parent_id: None,
1005            capabilities: Capabilities::default(),
1006            ignore_breakpoints,
1007            variables: Default::default(),
1008            stack_frames: Default::default(),
1009            thread_states: ThreadStates::default(),
1010            output_token: OutputToken(0),
1011            output: circular_buffer::CircularBuffer::boxed(),
1012            requests: HashMap::default(),
1013            modules: Vec::default(),
1014            loaded_sources: Vec::default(),
1015            threads: IndexMap::default(),
1016            _background_tasks: Vec::default(),
1017            locations: Default::default(),
1018            is_session_terminated: false,
1019            exception_breakpoints: Default::default(),
1020        }
1021    }
1022
1023    pub fn session_id(&self) -> SessionId {
1024        self.id
1025    }
1026
1027    pub fn child_session_ids(&self) -> HashSet<SessionId> {
1028        self.child_session_ids.clone()
1029    }
1030
1031    pub fn add_child_session_id(&mut self, session_id: SessionId) {
1032        self.child_session_ids.insert(session_id);
1033    }
1034
1035    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1036        self.child_session_ids.remove(&session_id);
1037    }
1038
1039    pub fn parent_id(&self) -> Option<SessionId> {
1040        self.parent_id
1041    }
1042
1043    pub fn capabilities(&self) -> &Capabilities {
1044        &self.capabilities
1045    }
1046
1047    pub fn configuration(&self) -> Option<DebugAdapterConfig> {
1048        if let Mode::Local(local_mode) = &self.mode {
1049            Some(local_mode.config.clone())
1050        } else {
1051            None
1052        }
1053    }
1054
1055    pub fn is_terminated(&self) -> bool {
1056        self.is_session_terminated
1057    }
1058
1059    pub fn is_local(&self) -> bool {
1060        matches!(self.mode, Mode::Local(_))
1061    }
1062
1063    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1064        match &mut self.mode {
1065            Mode::Local(local_mode) => Some(local_mode),
1066            Mode::Remote(_) => None,
1067        }
1068    }
1069
1070    pub fn as_local(&self) -> Option<&LocalMode> {
1071        match &self.mode {
1072            Mode::Local(local_mode) => Some(local_mode),
1073            Mode::Remote(_) => None,
1074        }
1075    }
1076
1077    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1078        match &self.mode {
1079            Mode::Local(local_mode) => {
1080                let capabilities = local_mode.clone().request_initialization(cx);
1081
1082                cx.spawn(async move |this, cx| {
1083                    let capabilities = capabilities.await?;
1084                    this.update(cx, |session, _| {
1085                        session.capabilities = capabilities;
1086                        let filters = session
1087                            .capabilities
1088                            .exception_breakpoint_filters
1089                            .clone()
1090                            .unwrap_or_default();
1091                        for filter in filters {
1092                            let default = filter.default.unwrap_or_default();
1093                            session
1094                                .exception_breakpoints
1095                                .entry(filter.filter.clone())
1096                                .or_insert_with(|| (filter, default));
1097                        }
1098                    })?;
1099                    Ok(())
1100                })
1101            }
1102            Mode::Remote(_) => Task::ready(Err(anyhow!(
1103                "Cannot send initialize request from remote session"
1104            ))),
1105        }
1106    }
1107
1108    pub(super) fn initialize_sequence(
1109        &mut self,
1110        initialize_rx: oneshot::Receiver<()>,
1111        cx: &mut Context<Self>,
1112    ) -> Task<Result<()>> {
1113        match &self.mode {
1114            Mode::Local(local_mode) => {
1115                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1116            }
1117            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1118        }
1119    }
1120
1121    pub fn run_to_position(
1122        &mut self,
1123        breakpoint: SourceBreakpoint,
1124        active_thread_id: ThreadId,
1125        cx: &mut Context<Self>,
1126    ) {
1127        match &mut self.mode {
1128            Mode::Local(local_mode) => {
1129                if !matches!(
1130                    self.thread_states.thread_state(active_thread_id),
1131                    Some(ThreadStatus::Stopped)
1132                ) {
1133                    return;
1134                };
1135                let path = breakpoint.path.clone();
1136                local_mode.tmp_breakpoint = Some(breakpoint);
1137                let task = local_mode.send_breakpoints_from_path(
1138                    path,
1139                    BreakpointUpdatedReason::Toggled,
1140                    cx,
1141                );
1142
1143                cx.spawn(async move |this, cx| {
1144                    task.await;
1145                    this.update(cx, |this, cx| {
1146                        this.continue_thread(active_thread_id, cx);
1147                    })
1148                })
1149                .detach();
1150            }
1151            Mode::Remote(_) => {}
1152        }
1153    }
1154
1155    pub fn output(
1156        &self,
1157        since: OutputToken,
1158    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1159        if self.output_token.0 == 0 {
1160            return (self.output.range(0..0), OutputToken(0));
1161        };
1162
1163        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1164
1165        let clamped_events_since = events_since.clamp(0, self.output.len());
1166        (
1167            self.output
1168                .range(self.output.len() - clamped_events_since..),
1169            self.output_token,
1170        )
1171    }
1172
1173    pub fn respond_to_client(
1174        &self,
1175        request_seq: u64,
1176        success: bool,
1177        command: String,
1178        body: Option<serde_json::Value>,
1179        cx: &mut Context<Self>,
1180    ) -> Task<Result<()>> {
1181        let Some(local_session) = self.as_local().cloned() else {
1182            unreachable!("Cannot respond to remote client");
1183        };
1184
1185        cx.background_spawn(async move {
1186            local_session
1187                .client
1188                .send_message(Message::Response(Response {
1189                    body,
1190                    success,
1191                    command,
1192                    seq: request_seq + 1,
1193                    request_seq,
1194                    message: None,
1195                }))
1196                .await
1197        })
1198    }
1199
1200    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1201        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1202            let breakpoint = local.tmp_breakpoint.take()?;
1203            let path = breakpoint.path.clone();
1204            Some((local, path))
1205        }) {
1206            local
1207                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1208                .detach();
1209        };
1210
1211        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1212            self.thread_states.stop_all_threads();
1213
1214            self.invalidate_command_type::<StackTraceCommand>();
1215        }
1216
1217        // Event if we stopped all threads we still need to insert the thread_id
1218        // to our own data
1219        if let Some(thread_id) = event.thread_id {
1220            self.thread_states.stop_thread(ThreadId(thread_id));
1221
1222            self.invalidate_state(
1223                &StackTraceCommand {
1224                    thread_id,
1225                    start_frame: None,
1226                    levels: None,
1227                }
1228                .into(),
1229            );
1230        }
1231
1232        self.invalidate_generic();
1233        self.threads.clear();
1234        self.variables.clear();
1235        cx.emit(SessionEvent::Stopped(
1236            event
1237                .thread_id
1238                .map(Into::into)
1239                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1240        ));
1241        cx.notify();
1242    }
1243
1244    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1245        match *event {
1246            Events::Initialized(_) => {
1247                debug_assert!(
1248                    false,
1249                    "Initialized event should have been handled in LocalMode"
1250                );
1251            }
1252            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1253            Events::Continued(event) => {
1254                if event.all_threads_continued.unwrap_or_default() {
1255                    self.thread_states.continue_all_threads();
1256                } else {
1257                    self.thread_states
1258                        .continue_thread(ThreadId(event.thread_id));
1259                }
1260                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1261                self.invalidate_generic();
1262            }
1263            Events::Exited(_event) => {
1264                self.clear_active_debug_line(cx);
1265            }
1266            Events::Terminated(_) => {
1267                self.is_session_terminated = true;
1268                self.clear_active_debug_line(cx);
1269            }
1270            Events::Thread(event) => {
1271                let thread_id = ThreadId(event.thread_id);
1272
1273                match event.reason {
1274                    dap::ThreadEventReason::Started => {
1275                        self.thread_states.continue_thread(thread_id);
1276                    }
1277                    dap::ThreadEventReason::Exited => {
1278                        self.thread_states.exit_thread(thread_id);
1279                    }
1280                    reason => {
1281                        log::error!("Unhandled thread event reason {:?}", reason);
1282                    }
1283                }
1284                self.invalidate_state(&ThreadsCommand.into());
1285                cx.notify();
1286            }
1287            Events::Output(event) => {
1288                if event
1289                    .category
1290                    .as_ref()
1291                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1292                {
1293                    return;
1294                }
1295
1296                self.output.push_back(event);
1297                self.output_token.0 += 1;
1298                cx.notify();
1299            }
1300            Events::Breakpoint(_) => {}
1301            Events::Module(event) => {
1302                match event.reason {
1303                    dap::ModuleEventReason::New => {
1304                        self.modules.push(event.module);
1305                    }
1306                    dap::ModuleEventReason::Changed => {
1307                        if let Some(module) = self
1308                            .modules
1309                            .iter_mut()
1310                            .find(|other| event.module.id == other.id)
1311                        {
1312                            *module = event.module;
1313                        }
1314                    }
1315                    dap::ModuleEventReason::Removed => {
1316                        self.modules.retain(|other| event.module.id != other.id);
1317                    }
1318                }
1319
1320                // todo(debugger): We should only send the invalidate command to downstream clients.
1321                // self.invalidate_state(&ModulesCommand.into());
1322            }
1323            Events::LoadedSource(_) => {
1324                self.invalidate_state(&LoadedSourcesCommand.into());
1325            }
1326            Events::Capabilities(event) => {
1327                self.capabilities = self.capabilities.merge(event.capabilities);
1328                cx.notify();
1329            }
1330            Events::Memory(_) => {}
1331            Events::Process(_) => {}
1332            Events::ProgressEnd(_) => {}
1333            Events::ProgressStart(_) => {}
1334            Events::ProgressUpdate(_) => {}
1335            Events::Invalidated(_) => {}
1336            Events::Other(_) => {}
1337        }
1338    }
1339
1340    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1341    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1342        &mut self,
1343        request: T,
1344        process_result: impl FnOnce(
1345            &mut Self,
1346            Result<T::Response>,
1347            &mut Context<Self>,
1348        ) -> Option<T::Response>
1349        + 'static,
1350        cx: &mut Context<Self>,
1351    ) {
1352        const {
1353            assert!(
1354                T::CACHEABLE,
1355                "Only requests marked as cacheable should invoke `fetch`"
1356            );
1357        }
1358
1359        if !self.thread_states.any_stopped_thread()
1360            && request.type_id() != TypeId::of::<ThreadsCommand>()
1361            || self.is_session_terminated
1362        {
1363            return;
1364        }
1365
1366        let request_map = self
1367            .requests
1368            .entry(std::any::TypeId::of::<T>())
1369            .or_default();
1370
1371        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1372            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1373
1374            let task = Self::request_inner::<Arc<T>>(
1375                &self.capabilities,
1376                self.id,
1377                &self.mode,
1378                command,
1379                process_result,
1380                cx,
1381            );
1382            let task = cx
1383                .background_executor()
1384                .spawn(async move {
1385                    let _ = task.await?;
1386                    Some(())
1387                })
1388                .shared();
1389
1390            vacant.insert(task);
1391            cx.notify();
1392        }
1393    }
1394
1395    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1396        capabilities: &Capabilities,
1397        session_id: SessionId,
1398        mode: &Mode,
1399        request: T,
1400        process_result: impl FnOnce(
1401            &mut Self,
1402            Result<T::Response>,
1403            &mut Context<Self>,
1404        ) -> Option<T::Response>
1405        + 'static,
1406        cx: &mut Context<Self>,
1407    ) -> Task<Option<T::Response>> {
1408        if !T::is_supported(&capabilities) {
1409            log::warn!(
1410                "Attempted to send a DAP request that isn't supported: {:?}",
1411                request
1412            );
1413            let error = Err(anyhow::Error::msg(
1414                "Couldn't complete request because it's not supported",
1415            ));
1416            return cx.spawn(async move |this, cx| {
1417                this.update(cx, |this, cx| process_result(this, error, cx))
1418                    .log_err()
1419                    .flatten()
1420            });
1421        }
1422
1423        let request = mode.request_dap(session_id, request, cx);
1424        cx.spawn(async move |this, cx| {
1425            let result = request.await;
1426            this.update(cx, |this, cx| process_result(this, result, cx))
1427                .log_err()
1428                .flatten()
1429        })
1430    }
1431
1432    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1433        &self,
1434        request: T,
1435        process_result: impl FnOnce(
1436            &mut Self,
1437            Result<T::Response>,
1438            &mut Context<Self>,
1439        ) -> Option<T::Response>
1440        + 'static,
1441        cx: &mut Context<Self>,
1442    ) -> Task<Option<T::Response>> {
1443        Self::request_inner(
1444            &self.capabilities,
1445            self.id,
1446            &self.mode,
1447            request,
1448            process_result,
1449            cx,
1450        )
1451    }
1452
1453    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1454        self.requests.remove(&std::any::TypeId::of::<Command>());
1455    }
1456
1457    fn invalidate_generic(&mut self) {
1458        self.invalidate_command_type::<ModulesCommand>();
1459        self.invalidate_command_type::<LoadedSourcesCommand>();
1460        self.invalidate_command_type::<ThreadsCommand>();
1461    }
1462
1463    fn invalidate_state(&mut self, key: &RequestSlot) {
1464        self.requests
1465            .entry((&*key.0 as &dyn Any).type_id())
1466            .and_modify(|request_map| {
1467                request_map.remove(&key);
1468            });
1469    }
1470
1471    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1472        self.thread_states.thread_status(thread_id)
1473    }
1474
1475    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1476        self.fetch(
1477            dap_command::ThreadsCommand,
1478            |this, result, cx| {
1479                let result = result.log_err()?;
1480
1481                this.threads = result
1482                    .iter()
1483                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1484                    .collect();
1485
1486                this.invalidate_command_type::<StackTraceCommand>();
1487                cx.emit(SessionEvent::Threads);
1488                cx.notify();
1489
1490                Some(result)
1491            },
1492            cx,
1493        );
1494
1495        self.threads
1496            .values()
1497            .map(|thread| {
1498                (
1499                    thread.dap.clone(),
1500                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1501                )
1502            })
1503            .collect()
1504    }
1505
1506    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1507        self.fetch(
1508            dap_command::ModulesCommand,
1509            |this, result, cx| {
1510                let result = result.log_err()?;
1511
1512                this.modules = result.iter().cloned().collect();
1513                cx.emit(SessionEvent::Modules);
1514                cx.notify();
1515
1516                Some(result)
1517            },
1518            cx,
1519        );
1520
1521        &self.modules
1522    }
1523
1524    pub fn ignore_breakpoints(&self) -> bool {
1525        self.ignore_breakpoints
1526    }
1527
1528    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1529        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1530    }
1531
1532    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1533        if self.ignore_breakpoints == ignore {
1534            return Task::ready(());
1535        }
1536
1537        self.ignore_breakpoints = ignore;
1538
1539        if let Some(local) = self.as_local() {
1540            local.send_source_breakpoints(ignore, cx)
1541        } else {
1542            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1543            unimplemented!()
1544        }
1545    }
1546
1547    pub fn exception_breakpoints(
1548        &self,
1549    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1550        self.exception_breakpoints.values()
1551    }
1552
1553    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1554        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1555            *is_enabled = !*is_enabled;
1556            self.send_exception_breakpoints(cx);
1557        }
1558    }
1559
1560    fn send_exception_breakpoints(&mut self, cx: &App) {
1561        if let Some(local) = self.as_local() {
1562            let exception_filters = self
1563                .exception_breakpoints
1564                .values()
1565                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1566                .collect();
1567
1568            let supports_exception_filters = self
1569                .capabilities
1570                .supports_exception_filter_options
1571                .unwrap_or_default();
1572            local
1573                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1574                .detach_and_log_err(cx);
1575        } else {
1576            debug_assert!(false, "Not implemented");
1577        }
1578    }
1579
1580    pub fn breakpoints_enabled(&self) -> bool {
1581        self.ignore_breakpoints
1582    }
1583
1584    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1585        self.fetch(
1586            dap_command::LoadedSourcesCommand,
1587            |this, result, cx| {
1588                let result = result.log_err()?;
1589                this.loaded_sources = result.iter().cloned().collect();
1590                cx.emit(SessionEvent::LoadedSources);
1591                cx.notify();
1592                Some(result)
1593            },
1594            cx,
1595        );
1596
1597        &self.loaded_sources
1598    }
1599
1600    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1601        res.log_err()?;
1602        Some(())
1603    }
1604
1605    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1606        thread_id: ThreadId,
1607    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1608    {
1609        move |this, response, cx| match response.log_err() {
1610            Some(response) => Some(response),
1611            None => {
1612                this.thread_states.stop_thread(thread_id);
1613                cx.notify();
1614                None
1615            }
1616        }
1617    }
1618
1619    fn clear_active_debug_line_response(
1620        &mut self,
1621        response: Result<()>,
1622        cx: &mut Context<Session>,
1623    ) -> Option<()> {
1624        response.log_err()?;
1625        self.clear_active_debug_line(cx);
1626        Some(())
1627    }
1628
1629    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1630        self.as_local()
1631            .expect("Message handler will only run in local mode")
1632            .breakpoint_store
1633            .update(cx, |store, cx| {
1634                store.remove_active_position(Some(self.id), cx)
1635            });
1636    }
1637
1638    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1639        self.request(
1640            PauseCommand {
1641                thread_id: thread_id.0,
1642            },
1643            Self::empty_response,
1644            cx,
1645        )
1646        .detach();
1647    }
1648
1649    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1650        self.request(
1651            RestartStackFrameCommand { stack_frame_id },
1652            Self::empty_response,
1653            cx,
1654        )
1655        .detach();
1656    }
1657
1658    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1659        if self.capabilities.supports_restart_request.unwrap_or(false) {
1660            self.request(
1661                RestartCommand {
1662                    raw: args.unwrap_or(Value::Null),
1663                },
1664                Self::empty_response,
1665                cx,
1666            )
1667            .detach();
1668        } else {
1669            self.request(
1670                DisconnectCommand {
1671                    restart: Some(false),
1672                    terminate_debuggee: Some(true),
1673                    suspend_debuggee: Some(false),
1674                },
1675                Self::empty_response,
1676                cx,
1677            )
1678            .detach();
1679        }
1680    }
1681
1682    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1683        self.is_session_terminated = true;
1684        self.thread_states.exit_all_threads();
1685        cx.notify();
1686
1687        let task = if self
1688            .capabilities
1689            .supports_terminate_request
1690            .unwrap_or_default()
1691        {
1692            self.request(
1693                TerminateCommand {
1694                    restart: Some(false),
1695                },
1696                Self::clear_active_debug_line_response,
1697                cx,
1698            )
1699        } else {
1700            self.request(
1701                DisconnectCommand {
1702                    restart: Some(false),
1703                    terminate_debuggee: Some(true),
1704                    suspend_debuggee: Some(false),
1705                },
1706                Self::clear_active_debug_line_response,
1707                cx,
1708            )
1709        };
1710
1711        cx.emit(SessionStateEvent::Shutdown);
1712
1713        cx.background_spawn(async move {
1714            let _ = task.await;
1715        })
1716    }
1717
1718    pub fn completions(
1719        &mut self,
1720        query: CompletionsQuery,
1721        cx: &mut Context<Self>,
1722    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1723        let task = self.request(query, |_, result, _| result.log_err(), cx);
1724
1725        cx.background_executor().spawn(async move {
1726            anyhow::Ok(
1727                task.await
1728                    .map(|response| response.targets)
1729                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1730            )
1731        })
1732    }
1733
1734    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1735        self.thread_states.continue_thread(thread_id);
1736        self.request(
1737            ContinueCommand {
1738                args: ContinueArguments {
1739                    thread_id: thread_id.0,
1740                    single_thread: Some(true),
1741                },
1742            },
1743            Self::on_step_response::<ContinueCommand>(thread_id),
1744            cx,
1745        )
1746        .detach();
1747    }
1748
1749    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1750        match self.mode {
1751            Mode::Local(ref local) => Some(local.client.clone()),
1752            Mode::Remote(_) => None,
1753        }
1754    }
1755
1756    pub fn step_over(
1757        &mut self,
1758        thread_id: ThreadId,
1759        granularity: SteppingGranularity,
1760        cx: &mut Context<Self>,
1761    ) {
1762        let supports_single_thread_execution_requests =
1763            self.capabilities.supports_single_thread_execution_requests;
1764        let supports_stepping_granularity = self
1765            .capabilities
1766            .supports_stepping_granularity
1767            .unwrap_or_default();
1768
1769        let command = NextCommand {
1770            inner: StepCommand {
1771                thread_id: thread_id.0,
1772                granularity: supports_stepping_granularity.then(|| granularity),
1773                single_thread: supports_single_thread_execution_requests,
1774            },
1775        };
1776
1777        self.thread_states.process_step(thread_id);
1778        self.request(
1779            command,
1780            Self::on_step_response::<NextCommand>(thread_id),
1781            cx,
1782        )
1783        .detach();
1784    }
1785
1786    pub fn step_in(
1787        &mut self,
1788        thread_id: ThreadId,
1789        granularity: SteppingGranularity,
1790        cx: &mut Context<Self>,
1791    ) {
1792        let supports_single_thread_execution_requests =
1793            self.capabilities.supports_single_thread_execution_requests;
1794        let supports_stepping_granularity = self
1795            .capabilities
1796            .supports_stepping_granularity
1797            .unwrap_or_default();
1798
1799        let command = StepInCommand {
1800            inner: StepCommand {
1801                thread_id: thread_id.0,
1802                granularity: supports_stepping_granularity.then(|| granularity),
1803                single_thread: supports_single_thread_execution_requests,
1804            },
1805        };
1806
1807        self.thread_states.process_step(thread_id);
1808        self.request(
1809            command,
1810            Self::on_step_response::<StepInCommand>(thread_id),
1811            cx,
1812        )
1813        .detach();
1814    }
1815
1816    pub fn step_out(
1817        &mut self,
1818        thread_id: ThreadId,
1819        granularity: SteppingGranularity,
1820        cx: &mut Context<Self>,
1821    ) {
1822        let supports_single_thread_execution_requests =
1823            self.capabilities.supports_single_thread_execution_requests;
1824        let supports_stepping_granularity = self
1825            .capabilities
1826            .supports_stepping_granularity
1827            .unwrap_or_default();
1828
1829        let command = StepOutCommand {
1830            inner: StepCommand {
1831                thread_id: thread_id.0,
1832                granularity: supports_stepping_granularity.then(|| granularity),
1833                single_thread: supports_single_thread_execution_requests,
1834            },
1835        };
1836
1837        self.thread_states.process_step(thread_id);
1838        self.request(
1839            command,
1840            Self::on_step_response::<StepOutCommand>(thread_id),
1841            cx,
1842        )
1843        .detach();
1844    }
1845
1846    pub fn step_back(
1847        &mut self,
1848        thread_id: ThreadId,
1849        granularity: SteppingGranularity,
1850        cx: &mut Context<Self>,
1851    ) {
1852        let supports_single_thread_execution_requests =
1853            self.capabilities.supports_single_thread_execution_requests;
1854        let supports_stepping_granularity = self
1855            .capabilities
1856            .supports_stepping_granularity
1857            .unwrap_or_default();
1858
1859        let command = StepBackCommand {
1860            inner: StepCommand {
1861                thread_id: thread_id.0,
1862                granularity: supports_stepping_granularity.then(|| granularity),
1863                single_thread: supports_single_thread_execution_requests,
1864            },
1865        };
1866
1867        self.thread_states.process_step(thread_id);
1868
1869        self.request(
1870            command,
1871            Self::on_step_response::<StepBackCommand>(thread_id),
1872            cx,
1873        )
1874        .detach();
1875    }
1876
1877    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1878        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1879            && self.requests.contains_key(&ThreadsCommand.type_id())
1880            && self.threads.contains_key(&thread_id)
1881        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1882        // We could still be using an old thread id and have sent a new thread's request
1883        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1884        // But it very well could cause a minor bug in the future that is hard to track down
1885        {
1886            self.fetch(
1887                super::dap_command::StackTraceCommand {
1888                    thread_id: thread_id.0,
1889                    start_frame: None,
1890                    levels: None,
1891                },
1892                move |this, stack_frames, cx| {
1893                    let stack_frames = stack_frames.log_err()?;
1894
1895                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1896                        thread.stack_frame_ids =
1897                            stack_frames.iter().map(|frame| frame.id).collect();
1898                    });
1899                    debug_assert!(
1900                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1901                        "Sent request for thread_id that doesn't exist"
1902                    );
1903
1904                    this.stack_frames.extend(
1905                        stack_frames
1906                            .iter()
1907                            .cloned()
1908                            .map(|frame| (frame.id, StackFrame::from(frame))),
1909                    );
1910
1911                    this.invalidate_command_type::<ScopesCommand>();
1912                    this.invalidate_command_type::<VariablesCommand>();
1913
1914                    cx.emit(SessionEvent::StackTrace);
1915                    cx.notify();
1916                    Some(stack_frames)
1917                },
1918                cx,
1919            );
1920        }
1921
1922        self.threads
1923            .get(&thread_id)
1924            .map(|thread| {
1925                thread
1926                    .stack_frame_ids
1927                    .iter()
1928                    .filter_map(|id| self.stack_frames.get(id))
1929                    .cloned()
1930                    .collect()
1931            })
1932            .unwrap_or_default()
1933    }
1934
1935    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1936        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1937            && self
1938                .requests
1939                .contains_key(&TypeId::of::<StackTraceCommand>())
1940        {
1941            self.fetch(
1942                ScopesCommand { stack_frame_id },
1943                move |this, scopes, cx| {
1944                    let scopes = scopes.log_err()?;
1945
1946                    for scope in scopes .iter(){
1947                        this.variables(scope.variables_reference, cx);
1948                    }
1949
1950                    let entry = this
1951                        .stack_frames
1952                        .entry(stack_frame_id)
1953                        .and_modify(|stack_frame| {
1954                            stack_frame.scopes = scopes.clone();
1955                        });
1956
1957                    cx.emit(SessionEvent::Variables);
1958
1959                    debug_assert!(
1960                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1961                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1962                    );
1963
1964                    Some(scopes)
1965                },
1966                cx,
1967            );
1968        }
1969
1970        self.stack_frames
1971            .get(&stack_frame_id)
1972            .map(|frame| frame.scopes.as_slice())
1973            .unwrap_or_default()
1974    }
1975
1976    pub fn variables(
1977        &mut self,
1978        variables_reference: VariableReference,
1979        cx: &mut Context<Self>,
1980    ) -> Vec<dap::Variable> {
1981        let command = VariablesCommand {
1982            variables_reference,
1983            filter: None,
1984            start: None,
1985            count: None,
1986            format: None,
1987        };
1988
1989        self.fetch(
1990            command,
1991            move |this, variables, cx| {
1992                let variables = variables.log_err()?;
1993                this.variables
1994                    .insert(variables_reference, variables.clone());
1995
1996                cx.emit(SessionEvent::Variables);
1997                Some(variables)
1998            },
1999            cx,
2000        );
2001
2002        self.variables
2003            .get(&variables_reference)
2004            .cloned()
2005            .unwrap_or_default()
2006    }
2007
2008    pub fn set_variable_value(
2009        &mut self,
2010        variables_reference: u64,
2011        name: String,
2012        value: String,
2013        cx: &mut Context<Self>,
2014    ) {
2015        if self.capabilities.supports_set_variable.unwrap_or_default() {
2016            self.request(
2017                SetVariableValueCommand {
2018                    name,
2019                    value,
2020                    variables_reference,
2021                },
2022                move |this, response, cx| {
2023                    let response = response.log_err()?;
2024                    this.invalidate_command_type::<VariablesCommand>();
2025                    cx.notify();
2026                    Some(response)
2027                },
2028                cx,
2029            )
2030            .detach()
2031        }
2032    }
2033
2034    pub fn evaluate(
2035        &mut self,
2036        expression: String,
2037        context: Option<EvaluateArgumentsContext>,
2038        frame_id: Option<u64>,
2039        source: Option<Source>,
2040        cx: &mut Context<Self>,
2041    ) {
2042        self.request(
2043            EvaluateCommand {
2044                expression,
2045                context,
2046                frame_id,
2047                source,
2048            },
2049            |this, response, cx| {
2050                let response = response.log_err()?;
2051                this.output_token.0 += 1;
2052                this.output.push_back(dap::OutputEvent {
2053                    category: None,
2054                    output: response.result.clone(),
2055                    group: None,
2056                    variables_reference: Some(response.variables_reference),
2057                    source: None,
2058                    line: None,
2059                    column: None,
2060                    data: None,
2061                    location_reference: None,
2062                });
2063
2064                this.invalidate_command_type::<ScopesCommand>();
2065                cx.notify();
2066                Some(response)
2067            },
2068            cx,
2069        )
2070        .detach();
2071    }
2072
2073    pub fn location(
2074        &mut self,
2075        reference: u64,
2076        cx: &mut Context<Self>,
2077    ) -> Option<dap::LocationsResponse> {
2078        self.fetch(
2079            LocationsCommand { reference },
2080            move |this, response, _| {
2081                let response = response.log_err()?;
2082                this.locations.insert(reference, response.clone());
2083                Some(response)
2084            },
2085            cx,
2086        );
2087        self.locations.get(&reference).cloned()
2088    }
2089    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2090        let command = DisconnectCommand {
2091            restart: Some(false),
2092            terminate_debuggee: Some(true),
2093            suspend_debuggee: Some(false),
2094        };
2095
2096        self.request(command, Self::empty_response, cx).detach()
2097    }
2098
2099    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2100        if self
2101            .capabilities
2102            .supports_terminate_threads_request
2103            .unwrap_or_default()
2104        {
2105            self.request(
2106                TerminateThreadsCommand {
2107                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2108                },
2109                Self::clear_active_debug_line_response,
2110                cx,
2111            )
2112            .detach();
2113        } else {
2114            self.shutdown(cx).detach();
2115        }
2116    }
2117}
2118
2119fn create_local_session(
2120    breakpoint_store: Entity<BreakpointStore>,
2121    session_id: SessionId,
2122    parent_session: Option<Entity<Session>>,
2123    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
2124    initialized_tx: oneshot::Sender<()>,
2125    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
2126    mode: LocalMode,
2127    cx: &mut Context<Session>,
2128) -> Session {
2129    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
2130        let mut initialized_tx = Some(initialized_tx);
2131        while let Some(message) = message_rx.next().await {
2132            if let Message::Event(event) = message {
2133                if let Events::Initialized(_) = *event {
2134                    if let Some(tx) = initialized_tx.take() {
2135                        tx.send(()).ok();
2136                    }
2137                } else {
2138                    let Ok(_) = this.update(cx, |session, cx| {
2139                        session.handle_dap_event(event, cx);
2140                    }) else {
2141                        break;
2142                    };
2143                }
2144            } else {
2145                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
2146                else {
2147                    break;
2148                };
2149            }
2150        }
2151    })];
2152
2153    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2154        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2155            if let Some(local) = (!this.ignore_breakpoints)
2156                .then(|| this.as_local_mut())
2157                .flatten()
2158            {
2159                local
2160                    .send_breakpoints_from_path(path.clone(), *reason, cx)
2161                    .detach();
2162            };
2163        }
2164        BreakpointStoreEvent::BreakpointsCleared(paths) => {
2165            if let Some(local) = (!this.ignore_breakpoints)
2166                .then(|| this.as_local_mut())
2167                .flatten()
2168            {
2169                local.unset_breakpoints_from_paths(paths, cx).detach();
2170            }
2171        }
2172        BreakpointStoreEvent::ActiveDebugLineChanged => {}
2173    })
2174    .detach();
2175
2176    Session {
2177        mode: Mode::Local(mode),
2178        id: session_id,
2179        child_session_ids: HashSet::default(),
2180        parent_id: parent_session.map(|session| session.read(cx).id),
2181        variables: Default::default(),
2182        capabilities: Capabilities::default(),
2183        thread_states: ThreadStates::default(),
2184        output_token: OutputToken(0),
2185        ignore_breakpoints: false,
2186        output: circular_buffer::CircularBuffer::boxed(),
2187        requests: HashMap::default(),
2188        modules: Vec::default(),
2189        loaded_sources: Vec::default(),
2190        threads: IndexMap::default(),
2191        stack_frames: IndexMap::default(),
2192        locations: Default::default(),
2193        exception_breakpoints: Default::default(),
2194        _background_tasks,
2195        is_session_terminated: false,
2196    }
2197}