session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  29    StartDebuggingRequestArgumentsRequest, VariablePresentationHint,
  30};
  31use futures::SinkExt;
  32use futures::channel::mpsc::UnboundedSender;
  33use futures::channel::{mpsc, oneshot};
  34use futures::{FutureExt, future::Shared};
  35use gpui::{
  36    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  37    Task, WeakEntity,
  38};
  39
  40use rpc::ErrorExt;
  41use serde_json::Value;
  42use smol::stream::StreamExt;
  43use std::any::TypeId;
  44use std::collections::BTreeMap;
  45use std::u64;
  46use std::{
  47    any::Any,
  48    collections::hash_map::Entry,
  49    hash::{Hash, Hasher},
  50    path::Path,
  51    sync::Arc,
  52};
  53use task::TaskContext;
  54use text::{PointUtf16, ToPointUtf16};
  55use util::ResultExt;
  56use worktree::Worktree;
  57
  58#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  59#[repr(transparent)]
  60pub struct ThreadId(pub u64);
  61
  62impl ThreadId {
  63    pub const MIN: ThreadId = ThreadId(u64::MIN);
  64    pub const MAX: ThreadId = ThreadId(u64::MAX);
  65}
  66
  67impl From<u64> for ThreadId {
  68    fn from(id: u64) -> Self {
  69        Self(id)
  70    }
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct StackFrame {
  75    pub dap: dap::StackFrame,
  76    pub scopes: Vec<dap::Scope>,
  77}
  78
  79impl From<dap::StackFrame> for StackFrame {
  80    fn from(stack_frame: dap::StackFrame) -> Self {
  81        Self {
  82            scopes: vec![],
  83            dap: stack_frame,
  84        }
  85    }
  86}
  87
  88#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  89pub enum ThreadStatus {
  90    #[default]
  91    Running,
  92    Stopped,
  93    Stepping,
  94    Exited,
  95    Ended,
  96}
  97
  98impl ThreadStatus {
  99    pub fn label(&self) -> &'static str {
 100        match self {
 101            ThreadStatus::Running => "Running",
 102            ThreadStatus::Stopped => "Stopped",
 103            ThreadStatus::Stepping => "Stepping",
 104            ThreadStatus::Exited => "Exited",
 105            ThreadStatus::Ended => "Ended",
 106        }
 107    }
 108}
 109
 110#[derive(Debug)]
 111pub struct Thread {
 112    dap: dap::Thread,
 113    stack_frames: Vec<StackFrame>,
 114    stack_frames_error: Option<anyhow::Error>,
 115    _has_stopped: bool,
 116}
 117
 118impl From<dap::Thread> for Thread {
 119    fn from(dap: dap::Thread) -> Self {
 120        Self {
 121            dap,
 122            stack_frames: Default::default(),
 123            stack_frames_error: None,
 124            _has_stopped: false,
 125        }
 126    }
 127}
 128
 129#[derive(Debug, Clone, PartialEq)]
 130pub struct Watcher {
 131    pub expression: SharedString,
 132    pub value: SharedString,
 133    pub variables_reference: u64,
 134    pub presentation_hint: Option<VariablePresentationHint>,
 135}
 136
 137pub enum Mode {
 138    Building,
 139    Running(RunningMode),
 140}
 141
 142#[derive(Clone)]
 143pub struct RunningMode {
 144    client: Arc<DebugAdapterClient>,
 145    binary: DebugAdapterBinary,
 146    tmp_breakpoint: Option<SourceBreakpoint>,
 147    worktree: WeakEntity<Worktree>,
 148    executor: BackgroundExecutor,
 149    is_started: bool,
 150    has_ever_stopped: bool,
 151    messages_tx: UnboundedSender<Message>,
 152}
 153
 154fn client_source(abs_path: &Path) -> dap::Source {
 155    dap::Source {
 156        name: abs_path
 157            .file_name()
 158            .map(|filename| filename.to_string_lossy().to_string()),
 159        path: Some(abs_path.to_string_lossy().to_string()),
 160        source_reference: None,
 161        presentation_hint: None,
 162        origin: None,
 163        sources: None,
 164        adapter_data: None,
 165        checksums: None,
 166    }
 167}
 168
 169impl RunningMode {
 170    async fn new(
 171        session_id: SessionId,
 172        parent_session: Option<Entity<Session>>,
 173        worktree: WeakEntity<Worktree>,
 174        binary: DebugAdapterBinary,
 175        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 176        cx: &mut AsyncApp,
 177    ) -> Result<Self> {
 178        let message_handler = Box::new({
 179            let messages_tx = messages_tx.clone();
 180            move |message| {
 181                messages_tx.unbounded_send(message).ok();
 182            }
 183        });
 184
 185        let client = if let Some(client) = parent_session
 186            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 187            .flatten()
 188        {
 189            client
 190                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 191                .await?
 192        } else {
 193            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 194        };
 195
 196        Ok(Self {
 197            client: Arc::new(client),
 198            worktree,
 199            tmp_breakpoint: None,
 200            binary,
 201            executor: cx.background_executor().clone(),
 202            is_started: false,
 203            has_ever_stopped: false,
 204            messages_tx,
 205        })
 206    }
 207
 208    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 209        &self.worktree
 210    }
 211
 212    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 213        let tasks: Vec<_> = paths
 214            .into_iter()
 215            .map(|path| {
 216                self.request(dap_command::SetBreakpoints {
 217                    source: client_source(path),
 218                    source_modified: None,
 219                    breakpoints: vec![],
 220                })
 221            })
 222            .collect();
 223
 224        cx.background_spawn(async move {
 225            futures::future::join_all(tasks)
 226                .await
 227                .iter()
 228                .for_each(|res| match res {
 229                    Ok(_) => {}
 230                    Err(err) => {
 231                        log::warn!("Set breakpoints request failed: {}", err);
 232                    }
 233                });
 234        })
 235    }
 236
 237    fn send_breakpoints_from_path(
 238        &self,
 239        abs_path: Arc<Path>,
 240        reason: BreakpointUpdatedReason,
 241        breakpoint_store: &Entity<BreakpointStore>,
 242        cx: &mut App,
 243    ) -> Task<()> {
 244        let breakpoints =
 245            breakpoint_store
 246                .read(cx)
 247                .source_breakpoints_from_path(&abs_path, cx)
 248                .into_iter()
 249                .filter(|bp| bp.state.is_enabled())
 250                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 251                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 252                }))
 253                .map(Into::into)
 254                .collect();
 255
 256        let raw_breakpoints = breakpoint_store
 257            .read(cx)
 258            .breakpoints_from_path(&abs_path)
 259            .into_iter()
 260            .filter(|bp| bp.bp.state.is_enabled())
 261            .collect::<Vec<_>>();
 262
 263        let task = self.request(dap_command::SetBreakpoints {
 264            source: client_source(&abs_path),
 265            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 266            breakpoints,
 267        });
 268        let session_id = self.client.id();
 269        let breakpoint_store = breakpoint_store.downgrade();
 270        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 271            Ok(breakpoints) => {
 272                let breakpoints =
 273                    breakpoints
 274                        .into_iter()
 275                        .zip(raw_breakpoints)
 276                        .filter_map(|(dap_bp, zed_bp)| {
 277                            Some((
 278                                zed_bp,
 279                                BreakpointSessionState {
 280                                    id: dap_bp.id?,
 281                                    verified: dap_bp.verified,
 282                                },
 283                            ))
 284                        });
 285                breakpoint_store
 286                    .update(cx, |this, _| {
 287                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 288                    })
 289                    .ok();
 290            }
 291            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 292        })
 293    }
 294
 295    fn send_exception_breakpoints(
 296        &self,
 297        filters: Vec<ExceptionBreakpointsFilter>,
 298        supports_filter_options: bool,
 299    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 300        let arg = if supports_filter_options {
 301            SetExceptionBreakpoints::WithOptions {
 302                filters: filters
 303                    .into_iter()
 304                    .map(|filter| ExceptionFilterOptions {
 305                        filter_id: filter.filter,
 306                        condition: None,
 307                        mode: None,
 308                    })
 309                    .collect(),
 310            }
 311        } else {
 312            SetExceptionBreakpoints::Plain {
 313                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 314            }
 315        };
 316        self.request(arg)
 317    }
 318
 319    fn send_source_breakpoints(
 320        &self,
 321        ignore_breakpoints: bool,
 322        breakpoint_store: &Entity<BreakpointStore>,
 323        cx: &App,
 324    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 325        let mut breakpoint_tasks = Vec::new();
 326        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 327        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 328        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 329        let session_id = self.client.id();
 330        for (path, breakpoints) in breakpoints {
 331            let breakpoints = if ignore_breakpoints {
 332                vec![]
 333            } else {
 334                breakpoints
 335                    .into_iter()
 336                    .filter(|bp| bp.state.is_enabled())
 337                    .map(Into::into)
 338                    .collect()
 339            };
 340
 341            let raw_breakpoints = raw_breakpoints
 342                .remove(&path)
 343                .unwrap_or_default()
 344                .into_iter()
 345                .filter(|bp| bp.bp.state.is_enabled());
 346            let error_path = path.clone();
 347            let send_request = self
 348                .request(dap_command::SetBreakpoints {
 349                    source: client_source(&path),
 350                    source_modified: Some(false),
 351                    breakpoints,
 352                })
 353                .map(|result| result.map_err(move |e| (error_path, e)));
 354
 355            let task = cx.spawn({
 356                let breakpoint_store = breakpoint_store.downgrade();
 357                async move |cx| {
 358                    let breakpoints = cx.background_spawn(send_request).await?;
 359
 360                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 361                        |(dap_bp, zed_bp)| {
 362                            Some((
 363                                zed_bp,
 364                                BreakpointSessionState {
 365                                    id: dap_bp.id?,
 366                                    verified: dap_bp.verified,
 367                                },
 368                            ))
 369                        },
 370                    );
 371                    breakpoint_store
 372                        .update(cx, |this, _| {
 373                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 374                        })
 375                        .ok();
 376
 377                    Ok(())
 378                }
 379            });
 380            breakpoint_tasks.push(task);
 381        }
 382
 383        cx.background_spawn(async move {
 384            futures::future::join_all(breakpoint_tasks)
 385                .await
 386                .into_iter()
 387                .filter_map(Result::err)
 388                .collect::<HashMap<_, _>>()
 389        })
 390    }
 391
 392    fn initialize_sequence(
 393        &self,
 394        capabilities: &Capabilities,
 395        initialized_rx: oneshot::Receiver<()>,
 396        dap_store: WeakEntity<DapStore>,
 397        cx: &mut Context<Session>,
 398    ) -> Task<Result<()>> {
 399        let raw = self.binary.request_args.clone();
 400
 401        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 402        let launch = match raw.request {
 403            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 404                raw: raw.configuration,
 405            }),
 406            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 407                raw: raw.configuration,
 408            }),
 409        };
 410
 411        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 412        let exception_filters = capabilities
 413            .exception_breakpoint_filters
 414            .as_ref()
 415            .map(|exception_filters| {
 416                exception_filters
 417                    .iter()
 418                    .filter(|filter| filter.default == Some(true))
 419                    .cloned()
 420                    .collect::<Vec<_>>()
 421            })
 422            .unwrap_or_default();
 423        // From spec (on initialization sequence):
 424        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 425        //
 426        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 427        let should_send_exception_breakpoints = capabilities
 428            .exception_breakpoint_filters
 429            .as_ref()
 430            .map_or(false, |filters| !filters.is_empty())
 431            || !configuration_done_supported;
 432        let supports_exception_filters = capabilities
 433            .supports_exception_filter_options
 434            .unwrap_or_default();
 435        let this = self.clone();
 436        let worktree = self.worktree().clone();
 437        let configuration_sequence = cx.spawn({
 438            async move |_, cx| {
 439                let breakpoint_store =
 440                    dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 441                initialized_rx.await?;
 442                let errors_by_path = cx
 443                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 444                    .await;
 445
 446                dap_store.update(cx, |_, cx| {
 447                    let Some(worktree) = worktree.upgrade() else {
 448                        return;
 449                    };
 450
 451                    for (path, error) in &errors_by_path {
 452                        log::error!("failed to set breakpoints for {path:?}: {error}");
 453                    }
 454
 455                    if let Some(failed_path) = errors_by_path.keys().next() {
 456                        let failed_path = failed_path
 457                            .strip_prefix(worktree.read(cx).abs_path())
 458                            .unwrap_or(failed_path)
 459                            .display();
 460                        let message = format!(
 461                            "Failed to set breakpoints for {failed_path}{}",
 462                            match errors_by_path.len() {
 463                                0 => unreachable!(),
 464                                1 => "".into(),
 465                                2 => " and 1 other path".into(),
 466                                n => format!(" and {} other paths", n - 1),
 467                            }
 468                        );
 469                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 470                    }
 471                })?;
 472
 473                if should_send_exception_breakpoints {
 474                    this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 475                        .await
 476                        .ok();
 477                }
 478
 479                let ret = if configuration_done_supported {
 480                    this.request(ConfigurationDone {})
 481                } else {
 482                    Task::ready(Ok(()))
 483                }
 484                .await;
 485                ret
 486            }
 487        });
 488
 489        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 490
 491        cx.spawn(async move |this, cx| {
 492            let result = task.await;
 493
 494            this.update(cx, |this, cx| {
 495                if let Some(this) = this.as_running_mut() {
 496                    this.is_started = true;
 497                    cx.notify();
 498                }
 499            })
 500            .ok();
 501
 502            result?;
 503            anyhow::Ok(())
 504        })
 505    }
 506
 507    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 508        let client = self.client.clone();
 509        let messages_tx = self.messages_tx.clone();
 510        let message_handler = Box::new(move |message| {
 511            messages_tx.unbounded_send(message).ok();
 512        });
 513        if client.should_reconnect_for_ssh() {
 514            Some(cx.spawn(async move |cx| {
 515                client.connect(message_handler, cx).await?;
 516                anyhow::Ok(())
 517            }))
 518        } else {
 519            None
 520        }
 521    }
 522
 523    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 524    where
 525        <R::DapRequest as dap::requests::Request>::Response: 'static,
 526        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 527    {
 528        let request = Arc::new(request);
 529
 530        let request_clone = request.clone();
 531        let connection = self.client.clone();
 532        self.executor.spawn(async move {
 533            let args = request_clone.to_dap();
 534            let response = connection.request::<R::DapRequest>(args).await?;
 535            request.response_from_dap(response)
 536        })
 537    }
 538}
 539
 540impl Mode {
 541    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 542    where
 543        <R::DapRequest as dap::requests::Request>::Response: 'static,
 544        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 545    {
 546        match self {
 547            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 548            Mode::Building => Task::ready(Err(anyhow!(
 549                "no adapter running to send request: {request:?}"
 550            ))),
 551        }
 552    }
 553
 554    /// Did this debug session stop at least once?
 555    pub(crate) fn has_ever_stopped(&self) -> bool {
 556        match self {
 557            Mode::Building => false,
 558            Mode::Running(running_mode) => running_mode.has_ever_stopped,
 559        }
 560    }
 561
 562    fn stopped(&mut self) {
 563        if let Mode::Running(running) = self {
 564            running.has_ever_stopped = true;
 565        }
 566    }
 567}
 568
 569#[derive(Default)]
 570struct ThreadStates {
 571    global_state: Option<ThreadStatus>,
 572    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 573}
 574
 575impl ThreadStates {
 576    fn stop_all_threads(&mut self) {
 577        self.global_state = Some(ThreadStatus::Stopped);
 578        self.known_thread_states.clear();
 579    }
 580
 581    fn exit_all_threads(&mut self) {
 582        self.global_state = Some(ThreadStatus::Exited);
 583        self.known_thread_states.clear();
 584    }
 585
 586    fn continue_all_threads(&mut self) {
 587        self.global_state = Some(ThreadStatus::Running);
 588        self.known_thread_states.clear();
 589    }
 590
 591    fn stop_thread(&mut self, thread_id: ThreadId) {
 592        self.known_thread_states
 593            .insert(thread_id, ThreadStatus::Stopped);
 594    }
 595
 596    fn continue_thread(&mut self, thread_id: ThreadId) {
 597        self.known_thread_states
 598            .insert(thread_id, ThreadStatus::Running);
 599    }
 600
 601    fn process_step(&mut self, thread_id: ThreadId) {
 602        self.known_thread_states
 603            .insert(thread_id, ThreadStatus::Stepping);
 604    }
 605
 606    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 607        self.thread_state(thread_id)
 608            .unwrap_or(ThreadStatus::Running)
 609    }
 610
 611    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 612        self.known_thread_states
 613            .get(&thread_id)
 614            .copied()
 615            .or(self.global_state)
 616    }
 617
 618    fn exit_thread(&mut self, thread_id: ThreadId) {
 619        self.known_thread_states
 620            .insert(thread_id, ThreadStatus::Exited);
 621    }
 622
 623    fn any_stopped_thread(&self) -> bool {
 624        self.global_state
 625            .is_some_and(|state| state == ThreadStatus::Stopped)
 626            || self
 627                .known_thread_states
 628                .values()
 629                .any(|status| *status == ThreadStatus::Stopped)
 630    }
 631}
 632const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 633
 634type IsEnabled = bool;
 635
 636#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 637pub struct OutputToken(pub usize);
 638/// Represents a current state of a single debug adapter and provides ways to mutate it.
 639pub struct Session {
 640    pub mode: Mode,
 641    id: SessionId,
 642    label: SharedString,
 643    adapter: DebugAdapterName,
 644    pub(super) capabilities: Capabilities,
 645    child_session_ids: HashSet<SessionId>,
 646    parent_session: Option<Entity<Session>>,
 647    modules: Vec<dap::Module>,
 648    loaded_sources: Vec<dap::Source>,
 649    output_token: OutputToken,
 650    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 651    threads: IndexMap<ThreadId, Thread>,
 652    thread_states: ThreadStates,
 653    watchers: HashMap<SharedString, Watcher>,
 654    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 655    stack_frames: IndexMap<StackFrameId, StackFrame>,
 656    locations: HashMap<u64, dap::LocationsResponse>,
 657    is_session_terminated: bool,
 658    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 659    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 660    ignore_breakpoints: bool,
 661    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 662    background_tasks: Vec<Task<()>>,
 663    restart_task: Option<Task<()>>,
 664    task_context: TaskContext,
 665}
 666
 667trait CacheableCommand: Any + Send + Sync {
 668    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 669    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 670    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 671}
 672
 673impl<T> CacheableCommand for T
 674where
 675    T: DapCommand + PartialEq + Eq + Hash,
 676{
 677    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 678        (rhs as &dyn Any)
 679            .downcast_ref::<Self>()
 680            .map_or(false, |rhs| self == rhs)
 681    }
 682
 683    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 684        T::hash(self, &mut hasher);
 685    }
 686
 687    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 688        self
 689    }
 690}
 691
 692pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 693
 694impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 695    fn from(request: T) -> Self {
 696        Self(Arc::new(request))
 697    }
 698}
 699
 700impl PartialEq for RequestSlot {
 701    fn eq(&self, other: &Self) -> bool {
 702        self.0.dyn_eq(other.0.as_ref())
 703    }
 704}
 705
 706impl Eq for RequestSlot {}
 707
 708impl Hash for RequestSlot {
 709    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 710        self.0.dyn_hash(state);
 711        (&*self.0 as &dyn Any).type_id().hash(state)
 712    }
 713}
 714
 715#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 716pub struct CompletionsQuery {
 717    pub query: String,
 718    pub column: u64,
 719    pub line: Option<u64>,
 720    pub frame_id: Option<u64>,
 721}
 722
 723impl CompletionsQuery {
 724    pub fn new(
 725        buffer: &language::Buffer,
 726        cursor_position: language::Anchor,
 727        frame_id: Option<u64>,
 728    ) -> Self {
 729        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 730        Self {
 731            query: buffer.text(),
 732            column: column as u64,
 733            frame_id,
 734            line: Some(row as u64),
 735        }
 736    }
 737}
 738
 739#[derive(Debug)]
 740pub enum SessionEvent {
 741    Modules,
 742    LoadedSources,
 743    Stopped(Option<ThreadId>),
 744    StackTrace,
 745    Variables,
 746    Watchers,
 747    Threads,
 748    InvalidateInlineValue,
 749    CapabilitiesLoaded,
 750    RunInTerminal {
 751        request: RunInTerminalRequestArguments,
 752        sender: mpsc::Sender<Result<u32>>,
 753    },
 754    ConsoleOutput,
 755}
 756
 757#[derive(Clone, Debug, PartialEq, Eq)]
 758pub enum SessionStateEvent {
 759    Running,
 760    Shutdown,
 761    Restart,
 762    SpawnChildSession {
 763        request: StartDebuggingRequestArguments,
 764    },
 765}
 766
 767impl EventEmitter<SessionEvent> for Session {}
 768impl EventEmitter<SessionStateEvent> for Session {}
 769
 770// local session will send breakpoint updates to DAP for all new breakpoints
 771// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 772// BreakpointStore notifies session on breakpoint changes
 773impl Session {
 774    pub(crate) fn new(
 775        breakpoint_store: Entity<BreakpointStore>,
 776        session_id: SessionId,
 777        parent_session: Option<Entity<Session>>,
 778        label: SharedString,
 779        adapter: DebugAdapterName,
 780        task_context: TaskContext,
 781        cx: &mut App,
 782    ) -> Entity<Self> {
 783        cx.new::<Self>(|cx| {
 784            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 785                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 786                    if let Some(local) = (!this.ignore_breakpoints)
 787                        .then(|| this.as_running_mut())
 788                        .flatten()
 789                    {
 790                        local
 791                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 792                            .detach();
 793                    };
 794                }
 795                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 796                    if let Some(local) = (!this.ignore_breakpoints)
 797                        .then(|| this.as_running_mut())
 798                        .flatten()
 799                    {
 800                        local.unset_breakpoints_from_paths(paths, cx).detach();
 801                    }
 802                }
 803                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 804            })
 805            .detach();
 806            // cx.on_app_quit(Self::on_app_quit).detach();
 807
 808            let this = Self {
 809                mode: Mode::Building,
 810                id: session_id,
 811                child_session_ids: HashSet::default(),
 812                parent_session,
 813                capabilities: Capabilities::default(),
 814                watchers: HashMap::default(),
 815                variables: Default::default(),
 816                stack_frames: Default::default(),
 817                thread_states: ThreadStates::default(),
 818                output_token: OutputToken(0),
 819                output: circular_buffer::CircularBuffer::boxed(),
 820                requests: HashMap::default(),
 821                modules: Vec::default(),
 822                loaded_sources: Vec::default(),
 823                threads: IndexMap::default(),
 824                background_tasks: Vec::default(),
 825                restart_task: None,
 826                locations: Default::default(),
 827                is_session_terminated: false,
 828                ignore_breakpoints: false,
 829                breakpoint_store,
 830                exception_breakpoints: Default::default(),
 831                label,
 832                adapter,
 833                task_context,
 834            };
 835
 836            this
 837        })
 838    }
 839
 840    pub fn task_context(&self) -> &TaskContext {
 841        &self.task_context
 842    }
 843
 844    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 845        match &self.mode {
 846            Mode::Building => None,
 847            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 848        }
 849    }
 850
 851    pub fn boot(
 852        &mut self,
 853        binary: DebugAdapterBinary,
 854        worktree: Entity<Worktree>,
 855        dap_store: WeakEntity<DapStore>,
 856        cx: &mut Context<Self>,
 857    ) -> Task<Result<()>> {
 858        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 859        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 860
 861        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 862            let mut initialized_tx = Some(initialized_tx);
 863            while let Some(message) = message_rx.next().await {
 864                if let Message::Event(event) = message {
 865                    if let Events::Initialized(_) = *event {
 866                        if let Some(tx) = initialized_tx.take() {
 867                            tx.send(()).ok();
 868                        }
 869                    } else {
 870                        let Ok(_) = this.update(cx, |session, cx| {
 871                            session.handle_dap_event(event, cx);
 872                        }) else {
 873                            break;
 874                        };
 875                    }
 876                } else if let Message::Request(request) = message {
 877                    let Ok(_) = this.update(cx, |this, cx| {
 878                        if request.command == StartDebugging::COMMAND {
 879                            this.handle_start_debugging_request(request, cx)
 880                                .detach_and_log_err(cx);
 881                        } else if request.command == RunInTerminal::COMMAND {
 882                            this.handle_run_in_terminal_request(request, cx)
 883                                .detach_and_log_err(cx);
 884                        }
 885                    }) else {
 886                        break;
 887                    };
 888                }
 889            }
 890        })];
 891        self.background_tasks = background_tasks;
 892        let id = self.id;
 893        let parent_session = self.parent_session.clone();
 894
 895        cx.spawn(async move |this, cx| {
 896            let mode = RunningMode::new(
 897                id,
 898                parent_session,
 899                worktree.downgrade(),
 900                binary.clone(),
 901                message_tx,
 902                cx,
 903            )
 904            .await?;
 905            this.update(cx, |this, cx| {
 906                this.mode = Mode::Running(mode);
 907                cx.emit(SessionStateEvent::Running);
 908            })?;
 909
 910            this.update(cx, |session, cx| session.request_initialize(cx))?
 911                .await?;
 912
 913            let result = this
 914                .update(cx, |session, cx| {
 915                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 916                })?
 917                .await;
 918
 919            if result.is_err() {
 920                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 921
 922                console
 923                    .send(format!(
 924                        "Tried to launch debugger with: {}",
 925                        serde_json::to_string_pretty(&binary.request_args.configuration)
 926                            .unwrap_or_default(),
 927                    ))
 928                    .await
 929                    .ok();
 930            }
 931
 932            result
 933        })
 934    }
 935
 936    pub fn session_id(&self) -> SessionId {
 937        self.id
 938    }
 939
 940    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 941        self.child_session_ids.clone()
 942    }
 943
 944    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 945        self.child_session_ids.insert(session_id);
 946    }
 947
 948    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 949        self.child_session_ids.remove(&session_id);
 950    }
 951
 952    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 953        self.parent_session
 954            .as_ref()
 955            .map(|session| session.read(cx).id)
 956    }
 957
 958    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 959        self.parent_session.as_ref()
 960    }
 961
 962    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
 963        let Some(client) = self.adapter_client() else {
 964            return Task::ready(());
 965        };
 966
 967        let supports_terminate = self
 968            .capabilities
 969            .support_terminate_debuggee
 970            .unwrap_or(false);
 971
 972        cx.background_spawn(async move {
 973            if supports_terminate {
 974                client
 975                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
 976                        restart: Some(false),
 977                    })
 978                    .await
 979                    .ok();
 980            } else {
 981                client
 982                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
 983                        restart: Some(false),
 984                        terminate_debuggee: Some(true),
 985                        suspend_debuggee: Some(false),
 986                    })
 987                    .await
 988                    .ok();
 989            }
 990        })
 991    }
 992
 993    pub fn capabilities(&self) -> &Capabilities {
 994        &self.capabilities
 995    }
 996
 997    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
 998        match &self.mode {
 999            Mode::Building => None,
1000            Mode::Running(running_mode) => Some(&running_mode.binary),
1001        }
1002    }
1003
1004    pub fn adapter(&self) -> DebugAdapterName {
1005        self.adapter.clone()
1006    }
1007
1008    pub fn label(&self) -> SharedString {
1009        self.label.clone()
1010    }
1011
1012    pub fn is_terminated(&self) -> bool {
1013        self.is_session_terminated
1014    }
1015
1016    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1017        let (tx, mut rx) = mpsc::unbounded();
1018
1019        cx.spawn(async move |this, cx| {
1020            while let Some(output) = rx.next().await {
1021                this.update(cx, |this, cx| {
1022                    let event = dap::OutputEvent {
1023                        category: None,
1024                        output,
1025                        group: None,
1026                        variables_reference: None,
1027                        source: None,
1028                        line: None,
1029                        column: None,
1030                        data: None,
1031                        location_reference: None,
1032                    };
1033                    this.push_output(event, cx);
1034                })?;
1035            }
1036            anyhow::Ok(())
1037        })
1038        .detach();
1039
1040        return tx;
1041    }
1042
1043    pub fn is_started(&self) -> bool {
1044        match &self.mode {
1045            Mode::Building => false,
1046            Mode::Running(running) => running.is_started,
1047        }
1048    }
1049
1050    pub fn is_building(&self) -> bool {
1051        matches!(self.mode, Mode::Building)
1052    }
1053
1054    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1055        match &mut self.mode {
1056            Mode::Running(local_mode) => Some(local_mode),
1057            Mode::Building => None,
1058        }
1059    }
1060
1061    pub fn as_running(&self) -> Option<&RunningMode> {
1062        match &self.mode {
1063            Mode::Running(local_mode) => Some(local_mode),
1064            Mode::Building => None,
1065        }
1066    }
1067
1068    fn handle_start_debugging_request(
1069        &mut self,
1070        request: dap::messages::Request,
1071        cx: &mut Context<Self>,
1072    ) -> Task<Result<()>> {
1073        let request_seq = request.seq;
1074
1075        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1076            .arguments
1077            .as_ref()
1078            .map(|value| serde_json::from_value(value.clone()));
1079
1080        let mut success = true;
1081        if let Some(Ok(request)) = launch_request {
1082            cx.emit(SessionStateEvent::SpawnChildSession { request });
1083        } else {
1084            log::error!(
1085                "Failed to parse launch request arguments: {:?}",
1086                request.arguments
1087            );
1088            success = false;
1089        }
1090
1091        cx.spawn(async move |this, cx| {
1092            this.update(cx, |this, cx| {
1093                this.respond_to_client(
1094                    request_seq,
1095                    success,
1096                    StartDebugging::COMMAND.to_string(),
1097                    None,
1098                    cx,
1099                )
1100            })?
1101            .await
1102        })
1103    }
1104
1105    fn handle_run_in_terminal_request(
1106        &mut self,
1107        request: dap::messages::Request,
1108        cx: &mut Context<Self>,
1109    ) -> Task<Result<()>> {
1110        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1111            request.arguments.unwrap_or_default(),
1112        ) {
1113            Ok(args) => args,
1114            Err(error) => {
1115                return cx.spawn(async move |session, cx| {
1116                    let error = serde_json::to_value(dap::ErrorResponse {
1117                        error: Some(dap::Message {
1118                            id: request.seq,
1119                            format: error.to_string(),
1120                            variables: None,
1121                            send_telemetry: None,
1122                            show_user: None,
1123                            url: None,
1124                            url_label: None,
1125                        }),
1126                    })
1127                    .ok();
1128
1129                    session
1130                        .update(cx, |this, cx| {
1131                            this.respond_to_client(
1132                                request.seq,
1133                                false,
1134                                StartDebugging::COMMAND.to_string(),
1135                                error,
1136                                cx,
1137                            )
1138                        })?
1139                        .await?;
1140
1141                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1142                });
1143            }
1144        };
1145
1146        let seq = request.seq;
1147
1148        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1149        cx.emit(SessionEvent::RunInTerminal {
1150            request: request_args,
1151            sender: tx,
1152        });
1153        cx.notify();
1154
1155        cx.spawn(async move |session, cx| {
1156            let result = util::maybe!(async move {
1157                rx.next().await.ok_or_else(|| {
1158                    anyhow!("failed to receive response from spawn terminal".to_string())
1159                })?
1160            })
1161            .await;
1162            let (success, body) = match result {
1163                Ok(pid) => (
1164                    true,
1165                    serde_json::to_value(dap::RunInTerminalResponse {
1166                        process_id: None,
1167                        shell_process_id: Some(pid as u64),
1168                    })
1169                    .ok(),
1170                ),
1171                Err(error) => (
1172                    false,
1173                    serde_json::to_value(dap::ErrorResponse {
1174                        error: Some(dap::Message {
1175                            id: seq,
1176                            format: error.to_string(),
1177                            variables: None,
1178                            send_telemetry: None,
1179                            show_user: None,
1180                            url: None,
1181                            url_label: None,
1182                        }),
1183                    })
1184                    .ok(),
1185                ),
1186            };
1187
1188            session
1189                .update(cx, |session, cx| {
1190                    session.respond_to_client(
1191                        seq,
1192                        success,
1193                        RunInTerminal::COMMAND.to_string(),
1194                        body,
1195                        cx,
1196                    )
1197                })?
1198                .await
1199        })
1200    }
1201
1202    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1203        let adapter_id = self.adapter().to_string();
1204        let request = Initialize { adapter_id };
1205
1206        let Mode::Running(running) = &self.mode else {
1207            return Task::ready(Err(anyhow!(
1208                "Cannot send initialize request, task still building"
1209            )));
1210        };
1211        let mut response = running.request(request.clone());
1212
1213        cx.spawn(async move |this, cx| {
1214            loop {
1215                let capabilities = response.await;
1216                match capabilities {
1217                    Err(e) => {
1218                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1219                            this.as_running()
1220                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1221                        }) else {
1222                            return Err(e);
1223                        };
1224                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1225                        reconnect.await?;
1226
1227                        let Ok(Some(r)) = this.update(cx, |this, _| {
1228                            this.as_running()
1229                                .map(|running| running.request(request.clone()))
1230                        }) else {
1231                            return Err(e);
1232                        };
1233                        response = r
1234                    }
1235                    Ok(capabilities) => {
1236                        this.update(cx, |session, cx| {
1237                            session.capabilities = capabilities;
1238                            let filters = session
1239                                .capabilities
1240                                .exception_breakpoint_filters
1241                                .clone()
1242                                .unwrap_or_default();
1243                            for filter in filters {
1244                                let default = filter.default.unwrap_or_default();
1245                                session
1246                                    .exception_breakpoints
1247                                    .entry(filter.filter.clone())
1248                                    .or_insert_with(|| (filter, default));
1249                            }
1250                            cx.emit(SessionEvent::CapabilitiesLoaded);
1251                        })?;
1252                        return Ok(());
1253                    }
1254                }
1255            }
1256        })
1257    }
1258
1259    pub(super) fn initialize_sequence(
1260        &mut self,
1261        initialize_rx: oneshot::Receiver<()>,
1262        dap_store: WeakEntity<DapStore>,
1263        cx: &mut Context<Self>,
1264    ) -> Task<Result<()>> {
1265        match &self.mode {
1266            Mode::Running(local_mode) => {
1267                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1268            }
1269            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1270        }
1271    }
1272
1273    pub fn run_to_position(
1274        &mut self,
1275        breakpoint: SourceBreakpoint,
1276        active_thread_id: ThreadId,
1277        cx: &mut Context<Self>,
1278    ) {
1279        match &mut self.mode {
1280            Mode::Running(local_mode) => {
1281                if !matches!(
1282                    self.thread_states.thread_state(active_thread_id),
1283                    Some(ThreadStatus::Stopped)
1284                ) {
1285                    return;
1286                };
1287                let path = breakpoint.path.clone();
1288                local_mode.tmp_breakpoint = Some(breakpoint);
1289                let task = local_mode.send_breakpoints_from_path(
1290                    path,
1291                    BreakpointUpdatedReason::Toggled,
1292                    &self.breakpoint_store,
1293                    cx,
1294                );
1295
1296                cx.spawn(async move |this, cx| {
1297                    task.await;
1298                    this.update(cx, |this, cx| {
1299                        this.continue_thread(active_thread_id, cx);
1300                    })
1301                })
1302                .detach();
1303            }
1304            Mode::Building => {}
1305        }
1306    }
1307
1308    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1309        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1310    }
1311
1312    pub fn output(
1313        &self,
1314        since: OutputToken,
1315    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1316        if self.output_token.0 == 0 {
1317            return (self.output.range(0..0), OutputToken(0));
1318        };
1319
1320        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1321
1322        let clamped_events_since = events_since.clamp(0, self.output.len());
1323        (
1324            self.output
1325                .range(self.output.len() - clamped_events_since..),
1326            self.output_token,
1327        )
1328    }
1329
1330    pub fn respond_to_client(
1331        &self,
1332        request_seq: u64,
1333        success: bool,
1334        command: String,
1335        body: Option<serde_json::Value>,
1336        cx: &mut Context<Self>,
1337    ) -> Task<Result<()>> {
1338        let Some(local_session) = self.as_running() else {
1339            unreachable!("Cannot respond to remote client");
1340        };
1341        let client = local_session.client.clone();
1342
1343        cx.background_spawn(async move {
1344            client
1345                .send_message(Message::Response(Response {
1346                    body,
1347                    success,
1348                    command,
1349                    seq: request_seq + 1,
1350                    request_seq,
1351                    message: None,
1352                }))
1353                .await
1354        })
1355    }
1356
1357    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1358        self.mode.stopped();
1359        // todo(debugger): Find a clean way to get around the clone
1360        let breakpoint_store = self.breakpoint_store.clone();
1361        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1362            let breakpoint = local.tmp_breakpoint.take()?;
1363            let path = breakpoint.path.clone();
1364            Some((local, path))
1365        }) {
1366            local
1367                .send_breakpoints_from_path(
1368                    path,
1369                    BreakpointUpdatedReason::Toggled,
1370                    &breakpoint_store,
1371                    cx,
1372                )
1373                .detach();
1374        };
1375
1376        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1377            self.thread_states.stop_all_threads();
1378            self.invalidate_command_type::<StackTraceCommand>();
1379        }
1380
1381        // Event if we stopped all threads we still need to insert the thread_id
1382        // to our own data
1383        if let Some(thread_id) = event.thread_id {
1384            self.thread_states.stop_thread(ThreadId(thread_id));
1385
1386            self.invalidate_state(
1387                &StackTraceCommand {
1388                    thread_id,
1389                    start_frame: None,
1390                    levels: None,
1391                }
1392                .into(),
1393            );
1394        }
1395
1396        self.invalidate_generic();
1397        self.threads.clear();
1398        self.variables.clear();
1399        cx.emit(SessionEvent::Stopped(
1400            event
1401                .thread_id
1402                .map(Into::into)
1403                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1404        ));
1405        cx.emit(SessionEvent::InvalidateInlineValue);
1406        cx.notify();
1407    }
1408
1409    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1410        match *event {
1411            Events::Initialized(_) => {
1412                debug_assert!(
1413                    false,
1414                    "Initialized event should have been handled in LocalMode"
1415                );
1416            }
1417            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1418            Events::Continued(event) => {
1419                if event.all_threads_continued.unwrap_or_default() {
1420                    self.thread_states.continue_all_threads();
1421                    self.breakpoint_store.update(cx, |store, cx| {
1422                        store.remove_active_position(Some(self.session_id()), cx)
1423                    });
1424                } else {
1425                    self.thread_states
1426                        .continue_thread(ThreadId(event.thread_id));
1427                }
1428                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1429                self.invalidate_generic();
1430            }
1431            Events::Exited(_event) => {
1432                self.clear_active_debug_line(cx);
1433            }
1434            Events::Terminated(_) => {
1435                self.shutdown(cx).detach();
1436            }
1437            Events::Thread(event) => {
1438                let thread_id = ThreadId(event.thread_id);
1439
1440                match event.reason {
1441                    dap::ThreadEventReason::Started => {
1442                        self.thread_states.continue_thread(thread_id);
1443                    }
1444                    dap::ThreadEventReason::Exited => {
1445                        self.thread_states.exit_thread(thread_id);
1446                    }
1447                    reason => {
1448                        log::error!("Unhandled thread event reason {:?}", reason);
1449                    }
1450                }
1451                self.invalidate_state(&ThreadsCommand.into());
1452                cx.notify();
1453            }
1454            Events::Output(event) => {
1455                if event
1456                    .category
1457                    .as_ref()
1458                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1459                {
1460                    return;
1461                }
1462
1463                self.push_output(event, cx);
1464                cx.notify();
1465            }
1466            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1467                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1468            }),
1469            Events::Module(event) => {
1470                match event.reason {
1471                    dap::ModuleEventReason::New => {
1472                        self.modules.push(event.module);
1473                    }
1474                    dap::ModuleEventReason::Changed => {
1475                        if let Some(module) = self
1476                            .modules
1477                            .iter_mut()
1478                            .find(|other| event.module.id == other.id)
1479                        {
1480                            *module = event.module;
1481                        }
1482                    }
1483                    dap::ModuleEventReason::Removed => {
1484                        self.modules.retain(|other| event.module.id != other.id);
1485                    }
1486                }
1487
1488                // todo(debugger): We should only send the invalidate command to downstream clients.
1489                // self.invalidate_state(&ModulesCommand.into());
1490            }
1491            Events::LoadedSource(_) => {
1492                self.invalidate_state(&LoadedSourcesCommand.into());
1493            }
1494            Events::Capabilities(event) => {
1495                self.capabilities = self.capabilities.merge(event.capabilities);
1496
1497                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1498                let recent_filters = self
1499                    .capabilities
1500                    .exception_breakpoint_filters
1501                    .iter()
1502                    .flatten()
1503                    .map(|filter| (filter.filter.clone(), filter.clone()))
1504                    .collect::<BTreeMap<_, _>>();
1505                for filter in recent_filters.values() {
1506                    let default = filter.default.unwrap_or_default();
1507                    self.exception_breakpoints
1508                        .entry(filter.filter.clone())
1509                        .or_insert_with(|| (filter.clone(), default));
1510                }
1511                self.exception_breakpoints
1512                    .retain(|k, _| recent_filters.contains_key(k));
1513                if self.is_started() {
1514                    self.send_exception_breakpoints(cx);
1515                }
1516
1517                // Remove the ones that no longer exist.
1518                cx.notify();
1519            }
1520            Events::Memory(_) => {}
1521            Events::Process(_) => {}
1522            Events::ProgressEnd(_) => {}
1523            Events::ProgressStart(_) => {}
1524            Events::ProgressUpdate(_) => {}
1525            Events::Invalidated(_) => {}
1526            Events::Other(_) => {}
1527        }
1528    }
1529
1530    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1531    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1532        &mut self,
1533        request: T,
1534        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1535        cx: &mut Context<Self>,
1536    ) {
1537        const {
1538            assert!(
1539                T::CACHEABLE,
1540                "Only requests marked as cacheable should invoke `fetch`"
1541            );
1542        }
1543
1544        if !self.thread_states.any_stopped_thread()
1545            && request.type_id() != TypeId::of::<ThreadsCommand>()
1546            || self.is_session_terminated
1547        {
1548            return;
1549        }
1550
1551        let request_map = self
1552            .requests
1553            .entry(std::any::TypeId::of::<T>())
1554            .or_default();
1555
1556        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1557            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1558
1559            let task = Self::request_inner::<Arc<T>>(
1560                &self.capabilities,
1561                &self.mode,
1562                command,
1563                |this, result, cx| {
1564                    process_result(this, result, cx);
1565                    None
1566                },
1567                cx,
1568            );
1569            let task = cx
1570                .background_executor()
1571                .spawn(async move {
1572                    let _ = task.await?;
1573                    Some(())
1574                })
1575                .shared();
1576
1577            vacant.insert(task);
1578            cx.notify();
1579        }
1580    }
1581
1582    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1583        capabilities: &Capabilities,
1584        mode: &Mode,
1585        request: T,
1586        process_result: impl FnOnce(
1587            &mut Self,
1588            Result<T::Response>,
1589            &mut Context<Self>,
1590        ) -> Option<T::Response>
1591        + 'static,
1592        cx: &mut Context<Self>,
1593    ) -> Task<Option<T::Response>> {
1594        if !T::is_supported(&capabilities) {
1595            log::warn!(
1596                "Attempted to send a DAP request that isn't supported: {:?}",
1597                request
1598            );
1599            let error = Err(anyhow::Error::msg(
1600                "Couldn't complete request because it's not supported",
1601            ));
1602            return cx.spawn(async move |this, cx| {
1603                this.update(cx, |this, cx| process_result(this, error, cx))
1604                    .ok()
1605                    .flatten()
1606            });
1607        }
1608
1609        let request = mode.request_dap(request);
1610        cx.spawn(async move |this, cx| {
1611            let result = request.await;
1612            this.update(cx, |this, cx| process_result(this, result, cx))
1613                .ok()
1614                .flatten()
1615        })
1616    }
1617
1618    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1619        &self,
1620        request: T,
1621        process_result: impl FnOnce(
1622            &mut Self,
1623            Result<T::Response>,
1624            &mut Context<Self>,
1625        ) -> Option<T::Response>
1626        + 'static,
1627        cx: &mut Context<Self>,
1628    ) -> Task<Option<T::Response>> {
1629        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1630    }
1631
1632    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1633        self.requests.remove(&std::any::TypeId::of::<Command>());
1634    }
1635
1636    fn invalidate_generic(&mut self) {
1637        self.invalidate_command_type::<ModulesCommand>();
1638        self.invalidate_command_type::<LoadedSourcesCommand>();
1639        self.invalidate_command_type::<ThreadsCommand>();
1640    }
1641
1642    fn invalidate_state(&mut self, key: &RequestSlot) {
1643        self.requests
1644            .entry((&*key.0 as &dyn Any).type_id())
1645            .and_modify(|request_map| {
1646                request_map.remove(&key);
1647            });
1648    }
1649
1650    fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1651        self.output.push_back(event);
1652        self.output_token.0 += 1;
1653        cx.emit(SessionEvent::ConsoleOutput);
1654    }
1655
1656    pub fn any_stopped_thread(&self) -> bool {
1657        self.thread_states.any_stopped_thread()
1658    }
1659
1660    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1661        self.thread_states.thread_status(thread_id)
1662    }
1663
1664    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1665        self.fetch(
1666            dap_command::ThreadsCommand,
1667            |this, result, cx| {
1668                let Some(result) = result.log_err() else {
1669                    return;
1670                };
1671
1672                this.threads = result
1673                    .into_iter()
1674                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1675                    .collect();
1676
1677                this.invalidate_command_type::<StackTraceCommand>();
1678                cx.emit(SessionEvent::Threads);
1679                cx.notify();
1680            },
1681            cx,
1682        );
1683
1684        self.threads
1685            .values()
1686            .map(|thread| {
1687                (
1688                    thread.dap.clone(),
1689                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1690                )
1691            })
1692            .collect()
1693    }
1694
1695    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1696        self.fetch(
1697            dap_command::ModulesCommand,
1698            |this, result, cx| {
1699                let Some(result) = result.log_err() else {
1700                    return;
1701                };
1702
1703                this.modules = result;
1704                cx.emit(SessionEvent::Modules);
1705                cx.notify();
1706            },
1707            cx,
1708        );
1709
1710        &self.modules
1711    }
1712
1713    pub fn ignore_breakpoints(&self) -> bool {
1714        self.ignore_breakpoints
1715    }
1716
1717    pub fn toggle_ignore_breakpoints(
1718        &mut self,
1719        cx: &mut App,
1720    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1721        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1722    }
1723
1724    pub(crate) fn set_ignore_breakpoints(
1725        &mut self,
1726        ignore: bool,
1727        cx: &mut App,
1728    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1729        if self.ignore_breakpoints == ignore {
1730            return Task::ready(HashMap::default());
1731        }
1732
1733        self.ignore_breakpoints = ignore;
1734
1735        if let Some(local) = self.as_running() {
1736            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1737        } else {
1738            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1739            unimplemented!()
1740        }
1741    }
1742
1743    pub fn exception_breakpoints(
1744        &self,
1745    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1746        self.exception_breakpoints.values()
1747    }
1748
1749    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1750        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1751            *is_enabled = !*is_enabled;
1752            self.send_exception_breakpoints(cx);
1753        }
1754    }
1755
1756    fn send_exception_breakpoints(&mut self, cx: &App) {
1757        if let Some(local) = self.as_running() {
1758            let exception_filters = self
1759                .exception_breakpoints
1760                .values()
1761                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1762                .collect();
1763
1764            let supports_exception_filters = self
1765                .capabilities
1766                .supports_exception_filter_options
1767                .unwrap_or_default();
1768            local
1769                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1770                .detach_and_log_err(cx);
1771        } else {
1772            debug_assert!(false, "Not implemented");
1773        }
1774    }
1775
1776    pub fn breakpoints_enabled(&self) -> bool {
1777        self.ignore_breakpoints
1778    }
1779
1780    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1781        self.fetch(
1782            dap_command::LoadedSourcesCommand,
1783            |this, result, cx| {
1784                let Some(result) = result.log_err() else {
1785                    return;
1786                };
1787                this.loaded_sources = result;
1788                cx.emit(SessionEvent::LoadedSources);
1789                cx.notify();
1790            },
1791            cx,
1792        );
1793
1794        &self.loaded_sources
1795    }
1796
1797    fn fallback_to_manual_restart(
1798        &mut self,
1799        res: Result<()>,
1800        cx: &mut Context<Self>,
1801    ) -> Option<()> {
1802        if res.log_err().is_none() {
1803            cx.emit(SessionStateEvent::Restart);
1804            return None;
1805        }
1806        Some(())
1807    }
1808
1809    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1810        res.log_err()?;
1811        Some(())
1812    }
1813
1814    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1815        thread_id: ThreadId,
1816    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1817    {
1818        move |this, response, cx| match response.log_err() {
1819            Some(response) => {
1820                this.breakpoint_store.update(cx, |store, cx| {
1821                    store.remove_active_position(Some(this.session_id()), cx)
1822                });
1823                Some(response)
1824            }
1825            None => {
1826                this.thread_states.stop_thread(thread_id);
1827                cx.notify();
1828                None
1829            }
1830        }
1831    }
1832
1833    fn clear_active_debug_line_response(
1834        &mut self,
1835        response: Result<()>,
1836        cx: &mut Context<Session>,
1837    ) -> Option<()> {
1838        response.log_err()?;
1839        self.clear_active_debug_line(cx);
1840        Some(())
1841    }
1842
1843    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1844        self.breakpoint_store.update(cx, |store, cx| {
1845            store.remove_active_position(Some(self.id), cx)
1846        });
1847    }
1848
1849    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1850        self.request(
1851            PauseCommand {
1852                thread_id: thread_id.0,
1853            },
1854            Self::empty_response,
1855            cx,
1856        )
1857        .detach();
1858    }
1859
1860    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1861        self.request(
1862            RestartStackFrameCommand { stack_frame_id },
1863            Self::empty_response,
1864            cx,
1865        )
1866        .detach();
1867    }
1868
1869    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1870        if self.restart_task.is_some() || self.as_running().is_none() {
1871            return;
1872        }
1873
1874        let supports_dap_restart =
1875            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
1876
1877        self.restart_task = Some(cx.spawn(async move |this, cx| {
1878            let _ = this.update(cx, |session, cx| {
1879                if supports_dap_restart {
1880                    session
1881                        .request(
1882                            RestartCommand {
1883                                raw: args.unwrap_or(Value::Null),
1884                            },
1885                            Self::fallback_to_manual_restart,
1886                            cx,
1887                        )
1888                        .detach();
1889                } else {
1890                    cx.emit(SessionStateEvent::Restart);
1891                }
1892            });
1893        }));
1894    }
1895
1896    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1897        if self.is_session_terminated {
1898            return Task::ready(());
1899        }
1900
1901        self.is_session_terminated = true;
1902        self.thread_states.exit_all_threads();
1903        cx.notify();
1904
1905        let task = if self
1906            .capabilities
1907            .supports_terminate_request
1908            .unwrap_or_default()
1909        {
1910            self.request(
1911                TerminateCommand {
1912                    restart: Some(false),
1913                },
1914                Self::clear_active_debug_line_response,
1915                cx,
1916            )
1917        } else {
1918            self.request(
1919                DisconnectCommand {
1920                    restart: Some(false),
1921                    terminate_debuggee: Some(true),
1922                    suspend_debuggee: Some(false),
1923                },
1924                Self::clear_active_debug_line_response,
1925                cx,
1926            )
1927        };
1928
1929        cx.emit(SessionStateEvent::Shutdown);
1930
1931        cx.spawn(async move |this, cx| {
1932            task.await;
1933            let _ = this.update(cx, |this, _| {
1934                if let Some(adapter_client) = this.adapter_client() {
1935                    adapter_client.kill();
1936                }
1937            });
1938        })
1939    }
1940
1941    pub fn completions(
1942        &mut self,
1943        query: CompletionsQuery,
1944        cx: &mut Context<Self>,
1945    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1946        let task = self.request(query, |_, result, _| result.log_err(), cx);
1947
1948        cx.background_executor().spawn(async move {
1949            anyhow::Ok(
1950                task.await
1951                    .map(|response| response.targets)
1952                    .context("failed to fetch completions")?,
1953            )
1954        })
1955    }
1956
1957    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1958        self.thread_states.continue_thread(thread_id);
1959        self.request(
1960            ContinueCommand {
1961                args: ContinueArguments {
1962                    thread_id: thread_id.0,
1963                    single_thread: Some(true),
1964                },
1965            },
1966            Self::on_step_response::<ContinueCommand>(thread_id),
1967            cx,
1968        )
1969        .detach();
1970    }
1971
1972    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1973        match self.mode {
1974            Mode::Running(ref local) => Some(local.client.clone()),
1975            Mode::Building => None,
1976        }
1977    }
1978
1979    pub fn has_ever_stopped(&self) -> bool {
1980        self.mode.has_ever_stopped()
1981    }
1982    pub fn step_over(
1983        &mut self,
1984        thread_id: ThreadId,
1985        granularity: SteppingGranularity,
1986        cx: &mut Context<Self>,
1987    ) {
1988        let supports_single_thread_execution_requests =
1989            self.capabilities.supports_single_thread_execution_requests;
1990        let supports_stepping_granularity = self
1991            .capabilities
1992            .supports_stepping_granularity
1993            .unwrap_or_default();
1994
1995        let command = NextCommand {
1996            inner: StepCommand {
1997                thread_id: thread_id.0,
1998                granularity: supports_stepping_granularity.then(|| granularity),
1999                single_thread: supports_single_thread_execution_requests,
2000            },
2001        };
2002
2003        self.thread_states.process_step(thread_id);
2004        self.request(
2005            command,
2006            Self::on_step_response::<NextCommand>(thread_id),
2007            cx,
2008        )
2009        .detach();
2010    }
2011
2012    pub fn step_in(
2013        &mut self,
2014        thread_id: ThreadId,
2015        granularity: SteppingGranularity,
2016        cx: &mut Context<Self>,
2017    ) {
2018        let supports_single_thread_execution_requests =
2019            self.capabilities.supports_single_thread_execution_requests;
2020        let supports_stepping_granularity = self
2021            .capabilities
2022            .supports_stepping_granularity
2023            .unwrap_or_default();
2024
2025        let command = StepInCommand {
2026            inner: StepCommand {
2027                thread_id: thread_id.0,
2028                granularity: supports_stepping_granularity.then(|| granularity),
2029                single_thread: supports_single_thread_execution_requests,
2030            },
2031        };
2032
2033        self.thread_states.process_step(thread_id);
2034        self.request(
2035            command,
2036            Self::on_step_response::<StepInCommand>(thread_id),
2037            cx,
2038        )
2039        .detach();
2040    }
2041
2042    pub fn step_out(
2043        &mut self,
2044        thread_id: ThreadId,
2045        granularity: SteppingGranularity,
2046        cx: &mut Context<Self>,
2047    ) {
2048        let supports_single_thread_execution_requests =
2049            self.capabilities.supports_single_thread_execution_requests;
2050        let supports_stepping_granularity = self
2051            .capabilities
2052            .supports_stepping_granularity
2053            .unwrap_or_default();
2054
2055        let command = StepOutCommand {
2056            inner: StepCommand {
2057                thread_id: thread_id.0,
2058                granularity: supports_stepping_granularity.then(|| granularity),
2059                single_thread: supports_single_thread_execution_requests,
2060            },
2061        };
2062
2063        self.thread_states.process_step(thread_id);
2064        self.request(
2065            command,
2066            Self::on_step_response::<StepOutCommand>(thread_id),
2067            cx,
2068        )
2069        .detach();
2070    }
2071
2072    pub fn step_back(
2073        &mut self,
2074        thread_id: ThreadId,
2075        granularity: SteppingGranularity,
2076        cx: &mut Context<Self>,
2077    ) {
2078        let supports_single_thread_execution_requests =
2079            self.capabilities.supports_single_thread_execution_requests;
2080        let supports_stepping_granularity = self
2081            .capabilities
2082            .supports_stepping_granularity
2083            .unwrap_or_default();
2084
2085        let command = StepBackCommand {
2086            inner: StepCommand {
2087                thread_id: thread_id.0,
2088                granularity: supports_stepping_granularity.then(|| granularity),
2089                single_thread: supports_single_thread_execution_requests,
2090            },
2091        };
2092
2093        self.thread_states.process_step(thread_id);
2094
2095        self.request(
2096            command,
2097            Self::on_step_response::<StepBackCommand>(thread_id),
2098            cx,
2099        )
2100        .detach();
2101    }
2102
2103    pub fn stack_frames(
2104        &mut self,
2105        thread_id: ThreadId,
2106        cx: &mut Context<Self>,
2107    ) -> Result<Vec<StackFrame>> {
2108        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2109            && self.requests.contains_key(&ThreadsCommand.type_id())
2110            && self.threads.contains_key(&thread_id)
2111        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2112        // We could still be using an old thread id and have sent a new thread's request
2113        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2114        // But it very well could cause a minor bug in the future that is hard to track down
2115        {
2116            self.fetch(
2117                super::dap_command::StackTraceCommand {
2118                    thread_id: thread_id.0,
2119                    start_frame: None,
2120                    levels: None,
2121                },
2122                move |this, stack_frames, cx| {
2123                    let entry =
2124                        this.threads
2125                            .entry(thread_id)
2126                            .and_modify(|thread| match &stack_frames {
2127                                Ok(stack_frames) => {
2128                                    thread.stack_frames = stack_frames
2129                                        .iter()
2130                                        .cloned()
2131                                        .map(StackFrame::from)
2132                                        .collect();
2133                                    thread.stack_frames_error = None;
2134                                }
2135                                Err(error) => {
2136                                    thread.stack_frames.clear();
2137                                    thread.stack_frames_error = Some(error.cloned());
2138                                }
2139                            });
2140                    debug_assert!(
2141                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2142                        "Sent request for thread_id that doesn't exist"
2143                    );
2144                    if let Ok(stack_frames) = stack_frames {
2145                        this.stack_frames.extend(
2146                            stack_frames
2147                                .into_iter()
2148                                .filter(|frame| {
2149                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2150                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2151                                    !(frame.id == 0
2152                                        && frame.line == 0
2153                                        && frame.column == 0
2154                                        && frame.presentation_hint
2155                                            == Some(StackFramePresentationHint::Label))
2156                                })
2157                                .map(|frame| (frame.id, StackFrame::from(frame))),
2158                        );
2159                    }
2160
2161                    this.invalidate_command_type::<ScopesCommand>();
2162                    this.invalidate_command_type::<VariablesCommand>();
2163
2164                    cx.emit(SessionEvent::StackTrace);
2165                },
2166                cx,
2167            );
2168        }
2169
2170        match self.threads.get(&thread_id) {
2171            Some(thread) => {
2172                if let Some(error) = &thread.stack_frames_error {
2173                    Err(error.cloned())
2174                } else {
2175                    Ok(thread.stack_frames.clone())
2176                }
2177            }
2178            None => Ok(Vec::new()),
2179        }
2180    }
2181
2182    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2183        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2184            && self
2185                .requests
2186                .contains_key(&TypeId::of::<StackTraceCommand>())
2187        {
2188            self.fetch(
2189                ScopesCommand { stack_frame_id },
2190                move |this, scopes, cx| {
2191                    let Some(scopes) = scopes.log_err() else {
2192                        return
2193                    };
2194
2195                    for scope in scopes.iter() {
2196                        this.variables(scope.variables_reference, cx);
2197                    }
2198
2199                    let entry = this
2200                        .stack_frames
2201                        .entry(stack_frame_id)
2202                        .and_modify(|stack_frame| {
2203                            stack_frame.scopes = scopes;
2204                        });
2205
2206                    cx.emit(SessionEvent::Variables);
2207
2208                    debug_assert!(
2209                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2210                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2211                    );
2212                },
2213                cx,
2214            );
2215        }
2216
2217        self.stack_frames
2218            .get(&stack_frame_id)
2219            .map(|frame| frame.scopes.as_slice())
2220            .unwrap_or_default()
2221    }
2222
2223    pub fn variables_by_stack_frame_id(
2224        &self,
2225        stack_frame_id: StackFrameId,
2226        globals: bool,
2227        locals: bool,
2228    ) -> Vec<dap::Variable> {
2229        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2230            return Vec::new();
2231        };
2232
2233        stack_frame
2234            .scopes
2235            .iter()
2236            .filter(|scope| {
2237                (scope.name.to_lowercase().contains("local") && locals)
2238                    || (scope.name.to_lowercase().contains("global") && globals)
2239            })
2240            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2241            .flatten()
2242            .cloned()
2243            .collect()
2244    }
2245
2246    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2247        &self.watchers
2248    }
2249
2250    pub fn add_watcher(
2251        &mut self,
2252        expression: SharedString,
2253        frame_id: u64,
2254        cx: &mut Context<Self>,
2255    ) -> Task<Result<()>> {
2256        let request = self.mode.request_dap(EvaluateCommand {
2257            expression: expression.to_string(),
2258            context: Some(EvaluateArgumentsContext::Watch),
2259            frame_id: Some(frame_id),
2260            source: None,
2261        });
2262
2263        cx.spawn(async move |this, cx| {
2264            let response = request.await?;
2265
2266            this.update(cx, |session, cx| {
2267                session.watchers.insert(
2268                    expression.clone(),
2269                    Watcher {
2270                        expression,
2271                        value: response.result.into(),
2272                        variables_reference: response.variables_reference,
2273                        presentation_hint: response.presentation_hint,
2274                    },
2275                );
2276                cx.emit(SessionEvent::Watchers);
2277            })
2278        })
2279    }
2280
2281    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2282        let watches = self.watchers.clone();
2283        for (_, watch) in watches.into_iter() {
2284            self.add_watcher(watch.expression.clone(), frame_id, cx)
2285                .detach();
2286        }
2287    }
2288
2289    pub fn remove_watcher(&mut self, expression: SharedString) {
2290        self.watchers.remove(&expression);
2291    }
2292
2293    pub fn variables(
2294        &mut self,
2295        variables_reference: VariableReference,
2296        cx: &mut Context<Self>,
2297    ) -> Vec<dap::Variable> {
2298        let command = VariablesCommand {
2299            variables_reference,
2300            filter: None,
2301            start: None,
2302            count: None,
2303            format: None,
2304        };
2305
2306        self.fetch(
2307            command,
2308            move |this, variables, cx| {
2309                let Some(variables) = variables.log_err() else {
2310                    return;
2311                };
2312
2313                this.variables.insert(variables_reference, variables);
2314
2315                cx.emit(SessionEvent::Variables);
2316                cx.emit(SessionEvent::InvalidateInlineValue);
2317            },
2318            cx,
2319        );
2320
2321        self.variables
2322            .get(&variables_reference)
2323            .cloned()
2324            .unwrap_or_default()
2325    }
2326
2327    pub fn set_variable_value(
2328        &mut self,
2329        stack_frame_id: u64,
2330        variables_reference: u64,
2331        name: String,
2332        value: String,
2333        cx: &mut Context<Self>,
2334    ) {
2335        if self.capabilities.supports_set_variable.unwrap_or_default() {
2336            self.request(
2337                SetVariableValueCommand {
2338                    name,
2339                    value,
2340                    variables_reference,
2341                },
2342                move |this, response, cx| {
2343                    let response = response.log_err()?;
2344                    this.invalidate_command_type::<VariablesCommand>();
2345                    this.refresh_watchers(stack_frame_id, cx);
2346                    cx.emit(SessionEvent::Variables);
2347                    Some(response)
2348                },
2349                cx,
2350            )
2351            .detach();
2352        }
2353    }
2354
2355    pub fn evaluate(
2356        &mut self,
2357        expression: String,
2358        context: Option<EvaluateArgumentsContext>,
2359        frame_id: Option<u64>,
2360        source: Option<Source>,
2361        cx: &mut Context<Self>,
2362    ) -> Task<()> {
2363        let event = dap::OutputEvent {
2364            category: None,
2365            output: format!("> {expression}"),
2366            group: None,
2367            variables_reference: None,
2368            source: None,
2369            line: None,
2370            column: None,
2371            data: None,
2372            location_reference: None,
2373        };
2374        self.push_output(event, cx);
2375        let request = self.mode.request_dap(EvaluateCommand {
2376            expression,
2377            context,
2378            frame_id,
2379            source,
2380        });
2381        cx.spawn(async move |this, cx| {
2382            let response = request.await;
2383            this.update(cx, |this, cx| {
2384                match response {
2385                    Ok(response) => {
2386                        let event = dap::OutputEvent {
2387                            category: None,
2388                            output: format!("< {}", &response.result),
2389                            group: None,
2390                            variables_reference: Some(response.variables_reference),
2391                            source: None,
2392                            line: None,
2393                            column: None,
2394                            data: None,
2395                            location_reference: None,
2396                        };
2397                        this.push_output(event, cx);
2398                    }
2399                    Err(e) => {
2400                        let event = dap::OutputEvent {
2401                            category: None,
2402                            output: format!("{}", e),
2403                            group: None,
2404                            variables_reference: None,
2405                            source: None,
2406                            line: None,
2407                            column: None,
2408                            data: None,
2409                            location_reference: None,
2410                        };
2411                        this.push_output(event, cx);
2412                    }
2413                };
2414                cx.notify();
2415            })
2416            .ok();
2417        })
2418    }
2419
2420    pub fn location(
2421        &mut self,
2422        reference: u64,
2423        cx: &mut Context<Self>,
2424    ) -> Option<dap::LocationsResponse> {
2425        self.fetch(
2426            LocationsCommand { reference },
2427            move |this, response, _| {
2428                let Some(response) = response.log_err() else {
2429                    return;
2430                };
2431                this.locations.insert(reference, response);
2432            },
2433            cx,
2434        );
2435        self.locations.get(&reference).cloned()
2436    }
2437
2438    pub fn is_attached(&self) -> bool {
2439        let Mode::Running(local_mode) = &self.mode else {
2440            return false;
2441        };
2442        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2443    }
2444
2445    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2446        let command = DisconnectCommand {
2447            restart: Some(false),
2448            terminate_debuggee: Some(false),
2449            suspend_debuggee: Some(false),
2450        };
2451
2452        self.request(command, Self::empty_response, cx).detach()
2453    }
2454
2455    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2456        if self
2457            .capabilities
2458            .supports_terminate_threads_request
2459            .unwrap_or_default()
2460        {
2461            self.request(
2462                TerminateThreadsCommand {
2463                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2464                },
2465                Self::clear_active_debug_line_response,
2466                cx,
2467            )
2468            .detach();
2469        } else {
2470            self.shutdown(cx).detach();
2471        }
2472    }
2473
2474    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2475        self.thread_states.thread_state(thread_id)
2476    }
2477}