session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DisconnectCommand, EvaluateCommand,
   8    Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand, ModulesCommand,
   9    NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand, ScopesCommand,
  10    SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand, StepBackCommand,
  11    StepCommand, StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand,
  12    ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  29    StartDebuggingRequestArgumentsRequest, VariablePresentationHint,
  30};
  31use futures::SinkExt;
  32use futures::channel::mpsc::UnboundedSender;
  33use futures::channel::{mpsc, oneshot};
  34use futures::{FutureExt, future::Shared};
  35use gpui::{
  36    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  37    Task, WeakEntity,
  38};
  39
  40use rpc::ErrorExt;
  41use serde_json::Value;
  42use smol::stream::StreamExt;
  43use std::any::TypeId;
  44use std::collections::BTreeMap;
  45use std::u64;
  46use std::{
  47    any::Any,
  48    collections::hash_map::Entry,
  49    hash::{Hash, Hasher},
  50    path::Path,
  51    sync::Arc,
  52};
  53use task::TaskContext;
  54use text::{PointUtf16, ToPointUtf16};
  55use util::ResultExt;
  56use worktree::Worktree;
  57
  58#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  59#[repr(transparent)]
  60pub struct ThreadId(pub u64);
  61
  62impl ThreadId {
  63    pub const MIN: ThreadId = ThreadId(u64::MIN);
  64    pub const MAX: ThreadId = ThreadId(u64::MAX);
  65}
  66
  67impl From<u64> for ThreadId {
  68    fn from(id: u64) -> Self {
  69        Self(id)
  70    }
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct StackFrame {
  75    pub dap: dap::StackFrame,
  76    pub scopes: Vec<dap::Scope>,
  77}
  78
  79impl From<dap::StackFrame> for StackFrame {
  80    fn from(stack_frame: dap::StackFrame) -> Self {
  81        Self {
  82            scopes: vec![],
  83            dap: stack_frame,
  84        }
  85    }
  86}
  87
  88#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  89pub enum ThreadStatus {
  90    #[default]
  91    Running,
  92    Stopped,
  93    Stepping,
  94    Exited,
  95    Ended,
  96}
  97
  98impl ThreadStatus {
  99    pub fn label(&self) -> &'static str {
 100        match self {
 101            ThreadStatus::Running => "Running",
 102            ThreadStatus::Stopped => "Stopped",
 103            ThreadStatus::Stepping => "Stepping",
 104            ThreadStatus::Exited => "Exited",
 105            ThreadStatus::Ended => "Ended",
 106        }
 107    }
 108}
 109
 110#[derive(Debug)]
 111pub struct Thread {
 112    dap: dap::Thread,
 113    stack_frames: Vec<StackFrame>,
 114    stack_frames_error: Option<anyhow::Error>,
 115    _has_stopped: bool,
 116}
 117
 118impl From<dap::Thread> for Thread {
 119    fn from(dap: dap::Thread) -> Self {
 120        Self {
 121            dap,
 122            stack_frames: Default::default(),
 123            stack_frames_error: None,
 124            _has_stopped: false,
 125        }
 126    }
 127}
 128
 129#[derive(Debug, Clone, PartialEq)]
 130pub struct Watcher {
 131    pub expression: SharedString,
 132    pub value: SharedString,
 133    pub variables_reference: u64,
 134    pub presentation_hint: Option<VariablePresentationHint>,
 135}
 136
 137pub enum SessionState {
 138    Building(Option<Task<Result<()>>>),
 139    Running(RunningMode),
 140}
 141
 142#[derive(Clone)]
 143pub struct RunningMode {
 144    client: Arc<DebugAdapterClient>,
 145    binary: DebugAdapterBinary,
 146    tmp_breakpoint: Option<SourceBreakpoint>,
 147    worktree: WeakEntity<Worktree>,
 148    executor: BackgroundExecutor,
 149    is_started: bool,
 150    has_ever_stopped: bool,
 151    messages_tx: UnboundedSender<Message>,
 152}
 153
 154#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
 155pub struct SessionQuirks {
 156    pub compact: bool,
 157    pub prefer_thread_name: bool,
 158}
 159
 160fn client_source(abs_path: &Path) -> dap::Source {
 161    dap::Source {
 162        name: abs_path
 163            .file_name()
 164            .map(|filename| filename.to_string_lossy().to_string()),
 165        path: Some(abs_path.to_string_lossy().to_string()),
 166        source_reference: None,
 167        presentation_hint: None,
 168        origin: None,
 169        sources: None,
 170        adapter_data: None,
 171        checksums: None,
 172    }
 173}
 174
 175impl RunningMode {
 176    async fn new(
 177        session_id: SessionId,
 178        parent_session: Option<Entity<Session>>,
 179        worktree: WeakEntity<Worktree>,
 180        binary: DebugAdapterBinary,
 181        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 182        cx: &mut AsyncApp,
 183    ) -> Result<Self> {
 184        let message_handler = Box::new({
 185            let messages_tx = messages_tx.clone();
 186            move |message| {
 187                messages_tx.unbounded_send(message).ok();
 188            }
 189        });
 190
 191        let client = if let Some(client) = parent_session
 192            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 193            .flatten()
 194        {
 195            client
 196                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 197                .await?
 198        } else {
 199            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 200        };
 201
 202        Ok(Self {
 203            client: Arc::new(client),
 204            worktree,
 205            tmp_breakpoint: None,
 206            binary,
 207            executor: cx.background_executor().clone(),
 208            is_started: false,
 209            has_ever_stopped: false,
 210            messages_tx,
 211        })
 212    }
 213
 214    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 215        &self.worktree
 216    }
 217
 218    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 219        let tasks: Vec<_> = paths
 220            .into_iter()
 221            .map(|path| {
 222                self.request(dap_command::SetBreakpoints {
 223                    source: client_source(path),
 224                    source_modified: None,
 225                    breakpoints: vec![],
 226                })
 227            })
 228            .collect();
 229
 230        cx.background_spawn(async move {
 231            futures::future::join_all(tasks)
 232                .await
 233                .iter()
 234                .for_each(|res| match res {
 235                    Ok(_) => {}
 236                    Err(err) => {
 237                        log::warn!("Set breakpoints request failed: {}", err);
 238                    }
 239                });
 240        })
 241    }
 242
 243    fn send_breakpoints_from_path(
 244        &self,
 245        abs_path: Arc<Path>,
 246        reason: BreakpointUpdatedReason,
 247        breakpoint_store: &Entity<BreakpointStore>,
 248        cx: &mut App,
 249    ) -> Task<()> {
 250        let breakpoints =
 251            breakpoint_store
 252                .read(cx)
 253                .source_breakpoints_from_path(&abs_path, cx)
 254                .into_iter()
 255                .filter(|bp| bp.state.is_enabled())
 256                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 257                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 258                }))
 259                .map(Into::into)
 260                .collect();
 261
 262        let raw_breakpoints = breakpoint_store
 263            .read(cx)
 264            .breakpoints_from_path(&abs_path)
 265            .into_iter()
 266            .filter(|bp| bp.bp.state.is_enabled())
 267            .collect::<Vec<_>>();
 268
 269        let task = self.request(dap_command::SetBreakpoints {
 270            source: client_source(&abs_path),
 271            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 272            breakpoints,
 273        });
 274        let session_id = self.client.id();
 275        let breakpoint_store = breakpoint_store.downgrade();
 276        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 277            Ok(breakpoints) => {
 278                let breakpoints =
 279                    breakpoints
 280                        .into_iter()
 281                        .zip(raw_breakpoints)
 282                        .filter_map(|(dap_bp, zed_bp)| {
 283                            Some((
 284                                zed_bp,
 285                                BreakpointSessionState {
 286                                    id: dap_bp.id?,
 287                                    verified: dap_bp.verified,
 288                                },
 289                            ))
 290                        });
 291                breakpoint_store
 292                    .update(cx, |this, _| {
 293                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 294                    })
 295                    .ok();
 296            }
 297            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 298        })
 299    }
 300
 301    fn send_exception_breakpoints(
 302        &self,
 303        filters: Vec<ExceptionBreakpointsFilter>,
 304        supports_filter_options: bool,
 305    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 306        let arg = if supports_filter_options {
 307            SetExceptionBreakpoints::WithOptions {
 308                filters: filters
 309                    .into_iter()
 310                    .map(|filter| ExceptionFilterOptions {
 311                        filter_id: filter.filter,
 312                        condition: None,
 313                        mode: None,
 314                    })
 315                    .collect(),
 316            }
 317        } else {
 318            SetExceptionBreakpoints::Plain {
 319                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 320            }
 321        };
 322        self.request(arg)
 323    }
 324
 325    fn send_source_breakpoints(
 326        &self,
 327        ignore_breakpoints: bool,
 328        breakpoint_store: &Entity<BreakpointStore>,
 329        cx: &App,
 330    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 331        let mut breakpoint_tasks = Vec::new();
 332        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 333        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 334        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 335        let session_id = self.client.id();
 336        for (path, breakpoints) in breakpoints {
 337            let breakpoints = if ignore_breakpoints {
 338                vec![]
 339            } else {
 340                breakpoints
 341                    .into_iter()
 342                    .filter(|bp| bp.state.is_enabled())
 343                    .map(Into::into)
 344                    .collect()
 345            };
 346
 347            let raw_breakpoints = raw_breakpoints
 348                .remove(&path)
 349                .unwrap_or_default()
 350                .into_iter()
 351                .filter(|bp| bp.bp.state.is_enabled());
 352            let error_path = path.clone();
 353            let send_request = self
 354                .request(dap_command::SetBreakpoints {
 355                    source: client_source(&path),
 356                    source_modified: Some(false),
 357                    breakpoints,
 358                })
 359                .map(|result| result.map_err(move |e| (error_path, e)));
 360
 361            let task = cx.spawn({
 362                let breakpoint_store = breakpoint_store.downgrade();
 363                async move |cx| {
 364                    let breakpoints = cx.background_spawn(send_request).await?;
 365
 366                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 367                        |(dap_bp, zed_bp)| {
 368                            Some((
 369                                zed_bp,
 370                                BreakpointSessionState {
 371                                    id: dap_bp.id?,
 372                                    verified: dap_bp.verified,
 373                                },
 374                            ))
 375                        },
 376                    );
 377                    breakpoint_store
 378                        .update(cx, |this, _| {
 379                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 380                        })
 381                        .ok();
 382
 383                    Ok(())
 384                }
 385            });
 386            breakpoint_tasks.push(task);
 387        }
 388
 389        cx.background_spawn(async move {
 390            futures::future::join_all(breakpoint_tasks)
 391                .await
 392                .into_iter()
 393                .filter_map(Result::err)
 394                .collect::<HashMap<_, _>>()
 395        })
 396    }
 397
 398    fn initialize_sequence(
 399        &self,
 400        capabilities: &Capabilities,
 401        initialized_rx: oneshot::Receiver<()>,
 402        dap_store: WeakEntity<DapStore>,
 403        cx: &mut Context<Session>,
 404    ) -> Task<Result<()>> {
 405        let raw = self.binary.request_args.clone();
 406
 407        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 408        let launch = match raw.request {
 409            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 410                raw: raw.configuration,
 411            }),
 412            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 413                raw: raw.configuration,
 414            }),
 415        };
 416
 417        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 418        // From spec (on initialization sequence):
 419        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 420        //
 421        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 422        let should_send_exception_breakpoints = capabilities
 423            .exception_breakpoint_filters
 424            .as_ref()
 425            .map_or(false, |filters| !filters.is_empty())
 426            || !configuration_done_supported;
 427        let supports_exception_filters = capabilities
 428            .supports_exception_filter_options
 429            .unwrap_or_default();
 430        let this = self.clone();
 431        let worktree = self.worktree().clone();
 432        let mut filters = capabilities
 433            .exception_breakpoint_filters
 434            .clone()
 435            .unwrap_or_default();
 436        let configuration_sequence = cx.spawn({
 437            async move |session, cx| {
 438                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 439                let (breakpoint_store, adapter_defaults) =
 440                    dap_store.read_with(cx, |dap_store, _| {
 441                        (
 442                            dap_store.breakpoint_store().clone(),
 443                            dap_store.adapter_options(&adapter_name),
 444                        )
 445                    })?;
 446                initialized_rx.await?;
 447                let errors_by_path = cx
 448                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 449                    .await;
 450
 451                dap_store.update(cx, |_, cx| {
 452                    let Some(worktree) = worktree.upgrade() else {
 453                        return;
 454                    };
 455
 456                    for (path, error) in &errors_by_path {
 457                        log::error!("failed to set breakpoints for {path:?}: {error}");
 458                    }
 459
 460                    if let Some(failed_path) = errors_by_path.keys().next() {
 461                        let failed_path = failed_path
 462                            .strip_prefix(worktree.read(cx).abs_path())
 463                            .unwrap_or(failed_path)
 464                            .display();
 465                        let message = format!(
 466                            "Failed to set breakpoints for {failed_path}{}",
 467                            match errors_by_path.len() {
 468                                0 => unreachable!(),
 469                                1 => "".into(),
 470                                2 => " and 1 other path".into(),
 471                                n => format!(" and {} other paths", n - 1),
 472                            }
 473                        );
 474                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 475                    }
 476                })?;
 477
 478                if should_send_exception_breakpoints {
 479                    _ = session.update(cx, |this, _| {
 480                        filters.retain(|filter| {
 481                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 482                                defaults
 483                                    .exception_breakpoints
 484                                    .get(&filter.filter)
 485                                    .map(|options| options.enabled)
 486                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 487                            } else {
 488                                filter.default.unwrap_or_default()
 489                            };
 490                            this.exception_breakpoints
 491                                .entry(filter.filter.clone())
 492                                .or_insert_with(|| (filter.clone(), is_enabled));
 493                            is_enabled
 494                        });
 495                    });
 496
 497                    this.send_exception_breakpoints(filters, supports_exception_filters)
 498                        .await
 499                        .ok();
 500                }
 501
 502                let ret = if configuration_done_supported {
 503                    this.request(ConfigurationDone {})
 504                } else {
 505                    Task::ready(Ok(()))
 506                }
 507                .await;
 508                ret
 509            }
 510        });
 511
 512        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 513
 514        cx.spawn(async move |this, cx| {
 515            let result = task.await;
 516
 517            this.update(cx, |this, cx| {
 518                if let Some(this) = this.as_running_mut() {
 519                    this.is_started = true;
 520                    cx.notify();
 521                }
 522            })
 523            .ok();
 524
 525            result?;
 526            anyhow::Ok(())
 527        })
 528    }
 529
 530    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 531        let client = self.client.clone();
 532        let messages_tx = self.messages_tx.clone();
 533        let message_handler = Box::new(move |message| {
 534            messages_tx.unbounded_send(message).ok();
 535        });
 536        if client.should_reconnect_for_ssh() {
 537            Some(cx.spawn(async move |cx| {
 538                client.connect(message_handler, cx).await?;
 539                anyhow::Ok(())
 540            }))
 541        } else {
 542            None
 543        }
 544    }
 545
 546    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 547    where
 548        <R::DapRequest as dap::requests::Request>::Response: 'static,
 549        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 550    {
 551        let request = Arc::new(request);
 552
 553        let request_clone = request.clone();
 554        let connection = self.client.clone();
 555        self.executor.spawn(async move {
 556            let args = request_clone.to_dap();
 557            let response = connection.request::<R::DapRequest>(args).await?;
 558            request.response_from_dap(response)
 559        })
 560    }
 561}
 562
 563impl SessionState {
 564    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 565    where
 566        <R::DapRequest as dap::requests::Request>::Response: 'static,
 567        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 568    {
 569        match self {
 570            SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
 571            SessionState::Building(_) => Task::ready(Err(anyhow!(
 572                "no adapter running to send request: {request:?}"
 573            ))),
 574        }
 575    }
 576
 577    /// Did this debug session stop at least once?
 578    pub(crate) fn has_ever_stopped(&self) -> bool {
 579        match self {
 580            SessionState::Building(_) => false,
 581            SessionState::Running(running_mode) => running_mode.has_ever_stopped,
 582        }
 583    }
 584
 585    fn stopped(&mut self) {
 586        if let SessionState::Running(running) = self {
 587            running.has_ever_stopped = true;
 588        }
 589    }
 590}
 591
 592#[derive(Default)]
 593struct ThreadStates {
 594    global_state: Option<ThreadStatus>,
 595    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 596}
 597
 598impl ThreadStates {
 599    fn stop_all_threads(&mut self) {
 600        self.global_state = Some(ThreadStatus::Stopped);
 601        self.known_thread_states.clear();
 602    }
 603
 604    fn exit_all_threads(&mut self) {
 605        self.global_state = Some(ThreadStatus::Exited);
 606        self.known_thread_states.clear();
 607    }
 608
 609    fn continue_all_threads(&mut self) {
 610        self.global_state = Some(ThreadStatus::Running);
 611        self.known_thread_states.clear();
 612    }
 613
 614    fn stop_thread(&mut self, thread_id: ThreadId) {
 615        self.known_thread_states
 616            .insert(thread_id, ThreadStatus::Stopped);
 617    }
 618
 619    fn continue_thread(&mut self, thread_id: ThreadId) {
 620        self.known_thread_states
 621            .insert(thread_id, ThreadStatus::Running);
 622    }
 623
 624    fn process_step(&mut self, thread_id: ThreadId) {
 625        self.known_thread_states
 626            .insert(thread_id, ThreadStatus::Stepping);
 627    }
 628
 629    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 630        self.thread_state(thread_id)
 631            .unwrap_or(ThreadStatus::Running)
 632    }
 633
 634    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 635        self.known_thread_states
 636            .get(&thread_id)
 637            .copied()
 638            .or(self.global_state)
 639    }
 640
 641    fn exit_thread(&mut self, thread_id: ThreadId) {
 642        self.known_thread_states
 643            .insert(thread_id, ThreadStatus::Exited);
 644    }
 645
 646    fn any_stopped_thread(&self) -> bool {
 647        self.global_state
 648            .is_some_and(|state| state == ThreadStatus::Stopped)
 649            || self
 650                .known_thread_states
 651                .values()
 652                .any(|status| *status == ThreadStatus::Stopped)
 653    }
 654}
 655const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 656
 657type IsEnabled = bool;
 658
 659#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 660pub struct OutputToken(pub usize);
 661/// Represents a current state of a single debug adapter and provides ways to mutate it.
 662pub struct Session {
 663    pub mode: SessionState,
 664    id: SessionId,
 665    label: Option<SharedString>,
 666    adapter: DebugAdapterName,
 667    pub(super) capabilities: Capabilities,
 668    child_session_ids: HashSet<SessionId>,
 669    parent_session: Option<Entity<Session>>,
 670    modules: Vec<dap::Module>,
 671    loaded_sources: Vec<dap::Source>,
 672    output_token: OutputToken,
 673    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 674    threads: IndexMap<ThreadId, Thread>,
 675    thread_states: ThreadStates,
 676    watchers: HashMap<SharedString, Watcher>,
 677    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 678    stack_frames: IndexMap<StackFrameId, StackFrame>,
 679    locations: HashMap<u64, dap::LocationsResponse>,
 680    is_session_terminated: bool,
 681    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 682    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 683    ignore_breakpoints: bool,
 684    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 685    background_tasks: Vec<Task<()>>,
 686    restart_task: Option<Task<()>>,
 687    task_context: TaskContext,
 688    quirks: SessionQuirks,
 689}
 690
 691trait CacheableCommand: Any + Send + Sync {
 692    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 693    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 694    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 695}
 696
 697impl<T> CacheableCommand for T
 698where
 699    T: LocalDapCommand + PartialEq + Eq + Hash,
 700{
 701    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 702        (rhs as &dyn Any)
 703            .downcast_ref::<Self>()
 704            .map_or(false, |rhs| self == rhs)
 705    }
 706
 707    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 708        T::hash(self, &mut hasher);
 709    }
 710
 711    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 712        self
 713    }
 714}
 715
 716pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 717
 718impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 719    fn from(request: T) -> Self {
 720        Self(Arc::new(request))
 721    }
 722}
 723
 724impl PartialEq for RequestSlot {
 725    fn eq(&self, other: &Self) -> bool {
 726        self.0.dyn_eq(other.0.as_ref())
 727    }
 728}
 729
 730impl Eq for RequestSlot {}
 731
 732impl Hash for RequestSlot {
 733    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 734        self.0.dyn_hash(state);
 735        (&*self.0 as &dyn Any).type_id().hash(state)
 736    }
 737}
 738
 739#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 740pub struct CompletionsQuery {
 741    pub query: String,
 742    pub column: u64,
 743    pub line: Option<u64>,
 744    pub frame_id: Option<u64>,
 745}
 746
 747impl CompletionsQuery {
 748    pub fn new(
 749        buffer: &language::Buffer,
 750        cursor_position: language::Anchor,
 751        frame_id: Option<u64>,
 752    ) -> Self {
 753        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 754        Self {
 755            query: buffer.text(),
 756            column: column as u64,
 757            frame_id,
 758            line: Some(row as u64),
 759        }
 760    }
 761}
 762
 763#[derive(Debug)]
 764pub enum SessionEvent {
 765    Modules,
 766    LoadedSources,
 767    Stopped(Option<ThreadId>),
 768    StackTrace,
 769    Variables,
 770    Watchers,
 771    Threads,
 772    InvalidateInlineValue,
 773    CapabilitiesLoaded,
 774    RunInTerminal {
 775        request: RunInTerminalRequestArguments,
 776        sender: mpsc::Sender<Result<u32>>,
 777    },
 778    ConsoleOutput,
 779}
 780
 781#[derive(Clone, Debug, PartialEq, Eq)]
 782pub enum SessionStateEvent {
 783    Running,
 784    Shutdown,
 785    Restart,
 786    SpawnChildSession {
 787        request: StartDebuggingRequestArguments,
 788    },
 789}
 790
 791impl EventEmitter<SessionEvent> for Session {}
 792impl EventEmitter<SessionStateEvent> for Session {}
 793
 794// local session will send breakpoint updates to DAP for all new breakpoints
 795// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 796// BreakpointStore notifies session on breakpoint changes
 797impl Session {
 798    pub(crate) fn new(
 799        breakpoint_store: Entity<BreakpointStore>,
 800        session_id: SessionId,
 801        parent_session: Option<Entity<Session>>,
 802        label: Option<SharedString>,
 803        adapter: DebugAdapterName,
 804        task_context: TaskContext,
 805        quirks: SessionQuirks,
 806        cx: &mut App,
 807    ) -> Entity<Self> {
 808        cx.new::<Self>(|cx| {
 809            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 810                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 811                    if let Some(local) = (!this.ignore_breakpoints)
 812                        .then(|| this.as_running_mut())
 813                        .flatten()
 814                    {
 815                        local
 816                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 817                            .detach();
 818                    };
 819                }
 820                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 821                    if let Some(local) = (!this.ignore_breakpoints)
 822                        .then(|| this.as_running_mut())
 823                        .flatten()
 824                    {
 825                        local.unset_breakpoints_from_paths(paths, cx).detach();
 826                    }
 827                }
 828                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 829            })
 830            .detach();
 831
 832            let this = Self {
 833                mode: SessionState::Building(None),
 834                id: session_id,
 835                child_session_ids: HashSet::default(),
 836                parent_session,
 837                capabilities: Capabilities::default(),
 838                watchers: HashMap::default(),
 839                variables: Default::default(),
 840                stack_frames: Default::default(),
 841                thread_states: ThreadStates::default(),
 842                output_token: OutputToken(0),
 843                output: circular_buffer::CircularBuffer::boxed(),
 844                requests: HashMap::default(),
 845                modules: Vec::default(),
 846                loaded_sources: Vec::default(),
 847                threads: IndexMap::default(),
 848                background_tasks: Vec::default(),
 849                restart_task: None,
 850                locations: Default::default(),
 851                is_session_terminated: false,
 852                ignore_breakpoints: false,
 853                breakpoint_store,
 854                exception_breakpoints: Default::default(),
 855                label,
 856                adapter,
 857                task_context,
 858                quirks,
 859            };
 860
 861            this
 862        })
 863    }
 864
 865    pub fn task_context(&self) -> &TaskContext {
 866        &self.task_context
 867    }
 868
 869    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 870        match &self.mode {
 871            SessionState::Building(_) => None,
 872            SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
 873        }
 874    }
 875
 876    pub fn boot(
 877        &mut self,
 878        binary: DebugAdapterBinary,
 879        worktree: Entity<Worktree>,
 880        dap_store: WeakEntity<DapStore>,
 881        cx: &mut Context<Self>,
 882    ) -> Task<Result<()>> {
 883        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 884        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 885
 886        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 887            let mut initialized_tx = Some(initialized_tx);
 888            while let Some(message) = message_rx.next().await {
 889                if let Message::Event(event) = message {
 890                    if let Events::Initialized(_) = *event {
 891                        if let Some(tx) = initialized_tx.take() {
 892                            tx.send(()).ok();
 893                        }
 894                    } else {
 895                        let Ok(_) = this.update(cx, |session, cx| {
 896                            session.handle_dap_event(event, cx);
 897                        }) else {
 898                            break;
 899                        };
 900                    }
 901                } else if let Message::Request(request) = message {
 902                    let Ok(_) = this.update(cx, |this, cx| {
 903                        if request.command == StartDebugging::COMMAND {
 904                            this.handle_start_debugging_request(request, cx)
 905                                .detach_and_log_err(cx);
 906                        } else if request.command == RunInTerminal::COMMAND {
 907                            this.handle_run_in_terminal_request(request, cx)
 908                                .detach_and_log_err(cx);
 909                        }
 910                    }) else {
 911                        break;
 912                    };
 913                }
 914            }
 915        })];
 916        self.background_tasks = background_tasks;
 917        let id = self.id;
 918        let parent_session = self.parent_session.clone();
 919
 920        cx.spawn(async move |this, cx| {
 921            let mode = RunningMode::new(
 922                id,
 923                parent_session,
 924                worktree.downgrade(),
 925                binary.clone(),
 926                message_tx,
 927                cx,
 928            )
 929            .await?;
 930            this.update(cx, |this, cx| {
 931                match &mut this.mode {
 932                    SessionState::Building(task) if task.is_some() => {
 933                        task.take().unwrap().detach_and_log_err(cx);
 934                    }
 935                    _ => {
 936                        debug_assert!(
 937                            this.parent_session.is_some(),
 938                            "Booting a root debug session without a boot task"
 939                        );
 940                    }
 941                };
 942                this.mode = SessionState::Running(mode);
 943                cx.emit(SessionStateEvent::Running);
 944            })?;
 945
 946            this.update(cx, |session, cx| session.request_initialize(cx))?
 947                .await?;
 948
 949            let result = this
 950                .update(cx, |session, cx| {
 951                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 952                })?
 953                .await;
 954
 955            if result.is_err() {
 956                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 957
 958                console
 959                    .send(format!(
 960                        "Tried to launch debugger with: {}",
 961                        serde_json::to_string_pretty(&binary.request_args.configuration)
 962                            .unwrap_or_default(),
 963                    ))
 964                    .await
 965                    .ok();
 966            }
 967
 968            result
 969        })
 970    }
 971
 972    pub fn session_id(&self) -> SessionId {
 973        self.id
 974    }
 975
 976    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 977        self.child_session_ids.clone()
 978    }
 979
 980    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 981        self.child_session_ids.insert(session_id);
 982    }
 983
 984    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 985        self.child_session_ids.remove(&session_id);
 986    }
 987
 988    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 989        self.parent_session
 990            .as_ref()
 991            .map(|session| session.read(cx).id)
 992    }
 993
 994    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 995        self.parent_session.as_ref()
 996    }
 997
 998    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
 999        let Some(client) = self.adapter_client() else {
1000            return Task::ready(());
1001        };
1002
1003        let supports_terminate = self
1004            .capabilities
1005            .support_terminate_debuggee
1006            .unwrap_or(false);
1007
1008        cx.background_spawn(async move {
1009            if supports_terminate {
1010                client
1011                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
1012                        restart: Some(false),
1013                    })
1014                    .await
1015                    .ok();
1016            } else {
1017                client
1018                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1019                        restart: Some(false),
1020                        terminate_debuggee: Some(true),
1021                        suspend_debuggee: Some(false),
1022                    })
1023                    .await
1024                    .ok();
1025            }
1026        })
1027    }
1028
1029    pub fn capabilities(&self) -> &Capabilities {
1030        &self.capabilities
1031    }
1032
1033    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1034        match &self.mode {
1035            SessionState::Building(_) => None,
1036            SessionState::Running(running_mode) => Some(&running_mode.binary),
1037        }
1038    }
1039
1040    pub fn adapter(&self) -> DebugAdapterName {
1041        self.adapter.clone()
1042    }
1043
1044    pub fn label(&self) -> Option<SharedString> {
1045        self.label.clone()
1046    }
1047
1048    pub fn is_terminated(&self) -> bool {
1049        self.is_session_terminated
1050    }
1051
1052    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1053        let (tx, mut rx) = mpsc::unbounded();
1054
1055        cx.spawn(async move |this, cx| {
1056            while let Some(output) = rx.next().await {
1057                this.update(cx, |this, _| {
1058                    let event = dap::OutputEvent {
1059                        category: None,
1060                        output,
1061                        group: None,
1062                        variables_reference: None,
1063                        source: None,
1064                        line: None,
1065                        column: None,
1066                        data: None,
1067                        location_reference: None,
1068                    };
1069                    this.push_output(event);
1070                })?;
1071            }
1072            anyhow::Ok(())
1073        })
1074        .detach();
1075
1076        return tx;
1077    }
1078
1079    pub fn is_started(&self) -> bool {
1080        match &self.mode {
1081            SessionState::Building(_) => false,
1082            SessionState::Running(running) => running.is_started,
1083        }
1084    }
1085
1086    pub fn is_building(&self) -> bool {
1087        matches!(self.mode, SessionState::Building(_))
1088    }
1089
1090    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1091        match &mut self.mode {
1092            SessionState::Running(local_mode) => Some(local_mode),
1093            SessionState::Building(_) => None,
1094        }
1095    }
1096
1097    pub fn as_running(&self) -> Option<&RunningMode> {
1098        match &self.mode {
1099            SessionState::Running(local_mode) => Some(local_mode),
1100            SessionState::Building(_) => None,
1101        }
1102    }
1103
1104    fn handle_start_debugging_request(
1105        &mut self,
1106        request: dap::messages::Request,
1107        cx: &mut Context<Self>,
1108    ) -> Task<Result<()>> {
1109        let request_seq = request.seq;
1110
1111        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1112            .arguments
1113            .as_ref()
1114            .map(|value| serde_json::from_value(value.clone()));
1115
1116        let mut success = true;
1117        if let Some(Ok(request)) = launch_request {
1118            cx.emit(SessionStateEvent::SpawnChildSession { request });
1119        } else {
1120            log::error!(
1121                "Failed to parse launch request arguments: {:?}",
1122                request.arguments
1123            );
1124            success = false;
1125        }
1126
1127        cx.spawn(async move |this, cx| {
1128            this.update(cx, |this, cx| {
1129                this.respond_to_client(
1130                    request_seq,
1131                    success,
1132                    StartDebugging::COMMAND.to_string(),
1133                    None,
1134                    cx,
1135                )
1136            })?
1137            .await
1138        })
1139    }
1140
1141    fn handle_run_in_terminal_request(
1142        &mut self,
1143        request: dap::messages::Request,
1144        cx: &mut Context<Self>,
1145    ) -> Task<Result<()>> {
1146        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1147            request.arguments.unwrap_or_default(),
1148        ) {
1149            Ok(args) => args,
1150            Err(error) => {
1151                return cx.spawn(async move |session, cx| {
1152                    let error = serde_json::to_value(dap::ErrorResponse {
1153                        error: Some(dap::Message {
1154                            id: request.seq,
1155                            format: error.to_string(),
1156                            variables: None,
1157                            send_telemetry: None,
1158                            show_user: None,
1159                            url: None,
1160                            url_label: None,
1161                        }),
1162                    })
1163                    .ok();
1164
1165                    session
1166                        .update(cx, |this, cx| {
1167                            this.respond_to_client(
1168                                request.seq,
1169                                false,
1170                                StartDebugging::COMMAND.to_string(),
1171                                error,
1172                                cx,
1173                            )
1174                        })?
1175                        .await?;
1176
1177                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1178                });
1179            }
1180        };
1181
1182        let seq = request.seq;
1183
1184        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1185        cx.emit(SessionEvent::RunInTerminal {
1186            request: request_args,
1187            sender: tx,
1188        });
1189        cx.notify();
1190
1191        cx.spawn(async move |session, cx| {
1192            let result = util::maybe!(async move {
1193                rx.next().await.ok_or_else(|| {
1194                    anyhow!("failed to receive response from spawn terminal".to_string())
1195                })?
1196            })
1197            .await;
1198            let (success, body) = match result {
1199                Ok(pid) => (
1200                    true,
1201                    serde_json::to_value(dap::RunInTerminalResponse {
1202                        process_id: None,
1203                        shell_process_id: Some(pid as u64),
1204                    })
1205                    .ok(),
1206                ),
1207                Err(error) => (
1208                    false,
1209                    serde_json::to_value(dap::ErrorResponse {
1210                        error: Some(dap::Message {
1211                            id: seq,
1212                            format: error.to_string(),
1213                            variables: None,
1214                            send_telemetry: None,
1215                            show_user: None,
1216                            url: None,
1217                            url_label: None,
1218                        }),
1219                    })
1220                    .ok(),
1221                ),
1222            };
1223
1224            session
1225                .update(cx, |session, cx| {
1226                    session.respond_to_client(
1227                        seq,
1228                        success,
1229                        RunInTerminal::COMMAND.to_string(),
1230                        body,
1231                        cx,
1232                    )
1233                })?
1234                .await
1235        })
1236    }
1237
1238    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1239        let adapter_id = self.adapter().to_string();
1240        let request = Initialize { adapter_id };
1241
1242        let SessionState::Running(running) = &self.mode else {
1243            return Task::ready(Err(anyhow!(
1244                "Cannot send initialize request, task still building"
1245            )));
1246        };
1247        let mut response = running.request(request.clone());
1248
1249        cx.spawn(async move |this, cx| {
1250            loop {
1251                let capabilities = response.await;
1252                match capabilities {
1253                    Err(e) => {
1254                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1255                            this.as_running()
1256                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1257                        }) else {
1258                            return Err(e);
1259                        };
1260                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1261                        reconnect.await?;
1262
1263                        let Ok(Some(r)) = this.update(cx, |this, _| {
1264                            this.as_running()
1265                                .map(|running| running.request(request.clone()))
1266                        }) else {
1267                            return Err(e);
1268                        };
1269                        response = r
1270                    }
1271                    Ok(capabilities) => {
1272                        this.update(cx, |session, cx| {
1273                            session.capabilities = capabilities;
1274
1275                            cx.emit(SessionEvent::CapabilitiesLoaded);
1276                        })?;
1277                        return Ok(());
1278                    }
1279                }
1280            }
1281        })
1282    }
1283
1284    pub(super) fn initialize_sequence(
1285        &mut self,
1286        initialize_rx: oneshot::Receiver<()>,
1287        dap_store: WeakEntity<DapStore>,
1288        cx: &mut Context<Self>,
1289    ) -> Task<Result<()>> {
1290        match &self.mode {
1291            SessionState::Running(local_mode) => {
1292                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1293            }
1294            SessionState::Building(_) => {
1295                Task::ready(Err(anyhow!("cannot initialize, still building")))
1296            }
1297        }
1298    }
1299
1300    pub fn run_to_position(
1301        &mut self,
1302        breakpoint: SourceBreakpoint,
1303        active_thread_id: ThreadId,
1304        cx: &mut Context<Self>,
1305    ) {
1306        match &mut self.mode {
1307            SessionState::Running(local_mode) => {
1308                if !matches!(
1309                    self.thread_states.thread_state(active_thread_id),
1310                    Some(ThreadStatus::Stopped)
1311                ) {
1312                    return;
1313                };
1314                let path = breakpoint.path.clone();
1315                local_mode.tmp_breakpoint = Some(breakpoint);
1316                let task = local_mode.send_breakpoints_from_path(
1317                    path,
1318                    BreakpointUpdatedReason::Toggled,
1319                    &self.breakpoint_store,
1320                    cx,
1321                );
1322
1323                cx.spawn(async move |this, cx| {
1324                    task.await;
1325                    this.update(cx, |this, cx| {
1326                        this.continue_thread(active_thread_id, cx);
1327                    })
1328                })
1329                .detach();
1330            }
1331            SessionState::Building(_) => {}
1332        }
1333    }
1334
1335    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1336        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1337    }
1338
1339    pub fn output(
1340        &self,
1341        since: OutputToken,
1342    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1343        if self.output_token.0 == 0 {
1344            return (self.output.range(0..0), OutputToken(0));
1345        };
1346
1347        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1348
1349        let clamped_events_since = events_since.clamp(0, self.output.len());
1350        (
1351            self.output
1352                .range(self.output.len() - clamped_events_since..),
1353            self.output_token,
1354        )
1355    }
1356
1357    pub fn respond_to_client(
1358        &self,
1359        request_seq: u64,
1360        success: bool,
1361        command: String,
1362        body: Option<serde_json::Value>,
1363        cx: &mut Context<Self>,
1364    ) -> Task<Result<()>> {
1365        let Some(local_session) = self.as_running() else {
1366            unreachable!("Cannot respond to remote client");
1367        };
1368        let client = local_session.client.clone();
1369
1370        cx.background_spawn(async move {
1371            client
1372                .send_message(Message::Response(Response {
1373                    body,
1374                    success,
1375                    command,
1376                    seq: request_seq + 1,
1377                    request_seq,
1378                    message: None,
1379                }))
1380                .await
1381        })
1382    }
1383
1384    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1385        self.mode.stopped();
1386        // todo(debugger): Find a clean way to get around the clone
1387        let breakpoint_store = self.breakpoint_store.clone();
1388        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1389            let breakpoint = local.tmp_breakpoint.take()?;
1390            let path = breakpoint.path.clone();
1391            Some((local, path))
1392        }) {
1393            local
1394                .send_breakpoints_from_path(
1395                    path,
1396                    BreakpointUpdatedReason::Toggled,
1397                    &breakpoint_store,
1398                    cx,
1399                )
1400                .detach();
1401        };
1402
1403        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1404            self.thread_states.stop_all_threads();
1405            self.invalidate_command_type::<StackTraceCommand>();
1406        }
1407
1408        // Event if we stopped all threads we still need to insert the thread_id
1409        // to our own data
1410        if let Some(thread_id) = event.thread_id {
1411            self.thread_states.stop_thread(ThreadId(thread_id));
1412
1413            self.invalidate_state(
1414                &StackTraceCommand {
1415                    thread_id,
1416                    start_frame: None,
1417                    levels: None,
1418                }
1419                .into(),
1420            );
1421        }
1422
1423        self.invalidate_generic();
1424        self.threads.clear();
1425        self.variables.clear();
1426        cx.emit(SessionEvent::Stopped(
1427            event
1428                .thread_id
1429                .map(Into::into)
1430                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1431        ));
1432        cx.emit(SessionEvent::InvalidateInlineValue);
1433        cx.notify();
1434    }
1435
1436    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1437        match *event {
1438            Events::Initialized(_) => {
1439                debug_assert!(
1440                    false,
1441                    "Initialized event should have been handled in LocalMode"
1442                );
1443            }
1444            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1445            Events::Continued(event) => {
1446                if event.all_threads_continued.unwrap_or_default() {
1447                    self.thread_states.continue_all_threads();
1448                    self.breakpoint_store.update(cx, |store, cx| {
1449                        store.remove_active_position(Some(self.session_id()), cx)
1450                    });
1451                } else {
1452                    self.thread_states
1453                        .continue_thread(ThreadId(event.thread_id));
1454                }
1455                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1456                self.invalidate_generic();
1457            }
1458            Events::Exited(_event) => {
1459                self.clear_active_debug_line(cx);
1460            }
1461            Events::Terminated(_) => {
1462                self.shutdown(cx).detach();
1463            }
1464            Events::Thread(event) => {
1465                let thread_id = ThreadId(event.thread_id);
1466
1467                match event.reason {
1468                    dap::ThreadEventReason::Started => {
1469                        self.thread_states.continue_thread(thread_id);
1470                    }
1471                    dap::ThreadEventReason::Exited => {
1472                        self.thread_states.exit_thread(thread_id);
1473                    }
1474                    reason => {
1475                        log::error!("Unhandled thread event reason {:?}", reason);
1476                    }
1477                }
1478                self.invalidate_state(&ThreadsCommand.into());
1479                cx.notify();
1480            }
1481            Events::Output(event) => {
1482                if event
1483                    .category
1484                    .as_ref()
1485                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1486                {
1487                    return;
1488                }
1489
1490                self.push_output(event);
1491                cx.notify();
1492            }
1493            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1494                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1495            }),
1496            Events::Module(event) => {
1497                match event.reason {
1498                    dap::ModuleEventReason::New => {
1499                        self.modules.push(event.module);
1500                    }
1501                    dap::ModuleEventReason::Changed => {
1502                        if let Some(module) = self
1503                            .modules
1504                            .iter_mut()
1505                            .find(|other| event.module.id == other.id)
1506                        {
1507                            *module = event.module;
1508                        }
1509                    }
1510                    dap::ModuleEventReason::Removed => {
1511                        self.modules.retain(|other| event.module.id != other.id);
1512                    }
1513                }
1514
1515                // todo(debugger): We should only send the invalidate command to downstream clients.
1516                // self.invalidate_state(&ModulesCommand.into());
1517            }
1518            Events::LoadedSource(_) => {
1519                self.invalidate_state(&LoadedSourcesCommand.into());
1520            }
1521            Events::Capabilities(event) => {
1522                self.capabilities = self.capabilities.merge(event.capabilities);
1523
1524                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1525                let recent_filters = self
1526                    .capabilities
1527                    .exception_breakpoint_filters
1528                    .iter()
1529                    .flatten()
1530                    .map(|filter| (filter.filter.clone(), filter.clone()))
1531                    .collect::<BTreeMap<_, _>>();
1532                for filter in recent_filters.values() {
1533                    let default = filter.default.unwrap_or_default();
1534                    self.exception_breakpoints
1535                        .entry(filter.filter.clone())
1536                        .or_insert_with(|| (filter.clone(), default));
1537                }
1538                self.exception_breakpoints
1539                    .retain(|k, _| recent_filters.contains_key(k));
1540                if self.is_started() {
1541                    self.send_exception_breakpoints(cx);
1542                }
1543
1544                // Remove the ones that no longer exist.
1545                cx.notify();
1546            }
1547            Events::Memory(_) => {}
1548            Events::Process(_) => {}
1549            Events::ProgressEnd(_) => {}
1550            Events::ProgressStart(_) => {}
1551            Events::ProgressUpdate(_) => {}
1552            Events::Invalidated(_) => {}
1553            Events::Other(_) => {}
1554        }
1555    }
1556
1557    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1558    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1559        &mut self,
1560        request: T,
1561        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1562        cx: &mut Context<Self>,
1563    ) {
1564        const {
1565            assert!(
1566                T::CACHEABLE,
1567                "Only requests marked as cacheable should invoke `fetch`"
1568            );
1569        }
1570
1571        if !self.thread_states.any_stopped_thread()
1572            && request.type_id() != TypeId::of::<ThreadsCommand>()
1573            || self.is_session_terminated
1574        {
1575            return;
1576        }
1577
1578        let request_map = self
1579            .requests
1580            .entry(std::any::TypeId::of::<T>())
1581            .or_default();
1582
1583        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1584            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1585
1586            let task = Self::request_inner::<Arc<T>>(
1587                &self.capabilities,
1588                &self.mode,
1589                command,
1590                |this, result, cx| {
1591                    process_result(this, result, cx);
1592                    None
1593                },
1594                cx,
1595            );
1596            let task = cx
1597                .background_executor()
1598                .spawn(async move {
1599                    let _ = task.await?;
1600                    Some(())
1601                })
1602                .shared();
1603
1604            vacant.insert(task);
1605            cx.notify();
1606        }
1607    }
1608
1609    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1610        capabilities: &Capabilities,
1611        mode: &SessionState,
1612        request: T,
1613        process_result: impl FnOnce(
1614            &mut Self,
1615            Result<T::Response>,
1616            &mut Context<Self>,
1617        ) -> Option<T::Response>
1618        + 'static,
1619        cx: &mut Context<Self>,
1620    ) -> Task<Option<T::Response>> {
1621        if !T::is_supported(&capabilities) {
1622            log::warn!(
1623                "Attempted to send a DAP request that isn't supported: {:?}",
1624                request
1625            );
1626            let error = Err(anyhow::Error::msg(
1627                "Couldn't complete request because it's not supported",
1628            ));
1629            return cx.spawn(async move |this, cx| {
1630                this.update(cx, |this, cx| process_result(this, error, cx))
1631                    .ok()
1632                    .flatten()
1633            });
1634        }
1635
1636        let request = mode.request_dap(request);
1637        cx.spawn(async move |this, cx| {
1638            let result = request.await;
1639            this.update(cx, |this, cx| process_result(this, result, cx))
1640                .ok()
1641                .flatten()
1642        })
1643    }
1644
1645    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1646        &self,
1647        request: T,
1648        process_result: impl FnOnce(
1649            &mut Self,
1650            Result<T::Response>,
1651            &mut Context<Self>,
1652        ) -> Option<T::Response>
1653        + 'static,
1654        cx: &mut Context<Self>,
1655    ) -> Task<Option<T::Response>> {
1656        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1657    }
1658
1659    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1660        self.requests.remove(&std::any::TypeId::of::<Command>());
1661    }
1662
1663    fn invalidate_generic(&mut self) {
1664        self.invalidate_command_type::<ModulesCommand>();
1665        self.invalidate_command_type::<LoadedSourcesCommand>();
1666        self.invalidate_command_type::<ThreadsCommand>();
1667    }
1668
1669    fn invalidate_state(&mut self, key: &RequestSlot) {
1670        self.requests
1671            .entry((&*key.0 as &dyn Any).type_id())
1672            .and_modify(|request_map| {
1673                request_map.remove(&key);
1674            });
1675    }
1676
1677    fn push_output(&mut self, event: OutputEvent) {
1678        self.output.push_back(event);
1679        self.output_token.0 += 1;
1680    }
1681
1682    pub fn any_stopped_thread(&self) -> bool {
1683        self.thread_states.any_stopped_thread()
1684    }
1685
1686    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1687        self.thread_states.thread_status(thread_id)
1688    }
1689
1690    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1691        self.fetch(
1692            dap_command::ThreadsCommand,
1693            |this, result, cx| {
1694                let Some(result) = result.log_err() else {
1695                    return;
1696                };
1697
1698                this.threads = result
1699                    .into_iter()
1700                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1701                    .collect();
1702
1703                this.invalidate_command_type::<StackTraceCommand>();
1704                cx.emit(SessionEvent::Threads);
1705                cx.notify();
1706            },
1707            cx,
1708        );
1709
1710        self.threads
1711            .values()
1712            .map(|thread| {
1713                (
1714                    thread.dap.clone(),
1715                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1716                )
1717            })
1718            .collect()
1719    }
1720
1721    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1722        self.fetch(
1723            dap_command::ModulesCommand,
1724            |this, result, cx| {
1725                let Some(result) = result.log_err() else {
1726                    return;
1727                };
1728
1729                this.modules = result;
1730                cx.emit(SessionEvent::Modules);
1731                cx.notify();
1732            },
1733            cx,
1734        );
1735
1736        &self.modules
1737    }
1738
1739    pub fn ignore_breakpoints(&self) -> bool {
1740        self.ignore_breakpoints
1741    }
1742
1743    pub fn toggle_ignore_breakpoints(
1744        &mut self,
1745        cx: &mut App,
1746    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1747        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1748    }
1749
1750    pub(crate) fn set_ignore_breakpoints(
1751        &mut self,
1752        ignore: bool,
1753        cx: &mut App,
1754    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1755        if self.ignore_breakpoints == ignore {
1756            return Task::ready(HashMap::default());
1757        }
1758
1759        self.ignore_breakpoints = ignore;
1760
1761        if let Some(local) = self.as_running() {
1762            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1763        } else {
1764            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1765            unimplemented!()
1766        }
1767    }
1768
1769    pub fn exception_breakpoints(
1770        &self,
1771    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1772        self.exception_breakpoints.values()
1773    }
1774
1775    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1776        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1777            *is_enabled = !*is_enabled;
1778            self.send_exception_breakpoints(cx);
1779        }
1780    }
1781
1782    fn send_exception_breakpoints(&mut self, cx: &App) {
1783        if let Some(local) = self.as_running() {
1784            let exception_filters = self
1785                .exception_breakpoints
1786                .values()
1787                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1788                .collect();
1789
1790            let supports_exception_filters = self
1791                .capabilities
1792                .supports_exception_filter_options
1793                .unwrap_or_default();
1794            local
1795                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1796                .detach_and_log_err(cx);
1797        } else {
1798            debug_assert!(false, "Not implemented");
1799        }
1800    }
1801
1802    pub fn breakpoints_enabled(&self) -> bool {
1803        self.ignore_breakpoints
1804    }
1805
1806    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1807        self.fetch(
1808            dap_command::LoadedSourcesCommand,
1809            |this, result, cx| {
1810                let Some(result) = result.log_err() else {
1811                    return;
1812                };
1813                this.loaded_sources = result;
1814                cx.emit(SessionEvent::LoadedSources);
1815                cx.notify();
1816            },
1817            cx,
1818        );
1819
1820        &self.loaded_sources
1821    }
1822
1823    fn fallback_to_manual_restart(
1824        &mut self,
1825        res: Result<()>,
1826        cx: &mut Context<Self>,
1827    ) -> Option<()> {
1828        if res.log_err().is_none() {
1829            cx.emit(SessionStateEvent::Restart);
1830            return None;
1831        }
1832        Some(())
1833    }
1834
1835    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1836        res.log_err()?;
1837        Some(())
1838    }
1839
1840    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
1841        thread_id: ThreadId,
1842    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1843    {
1844        move |this, response, cx| match response.log_err() {
1845            Some(response) => {
1846                this.breakpoint_store.update(cx, |store, cx| {
1847                    store.remove_active_position(Some(this.session_id()), cx)
1848                });
1849                Some(response)
1850            }
1851            None => {
1852                this.thread_states.stop_thread(thread_id);
1853                cx.notify();
1854                None
1855            }
1856        }
1857    }
1858
1859    fn clear_active_debug_line_response(
1860        &mut self,
1861        response: Result<()>,
1862        cx: &mut Context<Session>,
1863    ) -> Option<()> {
1864        response.log_err()?;
1865        self.clear_active_debug_line(cx);
1866        Some(())
1867    }
1868
1869    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1870        self.breakpoint_store.update(cx, |store, cx| {
1871            store.remove_active_position(Some(self.id), cx)
1872        });
1873    }
1874
1875    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1876        self.request(
1877            PauseCommand {
1878                thread_id: thread_id.0,
1879            },
1880            Self::empty_response,
1881            cx,
1882        )
1883        .detach();
1884    }
1885
1886    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1887        self.request(
1888            RestartStackFrameCommand { stack_frame_id },
1889            Self::empty_response,
1890            cx,
1891        )
1892        .detach();
1893    }
1894
1895    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1896        if self.restart_task.is_some() || self.as_running().is_none() {
1897            return;
1898        }
1899
1900        let supports_dap_restart =
1901            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
1902
1903        self.restart_task = Some(cx.spawn(async move |this, cx| {
1904            let _ = this.update(cx, |session, cx| {
1905                if supports_dap_restart {
1906                    session
1907                        .request(
1908                            RestartCommand {
1909                                raw: args.unwrap_or(Value::Null),
1910                            },
1911                            Self::fallback_to_manual_restart,
1912                            cx,
1913                        )
1914                        .detach();
1915                } else {
1916                    cx.emit(SessionStateEvent::Restart);
1917                }
1918            });
1919        }));
1920    }
1921
1922    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1923        if self.is_session_terminated {
1924            return Task::ready(());
1925        }
1926
1927        self.is_session_terminated = true;
1928        self.thread_states.exit_all_threads();
1929        cx.notify();
1930
1931        let task = match &mut self.mode {
1932            SessionState::Running(_) => {
1933                if self
1934                    .capabilities
1935                    .supports_terminate_request
1936                    .unwrap_or_default()
1937                {
1938                    self.request(
1939                        TerminateCommand {
1940                            restart: Some(false),
1941                        },
1942                        Self::clear_active_debug_line_response,
1943                        cx,
1944                    )
1945                } else {
1946                    self.request(
1947                        DisconnectCommand {
1948                            restart: Some(false),
1949                            terminate_debuggee: Some(true),
1950                            suspend_debuggee: Some(false),
1951                        },
1952                        Self::clear_active_debug_line_response,
1953                        cx,
1954                    )
1955                }
1956            }
1957            SessionState::Building(build_task) => {
1958                build_task.take();
1959                Task::ready(Some(()))
1960            }
1961        };
1962
1963        cx.emit(SessionStateEvent::Shutdown);
1964
1965        cx.spawn(async move |this, cx| {
1966            task.await;
1967            let _ = this.update(cx, |this, _| {
1968                if let Some(adapter_client) = this.adapter_client() {
1969                    adapter_client.kill();
1970                }
1971            });
1972        })
1973    }
1974
1975    pub fn completions(
1976        &mut self,
1977        query: CompletionsQuery,
1978        cx: &mut Context<Self>,
1979    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1980        let task = self.request(query, |_, result, _| result.log_err(), cx);
1981
1982        cx.background_executor().spawn(async move {
1983            anyhow::Ok(
1984                task.await
1985                    .map(|response| response.targets)
1986                    .context("failed to fetch completions")?,
1987            )
1988        })
1989    }
1990
1991    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1992        let supports_single_thread_execution_requests =
1993            self.capabilities.supports_single_thread_execution_requests;
1994        self.thread_states.continue_thread(thread_id);
1995        self.request(
1996            ContinueCommand {
1997                args: ContinueArguments {
1998                    thread_id: thread_id.0,
1999                    single_thread: supports_single_thread_execution_requests,
2000                },
2001            },
2002            Self::on_step_response::<ContinueCommand>(thread_id),
2003            cx,
2004        )
2005        .detach();
2006    }
2007
2008    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2009        match self.mode {
2010            SessionState::Running(ref local) => Some(local.client.clone()),
2011            SessionState::Building(_) => None,
2012        }
2013    }
2014
2015    pub fn has_ever_stopped(&self) -> bool {
2016        self.mode.has_ever_stopped()
2017    }
2018    pub fn step_over(
2019        &mut self,
2020        thread_id: ThreadId,
2021        granularity: SteppingGranularity,
2022        cx: &mut Context<Self>,
2023    ) {
2024        let supports_single_thread_execution_requests =
2025            self.capabilities.supports_single_thread_execution_requests;
2026        let supports_stepping_granularity = self
2027            .capabilities
2028            .supports_stepping_granularity
2029            .unwrap_or_default();
2030
2031        let command = NextCommand {
2032            inner: StepCommand {
2033                thread_id: thread_id.0,
2034                granularity: supports_stepping_granularity.then(|| granularity),
2035                single_thread: supports_single_thread_execution_requests,
2036            },
2037        };
2038
2039        self.thread_states.process_step(thread_id);
2040        self.request(
2041            command,
2042            Self::on_step_response::<NextCommand>(thread_id),
2043            cx,
2044        )
2045        .detach();
2046    }
2047
2048    pub fn step_in(
2049        &mut self,
2050        thread_id: ThreadId,
2051        granularity: SteppingGranularity,
2052        cx: &mut Context<Self>,
2053    ) {
2054        let supports_single_thread_execution_requests =
2055            self.capabilities.supports_single_thread_execution_requests;
2056        let supports_stepping_granularity = self
2057            .capabilities
2058            .supports_stepping_granularity
2059            .unwrap_or_default();
2060
2061        let command = StepInCommand {
2062            inner: StepCommand {
2063                thread_id: thread_id.0,
2064                granularity: supports_stepping_granularity.then(|| granularity),
2065                single_thread: supports_single_thread_execution_requests,
2066            },
2067        };
2068
2069        self.thread_states.process_step(thread_id);
2070        self.request(
2071            command,
2072            Self::on_step_response::<StepInCommand>(thread_id),
2073            cx,
2074        )
2075        .detach();
2076    }
2077
2078    pub fn step_out(
2079        &mut self,
2080        thread_id: ThreadId,
2081        granularity: SteppingGranularity,
2082        cx: &mut Context<Self>,
2083    ) {
2084        let supports_single_thread_execution_requests =
2085            self.capabilities.supports_single_thread_execution_requests;
2086        let supports_stepping_granularity = self
2087            .capabilities
2088            .supports_stepping_granularity
2089            .unwrap_or_default();
2090
2091        let command = StepOutCommand {
2092            inner: StepCommand {
2093                thread_id: thread_id.0,
2094                granularity: supports_stepping_granularity.then(|| granularity),
2095                single_thread: supports_single_thread_execution_requests,
2096            },
2097        };
2098
2099        self.thread_states.process_step(thread_id);
2100        self.request(
2101            command,
2102            Self::on_step_response::<StepOutCommand>(thread_id),
2103            cx,
2104        )
2105        .detach();
2106    }
2107
2108    pub fn step_back(
2109        &mut self,
2110        thread_id: ThreadId,
2111        granularity: SteppingGranularity,
2112        cx: &mut Context<Self>,
2113    ) {
2114        let supports_single_thread_execution_requests =
2115            self.capabilities.supports_single_thread_execution_requests;
2116        let supports_stepping_granularity = self
2117            .capabilities
2118            .supports_stepping_granularity
2119            .unwrap_or_default();
2120
2121        let command = StepBackCommand {
2122            inner: StepCommand {
2123                thread_id: thread_id.0,
2124                granularity: supports_stepping_granularity.then(|| granularity),
2125                single_thread: supports_single_thread_execution_requests,
2126            },
2127        };
2128
2129        self.thread_states.process_step(thread_id);
2130
2131        self.request(
2132            command,
2133            Self::on_step_response::<StepBackCommand>(thread_id),
2134            cx,
2135        )
2136        .detach();
2137    }
2138
2139    pub fn stack_frames(
2140        &mut self,
2141        thread_id: ThreadId,
2142        cx: &mut Context<Self>,
2143    ) -> Result<Vec<StackFrame>> {
2144        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2145            && self.requests.contains_key(&ThreadsCommand.type_id())
2146            && self.threads.contains_key(&thread_id)
2147        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2148        // We could still be using an old thread id and have sent a new thread's request
2149        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2150        // But it very well could cause a minor bug in the future that is hard to track down
2151        {
2152            self.fetch(
2153                super::dap_command::StackTraceCommand {
2154                    thread_id: thread_id.0,
2155                    start_frame: None,
2156                    levels: None,
2157                },
2158                move |this, stack_frames, cx| {
2159                    let entry =
2160                        this.threads
2161                            .entry(thread_id)
2162                            .and_modify(|thread| match &stack_frames {
2163                                Ok(stack_frames) => {
2164                                    thread.stack_frames = stack_frames
2165                                        .iter()
2166                                        .cloned()
2167                                        .map(StackFrame::from)
2168                                        .collect();
2169                                    thread.stack_frames_error = None;
2170                                }
2171                                Err(error) => {
2172                                    thread.stack_frames.clear();
2173                                    thread.stack_frames_error = Some(error.cloned());
2174                                }
2175                            });
2176                    debug_assert!(
2177                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2178                        "Sent request for thread_id that doesn't exist"
2179                    );
2180                    if let Ok(stack_frames) = stack_frames {
2181                        this.stack_frames.extend(
2182                            stack_frames
2183                                .into_iter()
2184                                .filter(|frame| {
2185                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2186                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2187                                    !(frame.id == 0
2188                                        && frame.line == 0
2189                                        && frame.column == 0
2190                                        && frame.presentation_hint
2191                                            == Some(StackFramePresentationHint::Label))
2192                                })
2193                                .map(|frame| (frame.id, StackFrame::from(frame))),
2194                        );
2195                    }
2196
2197                    this.invalidate_command_type::<ScopesCommand>();
2198                    this.invalidate_command_type::<VariablesCommand>();
2199
2200                    cx.emit(SessionEvent::StackTrace);
2201                },
2202                cx,
2203            );
2204        }
2205
2206        match self.threads.get(&thread_id) {
2207            Some(thread) => {
2208                if let Some(error) = &thread.stack_frames_error {
2209                    Err(error.cloned())
2210                } else {
2211                    Ok(thread.stack_frames.clone())
2212                }
2213            }
2214            None => Ok(Vec::new()),
2215        }
2216    }
2217
2218    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2219        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2220            && self
2221                .requests
2222                .contains_key(&TypeId::of::<StackTraceCommand>())
2223        {
2224            self.fetch(
2225                ScopesCommand { stack_frame_id },
2226                move |this, scopes, cx| {
2227                    let Some(scopes) = scopes.log_err() else {
2228                        return
2229                    };
2230
2231                    for scope in scopes.iter() {
2232                        this.variables(scope.variables_reference, cx);
2233                    }
2234
2235                    let entry = this
2236                        .stack_frames
2237                        .entry(stack_frame_id)
2238                        .and_modify(|stack_frame| {
2239                            stack_frame.scopes = scopes;
2240                        });
2241
2242                    cx.emit(SessionEvent::Variables);
2243
2244                    debug_assert!(
2245                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2246                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2247                    );
2248                },
2249                cx,
2250            );
2251        }
2252
2253        self.stack_frames
2254            .get(&stack_frame_id)
2255            .map(|frame| frame.scopes.as_slice())
2256            .unwrap_or_default()
2257    }
2258
2259    pub fn variables_by_stack_frame_id(
2260        &self,
2261        stack_frame_id: StackFrameId,
2262        globals: bool,
2263        locals: bool,
2264    ) -> Vec<dap::Variable> {
2265        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2266            return Vec::new();
2267        };
2268
2269        stack_frame
2270            .scopes
2271            .iter()
2272            .filter(|scope| {
2273                (scope.name.to_lowercase().contains("local") && locals)
2274                    || (scope.name.to_lowercase().contains("global") && globals)
2275            })
2276            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2277            .flatten()
2278            .cloned()
2279            .collect()
2280    }
2281
2282    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2283        &self.watchers
2284    }
2285
2286    pub fn add_watcher(
2287        &mut self,
2288        expression: SharedString,
2289        frame_id: u64,
2290        cx: &mut Context<Self>,
2291    ) -> Task<Result<()>> {
2292        let request = self.mode.request_dap(EvaluateCommand {
2293            expression: expression.to_string(),
2294            context: Some(EvaluateArgumentsContext::Watch),
2295            frame_id: Some(frame_id),
2296            source: None,
2297        });
2298
2299        cx.spawn(async move |this, cx| {
2300            let response = request.await?;
2301
2302            this.update(cx, |session, cx| {
2303                session.watchers.insert(
2304                    expression.clone(),
2305                    Watcher {
2306                        expression,
2307                        value: response.result.into(),
2308                        variables_reference: response.variables_reference,
2309                        presentation_hint: response.presentation_hint,
2310                    },
2311                );
2312                cx.emit(SessionEvent::Watchers);
2313            })
2314        })
2315    }
2316
2317    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2318        let watches = self.watchers.clone();
2319        for (_, watch) in watches.into_iter() {
2320            self.add_watcher(watch.expression.clone(), frame_id, cx)
2321                .detach();
2322        }
2323    }
2324
2325    pub fn remove_watcher(&mut self, expression: SharedString) {
2326        self.watchers.remove(&expression);
2327    }
2328
2329    pub fn variables(
2330        &mut self,
2331        variables_reference: VariableReference,
2332        cx: &mut Context<Self>,
2333    ) -> Vec<dap::Variable> {
2334        let command = VariablesCommand {
2335            variables_reference,
2336            filter: None,
2337            start: None,
2338            count: None,
2339            format: None,
2340        };
2341
2342        self.fetch(
2343            command,
2344            move |this, variables, cx| {
2345                let Some(variables) = variables.log_err() else {
2346                    return;
2347                };
2348
2349                this.variables.insert(variables_reference, variables);
2350
2351                cx.emit(SessionEvent::Variables);
2352                cx.emit(SessionEvent::InvalidateInlineValue);
2353            },
2354            cx,
2355        );
2356
2357        self.variables
2358            .get(&variables_reference)
2359            .cloned()
2360            .unwrap_or_default()
2361    }
2362
2363    pub fn set_variable_value(
2364        &mut self,
2365        stack_frame_id: u64,
2366        variables_reference: u64,
2367        name: String,
2368        value: String,
2369        cx: &mut Context<Self>,
2370    ) {
2371        if self.capabilities.supports_set_variable.unwrap_or_default() {
2372            self.request(
2373                SetVariableValueCommand {
2374                    name,
2375                    value,
2376                    variables_reference,
2377                },
2378                move |this, response, cx| {
2379                    let response = response.log_err()?;
2380                    this.invalidate_command_type::<VariablesCommand>();
2381                    this.refresh_watchers(stack_frame_id, cx);
2382                    cx.emit(SessionEvent::Variables);
2383                    Some(response)
2384                },
2385                cx,
2386            )
2387            .detach();
2388        }
2389    }
2390
2391    pub fn evaluate(
2392        &mut self,
2393        expression: String,
2394        context: Option<EvaluateArgumentsContext>,
2395        frame_id: Option<u64>,
2396        source: Option<Source>,
2397        cx: &mut Context<Self>,
2398    ) -> Task<()> {
2399        let event = dap::OutputEvent {
2400            category: None,
2401            output: format!("> {expression}"),
2402            group: None,
2403            variables_reference: None,
2404            source: None,
2405            line: None,
2406            column: None,
2407            data: None,
2408            location_reference: None,
2409        };
2410        self.push_output(event);
2411        let request = self.mode.request_dap(EvaluateCommand {
2412            expression,
2413            context,
2414            frame_id,
2415            source,
2416        });
2417        cx.spawn(async move |this, cx| {
2418            let response = request.await;
2419            this.update(cx, |this, cx| {
2420                match response {
2421                    Ok(response) => {
2422                        let event = dap::OutputEvent {
2423                            category: None,
2424                            output: format!("< {}", &response.result),
2425                            group: None,
2426                            variables_reference: Some(response.variables_reference),
2427                            source: None,
2428                            line: None,
2429                            column: None,
2430                            data: None,
2431                            location_reference: None,
2432                        };
2433                        this.push_output(event);
2434                    }
2435                    Err(e) => {
2436                        let event = dap::OutputEvent {
2437                            category: None,
2438                            output: format!("{}", e),
2439                            group: None,
2440                            variables_reference: None,
2441                            source: None,
2442                            line: None,
2443                            column: None,
2444                            data: None,
2445                            location_reference: None,
2446                        };
2447                        this.push_output(event);
2448                    }
2449                };
2450                cx.notify();
2451            })
2452            .ok();
2453        })
2454    }
2455
2456    pub fn location(
2457        &mut self,
2458        reference: u64,
2459        cx: &mut Context<Self>,
2460    ) -> Option<dap::LocationsResponse> {
2461        self.fetch(
2462            LocationsCommand { reference },
2463            move |this, response, _| {
2464                let Some(response) = response.log_err() else {
2465                    return;
2466                };
2467                this.locations.insert(reference, response);
2468            },
2469            cx,
2470        );
2471        self.locations.get(&reference).cloned()
2472    }
2473
2474    pub fn is_attached(&self) -> bool {
2475        let SessionState::Running(local_mode) = &self.mode else {
2476            return false;
2477        };
2478        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2479    }
2480
2481    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2482        let command = DisconnectCommand {
2483            restart: Some(false),
2484            terminate_debuggee: Some(false),
2485            suspend_debuggee: Some(false),
2486        };
2487
2488        self.request(command, Self::empty_response, cx).detach()
2489    }
2490
2491    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2492        if self
2493            .capabilities
2494            .supports_terminate_threads_request
2495            .unwrap_or_default()
2496        {
2497            self.request(
2498                TerminateThreadsCommand {
2499                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2500                },
2501                Self::clear_active_debug_line_response,
2502                cx,
2503            )
2504            .detach();
2505        } else {
2506            self.shutdown(cx).detach();
2507        }
2508    }
2509
2510    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2511        self.thread_states.thread_state(thread_id)
2512    }
2513
2514    pub fn quirks(&self) -> SessionQuirks {
2515        self.quirks
2516    }
2517}