session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  29    StartDebuggingRequestArgumentsRequest, VariablePresentationHint,
  30};
  31use futures::SinkExt;
  32use futures::channel::mpsc::UnboundedSender;
  33use futures::channel::{mpsc, oneshot};
  34use futures::{FutureExt, future::Shared};
  35use gpui::{
  36    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  37    Task, WeakEntity,
  38};
  39
  40use rpc::ErrorExt;
  41use serde_json::Value;
  42use smol::stream::StreamExt;
  43use std::any::TypeId;
  44use std::collections::BTreeMap;
  45use std::u64;
  46use std::{
  47    any::Any,
  48    collections::hash_map::Entry,
  49    hash::{Hash, Hasher},
  50    path::Path,
  51    sync::Arc,
  52};
  53use task::TaskContext;
  54use text::{PointUtf16, ToPointUtf16};
  55use util::ResultExt;
  56use worktree::Worktree;
  57
  58#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  59#[repr(transparent)]
  60pub struct ThreadId(pub u64);
  61
  62impl ThreadId {
  63    pub const MIN: ThreadId = ThreadId(u64::MIN);
  64    pub const MAX: ThreadId = ThreadId(u64::MAX);
  65}
  66
  67impl From<u64> for ThreadId {
  68    fn from(id: u64) -> Self {
  69        Self(id)
  70    }
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct StackFrame {
  75    pub dap: dap::StackFrame,
  76    pub scopes: Vec<dap::Scope>,
  77}
  78
  79impl From<dap::StackFrame> for StackFrame {
  80    fn from(stack_frame: dap::StackFrame) -> Self {
  81        Self {
  82            scopes: vec![],
  83            dap: stack_frame,
  84        }
  85    }
  86}
  87
  88#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  89pub enum ThreadStatus {
  90    #[default]
  91    Running,
  92    Stopped,
  93    Stepping,
  94    Exited,
  95    Ended,
  96}
  97
  98impl ThreadStatus {
  99    pub fn label(&self) -> &'static str {
 100        match self {
 101            ThreadStatus::Running => "Running",
 102            ThreadStatus::Stopped => "Stopped",
 103            ThreadStatus::Stepping => "Stepping",
 104            ThreadStatus::Exited => "Exited",
 105            ThreadStatus::Ended => "Ended",
 106        }
 107    }
 108}
 109
 110#[derive(Debug)]
 111pub struct Thread {
 112    dap: dap::Thread,
 113    stack_frames: Vec<StackFrame>,
 114    stack_frames_error: Option<anyhow::Error>,
 115    _has_stopped: bool,
 116}
 117
 118impl From<dap::Thread> for Thread {
 119    fn from(dap: dap::Thread) -> Self {
 120        Self {
 121            dap,
 122            stack_frames: Default::default(),
 123            stack_frames_error: None,
 124            _has_stopped: false,
 125        }
 126    }
 127}
 128
 129#[derive(Debug, Clone, PartialEq)]
 130pub struct Watcher {
 131    pub expression: SharedString,
 132    pub value: SharedString,
 133    pub variables_reference: u64,
 134    pub presentation_hint: Option<VariablePresentationHint>,
 135}
 136
 137pub enum Mode {
 138    Building,
 139    Running(RunningMode),
 140}
 141
 142#[derive(Clone)]
 143pub struct RunningMode {
 144    client: Arc<DebugAdapterClient>,
 145    binary: DebugAdapterBinary,
 146    tmp_breakpoint: Option<SourceBreakpoint>,
 147    worktree: WeakEntity<Worktree>,
 148    executor: BackgroundExecutor,
 149    is_started: bool,
 150    has_ever_stopped: bool,
 151    messages_tx: UnboundedSender<Message>,
 152}
 153
 154fn client_source(abs_path: &Path) -> dap::Source {
 155    dap::Source {
 156        name: abs_path
 157            .file_name()
 158            .map(|filename| filename.to_string_lossy().to_string()),
 159        path: Some(abs_path.to_string_lossy().to_string()),
 160        source_reference: None,
 161        presentation_hint: None,
 162        origin: None,
 163        sources: None,
 164        adapter_data: None,
 165        checksums: None,
 166    }
 167}
 168
 169impl RunningMode {
 170    async fn new(
 171        session_id: SessionId,
 172        parent_session: Option<Entity<Session>>,
 173        worktree: WeakEntity<Worktree>,
 174        binary: DebugAdapterBinary,
 175        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 176        cx: &mut AsyncApp,
 177    ) -> Result<Self> {
 178        let message_handler = Box::new({
 179            let messages_tx = messages_tx.clone();
 180            move |message| {
 181                messages_tx.unbounded_send(message).ok();
 182            }
 183        });
 184
 185        let client = if let Some(client) = parent_session
 186            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 187            .flatten()
 188        {
 189            client
 190                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 191                .await?
 192        } else {
 193            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 194        };
 195
 196        Ok(Self {
 197            client: Arc::new(client),
 198            worktree,
 199            tmp_breakpoint: None,
 200            binary,
 201            executor: cx.background_executor().clone(),
 202            is_started: false,
 203            has_ever_stopped: false,
 204            messages_tx,
 205        })
 206    }
 207
 208    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 209        &self.worktree
 210    }
 211
 212    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 213        let tasks: Vec<_> = paths
 214            .into_iter()
 215            .map(|path| {
 216                self.request(dap_command::SetBreakpoints {
 217                    source: client_source(path),
 218                    source_modified: None,
 219                    breakpoints: vec![],
 220                })
 221            })
 222            .collect();
 223
 224        cx.background_spawn(async move {
 225            futures::future::join_all(tasks)
 226                .await
 227                .iter()
 228                .for_each(|res| match res {
 229                    Ok(_) => {}
 230                    Err(err) => {
 231                        log::warn!("Set breakpoints request failed: {}", err);
 232                    }
 233                });
 234        })
 235    }
 236
 237    fn send_breakpoints_from_path(
 238        &self,
 239        abs_path: Arc<Path>,
 240        reason: BreakpointUpdatedReason,
 241        breakpoint_store: &Entity<BreakpointStore>,
 242        cx: &mut App,
 243    ) -> Task<()> {
 244        let breakpoints =
 245            breakpoint_store
 246                .read(cx)
 247                .source_breakpoints_from_path(&abs_path, cx)
 248                .into_iter()
 249                .filter(|bp| bp.state.is_enabled())
 250                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 251                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 252                }))
 253                .map(Into::into)
 254                .collect();
 255
 256        let raw_breakpoints = breakpoint_store
 257            .read(cx)
 258            .breakpoints_from_path(&abs_path)
 259            .into_iter()
 260            .filter(|bp| bp.bp.state.is_enabled())
 261            .collect::<Vec<_>>();
 262
 263        let task = self.request(dap_command::SetBreakpoints {
 264            source: client_source(&abs_path),
 265            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 266            breakpoints,
 267        });
 268        let session_id = self.client.id();
 269        let breakpoint_store = breakpoint_store.downgrade();
 270        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 271            Ok(breakpoints) => {
 272                let breakpoints =
 273                    breakpoints
 274                        .into_iter()
 275                        .zip(raw_breakpoints)
 276                        .filter_map(|(dap_bp, zed_bp)| {
 277                            Some((
 278                                zed_bp,
 279                                BreakpointSessionState {
 280                                    id: dap_bp.id?,
 281                                    verified: dap_bp.verified,
 282                                },
 283                            ))
 284                        });
 285                breakpoint_store
 286                    .update(cx, |this, _| {
 287                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 288                    })
 289                    .ok();
 290            }
 291            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 292        })
 293    }
 294
 295    fn send_exception_breakpoints(
 296        &self,
 297        filters: Vec<ExceptionBreakpointsFilter>,
 298        supports_filter_options: bool,
 299    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 300        let arg = if supports_filter_options {
 301            SetExceptionBreakpoints::WithOptions {
 302                filters: filters
 303                    .into_iter()
 304                    .map(|filter| ExceptionFilterOptions {
 305                        filter_id: filter.filter,
 306                        condition: None,
 307                        mode: None,
 308                    })
 309                    .collect(),
 310            }
 311        } else {
 312            SetExceptionBreakpoints::Plain {
 313                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 314            }
 315        };
 316        self.request(arg)
 317    }
 318
 319    fn send_source_breakpoints(
 320        &self,
 321        ignore_breakpoints: bool,
 322        breakpoint_store: &Entity<BreakpointStore>,
 323        cx: &App,
 324    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 325        let mut breakpoint_tasks = Vec::new();
 326        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 327        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 328        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 329        let session_id = self.client.id();
 330        for (path, breakpoints) in breakpoints {
 331            let breakpoints = if ignore_breakpoints {
 332                vec![]
 333            } else {
 334                breakpoints
 335                    .into_iter()
 336                    .filter(|bp| bp.state.is_enabled())
 337                    .map(Into::into)
 338                    .collect()
 339            };
 340
 341            let raw_breakpoints = raw_breakpoints
 342                .remove(&path)
 343                .unwrap_or_default()
 344                .into_iter()
 345                .filter(|bp| bp.bp.state.is_enabled());
 346            let error_path = path.clone();
 347            let send_request = self
 348                .request(dap_command::SetBreakpoints {
 349                    source: client_source(&path),
 350                    source_modified: Some(false),
 351                    breakpoints,
 352                })
 353                .map(|result| result.map_err(move |e| (error_path, e)));
 354
 355            let task = cx.spawn({
 356                let breakpoint_store = breakpoint_store.downgrade();
 357                async move |cx| {
 358                    let breakpoints = cx.background_spawn(send_request).await?;
 359
 360                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 361                        |(dap_bp, zed_bp)| {
 362                            Some((
 363                                zed_bp,
 364                                BreakpointSessionState {
 365                                    id: dap_bp.id?,
 366                                    verified: dap_bp.verified,
 367                                },
 368                            ))
 369                        },
 370                    );
 371                    breakpoint_store
 372                        .update(cx, |this, _| {
 373                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 374                        })
 375                        .ok();
 376
 377                    Ok(())
 378                }
 379            });
 380            breakpoint_tasks.push(task);
 381        }
 382
 383        cx.background_spawn(async move {
 384            futures::future::join_all(breakpoint_tasks)
 385                .await
 386                .into_iter()
 387                .filter_map(Result::err)
 388                .collect::<HashMap<_, _>>()
 389        })
 390    }
 391
 392    fn initialize_sequence(
 393        &self,
 394        capabilities: &Capabilities,
 395        initialized_rx: oneshot::Receiver<()>,
 396        dap_store: WeakEntity<DapStore>,
 397        cx: &mut Context<Session>,
 398    ) -> Task<Result<()>> {
 399        let raw = self.binary.request_args.clone();
 400
 401        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 402        let launch = match raw.request {
 403            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 404                raw: raw.configuration,
 405            }),
 406            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 407                raw: raw.configuration,
 408            }),
 409        };
 410
 411        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 412        // From spec (on initialization sequence):
 413        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 414        //
 415        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 416        let should_send_exception_breakpoints = capabilities
 417            .exception_breakpoint_filters
 418            .as_ref()
 419            .map_or(false, |filters| !filters.is_empty())
 420            || !configuration_done_supported;
 421        let supports_exception_filters = capabilities
 422            .supports_exception_filter_options
 423            .unwrap_or_default();
 424        let this = self.clone();
 425        let worktree = self.worktree().clone();
 426        let mut filters = capabilities
 427            .exception_breakpoint_filters
 428            .clone()
 429            .unwrap_or_default();
 430        let configuration_sequence = cx.spawn({
 431            async move |session, cx| {
 432                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 433                let (breakpoint_store, adapter_defaults) =
 434                    dap_store.read_with(cx, |dap_store, _| {
 435                        (
 436                            dap_store.breakpoint_store().clone(),
 437                            dap_store.adapter_options(&adapter_name),
 438                        )
 439                    })?;
 440                initialized_rx.await?;
 441                let errors_by_path = cx
 442                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 443                    .await;
 444
 445                dap_store.update(cx, |_, cx| {
 446                    let Some(worktree) = worktree.upgrade() else {
 447                        return;
 448                    };
 449
 450                    for (path, error) in &errors_by_path {
 451                        log::error!("failed to set breakpoints for {path:?}: {error}");
 452                    }
 453
 454                    if let Some(failed_path) = errors_by_path.keys().next() {
 455                        let failed_path = failed_path
 456                            .strip_prefix(worktree.read(cx).abs_path())
 457                            .unwrap_or(failed_path)
 458                            .display();
 459                        let message = format!(
 460                            "Failed to set breakpoints for {failed_path}{}",
 461                            match errors_by_path.len() {
 462                                0 => unreachable!(),
 463                                1 => "".into(),
 464                                2 => " and 1 other path".into(),
 465                                n => format!(" and {} other paths", n - 1),
 466                            }
 467                        );
 468                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 469                    }
 470                })?;
 471
 472                if should_send_exception_breakpoints {
 473                    _ = session.update(cx, |this, _| {
 474                        filters.retain(|filter| {
 475                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 476                                defaults
 477                                    .exception_breakpoints
 478                                    .get(&filter.filter)
 479                                    .map(|options| options.enabled)
 480                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 481                            } else {
 482                                filter.default.unwrap_or_default()
 483                            };
 484                            this.exception_breakpoints
 485                                .entry(filter.filter.clone())
 486                                .or_insert_with(|| (filter.clone(), is_enabled));
 487                            is_enabled
 488                        });
 489                    });
 490
 491                    this.send_exception_breakpoints(filters, supports_exception_filters)
 492                        .await
 493                        .ok();
 494                }
 495
 496                let ret = if configuration_done_supported {
 497                    this.request(ConfigurationDone {})
 498                } else {
 499                    Task::ready(Ok(()))
 500                }
 501                .await;
 502                ret
 503            }
 504        });
 505
 506        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 507
 508        cx.spawn(async move |this, cx| {
 509            let result = task.await;
 510
 511            this.update(cx, |this, cx| {
 512                if let Some(this) = this.as_running_mut() {
 513                    this.is_started = true;
 514                    cx.notify();
 515                }
 516            })
 517            .ok();
 518
 519            result?;
 520            anyhow::Ok(())
 521        })
 522    }
 523
 524    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 525        let client = self.client.clone();
 526        let messages_tx = self.messages_tx.clone();
 527        let message_handler = Box::new(move |message| {
 528            messages_tx.unbounded_send(message).ok();
 529        });
 530        if client.should_reconnect_for_ssh() {
 531            Some(cx.spawn(async move |cx| {
 532                client.connect(message_handler, cx).await?;
 533                anyhow::Ok(())
 534            }))
 535        } else {
 536            None
 537        }
 538    }
 539
 540    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 541    where
 542        <R::DapRequest as dap::requests::Request>::Response: 'static,
 543        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 544    {
 545        let request = Arc::new(request);
 546
 547        let request_clone = request.clone();
 548        let connection = self.client.clone();
 549        self.executor.spawn(async move {
 550            let args = request_clone.to_dap();
 551            let response = connection.request::<R::DapRequest>(args).await?;
 552            request.response_from_dap(response)
 553        })
 554    }
 555}
 556
 557impl Mode {
 558    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 559    where
 560        <R::DapRequest as dap::requests::Request>::Response: 'static,
 561        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 562    {
 563        match self {
 564            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 565            Mode::Building => Task::ready(Err(anyhow!(
 566                "no adapter running to send request: {request:?}"
 567            ))),
 568        }
 569    }
 570
 571    /// Did this debug session stop at least once?
 572    pub(crate) fn has_ever_stopped(&self) -> bool {
 573        match self {
 574            Mode::Building => false,
 575            Mode::Running(running_mode) => running_mode.has_ever_stopped,
 576        }
 577    }
 578
 579    fn stopped(&mut self) {
 580        if let Mode::Running(running) = self {
 581            running.has_ever_stopped = true;
 582        }
 583    }
 584}
 585
 586#[derive(Default)]
 587struct ThreadStates {
 588    global_state: Option<ThreadStatus>,
 589    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 590}
 591
 592impl ThreadStates {
 593    fn stop_all_threads(&mut self) {
 594        self.global_state = Some(ThreadStatus::Stopped);
 595        self.known_thread_states.clear();
 596    }
 597
 598    fn exit_all_threads(&mut self) {
 599        self.global_state = Some(ThreadStatus::Exited);
 600        self.known_thread_states.clear();
 601    }
 602
 603    fn continue_all_threads(&mut self) {
 604        self.global_state = Some(ThreadStatus::Running);
 605        self.known_thread_states.clear();
 606    }
 607
 608    fn stop_thread(&mut self, thread_id: ThreadId) {
 609        self.known_thread_states
 610            .insert(thread_id, ThreadStatus::Stopped);
 611    }
 612
 613    fn continue_thread(&mut self, thread_id: ThreadId) {
 614        self.known_thread_states
 615            .insert(thread_id, ThreadStatus::Running);
 616    }
 617
 618    fn process_step(&mut self, thread_id: ThreadId) {
 619        self.known_thread_states
 620            .insert(thread_id, ThreadStatus::Stepping);
 621    }
 622
 623    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 624        self.thread_state(thread_id)
 625            .unwrap_or(ThreadStatus::Running)
 626    }
 627
 628    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 629        self.known_thread_states
 630            .get(&thread_id)
 631            .copied()
 632            .or(self.global_state)
 633    }
 634
 635    fn exit_thread(&mut self, thread_id: ThreadId) {
 636        self.known_thread_states
 637            .insert(thread_id, ThreadStatus::Exited);
 638    }
 639
 640    fn any_stopped_thread(&self) -> bool {
 641        self.global_state
 642            .is_some_and(|state| state == ThreadStatus::Stopped)
 643            || self
 644                .known_thread_states
 645                .values()
 646                .any(|status| *status == ThreadStatus::Stopped)
 647    }
 648}
 649const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 650
 651type IsEnabled = bool;
 652
 653#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 654pub struct OutputToken(pub usize);
 655/// Represents a current state of a single debug adapter and provides ways to mutate it.
 656pub struct Session {
 657    pub mode: Mode,
 658    id: SessionId,
 659    label: SharedString,
 660    adapter: DebugAdapterName,
 661    pub(super) capabilities: Capabilities,
 662    child_session_ids: HashSet<SessionId>,
 663    parent_session: Option<Entity<Session>>,
 664    modules: Vec<dap::Module>,
 665    loaded_sources: Vec<dap::Source>,
 666    output_token: OutputToken,
 667    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 668    threads: IndexMap<ThreadId, Thread>,
 669    thread_states: ThreadStates,
 670    watchers: HashMap<SharedString, Watcher>,
 671    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 672    stack_frames: IndexMap<StackFrameId, StackFrame>,
 673    locations: HashMap<u64, dap::LocationsResponse>,
 674    is_session_terminated: bool,
 675    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 676    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 677    ignore_breakpoints: bool,
 678    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 679    background_tasks: Vec<Task<()>>,
 680    task_context: TaskContext,
 681}
 682
 683trait CacheableCommand: Any + Send + Sync {
 684    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 685    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 686    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 687}
 688
 689impl<T> CacheableCommand for T
 690where
 691    T: DapCommand + PartialEq + Eq + Hash,
 692{
 693    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 694        (rhs as &dyn Any)
 695            .downcast_ref::<Self>()
 696            .map_or(false, |rhs| self == rhs)
 697    }
 698
 699    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 700        T::hash(self, &mut hasher);
 701    }
 702
 703    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 704        self
 705    }
 706}
 707
 708pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 709
 710impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 711    fn from(request: T) -> Self {
 712        Self(Arc::new(request))
 713    }
 714}
 715
 716impl PartialEq for RequestSlot {
 717    fn eq(&self, other: &Self) -> bool {
 718        self.0.dyn_eq(other.0.as_ref())
 719    }
 720}
 721
 722impl Eq for RequestSlot {}
 723
 724impl Hash for RequestSlot {
 725    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 726        self.0.dyn_hash(state);
 727        (&*self.0 as &dyn Any).type_id().hash(state)
 728    }
 729}
 730
 731#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 732pub struct CompletionsQuery {
 733    pub query: String,
 734    pub column: u64,
 735    pub line: Option<u64>,
 736    pub frame_id: Option<u64>,
 737}
 738
 739impl CompletionsQuery {
 740    pub fn new(
 741        buffer: &language::Buffer,
 742        cursor_position: language::Anchor,
 743        frame_id: Option<u64>,
 744    ) -> Self {
 745        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 746        Self {
 747            query: buffer.text(),
 748            column: column as u64,
 749            frame_id,
 750            line: Some(row as u64),
 751        }
 752    }
 753}
 754
 755#[derive(Debug)]
 756pub enum SessionEvent {
 757    Modules,
 758    LoadedSources,
 759    Stopped(Option<ThreadId>),
 760    StackTrace,
 761    Variables,
 762    Watchers,
 763    Threads,
 764    InvalidateInlineValue,
 765    CapabilitiesLoaded,
 766    RunInTerminal {
 767        request: RunInTerminalRequestArguments,
 768        sender: mpsc::Sender<Result<u32>>,
 769    },
 770    ConsoleOutput,
 771}
 772
 773#[derive(Clone, Debug, PartialEq, Eq)]
 774pub enum SessionStateEvent {
 775    Running,
 776    Shutdown,
 777    Restart,
 778    SpawnChildSession {
 779        request: StartDebuggingRequestArguments,
 780    },
 781}
 782
 783impl EventEmitter<SessionEvent> for Session {}
 784impl EventEmitter<SessionStateEvent> for Session {}
 785
 786// local session will send breakpoint updates to DAP for all new breakpoints
 787// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 788// BreakpointStore notifies session on breakpoint changes
 789impl Session {
 790    pub(crate) fn new(
 791        breakpoint_store: Entity<BreakpointStore>,
 792        session_id: SessionId,
 793        parent_session: Option<Entity<Session>>,
 794        label: SharedString,
 795        adapter: DebugAdapterName,
 796        task_context: TaskContext,
 797        cx: &mut App,
 798    ) -> Entity<Self> {
 799        cx.new::<Self>(|cx| {
 800            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 801                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 802                    if let Some(local) = (!this.ignore_breakpoints)
 803                        .then(|| this.as_running_mut())
 804                        .flatten()
 805                    {
 806                        local
 807                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 808                            .detach();
 809                    };
 810                }
 811                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 812                    if let Some(local) = (!this.ignore_breakpoints)
 813                        .then(|| this.as_running_mut())
 814                        .flatten()
 815                    {
 816                        local.unset_breakpoints_from_paths(paths, cx).detach();
 817                    }
 818                }
 819                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 820            })
 821            .detach();
 822            // cx.on_app_quit(Self::on_app_quit).detach();
 823
 824            let this = Self {
 825                mode: Mode::Building,
 826                id: session_id,
 827                child_session_ids: HashSet::default(),
 828                parent_session,
 829                capabilities: Capabilities::default(),
 830                watchers: HashMap::default(),
 831                variables: Default::default(),
 832                stack_frames: Default::default(),
 833                thread_states: ThreadStates::default(),
 834                output_token: OutputToken(0),
 835                output: circular_buffer::CircularBuffer::boxed(),
 836                requests: HashMap::default(),
 837                modules: Vec::default(),
 838                loaded_sources: Vec::default(),
 839                threads: IndexMap::default(),
 840                background_tasks: Vec::default(),
 841                locations: Default::default(),
 842                is_session_terminated: false,
 843                ignore_breakpoints: false,
 844                breakpoint_store,
 845                exception_breakpoints: Default::default(),
 846                label,
 847                adapter,
 848                task_context,
 849            };
 850
 851            this
 852        })
 853    }
 854
 855    pub fn task_context(&self) -> &TaskContext {
 856        &self.task_context
 857    }
 858
 859    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 860        match &self.mode {
 861            Mode::Building => None,
 862            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 863        }
 864    }
 865
 866    pub fn boot(
 867        &mut self,
 868        binary: DebugAdapterBinary,
 869        worktree: Entity<Worktree>,
 870        dap_store: WeakEntity<DapStore>,
 871        cx: &mut Context<Self>,
 872    ) -> Task<Result<()>> {
 873        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 874        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 875
 876        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 877            let mut initialized_tx = Some(initialized_tx);
 878            while let Some(message) = message_rx.next().await {
 879                if let Message::Event(event) = message {
 880                    if let Events::Initialized(_) = *event {
 881                        if let Some(tx) = initialized_tx.take() {
 882                            tx.send(()).ok();
 883                        }
 884                    } else {
 885                        let Ok(_) = this.update(cx, |session, cx| {
 886                            session.handle_dap_event(event, cx);
 887                        }) else {
 888                            break;
 889                        };
 890                    }
 891                } else if let Message::Request(request) = message {
 892                    let Ok(_) = this.update(cx, |this, cx| {
 893                        if request.command == StartDebugging::COMMAND {
 894                            this.handle_start_debugging_request(request, cx)
 895                                .detach_and_log_err(cx);
 896                        } else if request.command == RunInTerminal::COMMAND {
 897                            this.handle_run_in_terminal_request(request, cx)
 898                                .detach_and_log_err(cx);
 899                        }
 900                    }) else {
 901                        break;
 902                    };
 903                }
 904            }
 905        })];
 906        self.background_tasks = background_tasks;
 907        let id = self.id;
 908        let parent_session = self.parent_session.clone();
 909
 910        cx.spawn(async move |this, cx| {
 911            let mode = RunningMode::new(
 912                id,
 913                parent_session,
 914                worktree.downgrade(),
 915                binary.clone(),
 916                message_tx,
 917                cx,
 918            )
 919            .await?;
 920            this.update(cx, |this, cx| {
 921                this.mode = Mode::Running(mode);
 922                cx.emit(SessionStateEvent::Running);
 923            })?;
 924
 925            this.update(cx, |session, cx| session.request_initialize(cx))?
 926                .await?;
 927
 928            let result = this
 929                .update(cx, |session, cx| {
 930                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 931                })?
 932                .await;
 933
 934            if result.is_err() {
 935                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 936
 937                console
 938                    .send(format!(
 939                        "Tried to launch debugger with: {}",
 940                        serde_json::to_string_pretty(&binary.request_args.configuration)
 941                            .unwrap_or_default(),
 942                    ))
 943                    .await
 944                    .ok();
 945            }
 946
 947            result
 948        })
 949    }
 950
 951    pub fn session_id(&self) -> SessionId {
 952        self.id
 953    }
 954
 955    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 956        self.child_session_ids.clone()
 957    }
 958
 959    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 960        self.child_session_ids.insert(session_id);
 961    }
 962
 963    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 964        self.child_session_ids.remove(&session_id);
 965    }
 966
 967    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 968        self.parent_session
 969            .as_ref()
 970            .map(|session| session.read(cx).id)
 971    }
 972
 973    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 974        self.parent_session.as_ref()
 975    }
 976
 977    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
 978        let Some(client) = self.adapter_client() else {
 979            return Task::ready(());
 980        };
 981
 982        let supports_terminate = self
 983            .capabilities
 984            .support_terminate_debuggee
 985            .unwrap_or(false);
 986
 987        cx.background_spawn(async move {
 988            if supports_terminate {
 989                client
 990                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
 991                        restart: Some(false),
 992                    })
 993                    .await
 994                    .ok();
 995            } else {
 996                client
 997                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
 998                        restart: Some(false),
 999                        terminate_debuggee: Some(true),
1000                        suspend_debuggee: Some(false),
1001                    })
1002                    .await
1003                    .ok();
1004            }
1005        })
1006    }
1007
1008    pub fn capabilities(&self) -> &Capabilities {
1009        &self.capabilities
1010    }
1011
1012    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1013        match &self.mode {
1014            Mode::Building => None,
1015            Mode::Running(running_mode) => Some(&running_mode.binary),
1016        }
1017    }
1018
1019    pub fn adapter(&self) -> DebugAdapterName {
1020        self.adapter.clone()
1021    }
1022
1023    pub fn label(&self) -> SharedString {
1024        self.label.clone()
1025    }
1026
1027    pub fn is_terminated(&self) -> bool {
1028        self.is_session_terminated
1029    }
1030
1031    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1032        let (tx, mut rx) = mpsc::unbounded();
1033
1034        cx.spawn(async move |this, cx| {
1035            while let Some(output) = rx.next().await {
1036                this.update(cx, |this, _| {
1037                    let event = dap::OutputEvent {
1038                        category: None,
1039                        output,
1040                        group: None,
1041                        variables_reference: None,
1042                        source: None,
1043                        line: None,
1044                        column: None,
1045                        data: None,
1046                        location_reference: None,
1047                    };
1048                    this.push_output(event);
1049                })?;
1050            }
1051            anyhow::Ok(())
1052        })
1053        .detach();
1054
1055        return tx;
1056    }
1057
1058    pub fn is_started(&self) -> bool {
1059        match &self.mode {
1060            Mode::Building => false,
1061            Mode::Running(running) => running.is_started,
1062        }
1063    }
1064
1065    pub fn is_building(&self) -> bool {
1066        matches!(self.mode, Mode::Building)
1067    }
1068
1069    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1070        match &mut self.mode {
1071            Mode::Running(local_mode) => Some(local_mode),
1072            Mode::Building => None,
1073        }
1074    }
1075
1076    pub fn as_running(&self) -> Option<&RunningMode> {
1077        match &self.mode {
1078            Mode::Running(local_mode) => Some(local_mode),
1079            Mode::Building => None,
1080        }
1081    }
1082
1083    fn handle_start_debugging_request(
1084        &mut self,
1085        request: dap::messages::Request,
1086        cx: &mut Context<Self>,
1087    ) -> Task<Result<()>> {
1088        let request_seq = request.seq;
1089
1090        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1091            .arguments
1092            .as_ref()
1093            .map(|value| serde_json::from_value(value.clone()));
1094
1095        let mut success = true;
1096        if let Some(Ok(request)) = launch_request {
1097            cx.emit(SessionStateEvent::SpawnChildSession { request });
1098        } else {
1099            log::error!(
1100                "Failed to parse launch request arguments: {:?}",
1101                request.arguments
1102            );
1103            success = false;
1104        }
1105
1106        cx.spawn(async move |this, cx| {
1107            this.update(cx, |this, cx| {
1108                this.respond_to_client(
1109                    request_seq,
1110                    success,
1111                    StartDebugging::COMMAND.to_string(),
1112                    None,
1113                    cx,
1114                )
1115            })?
1116            .await
1117        })
1118    }
1119
1120    fn handle_run_in_terminal_request(
1121        &mut self,
1122        request: dap::messages::Request,
1123        cx: &mut Context<Self>,
1124    ) -> Task<Result<()>> {
1125        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1126            request.arguments.unwrap_or_default(),
1127        ) {
1128            Ok(args) => args,
1129            Err(error) => {
1130                return cx.spawn(async move |session, cx| {
1131                    let error = serde_json::to_value(dap::ErrorResponse {
1132                        error: Some(dap::Message {
1133                            id: request.seq,
1134                            format: error.to_string(),
1135                            variables: None,
1136                            send_telemetry: None,
1137                            show_user: None,
1138                            url: None,
1139                            url_label: None,
1140                        }),
1141                    })
1142                    .ok();
1143
1144                    session
1145                        .update(cx, |this, cx| {
1146                            this.respond_to_client(
1147                                request.seq,
1148                                false,
1149                                StartDebugging::COMMAND.to_string(),
1150                                error,
1151                                cx,
1152                            )
1153                        })?
1154                        .await?;
1155
1156                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1157                });
1158            }
1159        };
1160
1161        let seq = request.seq;
1162
1163        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1164        cx.emit(SessionEvent::RunInTerminal {
1165            request: request_args,
1166            sender: tx,
1167        });
1168        cx.notify();
1169
1170        cx.spawn(async move |session, cx| {
1171            let result = util::maybe!(async move {
1172                rx.next().await.ok_or_else(|| {
1173                    anyhow!("failed to receive response from spawn terminal".to_string())
1174                })?
1175            })
1176            .await;
1177            let (success, body) = match result {
1178                Ok(pid) => (
1179                    true,
1180                    serde_json::to_value(dap::RunInTerminalResponse {
1181                        process_id: None,
1182                        shell_process_id: Some(pid as u64),
1183                    })
1184                    .ok(),
1185                ),
1186                Err(error) => (
1187                    false,
1188                    serde_json::to_value(dap::ErrorResponse {
1189                        error: Some(dap::Message {
1190                            id: seq,
1191                            format: error.to_string(),
1192                            variables: None,
1193                            send_telemetry: None,
1194                            show_user: None,
1195                            url: None,
1196                            url_label: None,
1197                        }),
1198                    })
1199                    .ok(),
1200                ),
1201            };
1202
1203            session
1204                .update(cx, |session, cx| {
1205                    session.respond_to_client(
1206                        seq,
1207                        success,
1208                        RunInTerminal::COMMAND.to_string(),
1209                        body,
1210                        cx,
1211                    )
1212                })?
1213                .await
1214        })
1215    }
1216
1217    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1218        let adapter_id = self.adapter().to_string();
1219        let request = Initialize { adapter_id };
1220
1221        let Mode::Running(running) = &self.mode else {
1222            return Task::ready(Err(anyhow!(
1223                "Cannot send initialize request, task still building"
1224            )));
1225        };
1226        let mut response = running.request(request.clone());
1227
1228        cx.spawn(async move |this, cx| {
1229            loop {
1230                let capabilities = response.await;
1231                match capabilities {
1232                    Err(e) => {
1233                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1234                            this.as_running()
1235                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1236                        }) else {
1237                            return Err(e);
1238                        };
1239                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1240                        reconnect.await?;
1241
1242                        let Ok(Some(r)) = this.update(cx, |this, _| {
1243                            this.as_running()
1244                                .map(|running| running.request(request.clone()))
1245                        }) else {
1246                            return Err(e);
1247                        };
1248                        response = r
1249                    }
1250                    Ok(capabilities) => {
1251                        this.update(cx, |session, cx| {
1252                            session.capabilities = capabilities;
1253
1254                            cx.emit(SessionEvent::CapabilitiesLoaded);
1255                        })?;
1256                        return Ok(());
1257                    }
1258                }
1259            }
1260        })
1261    }
1262
1263    pub(super) fn initialize_sequence(
1264        &mut self,
1265        initialize_rx: oneshot::Receiver<()>,
1266        dap_store: WeakEntity<DapStore>,
1267        cx: &mut Context<Self>,
1268    ) -> Task<Result<()>> {
1269        match &self.mode {
1270            Mode::Running(local_mode) => {
1271                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1272            }
1273            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1274        }
1275    }
1276
1277    pub fn run_to_position(
1278        &mut self,
1279        breakpoint: SourceBreakpoint,
1280        active_thread_id: ThreadId,
1281        cx: &mut Context<Self>,
1282    ) {
1283        match &mut self.mode {
1284            Mode::Running(local_mode) => {
1285                if !matches!(
1286                    self.thread_states.thread_state(active_thread_id),
1287                    Some(ThreadStatus::Stopped)
1288                ) {
1289                    return;
1290                };
1291                let path = breakpoint.path.clone();
1292                local_mode.tmp_breakpoint = Some(breakpoint);
1293                let task = local_mode.send_breakpoints_from_path(
1294                    path,
1295                    BreakpointUpdatedReason::Toggled,
1296                    &self.breakpoint_store,
1297                    cx,
1298                );
1299
1300                cx.spawn(async move |this, cx| {
1301                    task.await;
1302                    this.update(cx, |this, cx| {
1303                        this.continue_thread(active_thread_id, cx);
1304                    })
1305                })
1306                .detach();
1307            }
1308            Mode::Building => {}
1309        }
1310    }
1311
1312    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1313        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1314    }
1315
1316    pub fn output(
1317        &self,
1318        since: OutputToken,
1319    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1320        if self.output_token.0 == 0 {
1321            return (self.output.range(0..0), OutputToken(0));
1322        };
1323
1324        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1325
1326        let clamped_events_since = events_since.clamp(0, self.output.len());
1327        (
1328            self.output
1329                .range(self.output.len() - clamped_events_since..),
1330            self.output_token,
1331        )
1332    }
1333
1334    pub fn respond_to_client(
1335        &self,
1336        request_seq: u64,
1337        success: bool,
1338        command: String,
1339        body: Option<serde_json::Value>,
1340        cx: &mut Context<Self>,
1341    ) -> Task<Result<()>> {
1342        let Some(local_session) = self.as_running() else {
1343            unreachable!("Cannot respond to remote client");
1344        };
1345        let client = local_session.client.clone();
1346
1347        cx.background_spawn(async move {
1348            client
1349                .send_message(Message::Response(Response {
1350                    body,
1351                    success,
1352                    command,
1353                    seq: request_seq + 1,
1354                    request_seq,
1355                    message: None,
1356                }))
1357                .await
1358        })
1359    }
1360
1361    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1362        self.mode.stopped();
1363        // todo(debugger): Find a clean way to get around the clone
1364        let breakpoint_store = self.breakpoint_store.clone();
1365        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1366            let breakpoint = local.tmp_breakpoint.take()?;
1367            let path = breakpoint.path.clone();
1368            Some((local, path))
1369        }) {
1370            local
1371                .send_breakpoints_from_path(
1372                    path,
1373                    BreakpointUpdatedReason::Toggled,
1374                    &breakpoint_store,
1375                    cx,
1376                )
1377                .detach();
1378        };
1379
1380        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1381            self.thread_states.stop_all_threads();
1382            self.invalidate_command_type::<StackTraceCommand>();
1383        }
1384
1385        // Event if we stopped all threads we still need to insert the thread_id
1386        // to our own data
1387        if let Some(thread_id) = event.thread_id {
1388            self.thread_states.stop_thread(ThreadId(thread_id));
1389
1390            self.invalidate_state(
1391                &StackTraceCommand {
1392                    thread_id,
1393                    start_frame: None,
1394                    levels: None,
1395                }
1396                .into(),
1397            );
1398        }
1399
1400        self.invalidate_generic();
1401        self.threads.clear();
1402        self.variables.clear();
1403        cx.emit(SessionEvent::Stopped(
1404            event
1405                .thread_id
1406                .map(Into::into)
1407                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1408        ));
1409        cx.emit(SessionEvent::InvalidateInlineValue);
1410        cx.notify();
1411    }
1412
1413    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1414        match *event {
1415            Events::Initialized(_) => {
1416                debug_assert!(
1417                    false,
1418                    "Initialized event should have been handled in LocalMode"
1419                );
1420            }
1421            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1422            Events::Continued(event) => {
1423                if event.all_threads_continued.unwrap_or_default() {
1424                    self.thread_states.continue_all_threads();
1425                    self.breakpoint_store.update(cx, |store, cx| {
1426                        store.remove_active_position(Some(self.session_id()), cx)
1427                    });
1428                } else {
1429                    self.thread_states
1430                        .continue_thread(ThreadId(event.thread_id));
1431                }
1432                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1433                self.invalidate_generic();
1434            }
1435            Events::Exited(_event) => {
1436                self.clear_active_debug_line(cx);
1437            }
1438            Events::Terminated(_) => {
1439                self.shutdown(cx).detach();
1440            }
1441            Events::Thread(event) => {
1442                let thread_id = ThreadId(event.thread_id);
1443
1444                match event.reason {
1445                    dap::ThreadEventReason::Started => {
1446                        self.thread_states.continue_thread(thread_id);
1447                    }
1448                    dap::ThreadEventReason::Exited => {
1449                        self.thread_states.exit_thread(thread_id);
1450                    }
1451                    reason => {
1452                        log::error!("Unhandled thread event reason {:?}", reason);
1453                    }
1454                }
1455                self.invalidate_state(&ThreadsCommand.into());
1456                cx.notify();
1457            }
1458            Events::Output(event) => {
1459                if event
1460                    .category
1461                    .as_ref()
1462                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1463                {
1464                    return;
1465                }
1466
1467                self.push_output(event);
1468                cx.notify();
1469            }
1470            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1471                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1472            }),
1473            Events::Module(event) => {
1474                match event.reason {
1475                    dap::ModuleEventReason::New => {
1476                        self.modules.push(event.module);
1477                    }
1478                    dap::ModuleEventReason::Changed => {
1479                        if let Some(module) = self
1480                            .modules
1481                            .iter_mut()
1482                            .find(|other| event.module.id == other.id)
1483                        {
1484                            *module = event.module;
1485                        }
1486                    }
1487                    dap::ModuleEventReason::Removed => {
1488                        self.modules.retain(|other| event.module.id != other.id);
1489                    }
1490                }
1491
1492                // todo(debugger): We should only send the invalidate command to downstream clients.
1493                // self.invalidate_state(&ModulesCommand.into());
1494            }
1495            Events::LoadedSource(_) => {
1496                self.invalidate_state(&LoadedSourcesCommand.into());
1497            }
1498            Events::Capabilities(event) => {
1499                self.capabilities = self.capabilities.merge(event.capabilities);
1500
1501                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1502                let recent_filters = self
1503                    .capabilities
1504                    .exception_breakpoint_filters
1505                    .iter()
1506                    .flatten()
1507                    .map(|filter| (filter.filter.clone(), filter.clone()))
1508                    .collect::<BTreeMap<_, _>>();
1509                for filter in recent_filters.values() {
1510                    let default = filter.default.unwrap_or_default();
1511                    self.exception_breakpoints
1512                        .entry(filter.filter.clone())
1513                        .or_insert_with(|| (filter.clone(), default));
1514                }
1515                self.exception_breakpoints
1516                    .retain(|k, _| recent_filters.contains_key(k));
1517                if self.is_started() {
1518                    self.send_exception_breakpoints(cx);
1519                }
1520
1521                // Remove the ones that no longer exist.
1522                cx.notify();
1523            }
1524            Events::Memory(_) => {}
1525            Events::Process(_) => {}
1526            Events::ProgressEnd(_) => {}
1527            Events::ProgressStart(_) => {}
1528            Events::ProgressUpdate(_) => {}
1529            Events::Invalidated(_) => {}
1530            Events::Other(_) => {}
1531        }
1532    }
1533
1534    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1535    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1536        &mut self,
1537        request: T,
1538        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1539        cx: &mut Context<Self>,
1540    ) {
1541        const {
1542            assert!(
1543                T::CACHEABLE,
1544                "Only requests marked as cacheable should invoke `fetch`"
1545            );
1546        }
1547
1548        if !self.thread_states.any_stopped_thread()
1549            && request.type_id() != TypeId::of::<ThreadsCommand>()
1550            || self.is_session_terminated
1551        {
1552            return;
1553        }
1554
1555        let request_map = self
1556            .requests
1557            .entry(std::any::TypeId::of::<T>())
1558            .or_default();
1559
1560        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1561            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1562
1563            let task = Self::request_inner::<Arc<T>>(
1564                &self.capabilities,
1565                &self.mode,
1566                command,
1567                |this, result, cx| {
1568                    process_result(this, result, cx);
1569                    None
1570                },
1571                cx,
1572            );
1573            let task = cx
1574                .background_executor()
1575                .spawn(async move {
1576                    let _ = task.await?;
1577                    Some(())
1578                })
1579                .shared();
1580
1581            vacant.insert(task);
1582            cx.notify();
1583        }
1584    }
1585
1586    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1587        capabilities: &Capabilities,
1588        mode: &Mode,
1589        request: T,
1590        process_result: impl FnOnce(
1591            &mut Self,
1592            Result<T::Response>,
1593            &mut Context<Self>,
1594        ) -> Option<T::Response>
1595        + 'static,
1596        cx: &mut Context<Self>,
1597    ) -> Task<Option<T::Response>> {
1598        if !T::is_supported(&capabilities) {
1599            log::warn!(
1600                "Attempted to send a DAP request that isn't supported: {:?}",
1601                request
1602            );
1603            let error = Err(anyhow::Error::msg(
1604                "Couldn't complete request because it's not supported",
1605            ));
1606            return cx.spawn(async move |this, cx| {
1607                this.update(cx, |this, cx| process_result(this, error, cx))
1608                    .ok()
1609                    .flatten()
1610            });
1611        }
1612
1613        let request = mode.request_dap(request);
1614        cx.spawn(async move |this, cx| {
1615            let result = request.await;
1616            this.update(cx, |this, cx| process_result(this, result, cx))
1617                .ok()
1618                .flatten()
1619        })
1620    }
1621
1622    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1623        &self,
1624        request: T,
1625        process_result: impl FnOnce(
1626            &mut Self,
1627            Result<T::Response>,
1628            &mut Context<Self>,
1629        ) -> Option<T::Response>
1630        + 'static,
1631        cx: &mut Context<Self>,
1632    ) -> Task<Option<T::Response>> {
1633        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1634    }
1635
1636    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1637        self.requests.remove(&std::any::TypeId::of::<Command>());
1638    }
1639
1640    fn invalidate_generic(&mut self) {
1641        self.invalidate_command_type::<ModulesCommand>();
1642        self.invalidate_command_type::<LoadedSourcesCommand>();
1643        self.invalidate_command_type::<ThreadsCommand>();
1644    }
1645
1646    fn invalidate_state(&mut self, key: &RequestSlot) {
1647        self.requests
1648            .entry((&*key.0 as &dyn Any).type_id())
1649            .and_modify(|request_map| {
1650                request_map.remove(&key);
1651            });
1652    }
1653
1654    fn push_output(&mut self, event: OutputEvent) {
1655        self.output.push_back(event);
1656        self.output_token.0 += 1;
1657    }
1658
1659    pub fn any_stopped_thread(&self) -> bool {
1660        self.thread_states.any_stopped_thread()
1661    }
1662
1663    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1664        self.thread_states.thread_status(thread_id)
1665    }
1666
1667    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1668        self.fetch(
1669            dap_command::ThreadsCommand,
1670            |this, result, cx| {
1671                let Some(result) = result.log_err() else {
1672                    return;
1673                };
1674
1675                this.threads = result
1676                    .into_iter()
1677                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1678                    .collect();
1679
1680                this.invalidate_command_type::<StackTraceCommand>();
1681                cx.emit(SessionEvent::Threads);
1682                cx.notify();
1683            },
1684            cx,
1685        );
1686
1687        self.threads
1688            .values()
1689            .map(|thread| {
1690                (
1691                    thread.dap.clone(),
1692                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1693                )
1694            })
1695            .collect()
1696    }
1697
1698    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1699        self.fetch(
1700            dap_command::ModulesCommand,
1701            |this, result, cx| {
1702                let Some(result) = result.log_err() else {
1703                    return;
1704                };
1705
1706                this.modules = result;
1707                cx.emit(SessionEvent::Modules);
1708                cx.notify();
1709            },
1710            cx,
1711        );
1712
1713        &self.modules
1714    }
1715
1716    pub fn ignore_breakpoints(&self) -> bool {
1717        self.ignore_breakpoints
1718    }
1719
1720    pub fn toggle_ignore_breakpoints(
1721        &mut self,
1722        cx: &mut App,
1723    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1724        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1725    }
1726
1727    pub(crate) fn set_ignore_breakpoints(
1728        &mut self,
1729        ignore: bool,
1730        cx: &mut App,
1731    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1732        if self.ignore_breakpoints == ignore {
1733            return Task::ready(HashMap::default());
1734        }
1735
1736        self.ignore_breakpoints = ignore;
1737
1738        if let Some(local) = self.as_running() {
1739            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1740        } else {
1741            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1742            unimplemented!()
1743        }
1744    }
1745
1746    pub fn exception_breakpoints(
1747        &self,
1748    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1749        self.exception_breakpoints.values()
1750    }
1751
1752    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1753        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1754            *is_enabled = !*is_enabled;
1755            self.send_exception_breakpoints(cx);
1756        }
1757    }
1758
1759    fn send_exception_breakpoints(&mut self, cx: &App) {
1760        if let Some(local) = self.as_running() {
1761            let exception_filters = self
1762                .exception_breakpoints
1763                .values()
1764                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1765                .collect();
1766
1767            let supports_exception_filters = self
1768                .capabilities
1769                .supports_exception_filter_options
1770                .unwrap_or_default();
1771            local
1772                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1773                .detach_and_log_err(cx);
1774        } else {
1775            debug_assert!(false, "Not implemented");
1776        }
1777    }
1778
1779    pub fn breakpoints_enabled(&self) -> bool {
1780        self.ignore_breakpoints
1781    }
1782
1783    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1784        self.fetch(
1785            dap_command::LoadedSourcesCommand,
1786            |this, result, cx| {
1787                let Some(result) = result.log_err() else {
1788                    return;
1789                };
1790                this.loaded_sources = result;
1791                cx.emit(SessionEvent::LoadedSources);
1792                cx.notify();
1793            },
1794            cx,
1795        );
1796
1797        &self.loaded_sources
1798    }
1799
1800    fn fallback_to_manual_restart(
1801        &mut self,
1802        res: Result<()>,
1803        cx: &mut Context<Self>,
1804    ) -> Option<()> {
1805        if res.log_err().is_none() {
1806            cx.emit(SessionStateEvent::Restart);
1807            return None;
1808        }
1809        Some(())
1810    }
1811
1812    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1813        res.log_err()?;
1814        Some(())
1815    }
1816
1817    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1818        thread_id: ThreadId,
1819    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1820    {
1821        move |this, response, cx| match response.log_err() {
1822            Some(response) => {
1823                this.breakpoint_store.update(cx, |store, cx| {
1824                    store.remove_active_position(Some(this.session_id()), cx)
1825                });
1826                Some(response)
1827            }
1828            None => {
1829                this.thread_states.stop_thread(thread_id);
1830                cx.notify();
1831                None
1832            }
1833        }
1834    }
1835
1836    fn clear_active_debug_line_response(
1837        &mut self,
1838        response: Result<()>,
1839        cx: &mut Context<Session>,
1840    ) -> Option<()> {
1841        response.log_err()?;
1842        self.clear_active_debug_line(cx);
1843        Some(())
1844    }
1845
1846    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1847        self.breakpoint_store.update(cx, |store, cx| {
1848            store.remove_active_position(Some(self.id), cx)
1849        });
1850    }
1851
1852    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1853        self.request(
1854            PauseCommand {
1855                thread_id: thread_id.0,
1856            },
1857            Self::empty_response,
1858            cx,
1859        )
1860        .detach();
1861    }
1862
1863    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1864        self.request(
1865            RestartStackFrameCommand { stack_frame_id },
1866            Self::empty_response,
1867            cx,
1868        )
1869        .detach();
1870    }
1871
1872    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1873        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1874            self.request(
1875                RestartCommand {
1876                    raw: args.unwrap_or(Value::Null),
1877                },
1878                Self::fallback_to_manual_restart,
1879                cx,
1880            )
1881            .detach();
1882        } else {
1883            cx.emit(SessionStateEvent::Restart);
1884        }
1885    }
1886
1887    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1888        if self.is_session_terminated {
1889            return Task::ready(());
1890        }
1891
1892        self.is_session_terminated = true;
1893        self.thread_states.exit_all_threads();
1894        cx.notify();
1895
1896        let task = if self
1897            .capabilities
1898            .supports_terminate_request
1899            .unwrap_or_default()
1900        {
1901            self.request(
1902                TerminateCommand {
1903                    restart: Some(false),
1904                },
1905                Self::clear_active_debug_line_response,
1906                cx,
1907            )
1908        } else {
1909            self.request(
1910                DisconnectCommand {
1911                    restart: Some(false),
1912                    terminate_debuggee: Some(true),
1913                    suspend_debuggee: Some(false),
1914                },
1915                Self::clear_active_debug_line_response,
1916                cx,
1917            )
1918        };
1919
1920        cx.emit(SessionStateEvent::Shutdown);
1921
1922        cx.spawn(async move |_, _| {
1923            task.await;
1924        })
1925    }
1926
1927    pub fn completions(
1928        &mut self,
1929        query: CompletionsQuery,
1930        cx: &mut Context<Self>,
1931    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1932        let task = self.request(query, |_, result, _| result.log_err(), cx);
1933
1934        cx.background_executor().spawn(async move {
1935            anyhow::Ok(
1936                task.await
1937                    .map(|response| response.targets)
1938                    .context("failed to fetch completions")?,
1939            )
1940        })
1941    }
1942
1943    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1944        let supports_single_thread_execution_requests =
1945            self.capabilities.supports_single_thread_execution_requests;
1946        self.thread_states.continue_thread(thread_id);
1947        self.request(
1948            ContinueCommand {
1949                args: ContinueArguments {
1950                    thread_id: thread_id.0,
1951                    single_thread: supports_single_thread_execution_requests,
1952                },
1953            },
1954            Self::on_step_response::<ContinueCommand>(thread_id),
1955            cx,
1956        )
1957        .detach();
1958    }
1959
1960    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1961        match self.mode {
1962            Mode::Running(ref local) => Some(local.client.clone()),
1963            Mode::Building => None,
1964        }
1965    }
1966
1967    pub fn has_ever_stopped(&self) -> bool {
1968        self.mode.has_ever_stopped()
1969    }
1970    pub fn step_over(
1971        &mut self,
1972        thread_id: ThreadId,
1973        granularity: SteppingGranularity,
1974        cx: &mut Context<Self>,
1975    ) {
1976        let supports_single_thread_execution_requests =
1977            self.capabilities.supports_single_thread_execution_requests;
1978        let supports_stepping_granularity = self
1979            .capabilities
1980            .supports_stepping_granularity
1981            .unwrap_or_default();
1982
1983        let command = NextCommand {
1984            inner: StepCommand {
1985                thread_id: thread_id.0,
1986                granularity: supports_stepping_granularity.then(|| granularity),
1987                single_thread: supports_single_thread_execution_requests,
1988            },
1989        };
1990
1991        self.thread_states.process_step(thread_id);
1992        self.request(
1993            command,
1994            Self::on_step_response::<NextCommand>(thread_id),
1995            cx,
1996        )
1997        .detach();
1998    }
1999
2000    pub fn step_in(
2001        &mut self,
2002        thread_id: ThreadId,
2003        granularity: SteppingGranularity,
2004        cx: &mut Context<Self>,
2005    ) {
2006        let supports_single_thread_execution_requests =
2007            self.capabilities.supports_single_thread_execution_requests;
2008        let supports_stepping_granularity = self
2009            .capabilities
2010            .supports_stepping_granularity
2011            .unwrap_or_default();
2012
2013        let command = StepInCommand {
2014            inner: StepCommand {
2015                thread_id: thread_id.0,
2016                granularity: supports_stepping_granularity.then(|| granularity),
2017                single_thread: supports_single_thread_execution_requests,
2018            },
2019        };
2020
2021        self.thread_states.process_step(thread_id);
2022        self.request(
2023            command,
2024            Self::on_step_response::<StepInCommand>(thread_id),
2025            cx,
2026        )
2027        .detach();
2028    }
2029
2030    pub fn step_out(
2031        &mut self,
2032        thread_id: ThreadId,
2033        granularity: SteppingGranularity,
2034        cx: &mut Context<Self>,
2035    ) {
2036        let supports_single_thread_execution_requests =
2037            self.capabilities.supports_single_thread_execution_requests;
2038        let supports_stepping_granularity = self
2039            .capabilities
2040            .supports_stepping_granularity
2041            .unwrap_or_default();
2042
2043        let command = StepOutCommand {
2044            inner: StepCommand {
2045                thread_id: thread_id.0,
2046                granularity: supports_stepping_granularity.then(|| granularity),
2047                single_thread: supports_single_thread_execution_requests,
2048            },
2049        };
2050
2051        self.thread_states.process_step(thread_id);
2052        self.request(
2053            command,
2054            Self::on_step_response::<StepOutCommand>(thread_id),
2055            cx,
2056        )
2057        .detach();
2058    }
2059
2060    pub fn step_back(
2061        &mut self,
2062        thread_id: ThreadId,
2063        granularity: SteppingGranularity,
2064        cx: &mut Context<Self>,
2065    ) {
2066        let supports_single_thread_execution_requests =
2067            self.capabilities.supports_single_thread_execution_requests;
2068        let supports_stepping_granularity = self
2069            .capabilities
2070            .supports_stepping_granularity
2071            .unwrap_or_default();
2072
2073        let command = StepBackCommand {
2074            inner: StepCommand {
2075                thread_id: thread_id.0,
2076                granularity: supports_stepping_granularity.then(|| granularity),
2077                single_thread: supports_single_thread_execution_requests,
2078            },
2079        };
2080
2081        self.thread_states.process_step(thread_id);
2082
2083        self.request(
2084            command,
2085            Self::on_step_response::<StepBackCommand>(thread_id),
2086            cx,
2087        )
2088        .detach();
2089    }
2090
2091    pub fn stack_frames(
2092        &mut self,
2093        thread_id: ThreadId,
2094        cx: &mut Context<Self>,
2095    ) -> Result<Vec<StackFrame>> {
2096        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2097            && self.requests.contains_key(&ThreadsCommand.type_id())
2098            && self.threads.contains_key(&thread_id)
2099        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2100        // We could still be using an old thread id and have sent a new thread's request
2101        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2102        // But it very well could cause a minor bug in the future that is hard to track down
2103        {
2104            self.fetch(
2105                super::dap_command::StackTraceCommand {
2106                    thread_id: thread_id.0,
2107                    start_frame: None,
2108                    levels: None,
2109                },
2110                move |this, stack_frames, cx| {
2111                    let entry =
2112                        this.threads
2113                            .entry(thread_id)
2114                            .and_modify(|thread| match &stack_frames {
2115                                Ok(stack_frames) => {
2116                                    thread.stack_frames = stack_frames
2117                                        .iter()
2118                                        .cloned()
2119                                        .map(StackFrame::from)
2120                                        .collect();
2121                                    thread.stack_frames_error = None;
2122                                }
2123                                Err(error) => {
2124                                    thread.stack_frames.clear();
2125                                    thread.stack_frames_error = Some(error.cloned());
2126                                }
2127                            });
2128                    debug_assert!(
2129                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2130                        "Sent request for thread_id that doesn't exist"
2131                    );
2132                    if let Ok(stack_frames) = stack_frames {
2133                        this.stack_frames.extend(
2134                            stack_frames
2135                                .into_iter()
2136                                .filter(|frame| {
2137                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2138                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2139                                    !(frame.id == 0
2140                                        && frame.line == 0
2141                                        && frame.column == 0
2142                                        && frame.presentation_hint
2143                                            == Some(StackFramePresentationHint::Label))
2144                                })
2145                                .map(|frame| (frame.id, StackFrame::from(frame))),
2146                        );
2147                    }
2148
2149                    this.invalidate_command_type::<ScopesCommand>();
2150                    this.invalidate_command_type::<VariablesCommand>();
2151
2152                    cx.emit(SessionEvent::StackTrace);
2153                },
2154                cx,
2155            );
2156        }
2157
2158        match self.threads.get(&thread_id) {
2159            Some(thread) => {
2160                if let Some(error) = &thread.stack_frames_error {
2161                    Err(error.cloned())
2162                } else {
2163                    Ok(thread.stack_frames.clone())
2164                }
2165            }
2166            None => Ok(Vec::new()),
2167        }
2168    }
2169
2170    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2171        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2172            && self
2173                .requests
2174                .contains_key(&TypeId::of::<StackTraceCommand>())
2175        {
2176            self.fetch(
2177                ScopesCommand { stack_frame_id },
2178                move |this, scopes, cx| {
2179                    let Some(scopes) = scopes.log_err() else {
2180                        return
2181                    };
2182
2183                    for scope in scopes.iter() {
2184                        this.variables(scope.variables_reference, cx);
2185                    }
2186
2187                    let entry = this
2188                        .stack_frames
2189                        .entry(stack_frame_id)
2190                        .and_modify(|stack_frame| {
2191                            stack_frame.scopes = scopes;
2192                        });
2193
2194                    cx.emit(SessionEvent::Variables);
2195
2196                    debug_assert!(
2197                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2198                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2199                    );
2200                },
2201                cx,
2202            );
2203        }
2204
2205        self.stack_frames
2206            .get(&stack_frame_id)
2207            .map(|frame| frame.scopes.as_slice())
2208            .unwrap_or_default()
2209    }
2210
2211    pub fn variables_by_stack_frame_id(
2212        &self,
2213        stack_frame_id: StackFrameId,
2214        globals: bool,
2215        locals: bool,
2216    ) -> Vec<dap::Variable> {
2217        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2218            return Vec::new();
2219        };
2220
2221        stack_frame
2222            .scopes
2223            .iter()
2224            .filter(|scope| {
2225                (scope.name.to_lowercase().contains("local") && locals)
2226                    || (scope.name.to_lowercase().contains("global") && globals)
2227            })
2228            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2229            .flatten()
2230            .cloned()
2231            .collect()
2232    }
2233
2234    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2235        &self.watchers
2236    }
2237
2238    pub fn add_watcher(
2239        &mut self,
2240        expression: SharedString,
2241        frame_id: u64,
2242        cx: &mut Context<Self>,
2243    ) -> Task<Result<()>> {
2244        let request = self.mode.request_dap(EvaluateCommand {
2245            expression: expression.to_string(),
2246            context: Some(EvaluateArgumentsContext::Watch),
2247            frame_id: Some(frame_id),
2248            source: None,
2249        });
2250
2251        cx.spawn(async move |this, cx| {
2252            let response = request.await?;
2253
2254            this.update(cx, |session, cx| {
2255                session.watchers.insert(
2256                    expression.clone(),
2257                    Watcher {
2258                        expression,
2259                        value: response.result.into(),
2260                        variables_reference: response.variables_reference,
2261                        presentation_hint: response.presentation_hint,
2262                    },
2263                );
2264                cx.emit(SessionEvent::Watchers);
2265            })
2266        })
2267    }
2268
2269    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2270        let watches = self.watchers.clone();
2271        for (_, watch) in watches.into_iter() {
2272            self.add_watcher(watch.expression.clone(), frame_id, cx)
2273                .detach();
2274        }
2275    }
2276
2277    pub fn remove_watcher(&mut self, expression: SharedString) {
2278        self.watchers.remove(&expression);
2279    }
2280
2281    pub fn variables(
2282        &mut self,
2283        variables_reference: VariableReference,
2284        cx: &mut Context<Self>,
2285    ) -> Vec<dap::Variable> {
2286        let command = VariablesCommand {
2287            variables_reference,
2288            filter: None,
2289            start: None,
2290            count: None,
2291            format: None,
2292        };
2293
2294        self.fetch(
2295            command,
2296            move |this, variables, cx| {
2297                let Some(variables) = variables.log_err() else {
2298                    return;
2299                };
2300
2301                this.variables.insert(variables_reference, variables);
2302
2303                cx.emit(SessionEvent::Variables);
2304                cx.emit(SessionEvent::InvalidateInlineValue);
2305            },
2306            cx,
2307        );
2308
2309        self.variables
2310            .get(&variables_reference)
2311            .cloned()
2312            .unwrap_or_default()
2313    }
2314
2315    pub fn set_variable_value(
2316        &mut self,
2317        stack_frame_id: u64,
2318        variables_reference: u64,
2319        name: String,
2320        value: String,
2321        cx: &mut Context<Self>,
2322    ) {
2323        if self.capabilities.supports_set_variable.unwrap_or_default() {
2324            self.request(
2325                SetVariableValueCommand {
2326                    name,
2327                    value,
2328                    variables_reference,
2329                },
2330                move |this, response, cx| {
2331                    let response = response.log_err()?;
2332                    this.invalidate_command_type::<VariablesCommand>();
2333                    this.refresh_watchers(stack_frame_id, cx);
2334                    cx.emit(SessionEvent::Variables);
2335                    Some(response)
2336                },
2337                cx,
2338            )
2339            .detach();
2340        }
2341    }
2342
2343    pub fn evaluate(
2344        &mut self,
2345        expression: String,
2346        context: Option<EvaluateArgumentsContext>,
2347        frame_id: Option<u64>,
2348        source: Option<Source>,
2349        cx: &mut Context<Self>,
2350    ) -> Task<()> {
2351        let event = dap::OutputEvent {
2352            category: None,
2353            output: format!("> {expression}"),
2354            group: None,
2355            variables_reference: None,
2356            source: None,
2357            line: None,
2358            column: None,
2359            data: None,
2360            location_reference: None,
2361        };
2362        self.push_output(event);
2363        let request = self.mode.request_dap(EvaluateCommand {
2364            expression,
2365            context,
2366            frame_id,
2367            source,
2368        });
2369        cx.spawn(async move |this, cx| {
2370            let response = request.await;
2371            this.update(cx, |this, cx| {
2372                match response {
2373                    Ok(response) => {
2374                        let event = dap::OutputEvent {
2375                            category: None,
2376                            output: format!("< {}", &response.result),
2377                            group: None,
2378                            variables_reference: Some(response.variables_reference),
2379                            source: None,
2380                            line: None,
2381                            column: None,
2382                            data: None,
2383                            location_reference: None,
2384                        };
2385                        this.push_output(event);
2386                    }
2387                    Err(e) => {
2388                        let event = dap::OutputEvent {
2389                            category: None,
2390                            output: format!("{}", e),
2391                            group: None,
2392                            variables_reference: None,
2393                            source: None,
2394                            line: None,
2395                            column: None,
2396                            data: None,
2397                            location_reference: None,
2398                        };
2399                        this.push_output(event);
2400                    }
2401                };
2402                cx.notify();
2403            })
2404            .ok();
2405        })
2406    }
2407
2408    pub fn location(
2409        &mut self,
2410        reference: u64,
2411        cx: &mut Context<Self>,
2412    ) -> Option<dap::LocationsResponse> {
2413        self.fetch(
2414            LocationsCommand { reference },
2415            move |this, response, _| {
2416                let Some(response) = response.log_err() else {
2417                    return;
2418                };
2419                this.locations.insert(reference, response);
2420            },
2421            cx,
2422        );
2423        self.locations.get(&reference).cloned()
2424    }
2425
2426    pub fn is_attached(&self) -> bool {
2427        let Mode::Running(local_mode) = &self.mode else {
2428            return false;
2429        };
2430        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2431    }
2432
2433    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2434        let command = DisconnectCommand {
2435            restart: Some(false),
2436            terminate_debuggee: Some(false),
2437            suspend_debuggee: Some(false),
2438        };
2439
2440        self.request(command, Self::empty_response, cx).detach()
2441    }
2442
2443    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2444        if self
2445            .capabilities
2446            .supports_terminate_threads_request
2447            .unwrap_or_default()
2448        {
2449            self.request(
2450                TerminateThreadsCommand {
2451                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2452                },
2453                Self::clear_active_debug_line_response,
2454                cx,
2455            )
2456            .detach();
2457        } else {
2458            self.shutdown(cx).detach();
2459        }
2460    }
2461
2462    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2463        self.thread_states.thread_state(thread_id)
2464    }
2465}