session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use anyhow::{Context as _, Result, anyhow};
  13use collections::{HashMap, HashSet, IndexMap, IndexSet};
  14use dap::adapters::DebugAdapterBinary;
  15use dap::messages::Response;
  16use dap::{
  17    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  18    SteppingGranularity, StoppedEvent, VariableReference,
  19    client::{DebugAdapterClient, SessionId},
  20    messages::{Events, Message},
  21};
  22use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
  23use futures::channel::oneshot;
  24use futures::{FutureExt, future::Shared};
  25use gpui::{
  26    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  27    Task, WeakEntity,
  28};
  29use rpc::AnyProtoClient;
  30use serde_json::{Value, json};
  31use smol::stream::StreamExt;
  32use std::any::TypeId;
  33use std::collections::BTreeMap;
  34use std::u64;
  35use std::{
  36    any::Any,
  37    collections::hash_map::Entry,
  38    hash::{Hash, Hasher},
  39    path::Path,
  40    sync::Arc,
  41};
  42use task::DebugTaskDefinition;
  43use text::{PointUtf16, ToPointUtf16};
  44use util::{ResultExt, merge_json_value_into};
  45
  46#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  47#[repr(transparent)]
  48pub struct ThreadId(pub u64);
  49
  50impl ThreadId {
  51    pub const MIN: ThreadId = ThreadId(u64::MIN);
  52    pub const MAX: ThreadId = ThreadId(u64::MAX);
  53}
  54
  55impl From<u64> for ThreadId {
  56    fn from(id: u64) -> Self {
  57        Self(id)
  58    }
  59}
  60
  61#[derive(Clone, Debug)]
  62pub struct StackFrame {
  63    pub dap: dap::StackFrame,
  64    pub scopes: Vec<dap::Scope>,
  65}
  66
  67impl From<dap::StackFrame> for StackFrame {
  68    fn from(stack_frame: dap::StackFrame) -> Self {
  69        Self {
  70            scopes: vec![],
  71            dap: stack_frame,
  72        }
  73    }
  74}
  75
  76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  77pub enum ThreadStatus {
  78    #[default]
  79    Running,
  80    Stopped,
  81    Stepping,
  82    Exited,
  83    Ended,
  84}
  85
  86impl ThreadStatus {
  87    pub fn label(&self) -> &'static str {
  88        match self {
  89            ThreadStatus::Running => "Running",
  90            ThreadStatus::Stopped => "Stopped",
  91            ThreadStatus::Stepping => "Stepping",
  92            ThreadStatus::Exited => "Exited",
  93            ThreadStatus::Ended => "Ended",
  94        }
  95    }
  96}
  97
  98#[derive(Debug)]
  99pub struct Thread {
 100    dap: dap::Thread,
 101    stack_frame_ids: IndexSet<StackFrameId>,
 102    _has_stopped: bool,
 103}
 104
 105impl From<dap::Thread> for Thread {
 106    fn from(dap: dap::Thread) -> Self {
 107        Self {
 108            dap,
 109            stack_frame_ids: Default::default(),
 110            _has_stopped: false,
 111        }
 112    }
 113}
 114
 115type UpstreamProjectId = u64;
 116
 117struct RemoteConnection {
 118    _client: AnyProtoClient,
 119    _upstream_project_id: UpstreamProjectId,
 120    _adapter_name: SharedString,
 121}
 122
 123impl RemoteConnection {
 124    fn send_proto_client_request<R: DapCommand>(
 125        &self,
 126        _request: R,
 127        _session_id: SessionId,
 128        cx: &mut App,
 129    ) -> Task<Result<R::Response>> {
 130        // let message = request.to_proto(session_id, self.upstream_project_id);
 131        // let upstream_client = self.client.clone();
 132        cx.background_executor().spawn(async move {
 133            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 134            // let response = upstream_client.request(message).await?;
 135            // request.response_from_proto(response)
 136            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 137        })
 138    }
 139
 140    fn request<R: DapCommand>(
 141        &self,
 142        request: R,
 143        session_id: SessionId,
 144        cx: &mut App,
 145    ) -> Task<Result<R::Response>>
 146    where
 147        <R::DapRequest as dap::requests::Request>::Response: 'static,
 148        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 149    {
 150        return self.send_proto_client_request::<R>(request, session_id, cx);
 151    }
 152}
 153
 154enum Mode {
 155    Local(LocalMode),
 156    Remote(RemoteConnection),
 157}
 158
 159#[derive(Clone)]
 160pub struct LocalMode {
 161    client: Arc<DebugAdapterClient>,
 162    definition: DebugTaskDefinition,
 163    binary: DebugAdapterBinary,
 164    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 165    tmp_breakpoint: Option<SourceBreakpoint>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        session_id: SessionId,
 186        parent_session: Option<Entity<Session>>,
 187        breakpoint_store: Entity<BreakpointStore>,
 188        config: DebugTaskDefinition,
 189        binary: DebugAdapterBinary,
 190        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 191        cx: AsyncApp,
 192    ) -> Task<Result<Self>> {
 193        Self::new_inner(
 194            session_id,
 195            parent_session,
 196            breakpoint_store,
 197            config,
 198            binary,
 199            messages_tx,
 200            async |_, _| {},
 201            cx,
 202        )
 203    }
 204
 205    fn new_inner(
 206        session_id: SessionId,
 207        parent_session: Option<Entity<Session>>,
 208        breakpoint_store: Entity<BreakpointStore>,
 209        config: DebugTaskDefinition,
 210        binary: DebugAdapterBinary,
 211        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 212        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 213        cx: AsyncApp,
 214    ) -> Task<Result<Self>> {
 215        cx.spawn(async move |cx| {
 216            let message_handler = Box::new(move |message| {
 217                messages_tx.unbounded_send(message).ok();
 218            });
 219
 220            let client = Arc::new(
 221                if let Some(client) = parent_session
 222                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 223                    .flatten()
 224                {
 225                    client
 226                        .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 227                        .await?
 228                } else {
 229                    DebugAdapterClient::start(
 230                        session_id,
 231                        binary.clone(),
 232                        message_handler,
 233                        cx.clone(),
 234                    )
 235                    .await
 236                    .with_context(|| "Failed to start communication with debug adapter")?
 237                },
 238            );
 239
 240            let mut session = Self {
 241                client,
 242                breakpoint_store,
 243                tmp_breakpoint: None,
 244                definition: config,
 245                binary,
 246            };
 247
 248            on_initialized(&mut session, cx.clone()).await;
 249
 250            Ok(session)
 251        })
 252    }
 253
 254    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 255        let tasks: Vec<_> = paths
 256            .into_iter()
 257            .map(|path| {
 258                self.request(
 259                    dap_command::SetBreakpoints {
 260                        source: client_source(path),
 261                        source_modified: None,
 262                        breakpoints: vec![],
 263                    },
 264                    cx.background_executor().clone(),
 265                )
 266            })
 267            .collect();
 268
 269        cx.background_spawn(async move {
 270            futures::future::join_all(tasks)
 271                .await
 272                .iter()
 273                .for_each(|res| match res {
 274                    Ok(_) => {}
 275                    Err(err) => {
 276                        log::warn!("Set breakpoints request failed: {}", err);
 277                    }
 278                });
 279        })
 280    }
 281
 282    fn send_breakpoints_from_path(
 283        &self,
 284        abs_path: Arc<Path>,
 285        reason: BreakpointUpdatedReason,
 286        cx: &mut App,
 287    ) -> Task<()> {
 288        let breakpoints = self
 289            .breakpoint_store
 290            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 291            .into_iter()
 292            .filter(|bp| bp.state.is_enabled())
 293            .chain(self.tmp_breakpoint.clone())
 294            .map(Into::into)
 295            .collect();
 296
 297        let task = self.request(
 298            dap_command::SetBreakpoints {
 299                source: client_source(&abs_path),
 300                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 301                breakpoints,
 302            },
 303            cx.background_executor().clone(),
 304        );
 305
 306        cx.background_spawn(async move {
 307            match task.await {
 308                Ok(_) => {}
 309                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 310            }
 311        })
 312    }
 313
 314    fn send_exception_breakpoints(
 315        &self,
 316        filters: Vec<ExceptionBreakpointsFilter>,
 317        supports_filter_options: bool,
 318        cx: &App,
 319    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 320        let arg = if supports_filter_options {
 321            SetExceptionBreakpoints::WithOptions {
 322                filters: filters
 323                    .into_iter()
 324                    .map(|filter| ExceptionFilterOptions {
 325                        filter_id: filter.filter,
 326                        condition: None,
 327                        mode: None,
 328                    })
 329                    .collect(),
 330            }
 331        } else {
 332            SetExceptionBreakpoints::Plain {
 333                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 334            }
 335        };
 336        self.request(arg, cx.background_executor().clone())
 337    }
 338    fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 339        let mut breakpoint_tasks = Vec::new();
 340        let breakpoints = self
 341            .breakpoint_store
 342            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 343
 344        for (path, breakpoints) in breakpoints {
 345            let breakpoints = if ignore_breakpoints {
 346                vec![]
 347            } else {
 348                breakpoints
 349                    .into_iter()
 350                    .filter(|bp| bp.state.is_enabled())
 351                    .map(Into::into)
 352                    .collect()
 353            };
 354
 355            breakpoint_tasks.push(self.request(
 356                dap_command::SetBreakpoints {
 357                    source: client_source(&path),
 358                    source_modified: Some(false),
 359                    breakpoints,
 360                },
 361                cx.background_executor().clone(),
 362            ));
 363        }
 364
 365        cx.background_spawn(async move {
 366            futures::future::join_all(breakpoint_tasks)
 367                .await
 368                .iter()
 369                .for_each(|res| match res {
 370                    Ok(_) => {}
 371                    Err(err) => {
 372                        log::warn!("Set breakpoints request failed: {}", err);
 373                    }
 374                });
 375        })
 376    }
 377
 378    pub fn label(&self) -> String {
 379        self.definition.label.clone()
 380    }
 381
 382    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 383        let adapter_id = self.binary.adapter_name.to_string();
 384
 385        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 386    }
 387
 388    fn initialize_sequence(
 389        &self,
 390        capabilities: &Capabilities,
 391        initialized_rx: oneshot::Receiver<()>,
 392        cx: &App,
 393    ) -> Task<Result<()>> {
 394        let mut raw = self.binary.request_args.clone();
 395
 396        merge_json_value_into(
 397            self.definition.initialize_args.clone().unwrap_or(json!({})),
 398            &mut raw.configuration,
 399        );
 400        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 401        let launch = match raw.request {
 402            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
 403                Launch {
 404                    raw: raw.configuration,
 405                },
 406                cx.background_executor().clone(),
 407            ),
 408            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
 409                Attach {
 410                    raw: raw.configuration,
 411                },
 412                cx.background_executor().clone(),
 413            ),
 414        };
 415
 416        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 417        let exception_filters = capabilities
 418            .exception_breakpoint_filters
 419            .as_ref()
 420            .map(|exception_filters| {
 421                exception_filters
 422                    .iter()
 423                    .filter(|filter| filter.default == Some(true))
 424                    .cloned()
 425                    .collect::<Vec<_>>()
 426            })
 427            .unwrap_or_default();
 428        let supports_exception_filters = capabilities
 429            .supports_exception_filter_options
 430            .unwrap_or_default();
 431        let configuration_sequence = cx.spawn({
 432            let this = self.clone();
 433            async move |cx| {
 434                initialized_rx.await?;
 435                // todo(debugger) figure out if we want to handle a breakpoint response error
 436                // This will probably consist of letting a user know that breakpoints failed to be set
 437                cx.update(|cx| this.send_source_breakpoints(false, cx))?
 438                    .await;
 439                cx.update(|cx| {
 440                    this.send_exception_breakpoints(
 441                        exception_filters,
 442                        supports_exception_filters,
 443                        cx,
 444                    )
 445                })?
 446                .await
 447                .ok();
 448                let ret = if configuration_done_supported {
 449                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 450                } else {
 451                    Task::ready(Ok(()))
 452                }
 453                .await;
 454                ret
 455            }
 456        });
 457
 458        cx.background_spawn(async move {
 459            futures::future::try_join(launch, configuration_sequence).await?;
 460            Ok(())
 461        })
 462    }
 463
 464    fn request<R: LocalDapCommand>(
 465        &self,
 466        request: R,
 467        executor: BackgroundExecutor,
 468    ) -> Task<Result<R::Response>>
 469    where
 470        <R::DapRequest as dap::requests::Request>::Response: 'static,
 471        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 472    {
 473        let request = Arc::new(request);
 474
 475        let request_clone = request.clone();
 476        let connection = self.client.clone();
 477        let request_task = executor.spawn(async move {
 478            let args = request_clone.to_dap();
 479            connection.request::<R::DapRequest>(args).await
 480        });
 481
 482        executor.spawn(async move {
 483            let response = request.response_from_dap(request_task.await?);
 484            response
 485        })
 486    }
 487}
 488impl From<RemoteConnection> for Mode {
 489    fn from(value: RemoteConnection) -> Self {
 490        Self::Remote(value)
 491    }
 492}
 493
 494impl Mode {
 495    fn request_dap<R: DapCommand>(
 496        &self,
 497        session_id: SessionId,
 498        request: R,
 499        cx: &mut Context<Session>,
 500    ) -> Task<Result<R::Response>>
 501    where
 502        <R::DapRequest as dap::requests::Request>::Response: 'static,
 503        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 504    {
 505        match self {
 506            Mode::Local(debug_adapter_client) => {
 507                debug_adapter_client.request(request, cx.background_executor().clone())
 508            }
 509            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 510        }
 511    }
 512}
 513
 514#[derive(Default)]
 515struct ThreadStates {
 516    global_state: Option<ThreadStatus>,
 517    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 518}
 519
 520impl ThreadStates {
 521    fn stop_all_threads(&mut self) {
 522        self.global_state = Some(ThreadStatus::Stopped);
 523        self.known_thread_states.clear();
 524    }
 525
 526    fn exit_all_threads(&mut self) {
 527        self.global_state = Some(ThreadStatus::Exited);
 528        self.known_thread_states.clear();
 529    }
 530
 531    fn continue_all_threads(&mut self) {
 532        self.global_state = Some(ThreadStatus::Running);
 533        self.known_thread_states.clear();
 534    }
 535
 536    fn stop_thread(&mut self, thread_id: ThreadId) {
 537        self.known_thread_states
 538            .insert(thread_id, ThreadStatus::Stopped);
 539    }
 540
 541    fn continue_thread(&mut self, thread_id: ThreadId) {
 542        self.known_thread_states
 543            .insert(thread_id, ThreadStatus::Running);
 544    }
 545
 546    fn process_step(&mut self, thread_id: ThreadId) {
 547        self.known_thread_states
 548            .insert(thread_id, ThreadStatus::Stepping);
 549    }
 550
 551    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 552        self.thread_state(thread_id)
 553            .unwrap_or(ThreadStatus::Running)
 554    }
 555
 556    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 557        self.known_thread_states
 558            .get(&thread_id)
 559            .copied()
 560            .or(self.global_state)
 561    }
 562
 563    fn exit_thread(&mut self, thread_id: ThreadId) {
 564        self.known_thread_states
 565            .insert(thread_id, ThreadStatus::Exited);
 566    }
 567
 568    fn any_stopped_thread(&self) -> bool {
 569        self.global_state
 570            .is_some_and(|state| state == ThreadStatus::Stopped)
 571            || self
 572                .known_thread_states
 573                .values()
 574                .any(|status| *status == ThreadStatus::Stopped)
 575    }
 576}
 577const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 578
 579type IsEnabled = bool;
 580
 581#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 582pub struct OutputToken(pub usize);
 583/// Represents a current state of a single debug adapter and provides ways to mutate it.
 584pub struct Session {
 585    mode: Mode,
 586    pub(super) capabilities: Capabilities,
 587    id: SessionId,
 588    child_session_ids: HashSet<SessionId>,
 589    parent_id: Option<SessionId>,
 590    ignore_breakpoints: bool,
 591    modules: Vec<dap::Module>,
 592    loaded_sources: Vec<dap::Source>,
 593    output_token: OutputToken,
 594    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 595    threads: IndexMap<ThreadId, Thread>,
 596    thread_states: ThreadStates,
 597    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 598    stack_frames: IndexMap<StackFrameId, StackFrame>,
 599    locations: HashMap<u64, dap::LocationsResponse>,
 600    is_session_terminated: bool,
 601    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 602    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 603    _background_tasks: Vec<Task<()>>,
 604}
 605
 606trait CacheableCommand: Any + Send + Sync {
 607    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 608    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 609    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 610}
 611
 612impl<T> CacheableCommand for T
 613where
 614    T: DapCommand + PartialEq + Eq + Hash,
 615{
 616    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 617        (rhs as &dyn Any)
 618            .downcast_ref::<Self>()
 619            .map_or(false, |rhs| self == rhs)
 620    }
 621
 622    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 623        T::hash(self, &mut hasher);
 624    }
 625
 626    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 627        self
 628    }
 629}
 630
 631pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 632
 633impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 634    fn from(request: T) -> Self {
 635        Self(Arc::new(request))
 636    }
 637}
 638
 639impl PartialEq for RequestSlot {
 640    fn eq(&self, other: &Self) -> bool {
 641        self.0.dyn_eq(other.0.as_ref())
 642    }
 643}
 644
 645impl Eq for RequestSlot {}
 646
 647impl Hash for RequestSlot {
 648    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 649        self.0.dyn_hash(state);
 650        (&*self.0 as &dyn Any).type_id().hash(state)
 651    }
 652}
 653
 654#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 655pub struct CompletionsQuery {
 656    pub query: String,
 657    pub column: u64,
 658    pub line: Option<u64>,
 659    pub frame_id: Option<u64>,
 660}
 661
 662impl CompletionsQuery {
 663    pub fn new(
 664        buffer: &language::Buffer,
 665        cursor_position: language::Anchor,
 666        frame_id: Option<u64>,
 667    ) -> Self {
 668        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 669        Self {
 670            query: buffer.text(),
 671            column: column as u64,
 672            frame_id,
 673            line: Some(row as u64),
 674        }
 675    }
 676}
 677
 678pub enum SessionEvent {
 679    Modules,
 680    LoadedSources,
 681    Stopped(Option<ThreadId>),
 682    StackTrace,
 683    Variables,
 684    Threads,
 685}
 686
 687pub(crate) enum SessionStateEvent {
 688    Shutdown,
 689}
 690
 691impl EventEmitter<SessionEvent> for Session {}
 692impl EventEmitter<SessionStateEvent> for Session {}
 693
 694// local session will send breakpoint updates to DAP for all new breakpoints
 695// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 696// BreakpointStore notifies session on breakpoint changes
 697impl Session {
 698    pub(crate) fn local(
 699        breakpoint_store: Entity<BreakpointStore>,
 700        session_id: SessionId,
 701        parent_session: Option<Entity<Session>>,
 702        binary: DebugAdapterBinary,
 703        config: DebugTaskDefinition,
 704        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 705        initialized_tx: oneshot::Sender<()>,
 706        cx: &mut App,
 707    ) -> Task<Result<Entity<Self>>> {
 708        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 709
 710        cx.spawn(async move |cx| {
 711            let mode = LocalMode::new(
 712                session_id,
 713                parent_session.clone(),
 714                breakpoint_store.clone(),
 715                config.clone(),
 716                binary,
 717                message_tx,
 718                cx.clone(),
 719            )
 720            .await?;
 721
 722            cx.new(|cx| {
 723                create_local_session(
 724                    breakpoint_store,
 725                    session_id,
 726                    parent_session,
 727                    start_debugging_requests_tx,
 728                    initialized_tx,
 729                    message_rx,
 730                    mode,
 731                    cx,
 732                )
 733            })
 734        })
 735    }
 736
 737    pub(crate) fn remote(
 738        session_id: SessionId,
 739        client: AnyProtoClient,
 740        upstream_project_id: u64,
 741        ignore_breakpoints: bool,
 742    ) -> Self {
 743        Self {
 744            mode: Mode::Remote(RemoteConnection {
 745                _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
 746                _client: client,
 747                _upstream_project_id: upstream_project_id,
 748            }),
 749            id: session_id,
 750            child_session_ids: HashSet::default(),
 751            parent_id: None,
 752            capabilities: Capabilities::default(),
 753            ignore_breakpoints,
 754            variables: Default::default(),
 755            stack_frames: Default::default(),
 756            thread_states: ThreadStates::default(),
 757            output_token: OutputToken(0),
 758            output: circular_buffer::CircularBuffer::boxed(),
 759            requests: HashMap::default(),
 760            modules: Vec::default(),
 761            loaded_sources: Vec::default(),
 762            threads: IndexMap::default(),
 763            _background_tasks: Vec::default(),
 764            locations: Default::default(),
 765            is_session_terminated: false,
 766            exception_breakpoints: Default::default(),
 767        }
 768    }
 769
 770    pub fn session_id(&self) -> SessionId {
 771        self.id
 772    }
 773
 774    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 775        self.child_session_ids.clone()
 776    }
 777
 778    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 779        self.child_session_ids.insert(session_id);
 780    }
 781
 782    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 783        self.child_session_ids.remove(&session_id);
 784    }
 785
 786    pub fn parent_id(&self) -> Option<SessionId> {
 787        self.parent_id
 788    }
 789
 790    pub fn capabilities(&self) -> &Capabilities {
 791        &self.capabilities
 792    }
 793
 794    pub fn binary(&self) -> &DebugAdapterBinary {
 795        let Mode::Local(local_mode) = &self.mode else {
 796            panic!("Session is not local");
 797        };
 798        &local_mode.binary
 799    }
 800
 801    pub fn adapter_name(&self) -> SharedString {
 802        match &self.mode {
 803            Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
 804            Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
 805        }
 806    }
 807
 808    pub fn configuration(&self) -> Option<DebugTaskDefinition> {
 809        if let Mode::Local(local_mode) = &self.mode {
 810            Some(local_mode.definition.clone())
 811        } else {
 812            None
 813        }
 814    }
 815
 816    pub fn is_terminated(&self) -> bool {
 817        self.is_session_terminated
 818    }
 819
 820    pub fn is_local(&self) -> bool {
 821        matches!(self.mode, Mode::Local(_))
 822    }
 823
 824    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 825        match &mut self.mode {
 826            Mode::Local(local_mode) => Some(local_mode),
 827            Mode::Remote(_) => None,
 828        }
 829    }
 830
 831    pub fn as_local(&self) -> Option<&LocalMode> {
 832        match &self.mode {
 833            Mode::Local(local_mode) => Some(local_mode),
 834            Mode::Remote(_) => None,
 835        }
 836    }
 837
 838    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 839        match &self.mode {
 840            Mode::Local(local_mode) => {
 841                let capabilities = local_mode.clone().request_initialization(cx);
 842
 843                cx.spawn(async move |this, cx| {
 844                    let capabilities = capabilities.await?;
 845                    this.update(cx, |session, _| {
 846                        session.capabilities = capabilities;
 847                        let filters = session
 848                            .capabilities
 849                            .exception_breakpoint_filters
 850                            .clone()
 851                            .unwrap_or_default();
 852                        for filter in filters {
 853                            let default = filter.default.unwrap_or_default();
 854                            session
 855                                .exception_breakpoints
 856                                .entry(filter.filter.clone())
 857                                .or_insert_with(|| (filter, default));
 858                        }
 859                    })?;
 860                    Ok(())
 861                })
 862            }
 863            Mode::Remote(_) => Task::ready(Err(anyhow!(
 864                "Cannot send initialize request from remote session"
 865            ))),
 866        }
 867    }
 868
 869    pub(super) fn initialize_sequence(
 870        &mut self,
 871        initialize_rx: oneshot::Receiver<()>,
 872        cx: &mut Context<Self>,
 873    ) -> Task<Result<()>> {
 874        match &self.mode {
 875            Mode::Local(local_mode) => {
 876                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
 877            }
 878            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 879        }
 880    }
 881
 882    pub fn run_to_position(
 883        &mut self,
 884        breakpoint: SourceBreakpoint,
 885        active_thread_id: ThreadId,
 886        cx: &mut Context<Self>,
 887    ) {
 888        match &mut self.mode {
 889            Mode::Local(local_mode) => {
 890                if !matches!(
 891                    self.thread_states.thread_state(active_thread_id),
 892                    Some(ThreadStatus::Stopped)
 893                ) {
 894                    return;
 895                };
 896                let path = breakpoint.path.clone();
 897                local_mode.tmp_breakpoint = Some(breakpoint);
 898                let task = local_mode.send_breakpoints_from_path(
 899                    path,
 900                    BreakpointUpdatedReason::Toggled,
 901                    cx,
 902                );
 903
 904                cx.spawn(async move |this, cx| {
 905                    task.await;
 906                    this.update(cx, |this, cx| {
 907                        this.continue_thread(active_thread_id, cx);
 908                    })
 909                })
 910                .detach();
 911            }
 912            Mode::Remote(_) => {}
 913        }
 914    }
 915
 916    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
 917        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
 918    }
 919
 920    pub fn output(
 921        &self,
 922        since: OutputToken,
 923    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 924        if self.output_token.0 == 0 {
 925            return (self.output.range(0..0), OutputToken(0));
 926        };
 927
 928        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 929
 930        let clamped_events_since = events_since.clamp(0, self.output.len());
 931        (
 932            self.output
 933                .range(self.output.len() - clamped_events_since..),
 934            self.output_token,
 935        )
 936    }
 937
 938    pub fn respond_to_client(
 939        &self,
 940        request_seq: u64,
 941        success: bool,
 942        command: String,
 943        body: Option<serde_json::Value>,
 944        cx: &mut Context<Self>,
 945    ) -> Task<Result<()>> {
 946        let Some(local_session) = self.as_local().cloned() else {
 947            unreachable!("Cannot respond to remote client");
 948        };
 949
 950        cx.background_spawn(async move {
 951            local_session
 952                .client
 953                .send_message(Message::Response(Response {
 954                    body,
 955                    success,
 956                    command,
 957                    seq: request_seq + 1,
 958                    request_seq,
 959                    message: None,
 960                }))
 961                .await
 962        })
 963    }
 964
 965    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
 966        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
 967            let breakpoint = local.tmp_breakpoint.take()?;
 968            let path = breakpoint.path.clone();
 969            Some((local, path))
 970        }) {
 971            local
 972                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
 973                .detach();
 974        };
 975
 976        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
 977            self.thread_states.stop_all_threads();
 978
 979            self.invalidate_command_type::<StackTraceCommand>();
 980        }
 981
 982        // Event if we stopped all threads we still need to insert the thread_id
 983        // to our own data
 984        if let Some(thread_id) = event.thread_id {
 985            self.thread_states.stop_thread(ThreadId(thread_id));
 986
 987            self.invalidate_state(
 988                &StackTraceCommand {
 989                    thread_id,
 990                    start_frame: None,
 991                    levels: None,
 992                }
 993                .into(),
 994            );
 995        }
 996
 997        self.invalidate_generic();
 998        self.threads.clear();
 999        self.variables.clear();
