session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use anyhow::{Context as _, Result, anyhow};
  13use collections::{HashMap, HashSet, IndexMap, IndexSet};
  14use dap::adapters::DebugAdapterBinary;
  15use dap::messages::Response;
  16use dap::{
  17    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  18    SteppingGranularity, StoppedEvent, VariableReference,
  19    client::{DebugAdapterClient, SessionId},
  20    messages::{Events, Message},
  21};
  22use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
  23use futures::channel::oneshot;
  24use futures::{FutureExt, future::Shared};
  25use gpui::{
  26    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  27    Task, WeakEntity,
  28};
  29use rpc::AnyProtoClient;
  30use serde_json::{Value, json};
  31use smol::stream::StreamExt;
  32use std::any::TypeId;
  33use std::collections::BTreeMap;
  34use std::u64;
  35use std::{
  36    any::Any,
  37    collections::hash_map::Entry,
  38    hash::{Hash, Hasher},
  39    path::Path,
  40    sync::Arc,
  41};
  42use task::DebugTaskDefinition;
  43use text::{PointUtf16, ToPointUtf16};
  44use util::{ResultExt, merge_json_value_into};
  45
  46#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  47#[repr(transparent)]
  48pub struct ThreadId(pub u64);
  49
  50impl ThreadId {
  51    pub const MIN: ThreadId = ThreadId(u64::MIN);
  52    pub const MAX: ThreadId = ThreadId(u64::MAX);
  53}
  54
  55impl From<u64> for ThreadId {
  56    fn from(id: u64) -> Self {
  57        Self(id)
  58    }
  59}
  60
  61#[derive(Clone, Debug)]
  62pub struct StackFrame {
  63    pub dap: dap::StackFrame,
  64    pub scopes: Vec<dap::Scope>,
  65}
  66
  67impl From<dap::StackFrame> for StackFrame {
  68    fn from(stack_frame: dap::StackFrame) -> Self {
  69        Self {
  70            scopes: vec![],
  71            dap: stack_frame,
  72        }
  73    }
  74}
  75
  76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  77pub enum ThreadStatus {
  78    #[default]
  79    Running,
  80    Stopped,
  81    Stepping,
  82    Exited,
  83    Ended,
  84}
  85
  86impl ThreadStatus {
  87    pub fn label(&self) -> &'static str {
  88        match self {
  89            ThreadStatus::Running => "Running",
  90            ThreadStatus::Stopped => "Stopped",
  91            ThreadStatus::Stepping => "Stepping",
  92            ThreadStatus::Exited => "Exited",
  93            ThreadStatus::Ended => "Ended",
  94        }
  95    }
  96}
  97
  98#[derive(Debug)]
  99pub struct Thread {
 100    dap: dap::Thread,
 101    stack_frame_ids: IndexSet<StackFrameId>,
 102    _has_stopped: bool,
 103}
 104
 105impl From<dap::Thread> for Thread {
 106    fn from(dap: dap::Thread) -> Self {
 107        Self {
 108            dap,
 109            stack_frame_ids: Default::default(),
 110            _has_stopped: false,
 111        }
 112    }
 113}
 114
 115type UpstreamProjectId = u64;
 116
 117struct RemoteConnection {
 118    _client: AnyProtoClient,
 119    _upstream_project_id: UpstreamProjectId,
 120    _adapter_name: SharedString,
 121}
 122
 123impl RemoteConnection {
 124    fn send_proto_client_request<R: DapCommand>(
 125        &self,
 126        _request: R,
 127        _session_id: SessionId,
 128        cx: &mut App,
 129    ) -> Task<Result<R::Response>> {
 130        // let message = request.to_proto(session_id, self.upstream_project_id);
 131        // let upstream_client = self.client.clone();
 132        cx.background_executor().spawn(async move {
 133            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 134            // let response = upstream_client.request(message).await?;
 135            // request.response_from_proto(response)
 136            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 137        })
 138    }
 139
 140    fn request<R: DapCommand>(
 141        &self,
 142        request: R,
 143        session_id: SessionId,
 144        cx: &mut App,
 145    ) -> Task<Result<R::Response>>
 146    where
 147        <R::DapRequest as dap::requests::Request>::Response: 'static,
 148        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 149    {
 150        return self.send_proto_client_request::<R>(request, session_id, cx);
 151    }
 152}
 153
 154enum Mode {
 155    Local(LocalMode),
 156    Remote(RemoteConnection),
 157}
 158
 159#[derive(Clone)]
 160pub struct LocalMode {
 161    client: Arc<DebugAdapterClient>,
 162    definition: DebugTaskDefinition,
 163    binary: DebugAdapterBinary,
 164    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 165    tmp_breakpoint: Option<SourceBreakpoint>,
 166}
 167
 168fn client_source(abs_path: &Path) -> dap::Source {
 169    dap::Source {
 170        name: abs_path
 171            .file_name()
 172            .map(|filename| filename.to_string_lossy().to_string()),
 173        path: Some(abs_path.to_string_lossy().to_string()),
 174        source_reference: None,
 175        presentation_hint: None,
 176        origin: None,
 177        sources: None,
 178        adapter_data: None,
 179        checksums: None,
 180    }
 181}
 182
 183impl LocalMode {
 184    fn new(
 185        session_id: SessionId,
 186        parent_session: Option<Entity<Session>>,
 187        breakpoint_store: Entity<BreakpointStore>,
 188        config: DebugTaskDefinition,
 189        binary: DebugAdapterBinary,
 190        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 191        cx: AsyncApp,
 192    ) -> Task<Result<Self>> {
 193        Self::new_inner(
 194            session_id,
 195            parent_session,
 196            breakpoint_store,
 197            config,
 198            binary,
 199            messages_tx,
 200            async |_, _| {},
 201            cx,
 202        )
 203    }
 204
 205    fn new_inner(
 206        session_id: SessionId,
 207        parent_session: Option<Entity<Session>>,
 208        breakpoint_store: Entity<BreakpointStore>,
 209        config: DebugTaskDefinition,
 210        binary: DebugAdapterBinary,
 211        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 212        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 213        cx: AsyncApp,
 214    ) -> Task<Result<Self>> {
 215        cx.spawn(async move |cx| {
 216            let message_handler = Box::new(move |message| {
 217                messages_tx.unbounded_send(message).ok();
 218            });
 219
 220            let client = Arc::new(
 221                if let Some(client) = parent_session
 222                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 223                    .flatten()
 224                {
 225                    client
 226                        .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 227                        .await?
 228                } else {
 229                    DebugAdapterClient::start(
 230                        session_id,
 231                        binary.clone(),
 232                        message_handler,
 233                        cx.clone(),
 234                    )
 235                    .await
 236                    .with_context(|| "Failed to start communication with debug adapter")?
 237                },
 238            );
 239
 240            let mut session = Self {
 241                client,
 242                breakpoint_store,
 243                tmp_breakpoint: None,
 244                definition: config,
 245                binary,
 246            };
 247
 248            on_initialized(&mut session, cx.clone()).await;
 249
 250            Ok(session)
 251        })
 252    }
 253
 254    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 255        let tasks: Vec<_> = paths
 256            .into_iter()
 257            .map(|path| {
 258                self.request(
 259                    dap_command::SetBreakpoints {
 260                        source: client_source(path),
 261                        source_modified: None,
 262                        breakpoints: vec![],
 263                    },
 264                    cx.background_executor().clone(),
 265                )
 266            })
 267            .collect();
 268
 269        cx.background_spawn(async move {
 270            futures::future::join_all(tasks)
 271                .await
 272                .iter()
 273                .for_each(|res| match res {
 274                    Ok(_) => {}
 275                    Err(err) => {
 276                        log::warn!("Set breakpoints request failed: {}", err);
 277                    }
 278                });
 279        })
 280    }
 281
 282    fn send_breakpoints_from_path(
 283        &self,
 284        abs_path: Arc<Path>,
 285        reason: BreakpointUpdatedReason,
 286        cx: &mut App,
 287    ) -> Task<()> {
 288        let breakpoints = self
 289            .breakpoint_store
 290            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 291            .into_iter()
 292            .filter(|bp| bp.state.is_enabled())
 293            .chain(self.tmp_breakpoint.clone())
 294            .map(Into::into)
 295            .collect();
 296
 297        let task = self.request(
 298            dap_command::SetBreakpoints {
 299                source: client_source(&abs_path),
 300                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 301                breakpoints,
 302            },
 303            cx.background_executor().clone(),
 304        );
 305
 306        cx.background_spawn(async move {
 307            match task.await {
 308                Ok(_) => {}
 309                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 310            }
 311        })
 312    }
 313
 314    fn send_exception_breakpoints(
 315        &self,
 316        filters: Vec<ExceptionBreakpointsFilter>,
 317        supports_filter_options: bool,
 318        cx: &App,
 319    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 320        let arg = if supports_filter_options {
 321            SetExceptionBreakpoints::WithOptions {
 322                filters: filters
 323                    .into_iter()
 324                    .map(|filter| ExceptionFilterOptions {
 325                        filter_id: filter.filter,
 326                        condition: None,
 327                        mode: None,
 328                    })
 329                    .collect(),
 330            }
 331        } else {
 332            SetExceptionBreakpoints::Plain {
 333                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 334            }
 335        };
 336        self.request(arg, cx.background_executor().clone())
 337    }
 338    fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
 339        let mut breakpoint_tasks = Vec::new();
 340        let breakpoints = self
 341            .breakpoint_store
 342            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 343
 344        for (path, breakpoints) in breakpoints {
 345            let breakpoints = if ignore_breakpoints {
 346                vec![]
 347            } else {
 348                breakpoints
 349                    .into_iter()
 350                    .filter(|bp| bp.state.is_enabled())
 351                    .map(Into::into)
 352                    .collect()
 353            };
 354
 355            breakpoint_tasks.push(self.request(
 356                dap_command::SetBreakpoints {
 357                    source: client_source(&path),
 358                    source_modified: Some(false),
 359                    breakpoints,
 360                },
 361                cx.background_executor().clone(),
 362            ));
 363        }
 364
 365        cx.background_spawn(async move {
 366            futures::future::join_all(breakpoint_tasks)
 367                .await
 368                .iter()
 369                .for_each(|res| match res {
 370                    Ok(_) => {}
 371                    Err(err) => {
 372                        log::warn!("Set breakpoints request failed: {}", err);
 373                    }
 374                });
 375        })
 376    }
 377
 378    pub fn label(&self) -> String {
 379        self.definition.label.clone()
 380    }
 381
 382    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 383        let adapter_id = self.binary.adapter_name.to_string();
 384
 385        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 386    }
 387
 388    fn initialize_sequence(
 389        &self,
 390        capabilities: &Capabilities,
 391        initialized_rx: oneshot::Receiver<()>,
 392        cx: &App,
 393    ) -> Task<Result<()>> {
 394        let mut raw = self.binary.request_args.clone();
 395
 396        merge_json_value_into(
 397            self.definition.initialize_args.clone().unwrap_or(json!({})),
 398            &mut raw.configuration,
 399        );
 400
 401        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 402        let launch = match raw.request {
 403            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
 404                Launch {
 405                    raw: raw.configuration,
 406                },
 407                cx.background_executor().clone(),
 408            ),
 409            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
 410                Attach {
 411                    raw: raw.configuration,
 412                },
 413                cx.background_executor().clone(),
 414            ),
 415        };
 416
 417        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 418        let exception_filters = capabilities
 419            .exception_breakpoint_filters
 420            .as_ref()
 421            .map(|exception_filters| {
 422                exception_filters
 423                    .iter()
 424                    .filter(|filter| filter.default == Some(true))
 425                    .cloned()
 426                    .collect::<Vec<_>>()
 427            })
 428            .unwrap_or_default();
 429        let supports_exception_filters = capabilities
 430            .supports_exception_filter_options
 431            .unwrap_or_default();
 432        let configuration_sequence = cx.spawn({
 433            let this = self.clone();
 434            async move |cx| {
 435                initialized_rx.await?;
 436                // todo(debugger) figure out if we want to handle a breakpoint response error
 437                // This will probably consist of letting a user know that breakpoints failed to be set
 438                cx.update(|cx| this.send_source_breakpoints(false, cx))?
 439                    .await;
 440                cx.update(|cx| {
 441                    this.send_exception_breakpoints(
 442                        exception_filters,
 443                        supports_exception_filters,
 444                        cx,
 445                    )
 446                })?
 447                .await
 448                .ok();
 449                let ret = if configuration_done_supported {
 450                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 451                } else {
 452                    Task::ready(Ok(()))
 453                }
 454                .await;
 455                ret
 456            }
 457        });
 458
 459        cx.background_spawn(async move {
 460            futures::future::try_join(launch, configuration_sequence).await?;
 461            Ok(())
 462        })
 463    }
 464
 465    fn request<R: LocalDapCommand>(
 466        &self,
 467        request: R,
 468        executor: BackgroundExecutor,
 469    ) -> Task<Result<R::Response>>
 470    where
 471        <R::DapRequest as dap::requests::Request>::Response: 'static,
 472        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 473    {
 474        let request = Arc::new(request);
 475
 476        let request_clone = request.clone();
 477        let connection = self.client.clone();
 478        let request_task = executor.spawn(async move {
 479            let args = request_clone.to_dap();
 480            connection.request::<R::DapRequest>(args).await
 481        });
 482
 483        executor.spawn(async move {
 484            let response = request.response_from_dap(request_task.await?);
 485            response
 486        })
 487    }
 488}
 489impl From<RemoteConnection> for Mode {
 490    fn from(value: RemoteConnection) -> Self {
 491        Self::Remote(value)
 492    }
 493}
 494
 495impl Mode {
 496    fn request_dap<R: DapCommand>(
 497        &self,
 498        session_id: SessionId,
 499        request: R,
 500        cx: &mut Context<Session>,
 501    ) -> Task<Result<R::Response>>
 502    where
 503        <R::DapRequest as dap::requests::Request>::Response: 'static,
 504        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 505    {
 506        match self {
 507            Mode::Local(debug_adapter_client) => {
 508                debug_adapter_client.request(request, cx.background_executor().clone())
 509            }
 510            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 511        }
 512    }
 513}
 514
 515#[derive(Default)]
 516struct ThreadStates {
 517    global_state: Option<ThreadStatus>,
 518    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 519}
 520
 521impl ThreadStates {
 522    fn stop_all_threads(&mut self) {
 523        self.global_state = Some(ThreadStatus::Stopped);
 524        self.known_thread_states.clear();
 525    }
 526
 527    fn exit_all_threads(&mut self) {
 528        self.global_state = Some(ThreadStatus::Exited);
 529        self.known_thread_states.clear();
 530    }
 531
 532    fn continue_all_threads(&mut self) {
 533        self.global_state = Some(ThreadStatus::Running);
 534        self.known_thread_states.clear();
 535    }
 536
 537    fn stop_thread(&mut self, thread_id: ThreadId) {
 538        self.known_thread_states
 539            .insert(thread_id, ThreadStatus::Stopped);
 540    }
 541
 542    fn continue_thread(&mut self, thread_id: ThreadId) {
 543        self.known_thread_states
 544            .insert(thread_id, ThreadStatus::Running);
 545    }
 546
 547    fn process_step(&mut self, thread_id: ThreadId) {
 548        self.known_thread_states
 549            .insert(thread_id, ThreadStatus::Stepping);
 550    }
 551
 552    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 553        self.thread_state(thread_id)
 554            .unwrap_or(ThreadStatus::Running)
 555    }
 556
 557    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 558        self.known_thread_states
 559            .get(&thread_id)
 560            .copied()
 561            .or(self.global_state)
 562    }
 563
 564    fn exit_thread(&mut self, thread_id: ThreadId) {
 565        self.known_thread_states
 566            .insert(thread_id, ThreadStatus::Exited);
 567    }
 568
 569    fn any_stopped_thread(&self) -> bool {
 570        self.global_state
 571            .is_some_and(|state| state == ThreadStatus::Stopped)
 572            || self
 573                .known_thread_states
 574                .values()
 575                .any(|status| *status == ThreadStatus::Stopped)
 576    }
 577}
 578const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 579
 580type IsEnabled = bool;
 581
 582#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 583pub struct OutputToken(pub usize);
 584/// Represents a current state of a single debug adapter and provides ways to mutate it.
 585pub struct Session {
 586    mode: Mode,
 587    pub(super) capabilities: Capabilities,
 588    id: SessionId,
 589    child_session_ids: HashSet<SessionId>,
 590    parent_id: Option<SessionId>,
 591    ignore_breakpoints: bool,
 592    modules: Vec<dap::Module>,
 593    loaded_sources: Vec<dap::Source>,
 594    output_token: OutputToken,
 595    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 596    threads: IndexMap<ThreadId, Thread>,
 597    thread_states: ThreadStates,
 598    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 599    stack_frames: IndexMap<StackFrameId, StackFrame>,
 600    locations: HashMap<u64, dap::LocationsResponse>,
 601    is_session_terminated: bool,
 602    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 603    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 604    _background_tasks: Vec<Task<()>>,
 605}
 606
 607trait CacheableCommand: Any + Send + Sync {
 608    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 609    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 610    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 611}
 612
 613impl<T> CacheableCommand for T
 614where
 615    T: DapCommand + PartialEq + Eq + Hash,
 616{
 617    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 618        (rhs as &dyn Any)
 619            .downcast_ref::<Self>()
 620            .map_or(false, |rhs| self == rhs)
 621    }
 622
 623    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 624        T::hash(self, &mut hasher);
 625    }
 626
 627    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 628        self
 629    }
 630}
 631
 632pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 633
 634impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 635    fn from(request: T) -> Self {
 636        Self(Arc::new(request))
 637    }
 638}
 639
 640impl PartialEq for RequestSlot {
 641    fn eq(&self, other: &Self) -> bool {
 642        self.0.dyn_eq(other.0.as_ref())
 643    }
 644}
 645
 646impl Eq for RequestSlot {}
 647
 648impl Hash for RequestSlot {
 649    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 650        self.0.dyn_hash(state);
 651        (&*self.0 as &dyn Any).type_id().hash(state)
 652    }
 653}
 654
 655#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 656pub struct CompletionsQuery {
 657    pub query: String,
 658    pub column: u64,
 659    pub line: Option<u64>,
 660    pub frame_id: Option<u64>,
 661}
 662
 663impl CompletionsQuery {
 664    pub fn new(
 665        buffer: &language::Buffer,
 666        cursor_position: language::Anchor,
 667        frame_id: Option<u64>,
 668    ) -> Self {
 669        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 670        Self {
 671            query: buffer.text(),
 672            column: column as u64,
 673            frame_id,
 674            line: Some(row as u64),
 675        }
 676    }
 677}
 678
 679pub enum SessionEvent {
 680    Modules,
 681    LoadedSources,
 682    Stopped(Option<ThreadId>),
 683    StackTrace,
 684    Variables,
 685    Threads,
 686}
 687
 688pub(super) enum SessionStateEvent {
 689    Shutdown,
 690    Restart,
 691}
 692
 693impl EventEmitter<SessionEvent> for Session {}
 694impl EventEmitter<SessionStateEvent> for Session {}
 695
 696// local session will send breakpoint updates to DAP for all new breakpoints
 697// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 698// BreakpointStore notifies session on breakpoint changes
 699impl Session {
 700    pub(crate) fn local(
 701        breakpoint_store: Entity<BreakpointStore>,
 702        session_id: SessionId,
 703        parent_session: Option<Entity<Session>>,
 704        binary: DebugAdapterBinary,
 705        config: DebugTaskDefinition,
 706        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 707        initialized_tx: oneshot::Sender<()>,
 708        cx: &mut App,
 709    ) -> Task<Result<Entity<Self>>> {
 710        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 711
 712        cx.spawn(async move |cx| {
 713            let mode = LocalMode::new(
 714                session_id,
 715                parent_session.clone(),
 716                breakpoint_store.clone(),
 717                config.clone(),
 718                binary,
 719                message_tx,
 720                cx.clone(),
 721            )
 722            .await?;
 723
 724            cx.new(|cx| {
 725                create_local_session(
 726                    breakpoint_store,
 727                    session_id,
 728                    parent_session,
 729                    start_debugging_requests_tx,
 730                    initialized_tx,
 731                    message_rx,
 732                    mode,
 733                    cx,
 734                )
 735            })
 736        })
 737    }
 738
 739    pub(crate) fn remote(
 740        session_id: SessionId,
 741        client: AnyProtoClient,
 742        upstream_project_id: u64,
 743        ignore_breakpoints: bool,
 744    ) -> Self {
 745        Self {
 746            mode: Mode::Remote(RemoteConnection {
 747                _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
 748                _client: client,
 749                _upstream_project_id: upstream_project_id,
 750            }),
 751            id: session_id,
 752            child_session_ids: HashSet::default(),
 753            parent_id: None,
 754            capabilities: Capabilities::default(),
 755            ignore_breakpoints,
 756            variables: Default::default(),
 757            stack_frames: Default::default(),
 758            thread_states: ThreadStates::default(),
 759            output_token: OutputToken(0),
 760            output: circular_buffer::CircularBuffer::boxed(),
 761            requests: HashMap::default(),
 762            modules: Vec::default(),
 763            loaded_sources: Vec::default(),
 764            threads: IndexMap::default(),
 765            _background_tasks: Vec::default(),
 766            locations: Default::default(),
 767            is_session_terminated: false,
 768            exception_breakpoints: Default::default(),
 769        }
 770    }
 771
 772    pub fn session_id(&self) -> SessionId {
 773        self.id
 774    }
 775
 776    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 777        self.child_session_ids.clone()
 778    }
 779
 780    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 781        self.child_session_ids.insert(session_id);
 782    }
 783
 784    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 785        self.child_session_ids.remove(&session_id);
 786    }
 787
 788    pub fn parent_id(&self) -> Option<SessionId> {
 789        self.parent_id
 790    }
 791
 792    pub fn capabilities(&self) -> &Capabilities {
 793        &self.capabilities
 794    }
 795
 796    pub fn binary(&self) -> &DebugAdapterBinary {
 797        let Mode::Local(local_mode) = &self.mode else {
 798            panic!("Session is not local");
 799        };
 800        &local_mode.binary
 801    }
 802
 803    pub fn adapter_name(&self) -> SharedString {
 804        match &self.mode {
 805            Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
 806            Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
 807        }
 808    }
 809
 810    pub fn configuration(&self) -> Option<DebugTaskDefinition> {
 811        if let Mode::Local(local_mode) = &self.mode {
 812            Some(local_mode.definition.clone())
 813        } else {
 814            None
 815        }
 816    }
 817
 818    pub fn is_terminated(&self) -> bool {
 819        self.is_session_terminated
 820    }
 821
 822    pub fn is_local(&self) -> bool {
 823        matches!(self.mode, Mode::Local(_))
 824    }
 825
 826    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 827        match &mut self.mode {
 828            Mode::Local(local_mode) => Some(local_mode),
 829            Mode::Remote(_) => None,
 830        }
 831    }
 832
 833    pub fn as_local(&self) -> Option<&LocalMode> {
 834        match &self.mode {
 835            Mode::Local(local_mode) => Some(local_mode),
 836            Mode::Remote(_) => None,
 837        }
 838    }
 839
 840    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 841        match &self.mode {
 842            Mode::Local(local_mode) => {
 843                let capabilities = local_mode.clone().request_initialization(cx);
 844
 845                cx.spawn(async move |this, cx| {
 846                    let capabilities = capabilities.await?;
 847                    this.update(cx, |session, _| {
 848                        session.capabilities = capabilities;
 849                        let filters = session
 850                            .capabilities
 851                            .exception_breakpoint_filters
 852                            .clone()
 853                            .unwrap_or_default();
 854                        for filter in filters {
 855                            let default = filter.default.unwrap_or_default();
 856                            session
 857                                .exception_breakpoints
 858                                .entry(filter.filter.clone())
 859                                .or_insert_with(|| (filter, default));
 860                        }
 861                    })?;
 862                    Ok(())
 863                })
 864            }
 865            Mode::Remote(_) => Task::ready(Err(anyhow!(
 866                "Cannot send initialize request from remote session"
 867            ))),
 868        }
 869    }
 870
 871    pub(super) fn initialize_sequence(
 872        &mut self,
 873        initialize_rx: oneshot::Receiver<()>,
 874        cx: &mut Context<Self>,
 875    ) -> Task<Result<()>> {
 876        match &self.mode {
 877            Mode::Local(local_mode) => {
 878                local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
 879            }
 880            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 881        }
 882    }
 883
 884    pub fn run_to_position(
 885        &mut self,
 886        breakpoint: SourceBreakpoint,
 887        active_thread_id: ThreadId,
 888        cx: &mut Context<Self>,
 889    ) {
 890        match &mut self.mode {
 891            Mode::Local(local_mode) => {
 892                if !matches!(
 893                    self.thread_states.thread_state(active_thread_id),
 894                    Some(ThreadStatus::Stopped)
 895                ) {
 896                    return;
 897                };
 898                let path = breakpoint.path.clone();
 899                local_mode.tmp_breakpoint = Some(breakpoint);
 900                let task = local_mode.send_breakpoints_from_path(
 901                    path,
 902                    BreakpointUpdatedReason::Toggled,
 903                    cx,
 904                );
 905
 906                cx.spawn(async move |this, cx| {
 907                    task.await;
 908                    this.update(cx, |this, cx| {
 909                        this.continue_thread(active_thread_id, cx);
 910                    })
 911                })
 912                .detach();
 913            }
 914            Mode::Remote(_) => {}
 915        }
 916    }
 917
 918    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
 919        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
 920    }
 921
 922    pub fn output(
 923        &self,
 924        since: OutputToken,
 925    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 926        if self.output_token.0 == 0 {
 927            return (self.output.range(0..0), OutputToken(0));
 928        };
 929
 930        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 931
 932        let clamped_events_since = events_since.clamp(0, self.output.len());
 933        (
 934            self.output
 935                .range(self.output.len() - clamped_events_since..),
 936            self.output_token,
 937        )
 938    }
 939
 940    pub fn respond_to_client(
 941        &self,
 942        request_seq: u64,
 943        success: bool,
 944        command: String,
 945        body: Option<serde_json::Value>,
 946        cx: &mut Context<Self>,
 947    ) -> Task<Result<()>> {
 948        let Some(local_session) = self.as_local().cloned() else {
 949            unreachable!("Cannot respond to remote client");
 950        };
 951
 952        cx.background_spawn(async move {
 953            local_session
 954                .client
 955                .send_message(Message::Response(Response {
 956                    body,
 957                    success,
 958                    command,
 959                    seq: request_seq + 1,
 960                    request_seq,
 961                    message: None,
 962                }))
 963                .await
 964        })
 965    }
 966
 967    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
 968        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
 969            let breakpoint = local.tmp_breakpoint.take()?;
 970            let path = breakpoint.path.clone();
 971            Some((local, path))
 972        }) {
 973            local
 974                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
 975                .detach();
 976        };
 977
 978        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
 979            self.thread_states.stop_all_threads();
 980
 981            self.invalidate_command_type::<StackTraceCommand>();
 982        }
 983
 984        // Event if we stopped all threads we still need to insert the thread_id
 985        // to our own data
 986        if let Some(thread_id) = event.thread_id {
 987            self.thread_states.stop_thread(ThreadId(thread_id));
 988
 989            self.invalidate_state(
 990                &StackTraceCommand {
 991                    thread_id,
 992                    start_frame: None,
 993                    levels: None,
 994                }
 995                .into(),
 996            );
 997        }
 998
 999        self.invalidate_generic();
1000        self.threads.clear();
1001        self.variables.clear();
1002        cx.emit(SessionEvent::Stopped(
1003            event
1004                .thread_id
1005                .map(Into::into)
1006                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1007        ));
1008        cx.notify();
1009    }
1010
1011    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1012        match *event {
1013            Events::Initialized(_) => {
1014                debug_assert!(
1015                    false,
1016                    "Initialized event should have been handled in LocalMode"
1017                );
1018            }
1019            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1020            Events::Continued(event) => {
1021                if event.all_threads_continued.unwrap_or_default() {
1022                    self.thread_states.continue_all_threads();
1023                } else {
1024                    self.thread_states
1025                        .continue_thread(ThreadId(event.thread_id));
1026                }
1027                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1028                self.invalidate_generic();
1029            }
1030            Events::Exited(_event) => {
1031                self.clear_active_debug_line(cx);
1032            }
1033            Events::Terminated(_) => {
1034                self.is_session_terminated = true;
1035                self.clear_active_debug_line(cx);
1036            }
1037            Events::Thread(event) => {
1038                let thread_id = ThreadId(event.thread_id);
1039
1040                match event.reason {
1041                    dap::ThreadEventReason::Started => {
1042                        self.thread_states.continue_thread(thread_id);
1043                    }
1044                    dap::ThreadEventReason::Exited => {
1045                        self.thread_states.exit_thread(thread_id);
1046                    }
1047                    reason => {
1048                        log::error!("Unhandled thread event reason {:?}", reason);
1049                    }
1050                }
1051                self.invalidate_state(&ThreadsCommand.into());
1052                cx.notify();
1053            }
1054            Events::Output(event) => {
1055                if event
1056                    .category
1057                    .as_ref()
1058                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1059                {
1060                    return;
1061                }
1062
1063                self.output.push_back(event);
1064                self.output_token.0 += 1;
1065                cx.notify();
1066            }
1067            Events::Breakpoint(_) => {}
1068            Events::Module(event) => {
1069                match event.reason {
1070                    dap::ModuleEventReason::New => {
1071                        self.modules.push(event.module);
1072                    }
1073                    dap::ModuleEventReason::Changed => {
1074                        if let Some(module) = self
1075                            .modules
1076                            .iter_mut()
1077                            .find(|other| event.module.id == other.id)
1078                        {
1079                            *module = event.module;
1080                        }
1081                    }
1082                    dap::ModuleEventReason::Removed => {
1083                        self.modules.retain(|other| event.module.id != other.id);
1084                    }
1085                }
1086
1087                // todo(debugger): We should only send the invalidate command to downstream clients.
1088                // self.invalidate_state(&ModulesCommand.into());
1089            }
1090            Events::LoadedSource(_) => {
1091                self.invalidate_state(&LoadedSourcesCommand.into());
1092            }
1093            Events::Capabilities(event) => {
1094                self.capabilities = self.capabilities.merge(event.capabilities);
1095                cx.notify();
1096            }
1097            Events::Memory(_) => {}
1098            Events::Process(_) => {}
1099            Events::ProgressEnd(_) => {}
1100            Events::ProgressStart(_) => {}
1101            Events::ProgressUpdate(_) => {}
1102            Events::Invalidated(_) => {}
1103            Events::Other(_) => {}
1104        }
1105    }
1106
1107    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1108    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1109        &mut self,
1110        request: T,
1111        process_result: impl FnOnce(
1112            &mut Self,
1113            Result<T::Response>,
1114            &mut Context<Self>,
1115        ) -> Option<T::Response>
1116        + 'static,
1117        cx: &mut Context<Self>,
1118    ) {
1119        const {
1120            assert!(
1121                T::CACHEABLE,
1122                "Only requests marked as cacheable should invoke `fetch`"
1123            );
1124        }
1125
1126        if !self.thread_states.any_stopped_thread()
1127            && request.type_id() != TypeId::of::<ThreadsCommand>()
1128            || self.is_session_terminated
1129        {
1130            return;
1131        }
1132
1133        let request_map = self
1134            .requests
1135            .entry(std::any::TypeId::of::<T>())
1136            .or_default();
1137
1138        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1139            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1140
1141            let task = Self::request_inner::<Arc<T>>(
1142                &self.capabilities,
1143                self.id,
1144                &self.mode,
1145                command,
1146                process_result,
1147                cx,
1148            );
1149            let task = cx
1150                .background_executor()
1151                .spawn(async move {
1152                    let _ = task.await?;
1153                    Some(())
1154                })
1155                .shared();
1156
1157            vacant.insert(task);
1158            cx.notify();
1159        }
1160    }
1161
1162    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1163        capabilities: &Capabilities,
1164        session_id: SessionId,
1165        mode: &Mode,
1166        request: T,
1167        process_result: impl FnOnce(
1168            &mut Self,
1169            Result<T::Response>,
1170            &mut Context<Self>,
1171        ) -> Option<T::Response>
1172        + 'static,
1173        cx: &mut Context<Self>,
1174    ) -> Task<Option<T::Response>> {
1175        if !T::is_supported(&capabilities) {
1176            log::warn!(
1177                "Attempted to send a DAP request that isn't supported: {:?}",
1178                request
1179            );
1180            let error = Err(anyhow::Error::msg(
1181                "Couldn't complete request because it's not supported",
1182            ));
1183            return cx.spawn(async move |this, cx| {
1184                this.update(cx, |this, cx| process_result(this, error, cx))
1185                    .log_err()
1186                    .flatten()
1187            });
1188        }
1189
1190        let request = mode.request_dap(session_id, request, cx);
1191        cx.spawn(async move |this, cx| {
1192            let result = request.await;
1193            this.update(cx, |this, cx| process_result(this, result, cx))
1194                .log_err()
1195                .flatten()
1196        })
1197    }
1198
1199    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1200        &self,
1201        request: T,
1202        process_result: impl FnOnce(
1203            &mut Self,
1204            Result<T::Response>,
1205            &mut Context<Self>,
1206        ) -> Option<T::Response>
1207        + 'static,
1208        cx: &mut Context<Self>,
1209    ) -> Task<Option<T::Response>> {
1210        Self::request_inner(
1211            &self.capabilities,
1212            self.id,
1213            &self.mode,
1214            request,
1215            process_result,
1216            cx,
1217        )
1218    }
1219
1220    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1221        self.requests.remove(&std::any::TypeId::of::<Command>());
1222    }
1223
1224    fn invalidate_generic(&mut self) {
1225        self.invalidate_command_type::<ModulesCommand>();
1226        self.invalidate_command_type::<LoadedSourcesCommand>();
1227        self.invalidate_command_type::<ThreadsCommand>();
1228    }
1229
1230    fn invalidate_state(&mut self, key: &RequestSlot) {
1231        self.requests
1232            .entry((&*key.0 as &dyn Any).type_id())
1233            .and_modify(|request_map| {
1234                request_map.remove(&key);
1235            });
1236    }
1237
1238    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1239        self.thread_states.thread_status(thread_id)
1240    }
1241
1242    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1243        self.fetch(
1244            dap_command::ThreadsCommand,
1245            |this, result, cx| {
1246                let result = result.log_err()?;
1247
1248                this.threads = result
1249                    .iter()
1250                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1251                    .collect();
1252
1253                this.invalidate_command_type::<StackTraceCommand>();
1254                cx.emit(SessionEvent::Threads);
1255                cx.notify();
1256
1257                Some(result)
1258            },
1259            cx,
1260        );
1261
1262        self.threads
1263            .values()
1264            .map(|thread| {
1265                (
1266                    thread.dap.clone(),
1267                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1268                )
1269            })
1270            .collect()
1271    }
1272
1273    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1274        self.fetch(
1275            dap_command::ModulesCommand,
1276            |this, result, cx| {
1277                let result = result.log_err()?;
1278
1279                this.modules = result.iter().cloned().collect();
1280                cx.emit(SessionEvent::Modules);
1281                cx.notify();
1282
1283                Some(result)
1284            },
1285            cx,
1286        );
1287
1288        &self.modules
1289    }
1290
1291    pub fn ignore_breakpoints(&self) -> bool {
1292        self.ignore_breakpoints
1293    }
1294
1295    pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1296        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1297    }
1298
1299    pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1300        if self.ignore_breakpoints == ignore {
1301            return Task::ready(());
1302        }
1303
1304        self.ignore_breakpoints = ignore;
1305
1306        if let Some(local) = self.as_local() {
1307            local.send_source_breakpoints(ignore, cx)
1308        } else {
1309            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1310            unimplemented!()
1311        }
1312    }
1313
1314    pub fn exception_breakpoints(
1315        &self,
1316    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1317        self.exception_breakpoints.values()
1318    }
1319
1320    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1321        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1322            *is_enabled = !*is_enabled;
1323            self.send_exception_breakpoints(cx);
1324        }
1325    }
1326
1327    fn send_exception_breakpoints(&mut self, cx: &App) {
1328        if let Some(local) = self.as_local() {
1329            let exception_filters = self
1330                .exception_breakpoints
1331                .values()
1332                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1333                .collect();
1334
1335            let supports_exception_filters = self
1336                .capabilities
1337                .supports_exception_filter_options
1338                .unwrap_or_default();
1339            local
1340                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1341                .detach_and_log_err(cx);
1342        } else {
1343            debug_assert!(false, "Not implemented");
1344        }
1345    }
1346
1347    pub fn breakpoints_enabled(&self) -> bool {
1348        self.ignore_breakpoints
1349    }
1350
1351    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1352        self.fetch(
1353            dap_command::LoadedSourcesCommand,
1354            |this, result, cx| {
1355                let result = result.log_err()?;
1356                this.loaded_sources = result.iter().cloned().collect();
1357                cx.emit(SessionEvent::LoadedSources);
1358                cx.notify();
1359                Some(result)
1360            },
1361            cx,
1362        );
1363
1364        &self.loaded_sources
1365    }
1366
1367    fn fallback_to_manual_restart(
1368        &mut self,
1369        res: Result<()>,
1370        cx: &mut Context<Self>,
1371    ) -> Option<()> {
1372        if res.log_err().is_none() {
1373            cx.emit(SessionStateEvent::Restart);
1374            return None;
1375        }
1376        Some(())
1377    }
1378
1379    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1380        res.log_err()?;
1381        Some(())
1382    }
1383
1384    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1385        thread_id: ThreadId,
1386    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1387    {
1388        move |this, response, cx| match response.log_err() {
1389            Some(response) => Some(response),
1390            None => {
1391                this.thread_states.stop_thread(thread_id);
1392                cx.notify();
1393                None
1394            }
1395        }
1396    }
1397
1398    fn clear_active_debug_line_response(
1399        &mut self,
1400        response: Result<()>,
1401        cx: &mut Context<Session>,
1402    ) -> Option<()> {
1403        response.log_err()?;
1404        self.clear_active_debug_line(cx);
1405        Some(())
1406    }
1407
1408    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1409        self.as_local()
1410            .expect("Message handler will only run in local mode")
1411            .breakpoint_store
1412            .update(cx, |store, cx| {
1413                store.remove_active_position(Some(self.id), cx)
1414            });
1415    }
1416
1417    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1418        self.request(
1419            PauseCommand {
1420                thread_id: thread_id.0,
1421            },
1422            Self::empty_response,
1423            cx,
1424        )
1425        .detach();
1426    }
1427
1428    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1429        self.request(
1430            RestartStackFrameCommand { stack_frame_id },
1431            Self::empty_response,
1432            cx,
1433        )
1434        .detach();
1435    }
1436
1437    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1438        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1439            self.request(
1440                RestartCommand {
1441                    raw: args.unwrap_or(Value::Null),
1442                },
1443                Self::fallback_to_manual_restart,
1444                cx,
1445            )
1446            .detach();
1447        } else {
1448            cx.emit(SessionStateEvent::Restart);
1449        }
1450    }
1451
1452    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1453        self.is_session_terminated = true;
1454        self.thread_states.exit_all_threads();
1455        cx.notify();
1456
1457        let task = if self
1458            .capabilities
1459            .supports_terminate_request
1460            .unwrap_or_default()
1461        {
1462            self.request(
1463                TerminateCommand {
1464                    restart: Some(false),
1465                },
1466                Self::clear_active_debug_line_response,
1467                cx,
1468            )
1469        } else {
1470            self.request(
1471                DisconnectCommand {
1472                    restart: Some(false),
1473                    terminate_debuggee: Some(true),
1474                    suspend_debuggee: Some(false),
1475                },
1476                Self::clear_active_debug_line_response,
1477                cx,
1478            )
1479        };
1480
1481        cx.emit(SessionStateEvent::Shutdown);
1482
1483        let debug_client = self.adapter_client();
1484
1485        cx.background_spawn(async move {
1486            let _ = task.await;
1487
1488            if let Some(client) = debug_client {
1489                client.shutdown().await.log_err();
1490            }
1491        })
1492    }
1493
1494    pub fn completions(
1495        &mut self,
1496        query: CompletionsQuery,
1497        cx: &mut Context<Self>,
1498    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1499        let task = self.request(query, |_, result, _| result.log_err(), cx);
1500
1501        cx.background_executor().spawn(async move {
1502            anyhow::Ok(
1503                task.await
1504                    .map(|response| response.targets)
1505                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1506            )
1507        })
1508    }
1509
1510    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1511        self.thread_states.continue_thread(thread_id);
1512        self.request(
1513            ContinueCommand {
1514                args: ContinueArguments {
1515                    thread_id: thread_id.0,
1516                    single_thread: Some(true),
1517                },
1518            },
1519            Self::on_step_response::<ContinueCommand>(thread_id),
1520            cx,
1521        )
1522        .detach();
1523    }
1524
1525    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1526        match self.mode {
1527            Mode::Local(ref local) => Some(local.client.clone()),
1528            Mode::Remote(_) => None,
1529        }
1530    }
1531
1532    pub fn step_over(
1533        &mut self,
1534        thread_id: ThreadId,
1535        granularity: SteppingGranularity,
1536        cx: &mut Context<Self>,
1537    ) {
1538        let supports_single_thread_execution_requests =
1539            self.capabilities.supports_single_thread_execution_requests;
1540        let supports_stepping_granularity = self
1541            .capabilities
1542            .supports_stepping_granularity
1543            .unwrap_or_default();
1544
1545        let command = NextCommand {
1546            inner: StepCommand {
1547                thread_id: thread_id.0,
1548                granularity: supports_stepping_granularity.then(|| granularity),
1549                single_thread: supports_single_thread_execution_requests,
1550            },
1551        };
1552
1553        self.thread_states.process_step(thread_id);
1554        self.request(
1555            command,
1556            Self::on_step_response::<NextCommand>(thread_id),
1557            cx,
1558        )
1559        .detach();
1560    }
1561
1562    pub fn step_in(
1563        &mut self,
1564        thread_id: ThreadId,
1565        granularity: SteppingGranularity,
1566        cx: &mut Context<Self>,
1567    ) {
1568        let supports_single_thread_execution_requests =
1569            self.capabilities.supports_single_thread_execution_requests;
1570        let supports_stepping_granularity = self
1571            .capabilities
1572            .supports_stepping_granularity
1573            .unwrap_or_default();
1574
1575        let command = StepInCommand {
1576            inner: StepCommand {
1577                thread_id: thread_id.0,
1578                granularity: supports_stepping_granularity.then(|| granularity),
1579                single_thread: supports_single_thread_execution_requests,
1580            },
1581        };
1582
1583        self.thread_states.process_step(thread_id);
1584        self.request(
1585            command,
1586            Self::on_step_response::<StepInCommand>(thread_id),
1587            cx,
1588        )
1589        .detach();
1590    }
1591
1592    pub fn step_out(
1593        &mut self,
1594        thread_id: ThreadId,
1595        granularity: SteppingGranularity,
1596        cx: &mut Context<Self>,
1597    ) {
1598        let supports_single_thread_execution_requests =
1599            self.capabilities.supports_single_thread_execution_requests;
1600        let supports_stepping_granularity = self
1601            .capabilities
1602            .supports_stepping_granularity
1603            .unwrap_or_default();
1604
1605        let command = StepOutCommand {
1606            inner: StepCommand {
1607                thread_id: thread_id.0,
1608                granularity: supports_stepping_granularity.then(|| granularity),
1609                single_thread: supports_single_thread_execution_requests,
1610            },
1611        };
1612
1613        self.thread_states.process_step(thread_id);
1614        self.request(
1615            command,
1616            Self::on_step_response::<StepOutCommand>(thread_id),
1617            cx,
1618        )
1619        .detach();
1620    }
1621
1622    pub fn step_back(
1623        &mut self,
1624        thread_id: ThreadId,
1625        granularity: SteppingGranularity,
1626        cx: &mut Context<Self>,
1627    ) {
1628        let supports_single_thread_execution_requests =
1629            self.capabilities.supports_single_thread_execution_requests;
1630        let supports_stepping_granularity = self
1631            .capabilities
1632            .supports_stepping_granularity
1633            .unwrap_or_default();
1634
1635        let command = StepBackCommand {
1636            inner: StepCommand {
1637                thread_id: thread_id.0,
1638                granularity: supports_stepping_granularity.then(|| granularity),
1639                single_thread: supports_single_thread_execution_requests,
1640            },
1641        };
1642
1643        self.thread_states.process_step(thread_id);
1644
1645        self.request(
1646            command,
1647            Self::on_step_response::<StepBackCommand>(thread_id),
1648            cx,
1649        )
1650        .detach();
1651    }
1652
1653    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1654        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1655            && self.requests.contains_key(&ThreadsCommand.type_id())
1656            && self.threads.contains_key(&thread_id)
1657        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1658        // We could still be using an old thread id and have sent a new thread's request
1659        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1660        // But it very well could cause a minor bug in the future that is hard to track down
1661        {
1662            self.fetch(
1663                super::dap_command::StackTraceCommand {
1664                    thread_id: thread_id.0,
1665                    start_frame: None,
1666                    levels: None,
1667                },
1668                move |this, stack_frames, cx| {
1669                    let stack_frames = stack_frames.log_err()?;
1670
1671                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1672                        thread.stack_frame_ids =
1673                            stack_frames.iter().map(|frame| frame.id).collect();
1674                    });
1675                    debug_assert!(
1676                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1677                        "Sent request for thread_id that doesn't exist"
1678                    );
1679
1680                    this.stack_frames.extend(
1681                        stack_frames
1682                            .iter()
1683                            .cloned()
1684                            .map(|frame| (frame.id, StackFrame::from(frame))),
1685                    );
1686
1687                    this.invalidate_command_type::<ScopesCommand>();
1688                    this.invalidate_command_type::<VariablesCommand>();
1689
1690                    cx.emit(SessionEvent::StackTrace);
1691                    cx.notify();
1692                    Some(stack_frames)
1693                },
1694                cx,
1695            );
1696        }
1697
1698        self.threads
1699            .get(&thread_id)
1700            .map(|thread| {
1701                thread
1702                    .stack_frame_ids
1703                    .iter()
1704                    .filter_map(|id| self.stack_frames.get(id))
1705                    .cloned()
1706                    .collect()
1707            })
1708            .unwrap_or_default()
1709    }
1710
1711    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1712        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1713            && self
1714                .requests
1715                .contains_key(&TypeId::of::<StackTraceCommand>())
1716        {
1717            self.fetch(
1718                ScopesCommand { stack_frame_id },
1719                move |this, scopes, cx| {
1720                    let scopes = scopes.log_err()?;
1721
1722                    for scope in scopes .iter(){
1723                        this.variables(scope.variables_reference, cx);
1724                    }
1725
1726                    let entry = this
1727                        .stack_frames
1728                        .entry(stack_frame_id)
1729                        .and_modify(|stack_frame| {
1730                            stack_frame.scopes = scopes.clone();
1731                        });
1732
1733                    cx.emit(SessionEvent::Variables);
1734
1735                    debug_assert!(
1736                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1737                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1738                    );
1739
1740                    Some(scopes)
1741                },
1742                cx,
1743            );
1744        }
1745
1746        self.stack_frames
1747            .get(&stack_frame_id)
1748            .map(|frame| frame.scopes.as_slice())
1749            .unwrap_or_default()
1750    }
1751
1752    pub fn variables(
1753        &mut self,
1754        variables_reference: VariableReference,
1755        cx: &mut Context<Self>,
1756    ) -> Vec<dap::Variable> {
1757        let command = VariablesCommand {
1758            variables_reference,
1759            filter: None,
1760            start: None,
1761            count: None,
1762            format: None,
1763        };
1764
1765        self.fetch(
1766            command,
1767            move |this, variables, cx| {
1768                let variables = variables.log_err()?;
1769                this.variables
1770                    .insert(variables_reference, variables.clone());
1771
1772                cx.emit(SessionEvent::Variables);
1773                Some(variables)
1774            },
1775            cx,
1776        );
1777
1778        self.variables
1779            .get(&variables_reference)
1780            .cloned()
1781            .unwrap_or_default()
1782    }
1783
1784    pub fn set_variable_value(
1785        &mut self,
1786        variables_reference: u64,
1787        name: String,
1788        value: String,
1789        cx: &mut Context<Self>,
1790    ) {
1791        if self.capabilities.supports_set_variable.unwrap_or_default() {
1792            self.request(
1793                SetVariableValueCommand {
1794                    name,
1795                    value,
1796                    variables_reference,
1797                },
1798                move |this, response, cx| {
1799                    let response = response.log_err()?;
1800                    this.invalidate_command_type::<VariablesCommand>();
1801                    cx.notify();
1802                    Some(response)
1803                },
1804                cx,
1805            )
1806            .detach()
1807        }
1808    }
1809
1810    pub fn evaluate(
1811        &mut self,
1812        expression: String,
1813        context: Option<EvaluateArgumentsContext>,
1814        frame_id: Option<u64>,
1815        source: Option<Source>,
1816        cx: &mut Context<Self>,
1817    ) {
1818        self.request(
1819            EvaluateCommand {
1820                expression,
1821                context,
1822                frame_id,
1823                source,
1824            },
1825            |this, response, cx| {
1826                let response = response.log_err()?;
1827                this.output_token.0 += 1;
1828                this.output.push_back(dap::OutputEvent {
1829                    category: None,
1830                    output: response.result.clone(),
1831                    group: None,
1832                    variables_reference: Some(response.variables_reference),
1833                    source: None,
1834                    line: None,
1835                    column: None,
1836                    data: None,
1837                    location_reference: None,
1838                });
1839
1840                this.invalidate_command_type::<ScopesCommand>();
1841                cx.notify();
1842                Some(response)
1843            },
1844            cx,
1845        )
1846        .detach();
1847    }
1848
1849    pub fn location(
1850        &mut self,
1851        reference: u64,
1852        cx: &mut Context<Self>,
1853    ) -> Option<dap::LocationsResponse> {
1854        self.fetch(
1855            LocationsCommand { reference },
1856            move |this, response, _| {
1857                let response = response.log_err()?;
1858                this.locations.insert(reference, response.clone());
1859                Some(response)
1860            },
1861            cx,
1862        );
1863        self.locations.get(&reference).cloned()
1864    }
1865    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1866        let command = DisconnectCommand {
1867            restart: Some(false),
1868            terminate_debuggee: Some(true),
1869            suspend_debuggee: Some(false),
1870        };
1871
1872        self.request(command, Self::empty_response, cx).detach()
1873    }
1874
1875    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1876        if self
1877            .capabilities
1878            .supports_terminate_threads_request
1879            .unwrap_or_default()
1880        {
1881            self.request(
1882                TerminateThreadsCommand {
1883                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1884                },
1885                Self::clear_active_debug_line_response,
1886                cx,
1887            )
1888            .detach();
1889        } else {
1890            self.shutdown(cx).detach();
1891        }
1892    }
1893}
1894
1895fn create_local_session(
1896    breakpoint_store: Entity<BreakpointStore>,
1897    session_id: SessionId,
1898    parent_session: Option<Entity<Session>>,
1899    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1900    initialized_tx: oneshot::Sender<()>,
1901    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1902    mode: LocalMode,
1903    cx: &mut Context<Session>,
1904) -> Session {
1905    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1906        let mut initialized_tx = Some(initialized_tx);
1907        while let Some(message) = message_rx.next().await {
1908            if let Message::Event(event) = message {
1909                if let Events::Initialized(_) = *event {
1910                    if let Some(tx) = initialized_tx.take() {
1911                        tx.send(()).ok();
1912                    }
1913                } else {
1914                    let Ok(_) = this.update(cx, |session, cx| {
1915                        session.handle_dap_event(event, cx);
1916                    }) else {
1917                        break;
1918                    };
1919                }
1920            } else {
1921                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1922                else {
1923                    break;
1924                };
1925            }
1926        }
1927    })];
1928
1929    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1930        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1931            if let Some(local) = (!this.ignore_breakpoints)
1932                .then(|| this.as_local_mut())
1933                .flatten()
1934            {
1935                local
1936                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1937                    .detach();
1938            };
1939        }
1940        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1941            if let Some(local) = (!this.ignore_breakpoints)
1942                .then(|| this.as_local_mut())
1943                .flatten()
1944            {
1945                local.unset_breakpoints_from_paths(paths, cx).detach();
1946            }
1947        }
1948        BreakpointStoreEvent::ActiveDebugLineChanged => {}
1949    })
1950    .detach();
1951
1952    Session {
1953        mode: Mode::Local(mode),
1954        id: session_id,
1955        child_session_ids: HashSet::default(),
1956        parent_id: parent_session.map(|session| session.read(cx).id),
1957        variables: Default::default(),
1958        capabilities: Capabilities::default(),
1959        thread_states: ThreadStates::default(),
1960        output_token: OutputToken(0),
1961        ignore_breakpoints: false,
1962        output: circular_buffer::CircularBuffer::boxed(),
1963        requests: HashMap::default(),
1964        modules: Vec::default(),
1965        loaded_sources: Vec::default(),
1966        threads: IndexMap::default(),
1967        stack_frames: IndexMap::default(),
1968        locations: Default::default(),
1969        exception_breakpoints: Default::default(),
1970        _background_tasks,
1971        is_session_terminated: false,
1972    }
1973}