1000        cx.emit(SessionEvent::Stopped(
1001            event
1002                .thread_id
1003                .map(Into::into)
1004                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1005        ));
1006        cx.notify();
1007    }
1008
1009    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1010        match *event {
1011            Events::Initialized(_) => {
1012                debug_assert!(
1013                    false,
1014                    "Initialized event should have been handled in LocalMode"
1015                );
1016            }
1017            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1018            Events::Continued(event) => {
1019                if event.all_threads_continued.unwrap_or_default() {
1020                    self.thread_states.continue_all_threads();
1021                } else {
1022                    self.thread_states
1023                        .continue_thread(ThreadId(event.thread_id));
1024                }
1025                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1026                self.invalidate_generic();
1027            }
1028            Events::Exited(_event) => {
1029                self.clear_active_debug_line(cx);
1030            }
1031            Events::Terminated(_) => {
1032                self.is_session_terminated = true;
1033                self.clear_active_debug_line(cx);
1034            }
1035            Events::Thread(event) => {
1036                let thread_id = ThreadId(event.thread_id);
1037
1038                match event.reason {
1039                    dap::ThreadEventReason::Started => {
1040                        self.thread_states.continue_thread(thread_id);
1041                    }
1042                    dap::ThreadEventReason::Exited => {
1043                        self.thread_states.exit_thread(thread_id);
1044                    }
1045                    reason => {
1046                        log::error!("Unhandled thread event reason {:?}", reason);
1047                    }
1048                }
1049                self.invalidate_state(&ThreadsCommand.into());
1050                cx.notify();
1051            }
1052            Events::Output(event) => {
1053                if event
1054                    .category
1055                    .as_ref()
1056                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1057                {
1058                    return;
1059                }
1060
1061                self.output.push_back(event);
1062                self.output_token.0 += 1;
1063                cx.notify();
1064            }
1065            Events::Breakpoint(_) => {}
1066            Events::Module(event) => {
1067                match event.reason {
1068                    dap::ModuleEventReason::New => {
1069                        self.modules.push(event.module);
1070                    }
1071                    dap::ModuleEventReason::Changed => {
1072                        if let Some(module) = self
1073                            .modules
1074                            .iter_mut()
1075                            .find(|other| event.module.id == other.id)
1076                        {
1077                            *module = event.module;
1078                        }
1079                    }
1080                    dap::ModuleEventReason::Removed => {
1081                        self.modules.retain(|other| event.module.id != other.id);
1082                    }
1083                }
1084
1085                // todo(debugger): We should only send the invalidate command to downstream clients.
1086                // self.invalidate_state(&ModulesCommand.into());
1087            }
1088            Events::LoadedSource(_) => {
1089                self.invalidate_state(&LoadedSourcesCommand.into());
1090            }
1091            Events::Capabilities(event) => {
1092                self.capabilities = self.capabilities.merge(event.capabilities);
1093                cx.notify();
1094            }
1095            Events::Memory(_) => {}
1096            Events::Process(_) => {}
1097            Events::ProgressEnd(_) => {}
1098            Events::ProgressStart(_) => {}
1099            Events::ProgressUpdate(_) => {}
1100            Events::Invalidated(_) => {}
1101            Events::Other(_) => {}
1102        }
1103    }
1104
1105    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1106    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1107        &mut self,
1108        request: T,
1109        process_result: impl FnOnce(
1110            &mut Self,
1111            Result<T::Response>,
1112            &mut Context<Self>,
1113        ) -> Option<T::Response>
1114        + 'static,
1115        cx: &mut Context<Self>,
1116    ) {
1117        const {
1118            assert!(
1119                T::CACHEABLE,
1120                "Only requests marked as cacheable should invoke `fetch`"
1121            );
1122        }
1123
1124        if !self.thread_states.any_stopped_thread()
1125            && request.type_id() != TypeId::of::<ThreadsCommand>()
1126            || self.is_session_terminated
1127        {
1128            return;
1129        }
1130
1131        let request_map = self
1132            .requests
1133            .entry(std::any::TypeId::of::<T>())
1134            .or_default();
1135
1136        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1137            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1138
1139            let task = Self::request_inner::<Arc<T>>(
1140                &self.capabilities,
1141                self.id,
1142                &self.mode,
1143                command,
1144                process_result,
1145                cx,
1146            );
1147            let task = cx
1148                .background_executor()
1149                .spawn(async move {
1150                    let _ = task.await?;
1151                    Some(())
1152                })
1153                .shared();
1154
1155            vacant.insert(task);
1156            cx.notify();
1157        }
1158    }
1159
1160    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1161        capabilities: &Capabilities,
1162        session_id: SessionId,
1163        mode: &Mode,
1164        request: T,
1165        process_result: impl FnOnce(
1166            &mut Self,
1167            Result<T::Response>,
1168            &mut Context<Self>,
1169        ) -> Option<T::Response>
1170        + 'static,
1171        cx: &mut Context<Self>,
1172    ) -> Task<Option<T::Response>> {
1173        if !T::is_supported(&capabilities) {
1174            log::warn!(
1175                "Attempted to send a DAP request that isn't supported: {:?}",
1176                request
1177            );
1178            let error = Err(anyhow::Error::msg(
1179                "Couldn't complete request because it's not supported",
1180            ));
1181            return cx.spawn(async move |this, cx| {
1182                this.update(cx, |this, cx| process_result(this, error, cx))
1183                    .log_err()
1184                    .flatten()
1185            });
1186        }
1187
1188        let request = mode.request_dap(session_id, request, cx);
1189        cx.spawn(async move |this, cx| {
1190            let result = request.await;
1191            this.update(cx, |this, cx| process_result(this, result, cx))
1192                .log_err()
1193                .flatten()
1194        })
1195    }
1196
1197    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1198        &self,
1199        request: T,
1200        process_result: impl FnOnce(
1201            &mut Self,
1202            Result<T::Response>,
1203            &mut Context<Self>,
1204        ) -> Option<T::Response>
1205        + 'static,
1206        cx: &mut Context<Self>,
1207    ) -> Task<Option<T::Response>> {
1208        Self::request_inner(
1209            &self.capabilities,
1210            self.id,
1211            &self.mode,
1212            request,
1213            process_result,
1214            cx,
1215        )
1216    }
1217
1218    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1219        self.requests.remove(&std::any::TypeId::of::<Command>());
1220    }
1221
1222    fn invalidate_generic(&mut self) {
1223        self.invalidate_command_type::<ModulesCommand>();
1224        self.invalidate_command_type::<LoadedSourcesCommand>();
1225        self.invalidate_command_type::<ThreadsCommand>();
1226    }
1227
1228    fn invalidate_state(&mut self, key: &RequestSlot) {
1229        self.requests
1230            .entry((&*key.0 as &dyn Any).type_id())
1231            .and_modify(|request_map| {
1232                request_map.remove(&key);
1233            });
1234    }
1235
1236    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1237        self.thread_states.thread_status(thread_id)
1238    }
1239
1240    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1241        self.fetch(
1242            dap_command::ThreadsCommand,
1243            |this, result, cx| {
1244                let result = result.log_err()?;
1245
1246                this.threads = result
1247                    .iter()
1248                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1249                    .collect();
1250
1251                this.invalidate_command_type::<StackTraceCommand>();
1252                cx.emit(SessionEvent::Threads);
1253                cx.notify();
1254
1255                Some(result)
1256            },
1257            cx,
1258        );
1259
1260        self.threads
1261            .values()
1262            .map(|thread| {
1263                (
1264                    thread.dap.clone(),
1265                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1266                )
1267            })
1268            .collect()
1269    }
1270
1271    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1272        self.fetch(
1273            dap_command::ModulesCommand,
1274            |this, result, cx| {
1275                let result = result.log_err()?;
1276
1277                this.modules = result.iter().cloned().collect();
1278                cx.emit(SessionEvent::Modules);
1279                cx.notify();
1280
1281                Some(result)
1282            },
1283            cx,
1284        );
1285
1286        &self.modules
1287    }
1288
1289    pub fn ignore_breakpoints(&self) -> bool {
1290        self.ignore_breakpoints
1291    }
1292
1293    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1294        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1295    }
1296
1297    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1298        if self.ignore_breakpoints == ignore {
1299            return Task::ready(());
1300        }
1301
1302        self.ignore_breakpoints = ignore;
1303
1304        if let Some(local) = self.as_local() {
1305            local.send_source_breakpoints(ignore, cx)
1306        } else {
1307            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1308            unimplemented!()
1309        }
1310    }
1311
1312    pub fn exception_breakpoints(
1313        &self,
1314    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1315        self.exception_breakpoints.values()
1316    }
1317
1318    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1319        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1320            *is_enabled = !*is_enabled;
1321            self.send_exception_breakpoints(cx);
1322        }
1323    }
1324
1325    fn send_exception_breakpoints(&mut self, cx: &App) {
1326        if let Some(local) = self.as_local() {
1327            let exception_filters = self
1328                .exception_breakpoints
1329                .values()
1330                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1331                .collect();
1332
1333            let supports_exception_filters = self
1334                .capabilities
1335                .supports_exception_filter_options
1336                .unwrap_or_default();
1337            local
1338                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1339                .detach_and_log_err(cx);
1340        } else {
1341            debug_assert!(false, "Not implemented");
1342        }
1343    }
1344
1345    pub fn breakpoints_enabled(&self) -> bool {
1346        self.ignore_breakpoints
1347    }
1348
1349    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1350        self.fetch(
1351            dap_command::LoadedSourcesCommand,
1352            |this, result, cx| {
1353                let result = result.log_err()?;
1354                this.loaded_sources = result.iter().cloned().collect();
1355                cx.emit(SessionEvent::LoadedSources);
1356                cx.notify();
1357                Some(result)
1358            },
1359            cx,
1360        );
1361
1362        &self.loaded_sources
1363    }
1364
1365    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1366        res.log_err()?;
1367        Some(())
1368    }
1369
1370    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1371        thread_id: ThreadId,
1372    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1373    {
1374        move |this, response, cx| match response.log_err() {
1375            Some(response) => Some(response),
1376            None => {
1377                this.thread_states.stop_thread(thread_id);
1378                cx.notify();
1379                None
1380            }
1381        }
1382    }
1383
1384    fn clear_active_debug_line_response(
1385        &mut self,
1386        response: Result<()>,
1387        cx: &mut Context<Session>,
1388    ) -> Option<()> {
1389        response.log_err()?;
1390        self.clear_active_debug_line(cx);
1391        Some(())
1392    }
1393
1394    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1395        self.as_local()
1396            .expect("Message handler will only run in local mode")
1397            .breakpoint_store
1398            .update(cx, |store, cx| {
1399                store.remove_active_position(Some(self.id), cx)
1400            });
1401    }
1402
1403    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1404        self.request(
1405            PauseCommand {
1406                thread_id: thread_id.0,
1407            },
1408            Self::empty_response,
1409            cx,
1410        )
1411        .detach();
1412    }
1413
1414    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1415        self.request(
1416            RestartStackFrameCommand { stack_frame_id },
1417            Self::empty_response,
1418            cx,
1419        )
1420        .detach();
1421    }
1422
1423    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1424        if self.capabilities.supports_restart_request.unwrap_or(false) {
1425            self.request(
1426                RestartCommand {
1427                    raw: args.unwrap_or(Value::Null),
1428                },
1429                Self::empty_response,
1430                cx,
1431            )
1432            .detach();
1433        } else {
1434            self.request(
1435                DisconnectCommand {
1436                    restart: Some(false),
1437                    terminate_debuggee: Some(true),
1438                    suspend_debuggee: Some(false),
1439                },
1440                Self::empty_response,
1441                cx,
1442            )
1443            .detach();
1444        }
1445    }
1446
1447    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1448        self.is_session_terminated = true;
1449        self.thread_states.exit_all_threads();
1450        cx.notify();
1451
1452        let task = if self
1453            .capabilities
1454            .supports_terminate_request
1455            .unwrap_or_default()
1456        {
1457            self.request(
1458                TerminateCommand {
1459                    restart: Some(false),
1460                },
1461                Self::clear_active_debug_line_response,
1462                cx,
1463            )
1464        } else {
1465            self.request(
1466                DisconnectCommand {
1467                    restart: Some(false),
1468                    terminate_debuggee: Some(true),
1469                    suspend_debuggee: Some(false),
1470                },
1471                Self::clear_active_debug_line_response,
1472                cx,
1473            )
1474        };
1475
1476        cx.emit(SessionStateEvent::Shutdown);
1477
1478        cx.background_spawn(async move {
1479            let _ = task.await;
1480        })
1481    }
1482
1483    pub fn completions(
1484        &mut self,
1485        query: CompletionsQuery,
1486        cx: &mut Context<Self>,
1487    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1488        let task = self.request(query, |_, result, _| result.log_err(), cx);
1489
1490        cx.background_executor().spawn(async move {
1491            anyhow::Ok(
1492                task.await
1493                    .map(|response| response.targets)
1494                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1495            )
1496        })
1497    }
1498
1499    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1500        self.thread_states.continue_thread(thread_id);
1501        self.request(
1502            ContinueCommand {
1503                args: ContinueArguments {
1504                    thread_id: thread_id.0,
1505                    single_thread: Some(true),
1506                },
1507            },
1508            Self::on_step_response::<ContinueCommand>(thread_id),
1509            cx,
1510        )
1511        .detach();
1512    }
1513
1514    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1515        match self.mode {
1516            Mode::Local(ref local) => Some(local.client.clone()),
1517            Mode::Remote(_) => None,
1518        }
1519    }
1520
1521    pub fn step_over(
1522        &mut self,
1523        thread_id: ThreadId,
1524        granularity: SteppingGranularity,
1525        cx: &mut Context<Self>,
1526    ) {
1527        let supports_single_thread_execution_requests =
1528            self.capabilities.supports_single_thread_execution_requests;
1529        let supports_stepping_granularity = self
1530            .capabilities
1531            .supports_stepping_granularity
1532            .unwrap_or_default();
1533
1534        let command = NextCommand {
1535            inner: StepCommand {
1536                thread_id: thread_id.0,
1537                granularity: supports_stepping_granularity.then(|| granularity),
1538                single_thread: supports_single_thread_execution_requests,
1539            },
1540        };
1541
1542        self.thread_states.process_step(thread_id);
1543        self.request(
1544            command,
1545            Self::on_step_response::<NextCommand>(thread_id),
1546            cx,
1547        )
1548        .detach();
1549    }
1550
1551    pub fn step_in(
1552        &mut self,
1553        thread_id: ThreadId,
1554        granularity: SteppingGranularity,
1555        cx: &mut Context<Self>,
1556    ) {
1557        let supports_single_thread_execution_requests =
1558            self.capabilities.supports_single_thread_execution_requests;
1559        let supports_stepping_granularity = self
1560            .capabilities
1561            .supports_stepping_granularity
1562            .unwrap_or_default();
1563
1564        let command = StepInCommand {
1565            inner: StepCommand {
1566                thread_id: thread_id.0,
1567                granularity: supports_stepping_granularity.then(|| granularity),
1568                single_thread: supports_single_thread_execution_requests,
1569            },
1570        };
1571
1572        self.thread_states.process_step(thread_id);
1573        self.request(
1574            command,
1575            Self::on_step_response::<StepInCommand>(thread_id),
1576            cx,
1577        )
1578        .detach();
1579    }
1580
1581    pub fn step_out(
1582        &mut self,
1583        thread_id: ThreadId,
1584        granularity: SteppingGranularity,
1585        cx: &mut Context<Self>,
1586    ) {
1587        let supports_single_thread_execution_requests =
1588            self.capabilities.supports_single_thread_execution_requests;
1589        let supports_stepping_granularity = self
1590            .capabilities
1591            .supports_stepping_granularity
1592            .unwrap_or_default();
1593
1594        let command = StepOutCommand {
1595            inner: StepCommand {
1596                thread_id: thread_id.0,
1597                granularity: supports_stepping_granularity.then(|| granularity),
1598                single_thread: supports_single_thread_execution_requests,
1599            },
1600        };
1601
1602        self.thread_states.process_step(thread_id);
1603        self.request(
1604            command,
1605            Self::on_step_response::<StepOutCommand>(thread_id),
1606            cx,
1607        )
1608        .detach();
1609    }
1610
1611    pub fn step_back(
1612        &mut self,
1613        thread_id: ThreadId,
1614        granularity: SteppingGranularity,
1615        cx: &mut Context<Self>,
1616    ) {
1617        let supports_single_thread_execution_requests =
1618            self.capabilities.supports_single_thread_execution_requests;
1619        let supports_stepping_granularity = self
1620            .capabilities
1621            .supports_stepping_granularity
1622            .unwrap_or_default();
1623
1624        let command = StepBackCommand {
1625            inner: StepCommand {
1626                thread_id: thread_id.0,
1627                granularity: supports_stepping_granularity.then(|| granularity),
1628                single_thread: supports_single_thread_execution_requests,
1629            },
1630        };
1631
1632        self.thread_states.process_step(thread_id);
1633
1634        self.request(
1635            command,
1636            Self::on_step_response::<StepBackCommand>(thread_id),
1637            cx,
1638        )
1639        .detach();
1640    }
1641
1642    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1643        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1644            && self.requests.contains_key(&ThreadsCommand.type_id())
1645            && self.threads.contains_key(&thread_id)
1646        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1647        // We could still be using an old thread id and have sent a new thread's request
1648        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1649        // But it very well could cause a minor bug in the future that is hard to track down
1650        {
1651            self.fetch(
1652                super::dap_command::StackTraceCommand {
1653                    thread_id: thread_id.0,
1654                    start_frame: None,
1655                    levels: None,
1656                },
1657                move |this, stack_frames, cx| {
1658                    let stack_frames = stack_frames.log_err()?;
1659
1660                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1661                        thread.stack_frame_ids =
1662                            stack_frames.iter().map(|frame| frame.id).collect();
1663                    });
1664                    debug_assert!(
1665                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1666                        "Sent request for thread_id that doesn't exist"
1667                    );
1668
1669                    this.stack_frames.extend(
1670                        stack_frames
1671                            .iter()
1672                            .cloned()
1673                            .map(|frame| (frame.id, StackFrame::from(frame))),
1674                    );
1675
1676                    this.invalidate_command_type::<ScopesCommand>();
1677                    this.invalidate_command_type::<VariablesCommand>();
1678
1679                    cx.emit(SessionEvent::StackTrace);
1680                    cx.notify();
1681                    Some(stack_frames)
1682                },
1683                cx,
1684            );
1685        }
1686
1687        self.threads
1688            .get(&thread_id)
1689            .map(|thread| {
1690                thread
1691                    .stack_frame_ids
1692                    .iter()
1693                    .filter_map(|id| self.stack_frames.get(id))
1694                    .cloned()
1695                    .collect()
1696            })
1697            .unwrap_or_default()
1698    }
1699
1700    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1701        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1702            && self
1703                .requests
1704                .contains_key(&TypeId::of::<StackTraceCommand>())
1705        {
1706            self.fetch(
1707                ScopesCommand { stack_frame_id },
1708                move |this, scopes, cx| {
1709                    let scopes = scopes.log_err()?;
1710
1711                    for scope in scopes .iter(){
1712                        this.variables(scope.variables_reference, cx);
1713                    }
1714
1715                    let entry = this
1716                        .stack_frames
1717                        .entry(stack_frame_id)
1718                        .and_modify(|stack_frame| {
1719                            stack_frame.scopes = scopes.clone();
1720                        });
1721
1722                    cx.emit(SessionEvent::Variables);
1723
1724                    debug_assert!(
1725                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1726                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1727                    );
1728
1729                    Some(scopes)
1730                },
1731                cx,
1732            );
1733        }
1734
1735        self.stack_frames
1736            .get(&stack_frame_id)
1737            .map(|frame| frame.scopes.as_slice())
1738            .unwrap_or_default()
1739    }
1740
1741    pub fn variables(
1742        &mut self,
1743        variables_reference: VariableReference,
1744        cx: &mut Context<Self>,
1745    ) -> Vec<dap::Variable> {
1746        let command = VariablesCommand {
1747            variables_reference,
1748            filter: None,
1749            start: None,
1750            count: None,
1751            format: None,
1752        };
1753
1754        self.fetch(
1755            command,
1756            move |this, variables, cx| {
1757                let variables = variables.log_err()?;
1758                this.variables
1759                    .insert(variables_reference, variables.clone());
1760
1761                cx.emit(SessionEvent::Variables);
1762                Some(variables)
1763            },
1764            cx,
1765        );
1766
1767        self.variables
1768            .get(&variables_reference)
1769            .cloned()
1770            .unwrap_or_default()
1771    }
1772
1773    pub fn set_variable_value(
1774        &mut self,
1775        variables_reference: u64,
1776        name: String,
1777        value: String,
1778        cx: &mut Context<Self>,
1779    ) {
1780        if self.capabilities.supports_set_variable.unwrap_or_default() {
1781            self.request(
1782                SetVariableValueCommand {
1783                    name,
1784                    value,
1785                    variables_reference,
1786                },
1787                move |this, response, cx| {
1788                    let response = response.log_err()?;
1789                    this.invalidate_command_type::<VariablesCommand>();
1790                    cx.notify();
1791                    Some(response)
1792                },
1793                cx,
1794            )
1795            .detach()
1796        }
1797    }
1798
1799    pub fn evaluate(
1800        &mut self,
1801        expression: String,
1802        context: Option<EvaluateArgumentsContext>,
1803        frame_id: Option<u64>,
1804        source: Option<Source>,
1805        cx: &mut Context<Self>,
1806    ) {
1807        self.request(
1808            EvaluateCommand {
1809                expression,
1810                context,
1811                frame_id,
1812                source,
1813            },
1814            |this, response, cx| {
1815                let response = response.log_err()?;
1816                this.output_token.0 += 1;
1817                this.output.push_back(dap::OutputEvent {
1818                    category: None,
1819                    output: response.result.clone(),
1820                    group: None,
1821                    variables_reference: Some(response.variables_reference),
1822                    source: None,
1823                    line: None,
1824                    column: None,
1825                    data: None,
1826                    location_reference: None,
1827                });
1828
1829                this.invalidate_command_type::<ScopesCommand>();
1830                cx.notify();
1831                Some(response)
1832            },
1833            cx,
1834        )
1835        .detach();
1836    }
1837
1838    pub fn location(
1839        &mut self,
1840        reference: u64,
1841        cx: &mut Context<Self>,
1842    ) -> Option<dap::LocationsResponse> {
1843        self.fetch(
1844            LocationsCommand { reference },
1845            move |this, response, _| {
1846                let response = response.log_err()?;
1847                this.locations.insert(reference, response.clone());
1848                Some(response)
1849            },
1850            cx,
1851        );
1852        self.locations.get(&reference).cloned()
1853    }
1854    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1855        let command = DisconnectCommand {
1856            restart: Some(false),
1857            terminate_debuggee: Some(true),
1858            suspend_debuggee: Some(false),
1859        };
1860
1861        self.request(command, Self::empty_response, cx).detach()
1862    }
1863
1864    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1865        if self
1866            .capabilities
1867            .supports_terminate_threads_request
1868            .unwrap_or_default()
1869        {
1870            self.request(
1871                TerminateThreadsCommand {
1872                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1873                },
1874                Self::clear_active_debug_line_response,
1875                cx,
1876            )
1877            .detach();
1878        } else {
1879            self.shutdown(cx).detach();
1880        }
1881    }
1882}
1883
1884fn create_local_session(
1885    breakpoint_store: Entity<BreakpointStore>,
1886    session_id: SessionId,
1887    parent_session: Option<Entity<Session>>,
1888    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1889    initialized_tx: oneshot::Sender<()>,
1890    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1891    mode: LocalMode,
1892    cx: &mut Context<Session>,
1893) -> Session {
1894    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1895        let mut initialized_tx = Some(initialized_tx);
1896        while let Some(message) = message_rx.next().await {
1897            if let Message::Event(event) = message {
1898                if let Events::Initialized(_) = *event {
1899                    if let Some(tx) = initialized_tx.take() {
1900                        tx.send(()).ok();
1901                    }
1902                } else {
1903                    let Ok(_) = this.update(cx, |session, cx| {
1904                        session.handle_dap_event(event, cx);
1905                    }) else {
1906                        break;
1907                    };
1908                }
1909            } else {
1910                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1911                else {
1912                    break;
1913                };
1914            }
1915        }
1916    })];
1917
1918    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1919        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1920            if let Some(local) = (!this.ignore_breakpoints)
1921                .then(|| this.as_local_mut())
1922                .flatten()
1923            {
1924                local
1925                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1926                    .detach();
1927            };
1928        }
1929        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1930            if let Some(local) = (!this.ignore_breakpoints)
1931                .then(|| this.as_local_mut())
1932                .flatten()
1933            {
1934                local.unset_breakpoints_from_paths(paths, cx).detach();
1935            }
1936        }
1937        BreakpointStoreEvent::ActiveDebugLineChanged => {}
1938    })
1939    .detach();
1940
1941    Session {
1942        mode: Mode::Local(mode),
1943        id: session_id,
1944        child_session_ids: HashSet::default(),
1945        parent_id: parent_session.map(|session| session.read(cx).id),
1946        variables: Default::default(),
1947        capabilities: Capabilities::default(),
1948        thread_states: ThreadStates::default(),
1949        output_token: OutputToken(0),
1950        ignore_breakpoints: false,
1951        output: circular_buffer::CircularBuffer::boxed(),
1952        requests: HashMap::default(),
1953        modules: Vec::default(),
1954        loaded_sources: Vec::default(),
1955        threads: IndexMap::default(),
1956        stack_frames: IndexMap::default(),
1957        locations: Default::default(),
1958        exception_breakpoints: Default::default(),
1959        _background_tasks,
1960        is_session_terminated: false,
1961    }
1962}