session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DisconnectCommand, EvaluateCommand,
   8    Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand, ModulesCommand,
   9    NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand, ScopesCommand,
  10    SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand, StepBackCommand,
  11    StepCommand, StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand,
  12    ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  29    StartDebuggingRequestArgumentsRequest, VariablePresentationHint,
  30};
  31use futures::SinkExt;
  32use futures::channel::mpsc::UnboundedSender;
  33use futures::channel::{mpsc, oneshot};
  34use futures::{FutureExt, future::Shared};
  35use gpui::{
  36    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  37    Task, WeakEntity,
  38};
  39
  40use rpc::ErrorExt;
  41use serde_json::Value;
  42use smol::stream::StreamExt;
  43use std::any::TypeId;
  44use std::collections::BTreeMap;
  45use std::u64;
  46use std::{
  47    any::Any,
  48    collections::hash_map::Entry,
  49    hash::{Hash, Hasher},
  50    path::Path,
  51    sync::Arc,
  52};
  53use task::TaskContext;
  54use text::{PointUtf16, ToPointUtf16};
  55use util::ResultExt;
  56use worktree::Worktree;
  57
  58#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  59#[repr(transparent)]
  60pub struct ThreadId(pub u64);
  61
  62impl ThreadId {
  63    pub const MIN: ThreadId = ThreadId(u64::MIN);
  64    pub const MAX: ThreadId = ThreadId(u64::MAX);
  65}
  66
  67impl From<u64> for ThreadId {
  68    fn from(id: u64) -> Self {
  69        Self(id)
  70    }
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct StackFrame {
  75    pub dap: dap::StackFrame,
  76    pub scopes: Vec<dap::Scope>,
  77}
  78
  79impl From<dap::StackFrame> for StackFrame {
  80    fn from(stack_frame: dap::StackFrame) -> Self {
  81        Self {
  82            scopes: vec![],
  83            dap: stack_frame,
  84        }
  85    }
  86}
  87
  88#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  89pub enum ThreadStatus {
  90    #[default]
  91    Running,
  92    Stopped,
  93    Stepping,
  94    Exited,
  95    Ended,
  96}
  97
  98impl ThreadStatus {
  99    pub fn label(&self) -> &'static str {
 100        match self {
 101            ThreadStatus::Running => "Running",
 102            ThreadStatus::Stopped => "Stopped",
 103            ThreadStatus::Stepping => "Stepping",
 104            ThreadStatus::Exited => "Exited",
 105            ThreadStatus::Ended => "Ended",
 106        }
 107    }
 108}
 109
 110#[derive(Debug)]
 111pub struct Thread {
 112    dap: dap::Thread,
 113    stack_frames: Vec<StackFrame>,
 114    stack_frames_error: Option<anyhow::Error>,
 115    _has_stopped: bool,
 116}
 117
 118impl From<dap::Thread> for Thread {
 119    fn from(dap: dap::Thread) -> Self {
 120        Self {
 121            dap,
 122            stack_frames: Default::default(),
 123            stack_frames_error: None,
 124            _has_stopped: false,
 125        }
 126    }
 127}
 128
 129#[derive(Debug, Clone, PartialEq)]
 130pub struct Watcher {
 131    pub expression: SharedString,
 132    pub value: SharedString,
 133    pub variables_reference: u64,
 134    pub presentation_hint: Option<VariablePresentationHint>,
 135}
 136
 137pub enum Mode {
 138    Building,
 139    Running(RunningMode),
 140}
 141
 142#[derive(Clone)]
 143pub struct RunningMode {
 144    client: Arc<DebugAdapterClient>,
 145    binary: DebugAdapterBinary,
 146    tmp_breakpoint: Option<SourceBreakpoint>,
 147    worktree: WeakEntity<Worktree>,
 148    executor: BackgroundExecutor,
 149    is_started: bool,
 150    has_ever_stopped: bool,
 151    messages_tx: UnboundedSender<Message>,
 152}
 153
 154fn client_source(abs_path: &Path) -> dap::Source {
 155    dap::Source {
 156        name: abs_path
 157            .file_name()
 158            .map(|filename| filename.to_string_lossy().to_string()),
 159        path: Some(abs_path.to_string_lossy().to_string()),
 160        source_reference: None,
 161        presentation_hint: None,
 162        origin: None,
 163        sources: None,
 164        adapter_data: None,
 165        checksums: None,
 166    }
 167}
 168
 169impl RunningMode {
 170    async fn new(
 171        session_id: SessionId,
 172        parent_session: Option<Entity<Session>>,
 173        worktree: WeakEntity<Worktree>,
 174        binary: DebugAdapterBinary,
 175        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 176        cx: &mut AsyncApp,
 177    ) -> Result<Self> {
 178        let message_handler = Box::new({
 179            let messages_tx = messages_tx.clone();
 180            move |message| {
 181                messages_tx.unbounded_send(message).ok();
 182            }
 183        });
 184
 185        let client = if let Some(client) = parent_session
 186            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 187            .flatten()
 188        {
 189            client
 190                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 191                .await?
 192        } else {
 193            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 194        };
 195
 196        Ok(Self {
 197            client: Arc::new(client),
 198            worktree,
 199            tmp_breakpoint: None,
 200            binary,
 201            executor: cx.background_executor().clone(),
 202            is_started: false,
 203            has_ever_stopped: false,
 204            messages_tx,
 205        })
 206    }
 207
 208    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 209        &self.worktree
 210    }
 211
 212    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 213        let tasks: Vec<_> = paths
 214            .into_iter()
 215            .map(|path| {
 216                self.request(dap_command::SetBreakpoints {
 217                    source: client_source(path),
 218                    source_modified: None,
 219                    breakpoints: vec![],
 220                })
 221            })
 222            .collect();
 223
 224        cx.background_spawn(async move {
 225            futures::future::join_all(tasks)
 226                .await
 227                .iter()
 228                .for_each(|res| match res {
 229                    Ok(_) => {}
 230                    Err(err) => {
 231                        log::warn!("Set breakpoints request failed: {}", err);
 232                    }
 233                });
 234        })
 235    }
 236
 237    fn send_breakpoints_from_path(
 238        &self,
 239        abs_path: Arc<Path>,
 240        reason: BreakpointUpdatedReason,
 241        breakpoint_store: &Entity<BreakpointStore>,
 242        cx: &mut App,
 243    ) -> Task<()> {
 244        let breakpoints =
 245            breakpoint_store
 246                .read(cx)
 247                .source_breakpoints_from_path(&abs_path, cx)
 248                .into_iter()
 249                .filter(|bp| bp.state.is_enabled())
 250                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 251                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 252                }))
 253                .map(Into::into)
 254                .collect();
 255
 256        let raw_breakpoints = breakpoint_store
 257            .read(cx)
 258            .breakpoints_from_path(&abs_path)
 259            .into_iter()
 260            .filter(|bp| bp.bp.state.is_enabled())
 261            .collect::<Vec<_>>();
 262
 263        let task = self.request(dap_command::SetBreakpoints {
 264            source: client_source(&abs_path),
 265            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 266            breakpoints,
 267        });
 268        let session_id = self.client.id();
 269        let breakpoint_store = breakpoint_store.downgrade();
 270        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 271            Ok(breakpoints) => {
 272                let breakpoints =
 273                    breakpoints
 274                        .into_iter()
 275                        .zip(raw_breakpoints)
 276                        .filter_map(|(dap_bp, zed_bp)| {
 277                            Some((
 278                                zed_bp,
 279                                BreakpointSessionState {
 280                                    id: dap_bp.id?,
 281                                    verified: dap_bp.verified,
 282                                },
 283                            ))
 284                        });
 285                breakpoint_store
 286                    .update(cx, |this, _| {
 287                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 288                    })
 289                    .ok();
 290            }
 291            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 292        })
 293    }
 294
 295    fn send_exception_breakpoints(
 296        &self,
 297        filters: Vec<ExceptionBreakpointsFilter>,
 298        supports_filter_options: bool,
 299    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 300        let arg = if supports_filter_options {
 301            SetExceptionBreakpoints::WithOptions {
 302                filters: filters
 303                    .into_iter()
 304                    .map(|filter| ExceptionFilterOptions {
 305                        filter_id: filter.filter,
 306                        condition: None,
 307                        mode: None,
 308                    })
 309                    .collect(),
 310            }
 311        } else {
 312            SetExceptionBreakpoints::Plain {
 313                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 314            }
 315        };
 316        self.request(arg)
 317    }
 318
 319    fn send_source_breakpoints(
 320        &self,
 321        ignore_breakpoints: bool,
 322        breakpoint_store: &Entity<BreakpointStore>,
 323        cx: &App,
 324    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 325        let mut breakpoint_tasks = Vec::new();
 326        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 327        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 328        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 329        let session_id = self.client.id();
 330        for (path, breakpoints) in breakpoints {
 331            let breakpoints = if ignore_breakpoints {
 332                vec![]
 333            } else {
 334                breakpoints
 335                    .into_iter()
 336                    .filter(|bp| bp.state.is_enabled())
 337                    .map(Into::into)
 338                    .collect()
 339            };
 340
 341            let raw_breakpoints = raw_breakpoints
 342                .remove(&path)
 343                .unwrap_or_default()
 344                .into_iter()
 345                .filter(|bp| bp.bp.state.is_enabled());
 346            let error_path = path.clone();
 347            let send_request = self
 348                .request(dap_command::SetBreakpoints {
 349                    source: client_source(&path),
 350                    source_modified: Some(false),
 351                    breakpoints,
 352                })
 353                .map(|result| result.map_err(move |e| (error_path, e)));
 354
 355            let task = cx.spawn({
 356                let breakpoint_store = breakpoint_store.downgrade();
 357                async move |cx| {
 358                    let breakpoints = cx.background_spawn(send_request).await?;
 359
 360                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 361                        |(dap_bp, zed_bp)| {
 362                            Some((
 363                                zed_bp,
 364                                BreakpointSessionState {
 365                                    id: dap_bp.id?,
 366                                    verified: dap_bp.verified,
 367                                },
 368                            ))
 369                        },
 370                    );
 371                    breakpoint_store
 372                        .update(cx, |this, _| {
 373                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 374                        })
 375                        .ok();
 376
 377                    Ok(())
 378                }
 379            });
 380            breakpoint_tasks.push(task);
 381        }
 382
 383        cx.background_spawn(async move {
 384            futures::future::join_all(breakpoint_tasks)
 385                .await
 386                .into_iter()
 387                .filter_map(Result::err)
 388                .collect::<HashMap<_, _>>()
 389        })
 390    }
 391
 392    fn initialize_sequence(
 393        &self,
 394        capabilities: &Capabilities,
 395        initialized_rx: oneshot::Receiver<()>,
 396        dap_store: WeakEntity<DapStore>,
 397        cx: &mut Context<Session>,
 398    ) -> Task<Result<()>> {
 399        let raw = self.binary.request_args.clone();
 400
 401        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 402        let launch = match raw.request {
 403            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 404                raw: raw.configuration,
 405            }),
 406            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 407                raw: raw.configuration,
 408            }),
 409        };
 410
 411        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 412        // From spec (on initialization sequence):
 413        // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
 414        //
 415        // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
 416        let should_send_exception_breakpoints = capabilities
 417            .exception_breakpoint_filters
 418            .as_ref()
 419            .map_or(false, |filters| !filters.is_empty())
 420            || !configuration_done_supported;
 421        let supports_exception_filters = capabilities
 422            .supports_exception_filter_options
 423            .unwrap_or_default();
 424        let this = self.clone();
 425        let worktree = self.worktree().clone();
 426        let mut filters = capabilities
 427            .exception_breakpoint_filters
 428            .clone()
 429            .unwrap_or_default();
 430        let configuration_sequence = cx.spawn({
 431            async move |session, cx| {
 432                let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
 433                let (breakpoint_store, adapter_defaults) =
 434                    dap_store.read_with(cx, |dap_store, _| {
 435                        (
 436                            dap_store.breakpoint_store().clone(),
 437                            dap_store.adapter_options(&adapter_name),
 438                        )
 439                    })?;
 440                initialized_rx.await?;
 441                let errors_by_path = cx
 442                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 443                    .await;
 444
 445                dap_store.update(cx, |_, cx| {
 446                    let Some(worktree) = worktree.upgrade() else {
 447                        return;
 448                    };
 449
 450                    for (path, error) in &errors_by_path {
 451                        log::error!("failed to set breakpoints for {path:?}: {error}");
 452                    }
 453
 454                    if let Some(failed_path) = errors_by_path.keys().next() {
 455                        let failed_path = failed_path
 456                            .strip_prefix(worktree.read(cx).abs_path())
 457                            .unwrap_or(failed_path)
 458                            .display();
 459                        let message = format!(
 460                            "Failed to set breakpoints for {failed_path}{}",
 461                            match errors_by_path.len() {
 462                                0 => unreachable!(),
 463                                1 => "".into(),
 464                                2 => " and 1 other path".into(),
 465                                n => format!(" and {} other paths", n - 1),
 466                            }
 467                        );
 468                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 469                    }
 470                })?;
 471
 472                if should_send_exception_breakpoints {
 473                    _ = session.update(cx, |this, _| {
 474                        filters.retain(|filter| {
 475                            let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
 476                                defaults
 477                                    .exception_breakpoints
 478                                    .get(&filter.filter)
 479                                    .map(|options| options.enabled)
 480                                    .unwrap_or_else(|| filter.default.unwrap_or_default())
 481                            } else {
 482                                filter.default.unwrap_or_default()
 483                            };
 484                            this.exception_breakpoints
 485                                .entry(filter.filter.clone())
 486                                .or_insert_with(|| (filter.clone(), is_enabled));
 487                            is_enabled
 488                        });
 489                    });
 490
 491                    this.send_exception_breakpoints(filters, supports_exception_filters)
 492                        .await
 493                        .ok();
 494                }
 495
 496                let ret = if configuration_done_supported {
 497                    this.request(ConfigurationDone {})
 498                } else {
 499                    Task::ready(Ok(()))
 500                }
 501                .await;
 502                ret
 503            }
 504        });
 505
 506        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 507
 508        cx.spawn(async move |this, cx| {
 509            let result = task.await;
 510
 511            this.update(cx, |this, cx| {
 512                if let Some(this) = this.as_running_mut() {
 513                    this.is_started = true;
 514                    cx.notify();
 515                }
 516            })
 517            .ok();
 518
 519            result?;
 520            anyhow::Ok(())
 521        })
 522    }
 523
 524    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 525        let client = self.client.clone();
 526        let messages_tx = self.messages_tx.clone();
 527        let message_handler = Box::new(move |message| {
 528            messages_tx.unbounded_send(message).ok();
 529        });
 530        if client.should_reconnect_for_ssh() {
 531            Some(cx.spawn(async move |cx| {
 532                client.connect(message_handler, cx).await?;
 533                anyhow::Ok(())
 534            }))
 535        } else {
 536            None
 537        }
 538    }
 539
 540    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 541    where
 542        <R::DapRequest as dap::requests::Request>::Response: 'static,
 543        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 544    {
 545        let request = Arc::new(request);
 546
 547        let request_clone = request.clone();
 548        let connection = self.client.clone();
 549        self.executor.spawn(async move {
 550            let args = request_clone.to_dap();
 551            let response = connection.request::<R::DapRequest>(args).await?;
 552            request.response_from_dap(response)
 553        })
 554    }
 555}
 556
 557impl Mode {
 558    pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 559    where
 560        <R::DapRequest as dap::requests::Request>::Response: 'static,
 561        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 562    {
 563        match self {
 564            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 565            Mode::Building => Task::ready(Err(anyhow!(
 566                "no adapter running to send request: {request:?}"
 567            ))),
 568        }
 569    }
 570
 571    /// Did this debug session stop at least once?
 572    pub(crate) fn has_ever_stopped(&self) -> bool {
 573        match self {
 574            Mode::Building => false,
 575            Mode::Running(running_mode) => running_mode.has_ever_stopped,
 576        }
 577    }
 578
 579    fn stopped(&mut self) {
 580        if let Mode::Running(running) = self {
 581            running.has_ever_stopped = true;
 582        }
 583    }
 584}
 585
 586#[derive(Default)]
 587struct ThreadStates {
 588    global_state: Option<ThreadStatus>,
 589    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 590}
 591
 592impl ThreadStates {
 593    fn stop_all_threads(&mut self) {
 594        self.global_state = Some(ThreadStatus::Stopped);
 595        self.known_thread_states.clear();
 596    }
 597
 598    fn exit_all_threads(&mut self) {
 599        self.global_state = Some(ThreadStatus::Exited);
 600        self.known_thread_states.clear();
 601    }
 602
 603    fn continue_all_threads(&mut self) {
 604        self.global_state = Some(ThreadStatus::Running);
 605        self.known_thread_states.clear();
 606    }
 607
 608    fn stop_thread(&mut self, thread_id: ThreadId) {
 609        self.known_thread_states
 610            .insert(thread_id, ThreadStatus::Stopped);
 611    }
 612
 613    fn continue_thread(&mut self, thread_id: ThreadId) {
 614        self.known_thread_states
 615            .insert(thread_id, ThreadStatus::Running);
 616    }
 617
 618    fn process_step(&mut self, thread_id: ThreadId) {
 619        self.known_thread_states
 620            .insert(thread_id, ThreadStatus::Stepping);
 621    }
 622
 623    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 624        self.thread_state(thread_id)
 625            .unwrap_or(ThreadStatus::Running)
 626    }
 627
 628    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 629        self.known_thread_states
 630            .get(&thread_id)
 631            .copied()
 632            .or(self.global_state)
 633    }
 634
 635    fn exit_thread(&mut self, thread_id: ThreadId) {
 636        self.known_thread_states
 637            .insert(thread_id, ThreadStatus::Exited);
 638    }
 639
 640    fn any_stopped_thread(&self) -> bool {
 641        self.global_state
 642            .is_some_and(|state| state == ThreadStatus::Stopped)
 643            || self
 644                .known_thread_states
 645                .values()
 646                .any(|status| *status == ThreadStatus::Stopped)
 647    }
 648}
 649const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 650
 651type IsEnabled = bool;
 652
 653#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 654pub struct OutputToken(pub usize);
 655/// Represents a current state of a single debug adapter and provides ways to mutate it.
 656pub struct Session {
 657    pub mode: Mode,
 658    id: SessionId,
 659    label: SharedString,
 660    adapter: DebugAdapterName,
 661    pub(super) capabilities: Capabilities,
 662    child_session_ids: HashSet<SessionId>,
 663    parent_session: Option<Entity<Session>>,
 664    modules: Vec<dap::Module>,
 665    loaded_sources: Vec<dap::Source>,
 666    output_token: OutputToken,
 667    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 668    threads: IndexMap<ThreadId, Thread>,
 669    thread_states: ThreadStates,
 670    watchers: HashMap<SharedString, Watcher>,
 671    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 672    stack_frames: IndexMap<StackFrameId, StackFrame>,
 673    locations: HashMap<u64, dap::LocationsResponse>,
 674    is_session_terminated: bool,
 675    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 676    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 677    ignore_breakpoints: bool,
 678    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 679    background_tasks: Vec<Task<()>>,
 680    restart_task: Option<Task<()>>,
 681    task_context: TaskContext,
 682}
 683
 684trait CacheableCommand: Any + Send + Sync {
 685    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 686    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 687    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 688}
 689
 690impl<T> CacheableCommand for T
 691where
 692    T: LocalDapCommand + PartialEq + Eq + Hash,
 693{
 694    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 695        (rhs as &dyn Any)
 696            .downcast_ref::<Self>()
 697            .map_or(false, |rhs| self == rhs)
 698    }
 699
 700    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 701        T::hash(self, &mut hasher);
 702    }
 703
 704    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 705        self
 706    }
 707}
 708
 709pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 710
 711impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 712    fn from(request: T) -> Self {
 713        Self(Arc::new(request))
 714    }
 715}
 716
 717impl PartialEq for RequestSlot {
 718    fn eq(&self, other: &Self) -> bool {
 719        self.0.dyn_eq(other.0.as_ref())
 720    }
 721}
 722
 723impl Eq for RequestSlot {}
 724
 725impl Hash for RequestSlot {
 726    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 727        self.0.dyn_hash(state);
 728        (&*self.0 as &dyn Any).type_id().hash(state)
 729    }
 730}
 731
 732#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 733pub struct CompletionsQuery {
 734    pub query: String,
 735    pub column: u64,
 736    pub line: Option<u64>,
 737    pub frame_id: Option<u64>,
 738}
 739
 740impl CompletionsQuery {
 741    pub fn new(
 742        buffer: &language::Buffer,
 743        cursor_position: language::Anchor,
 744        frame_id: Option<u64>,
 745    ) -> Self {
 746        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 747        Self {
 748            query: buffer.text(),
 749            column: column as u64,
 750            frame_id,
 751            line: Some(row as u64),
 752        }
 753    }
 754}
 755
 756#[derive(Debug)]
 757pub enum SessionEvent {
 758    Modules,
 759    LoadedSources,
 760    Stopped(Option<ThreadId>),
 761    StackTrace,
 762    Variables,
 763    Watchers,
 764    Threads,
 765    InvalidateInlineValue,
 766    CapabilitiesLoaded,
 767    RunInTerminal {
 768        request: RunInTerminalRequestArguments,
 769        sender: mpsc::Sender<Result<u32>>,
 770    },
 771    ConsoleOutput,
 772}
 773
 774#[derive(Clone, Debug, PartialEq, Eq)]
 775pub enum SessionStateEvent {
 776    Running,
 777    Shutdown,
 778    Restart,
 779    SpawnChildSession {
 780        request: StartDebuggingRequestArguments,
 781    },
 782}
 783
 784impl EventEmitter<SessionEvent> for Session {}
 785impl EventEmitter<SessionStateEvent> for Session {}
 786
 787// local session will send breakpoint updates to DAP for all new breakpoints
 788// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 789// BreakpointStore notifies session on breakpoint changes
 790impl Session {
 791    pub(crate) fn new(
 792        breakpoint_store: Entity<BreakpointStore>,
 793        session_id: SessionId,
 794        parent_session: Option<Entity<Session>>,
 795        label: SharedString,
 796        adapter: DebugAdapterName,
 797        task_context: TaskContext,
 798        cx: &mut App,
 799    ) -> Entity<Self> {
 800        cx.new::<Self>(|cx| {
 801            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 802                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 803                    if let Some(local) = (!this.ignore_breakpoints)
 804                        .then(|| this.as_running_mut())
 805                        .flatten()
 806                    {
 807                        local
 808                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 809                            .detach();
 810                    };
 811                }
 812                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 813                    if let Some(local) = (!this.ignore_breakpoints)
 814                        .then(|| this.as_running_mut())
 815                        .flatten()
 816                    {
 817                        local.unset_breakpoints_from_paths(paths, cx).detach();
 818                    }
 819                }
 820                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 821            })
 822            .detach();
 823            // cx.on_app_quit(Self::on_app_quit).detach();
 824
 825            let this = Self {
 826                mode: Mode::Building,
 827                id: session_id,
 828                child_session_ids: HashSet::default(),
 829                parent_session,
 830                capabilities: Capabilities::default(),
 831                watchers: HashMap::default(),
 832                variables: Default::default(),
 833                stack_frames: Default::default(),
 834                thread_states: ThreadStates::default(),
 835                output_token: OutputToken(0),
 836                output: circular_buffer::CircularBuffer::boxed(),
 837                requests: HashMap::default(),
 838                modules: Vec::default(),
 839                loaded_sources: Vec::default(),
 840                threads: IndexMap::default(),
 841                background_tasks: Vec::default(),
 842                restart_task: None,
 843                locations: Default::default(),
 844                is_session_terminated: false,
 845                ignore_breakpoints: false,
 846                breakpoint_store,
 847                exception_breakpoints: Default::default(),
 848                label,
 849                adapter,
 850                task_context,
 851            };
 852
 853            this
 854        })
 855    }
 856
 857    pub fn task_context(&self) -> &TaskContext {
 858        &self.task_context
 859    }
 860
 861    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 862        match &self.mode {
 863            Mode::Building => None,
 864            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 865        }
 866    }
 867
 868    pub fn boot(
 869        &mut self,
 870        binary: DebugAdapterBinary,
 871        worktree: Entity<Worktree>,
 872        dap_store: WeakEntity<DapStore>,
 873        cx: &mut Context<Self>,
 874    ) -> Task<Result<()>> {
 875        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 876        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 877
 878        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 879            let mut initialized_tx = Some(initialized_tx);
 880            while let Some(message) = message_rx.next().await {
 881                if let Message::Event(event) = message {
 882                    if let Events::Initialized(_) = *event {
 883                        if let Some(tx) = initialized_tx.take() {
 884                            tx.send(()).ok();
 885                        }
 886                    } else {
 887                        let Ok(_) = this.update(cx, |session, cx| {
 888                            session.handle_dap_event(event, cx);
 889                        }) else {
 890                            break;
 891                        };
 892                    }
 893                } else if let Message::Request(request) = message {
 894                    let Ok(_) = this.update(cx, |this, cx| {
 895                        if request.command == StartDebugging::COMMAND {
 896                            this.handle_start_debugging_request(request, cx)
 897                                .detach_and_log_err(cx);
 898                        } else if request.command == RunInTerminal::COMMAND {
 899                            this.handle_run_in_terminal_request(request, cx)
 900                                .detach_and_log_err(cx);
 901                        }
 902                    }) else {
 903                        break;
 904                    };
 905                }
 906            }
 907        })];
 908        self.background_tasks = background_tasks;
 909        let id = self.id;
 910        let parent_session = self.parent_session.clone();
 911
 912        cx.spawn(async move |this, cx| {
 913            let mode = RunningMode::new(
 914                id,
 915                parent_session,
 916                worktree.downgrade(),
 917                binary.clone(),
 918                message_tx,
 919                cx,
 920            )
 921            .await?;
 922            this.update(cx, |this, cx| {
 923                this.mode = Mode::Running(mode);
 924                cx.emit(SessionStateEvent::Running);
 925            })?;
 926
 927            this.update(cx, |session, cx| session.request_initialize(cx))?
 928                .await?;
 929
 930            let result = this
 931                .update(cx, |session, cx| {
 932                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 933                })?
 934                .await;
 935
 936            if result.is_err() {
 937                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 938
 939                console
 940                    .send(format!(
 941                        "Tried to launch debugger with: {}",
 942                        serde_json::to_string_pretty(&binary.request_args.configuration)
 943                            .unwrap_or_default(),
 944                    ))
 945                    .await
 946                    .ok();
 947            }
 948
 949            result
 950        })
 951    }
 952
 953    pub fn session_id(&self) -> SessionId {
 954        self.id
 955    }
 956
 957    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 958        self.child_session_ids.clone()
 959    }
 960
 961    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 962        self.child_session_ids.insert(session_id);
 963    }
 964
 965    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 966        self.child_session_ids.remove(&session_id);
 967    }
 968
 969    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 970        self.parent_session
 971            .as_ref()
 972            .map(|session| session.read(cx).id)
 973    }
 974
 975    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 976        self.parent_session.as_ref()
 977    }
 978
 979    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
 980        let Some(client) = self.adapter_client() else {
 981            return Task::ready(());
 982        };
 983
 984        let supports_terminate = self
 985            .capabilities
 986            .support_terminate_debuggee
 987            .unwrap_or(false);
 988
 989        cx.background_spawn(async move {
 990            if supports_terminate {
 991                client
 992                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
 993                        restart: Some(false),
 994                    })
 995                    .await
 996                    .ok();
 997            } else {
 998                client
 999                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1000                        restart: Some(false),
1001                        terminate_debuggee: Some(true),
1002                        suspend_debuggee: Some(false),
1003                    })
1004                    .await
1005                    .ok();
1006            }
1007        })
1008    }
1009
1010    pub fn capabilities(&self) -> &Capabilities {
1011        &self.capabilities
1012    }
1013
1014    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1015        match &self.mode {
1016            Mode::Building => None,
1017            Mode::Running(running_mode) => Some(&running_mode.binary),
1018        }
1019    }
1020
1021    pub fn adapter(&self) -> DebugAdapterName {
1022        self.adapter.clone()
1023    }
1024
1025    pub fn label(&self) -> SharedString {
1026        self.label.clone()
1027    }
1028
1029    pub fn is_terminated(&self) -> bool {
1030        self.is_session_terminated
1031    }
1032
1033    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1034        let (tx, mut rx) = mpsc::unbounded();
1035
1036        cx.spawn(async move |this, cx| {
1037            while let Some(output) = rx.next().await {
1038                this.update(cx, |this, _| {
1039                    let event = dap::OutputEvent {
1040                        category: None,
1041                        output,
1042                        group: None,
1043                        variables_reference: None,
1044                        source: None,
1045                        line: None,
1046                        column: None,
1047                        data: None,
1048                        location_reference: None,
1049                    };
1050                    this.push_output(event);
1051                })?;
1052            }
1053            anyhow::Ok(())
1054        })
1055        .detach();
1056
1057        return tx;
1058    }
1059
1060    pub fn is_started(&self) -> bool {
1061        match &self.mode {
1062            Mode::Building => false,
1063            Mode::Running(running) => running.is_started,
1064        }
1065    }
1066
1067    pub fn is_building(&self) -> bool {
1068        matches!(self.mode, Mode::Building)
1069    }
1070
1071    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1072        match &mut self.mode {
1073            Mode::Running(local_mode) => Some(local_mode),
1074            Mode::Building => None,
1075        }
1076    }
1077
1078    pub fn as_running(&self) -> Option<&RunningMode> {
1079        match &self.mode {
1080            Mode::Running(local_mode) => Some(local_mode),
1081            Mode::Building => None,
1082        }
1083    }
1084
1085    fn handle_start_debugging_request(
1086        &mut self,
1087        request: dap::messages::Request,
1088        cx: &mut Context<Self>,
1089    ) -> Task<Result<()>> {
1090        let request_seq = request.seq;
1091
1092        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1093            .arguments
1094            .as_ref()
1095            .map(|value| serde_json::from_value(value.clone()));
1096
1097        let mut success = true;
1098        if let Some(Ok(request)) = launch_request {
1099            cx.emit(SessionStateEvent::SpawnChildSession { request });
1100        } else {
1101            log::error!(
1102                "Failed to parse launch request arguments: {:?}",
1103                request.arguments
1104            );
1105            success = false;
1106        }
1107
1108        cx.spawn(async move |this, cx| {
1109            this.update(cx, |this, cx| {
1110                this.respond_to_client(
1111                    request_seq,
1112                    success,
1113                    StartDebugging::COMMAND.to_string(),
1114                    None,
1115                    cx,
1116                )
1117            })?
1118            .await
1119        })
1120    }
1121
1122    fn handle_run_in_terminal_request(
1123        &mut self,
1124        request: dap::messages::Request,
1125        cx: &mut Context<Self>,
1126    ) -> Task<Result<()>> {
1127        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1128            request.arguments.unwrap_or_default(),
1129        ) {
1130            Ok(args) => args,
1131            Err(error) => {
1132                return cx.spawn(async move |session, cx| {
1133                    let error = serde_json::to_value(dap::ErrorResponse {
1134                        error: Some(dap::Message {
1135                            id: request.seq,
1136                            format: error.to_string(),
1137                            variables: None,
1138                            send_telemetry: None,
1139                            show_user: None,
1140                            url: None,
1141                            url_label: None,
1142                        }),
1143                    })
1144                    .ok();
1145
1146                    session
1147                        .update(cx, |this, cx| {
1148                            this.respond_to_client(
1149                                request.seq,
1150                                false,
1151                                StartDebugging::COMMAND.to_string(),
1152                                error,
1153                                cx,
1154                            )
1155                        })?
1156                        .await?;
1157
1158                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1159                });
1160            }
1161        };
1162
1163        let seq = request.seq;
1164
1165        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1166        cx.emit(SessionEvent::RunInTerminal {
1167            request: request_args,
1168            sender: tx,
1169        });
1170        cx.notify();
1171
1172        cx.spawn(async move |session, cx| {
1173            let result = util::maybe!(async move {
1174                rx.next().await.ok_or_else(|| {
1175                    anyhow!("failed to receive response from spawn terminal".to_string())
1176                })?
1177            })
1178            .await;
1179            let (success, body) = match result {
1180                Ok(pid) => (
1181                    true,
1182                    serde_json::to_value(dap::RunInTerminalResponse {
1183                        process_id: None,
1184                        shell_process_id: Some(pid as u64),
1185                    })
1186                    .ok(),
1187                ),
1188                Err(error) => (
1189                    false,
1190                    serde_json::to_value(dap::ErrorResponse {
1191                        error: Some(dap::Message {
1192                            id: seq,
1193                            format: error.to_string(),
1194                            variables: None,
1195                            send_telemetry: None,
1196                            show_user: None,
1197                            url: None,
1198                            url_label: None,
1199                        }),
1200                    })
1201                    .ok(),
1202                ),
1203            };
1204
1205            session
1206                .update(cx, |session, cx| {
1207                    session.respond_to_client(
1208                        seq,
1209                        success,
1210                        RunInTerminal::COMMAND.to_string(),
1211                        body,
1212                        cx,
1213                    )
1214                })?
1215                .await
1216        })
1217    }
1218
1219    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1220        let adapter_id = self.adapter().to_string();
1221        let request = Initialize { adapter_id };
1222
1223        let Mode::Running(running) = &self.mode else {
1224            return Task::ready(Err(anyhow!(
1225                "Cannot send initialize request, task still building"
1226            )));
1227        };
1228        let mut response = running.request(request.clone());
1229
1230        cx.spawn(async move |this, cx| {
1231            loop {
1232                let capabilities = response.await;
1233                match capabilities {
1234                    Err(e) => {
1235                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1236                            this.as_running()
1237                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1238                        }) else {
1239                            return Err(e);
1240                        };
1241                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1242                        reconnect.await?;
1243
1244                        let Ok(Some(r)) = this.update(cx, |this, _| {
1245                            this.as_running()
1246                                .map(|running| running.request(request.clone()))
1247                        }) else {
1248                            return Err(e);
1249                        };
1250                        response = r
1251                    }
1252                    Ok(capabilities) => {
1253                        this.update(cx, |session, cx| {
1254                            session.capabilities = capabilities;
1255
1256                            cx.emit(SessionEvent::CapabilitiesLoaded);
1257                        })?;
1258                        return Ok(());
1259                    }
1260                }
1261            }
1262        })
1263    }
1264
1265    pub(super) fn initialize_sequence(
1266        &mut self,
1267        initialize_rx: oneshot::Receiver<()>,
1268        dap_store: WeakEntity<DapStore>,
1269        cx: &mut Context<Self>,
1270    ) -> Task<Result<()>> {
1271        match &self.mode {
1272            Mode::Running(local_mode) => {
1273                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1274            }
1275            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1276        }
1277    }
1278
1279    pub fn run_to_position(
1280        &mut self,
1281        breakpoint: SourceBreakpoint,
1282        active_thread_id: ThreadId,
1283        cx: &mut Context<Self>,
1284    ) {
1285        match &mut self.mode {
1286            Mode::Running(local_mode) => {
1287                if !matches!(
1288                    self.thread_states.thread_state(active_thread_id),
1289                    Some(ThreadStatus::Stopped)
1290                ) {
1291                    return;
1292                };
1293                let path = breakpoint.path.clone();
1294                local_mode.tmp_breakpoint = Some(breakpoint);
1295                let task = local_mode.send_breakpoints_from_path(
1296                    path,
1297                    BreakpointUpdatedReason::Toggled,
1298                    &self.breakpoint_store,
1299                    cx,
1300                );
1301
1302                cx.spawn(async move |this, cx| {
1303                    task.await;
1304                    this.update(cx, |this, cx| {
1305                        this.continue_thread(active_thread_id, cx);
1306                    })
1307                })
1308                .detach();
1309            }
1310            Mode::Building => {}
1311        }
1312    }
1313
1314    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1315        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1316    }
1317
1318    pub fn output(
1319        &self,
1320        since: OutputToken,
1321    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1322        if self.output_token.0 == 0 {
1323            return (self.output.range(0..0), OutputToken(0));
1324        };
1325
1326        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1327
1328        let clamped_events_since = events_since.clamp(0, self.output.len());
1329        (
1330            self.output
1331                .range(self.output.len() - clamped_events_since..),
1332            self.output_token,
1333        )
1334    }
1335
1336    pub fn respond_to_client(
1337        &self,
1338        request_seq: u64,
1339        success: bool,
1340        command: String,
1341        body: Option<serde_json::Value>,
1342        cx: &mut Context<Self>,
1343    ) -> Task<Result<()>> {
1344        let Some(local_session) = self.as_running() else {
1345            unreachable!("Cannot respond to remote client");
1346        };
1347        let client = local_session.client.clone();
1348
1349        cx.background_spawn(async move {
1350            client
1351                .send_message(Message::Response(Response {
1352                    body,
1353                    success,
1354                    command,
1355                    seq: request_seq + 1,
1356                    request_seq,
1357                    message: None,
1358                }))
1359                .await
1360        })
1361    }
1362
1363    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1364        self.mode.stopped();
1365        // todo(debugger): Find a clean way to get around the clone
1366        let breakpoint_store = self.breakpoint_store.clone();
1367        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1368            let breakpoint = local.tmp_breakpoint.take()?;
1369            let path = breakpoint.path.clone();
1370            Some((local, path))
1371        }) {
1372            local
1373                .send_breakpoints_from_path(
1374                    path,
1375                    BreakpointUpdatedReason::Toggled,
1376                    &breakpoint_store,
1377                    cx,
1378                )
1379                .detach();
1380        };
1381
1382        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1383            self.thread_states.stop_all_threads();
1384            self.invalidate_command_type::<StackTraceCommand>();
1385        }
1386
1387        // Event if we stopped all threads we still need to insert the thread_id
1388        // to our own data
1389        if let Some(thread_id) = event.thread_id {
1390            self.thread_states.stop_thread(ThreadId(thread_id));
1391
1392            self.invalidate_state(
1393                &StackTraceCommand {
1394                    thread_id,
1395                    start_frame: None,
1396                    levels: None,
1397                }
1398                .into(),
1399            );
1400        }
1401
1402        self.invalidate_generic();
1403        self.threads.clear();
1404        self.variables.clear();
1405        cx.emit(SessionEvent::Stopped(
1406            event
1407                .thread_id
1408                .map(Into::into)
1409                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1410        ));
1411        cx.emit(SessionEvent::InvalidateInlineValue);
1412        cx.notify();
1413    }
1414
1415    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1416        match *event {
1417            Events::Initialized(_) => {
1418                debug_assert!(
1419                    false,
1420                    "Initialized event should have been handled in LocalMode"
1421                );
1422            }
1423            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1424            Events::Continued(event) => {
1425                if event.all_threads_continued.unwrap_or_default() {
1426                    self.thread_states.continue_all_threads();
1427                    self.breakpoint_store.update(cx, |store, cx| {
1428                        store.remove_active_position(Some(self.session_id()), cx)
1429                    });
1430                } else {
1431                    self.thread_states
1432                        .continue_thread(ThreadId(event.thread_id));
1433                }
1434                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1435                self.invalidate_generic();
1436            }
1437            Events::Exited(_event) => {
1438                self.clear_active_debug_line(cx);
1439            }
1440            Events::Terminated(_) => {
1441                self.shutdown(cx).detach();
1442            }
1443            Events::Thread(event) => {
1444                let thread_id = ThreadId(event.thread_id);
1445
1446                match event.reason {
1447                    dap::ThreadEventReason::Started => {
1448                        self.thread_states.continue_thread(thread_id);
1449                    }
1450                    dap::ThreadEventReason::Exited => {
1451                        self.thread_states.exit_thread(thread_id);
1452                    }
1453                    reason => {
1454                        log::error!("Unhandled thread event reason {:?}", reason);
1455                    }
1456                }
1457                self.invalidate_state(&ThreadsCommand.into());
1458                cx.notify();
1459            }
1460            Events::Output(event) => {
1461                if event
1462                    .category
1463                    .as_ref()
1464                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1465                {
1466                    return;
1467                }
1468
1469                self.push_output(event);
1470                cx.notify();
1471            }
1472            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1473                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1474            }),
1475            Events::Module(event) => {
1476                match event.reason {
1477                    dap::ModuleEventReason::New => {
1478                        self.modules.push(event.module);
1479                    }
1480                    dap::ModuleEventReason::Changed => {
1481                        if let Some(module) = self
1482                            .modules
1483                            .iter_mut()
1484                            .find(|other| event.module.id == other.id)
1485                        {
1486                            *module = event.module;
1487                        }
1488                    }
1489                    dap::ModuleEventReason::Removed => {
1490                        self.modules.retain(|other| event.module.id != other.id);
1491                    }
1492                }
1493
1494                // todo(debugger): We should only send the invalidate command to downstream clients.
1495                // self.invalidate_state(&ModulesCommand.into());
1496            }
1497            Events::LoadedSource(_) => {
1498                self.invalidate_state(&LoadedSourcesCommand.into());
1499            }
1500            Events::Capabilities(event) => {
1501                self.capabilities = self.capabilities.merge(event.capabilities);
1502
1503                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1504                let recent_filters = self
1505                    .capabilities
1506                    .exception_breakpoint_filters
1507                    .iter()
1508                    .flatten()
1509                    .map(|filter| (filter.filter.clone(), filter.clone()))
1510                    .collect::<BTreeMap<_, _>>();
1511                for filter in recent_filters.values() {
1512                    let default = filter.default.unwrap_or_default();
1513                    self.exception_breakpoints
1514                        .entry(filter.filter.clone())
1515                        .or_insert_with(|| (filter.clone(), default));
1516                }
1517                self.exception_breakpoints
1518                    .retain(|k, _| recent_filters.contains_key(k));
1519                if self.is_started() {
1520                    self.send_exception_breakpoints(cx);
1521                }
1522
1523                // Remove the ones that no longer exist.
1524                cx.notify();
1525            }
1526            Events::Memory(_) => {}
1527            Events::Process(_) => {}
1528            Events::ProgressEnd(_) => {}
1529            Events::ProgressStart(_) => {}
1530            Events::ProgressUpdate(_) => {}
1531            Events::Invalidated(_) => {}
1532            Events::Other(_) => {}
1533        }
1534    }
1535
1536    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1537    fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1538        &mut self,
1539        request: T,
1540        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1541        cx: &mut Context<Self>,
1542    ) {
1543        const {
1544            assert!(
1545                T::CACHEABLE,
1546                "Only requests marked as cacheable should invoke `fetch`"
1547            );
1548        }
1549
1550        if !self.thread_states.any_stopped_thread()
1551            && request.type_id() != TypeId::of::<ThreadsCommand>()
1552            || self.is_session_terminated
1553        {
1554            return;
1555        }
1556
1557        let request_map = self
1558            .requests
1559            .entry(std::any::TypeId::of::<T>())
1560            .or_default();
1561
1562        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1563            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1564
1565            let task = Self::request_inner::<Arc<T>>(
1566                &self.capabilities,
1567                &self.mode,
1568                command,
1569                |this, result, cx| {
1570                    process_result(this, result, cx);
1571                    None
1572                },
1573                cx,
1574            );
1575            let task = cx
1576                .background_executor()
1577                .spawn(async move {
1578                    let _ = task.await?;
1579                    Some(())
1580                })
1581                .shared();
1582
1583            vacant.insert(task);
1584            cx.notify();
1585        }
1586    }
1587
1588    fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1589        capabilities: &Capabilities,
1590        mode: &Mode,
1591        request: T,
1592        process_result: impl FnOnce(
1593            &mut Self,
1594            Result<T::Response>,
1595            &mut Context<Self>,
1596        ) -> Option<T::Response>
1597        + 'static,
1598        cx: &mut Context<Self>,
1599    ) -> Task<Option<T::Response>> {
1600        if !T::is_supported(&capabilities) {
1601            log::warn!(
1602                "Attempted to send a DAP request that isn't supported: {:?}",
1603                request
1604            );
1605            let error = Err(anyhow::Error::msg(
1606                "Couldn't complete request because it's not supported",
1607            ));
1608            return cx.spawn(async move |this, cx| {
1609                this.update(cx, |this, cx| process_result(this, error, cx))
1610                    .ok()
1611                    .flatten()
1612            });
1613        }
1614
1615        let request = mode.request_dap(request);
1616        cx.spawn(async move |this, cx| {
1617            let result = request.await;
1618            this.update(cx, |this, cx| process_result(this, result, cx))
1619                .ok()
1620                .flatten()
1621        })
1622    }
1623
1624    fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1625        &self,
1626        request: T,
1627        process_result: impl FnOnce(
1628            &mut Self,
1629            Result<T::Response>,
1630            &mut Context<Self>,
1631        ) -> Option<T::Response>
1632        + 'static,
1633        cx: &mut Context<Self>,
1634    ) -> Task<Option<T::Response>> {
1635        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1636    }
1637
1638    fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1639        self.requests.remove(&std::any::TypeId::of::<Command>());
1640    }
1641
1642    fn invalidate_generic(&mut self) {
1643        self.invalidate_command_type::<ModulesCommand>();
1644        self.invalidate_command_type::<LoadedSourcesCommand>();
1645        self.invalidate_command_type::<ThreadsCommand>();
1646    }
1647
1648    fn invalidate_state(&mut self, key: &RequestSlot) {
1649        self.requests
1650            .entry((&*key.0 as &dyn Any).type_id())
1651            .and_modify(|request_map| {
1652                request_map.remove(&key);
1653            });
1654    }
1655
1656    fn push_output(&mut self, event: OutputEvent) {
1657        self.output.push_back(event);
1658        self.output_token.0 += 1;
1659    }
1660
1661    pub fn any_stopped_thread(&self) -> bool {
1662        self.thread_states.any_stopped_thread()
1663    }
1664
1665    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1666        self.thread_states.thread_status(thread_id)
1667    }
1668
1669    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1670        self.fetch(
1671            dap_command::ThreadsCommand,
1672            |this, result, cx| {
1673                let Some(result) = result.log_err() else {
1674                    return;
1675                };
1676
1677                this.threads = result
1678                    .into_iter()
1679                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1680                    .collect();
1681
1682                this.invalidate_command_type::<StackTraceCommand>();
1683                cx.emit(SessionEvent::Threads);
1684                cx.notify();
1685            },
1686            cx,
1687        );
1688
1689        self.threads
1690            .values()
1691            .map(|thread| {
1692                (
1693                    thread.dap.clone(),
1694                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1695                )
1696            })
1697            .collect()
1698    }
1699
1700    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1701        self.fetch(
1702            dap_command::ModulesCommand,
1703            |this, result, cx| {
1704                let Some(result) = result.log_err() else {
1705                    return;
1706                };
1707
1708                this.modules = result;
1709                cx.emit(SessionEvent::Modules);
1710                cx.notify();
1711            },
1712            cx,
1713        );
1714
1715        &self.modules
1716    }
1717
1718    pub fn ignore_breakpoints(&self) -> bool {
1719        self.ignore_breakpoints
1720    }
1721
1722    pub fn toggle_ignore_breakpoints(
1723        &mut self,
1724        cx: &mut App,
1725    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1726        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1727    }
1728
1729    pub(crate) fn set_ignore_breakpoints(
1730        &mut self,
1731        ignore: bool,
1732        cx: &mut App,
1733    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1734        if self.ignore_breakpoints == ignore {
1735            return Task::ready(HashMap::default());
1736        }
1737
1738        self.ignore_breakpoints = ignore;
1739
1740        if let Some(local) = self.as_running() {
1741            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1742        } else {
1743            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1744            unimplemented!()
1745        }
1746    }
1747
1748    pub fn exception_breakpoints(
1749        &self,
1750    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1751        self.exception_breakpoints.values()
1752    }
1753
1754    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1755        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1756            *is_enabled = !*is_enabled;
1757            self.send_exception_breakpoints(cx);
1758        }
1759    }
1760
1761    fn send_exception_breakpoints(&mut self, cx: &App) {
1762        if let Some(local) = self.as_running() {
1763            let exception_filters = self
1764                .exception_breakpoints
1765                .values()
1766                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1767                .collect();
1768
1769            let supports_exception_filters = self
1770                .capabilities
1771                .supports_exception_filter_options
1772                .unwrap_or_default();
1773            local
1774                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1775                .detach_and_log_err(cx);
1776        } else {
1777            debug_assert!(false, "Not implemented");
1778        }
1779    }
1780
1781    pub fn breakpoints_enabled(&self) -> bool {
1782        self.ignore_breakpoints
1783    }
1784
1785    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1786        self.fetch(
1787            dap_command::LoadedSourcesCommand,
1788            |this, result, cx| {
1789                let Some(result) = result.log_err() else {
1790                    return;
1791                };
1792                this.loaded_sources = result;
1793                cx.emit(SessionEvent::LoadedSources);
1794                cx.notify();
1795            },
1796            cx,
1797        );
1798
1799        &self.loaded_sources
1800    }
1801
1802    fn fallback_to_manual_restart(
1803        &mut self,
1804        res: Result<()>,
1805        cx: &mut Context<Self>,
1806    ) -> Option<()> {
1807        if res.log_err().is_none() {
1808            cx.emit(SessionStateEvent::Restart);
1809            return None;
1810        }
1811        Some(())
1812    }
1813
1814    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1815        res.log_err()?;
1816        Some(())
1817    }
1818
1819    fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
1820        thread_id: ThreadId,
1821    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1822    {
1823        move |this, response, cx| match response.log_err() {
1824            Some(response) => {
1825                this.breakpoint_store.update(cx, |store, cx| {
1826                    store.remove_active_position(Some(this.session_id()), cx)
1827                });
1828                Some(response)
1829            }
1830            None => {
1831                this.thread_states.stop_thread(thread_id);
1832                cx.notify();
1833                None
1834            }
1835        }
1836    }
1837
1838    fn clear_active_debug_line_response(
1839        &mut self,
1840        response: Result<()>,
1841        cx: &mut Context<Session>,
1842    ) -> Option<()> {
1843        response.log_err()?;
1844        self.clear_active_debug_line(cx);
1845        Some(())
1846    }
1847
1848    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1849        self.breakpoint_store.update(cx, |store, cx| {
1850            store.remove_active_position(Some(self.id), cx)
1851        });
1852    }
1853
1854    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1855        self.request(
1856            PauseCommand {
1857                thread_id: thread_id.0,
1858            },
1859            Self::empty_response,
1860            cx,
1861        )
1862        .detach();
1863    }
1864
1865    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1866        self.request(
1867            RestartStackFrameCommand { stack_frame_id },
1868            Self::empty_response,
1869            cx,
1870        )
1871        .detach();
1872    }
1873
1874    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1875        if self.restart_task.is_some() || self.as_running().is_none() {
1876            return;
1877        }
1878
1879        let supports_dap_restart =
1880            self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
1881
1882        self.restart_task = Some(cx.spawn(async move |this, cx| {
1883            let _ = this.update(cx, |session, cx| {
1884                if supports_dap_restart {
1885                    session
1886                        .request(
1887                            RestartCommand {
1888                                raw: args.unwrap_or(Value::Null),
1889                            },
1890                            Self::fallback_to_manual_restart,
1891                            cx,
1892                        )
1893                        .detach();
1894                } else {
1895                    cx.emit(SessionStateEvent::Restart);
1896                }
1897            });
1898        }));
1899    }
1900
1901    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1902        if self.is_session_terminated {
1903            return Task::ready(());
1904        }
1905
1906        self.is_session_terminated = true;
1907        self.thread_states.exit_all_threads();
1908        cx.notify();
1909
1910        let task = if self
1911            .capabilities
1912            .supports_terminate_request
1913            .unwrap_or_default()
1914        {
1915            self.request(
1916                TerminateCommand {
1917                    restart: Some(false),
1918                },
1919                Self::clear_active_debug_line_response,
1920                cx,
1921            )
1922        } else {
1923            self.request(
1924                DisconnectCommand {
1925                    restart: Some(false),
1926                    terminate_debuggee: Some(true),
1927                    suspend_debuggee: Some(false),
1928                },
1929                Self::clear_active_debug_line_response,
1930                cx,
1931            )
1932        };
1933
1934        cx.emit(SessionStateEvent::Shutdown);
1935
1936        cx.spawn(async move |this, cx| {
1937            task.await;
1938            let _ = this.update(cx, |this, _| {
1939                if let Some(adapter_client) = this.adapter_client() {
1940                    adapter_client.kill();
1941                }
1942            });
1943        })
1944    }
1945
1946    pub fn completions(
1947        &mut self,
1948        query: CompletionsQuery,
1949        cx: &mut Context<Self>,
1950    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1951        let task = self.request(query, |_, result, _| result.log_err(), cx);
1952
1953        cx.background_executor().spawn(async move {
1954            anyhow::Ok(
1955                task.await
1956                    .map(|response| response.targets)
1957                    .context("failed to fetch completions")?,
1958            )
1959        })
1960    }
1961
1962    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1963        let supports_single_thread_execution_requests =
1964            self.capabilities.supports_single_thread_execution_requests;
1965        self.thread_states.continue_thread(thread_id);
1966        self.request(
1967            ContinueCommand {
1968                args: ContinueArguments {
1969                    thread_id: thread_id.0,
1970                    single_thread: supports_single_thread_execution_requests,
1971                },
1972            },
1973            Self::on_step_response::<ContinueCommand>(thread_id),
1974            cx,
1975        )
1976        .detach();
1977    }
1978
1979    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1980        match self.mode {
1981            Mode::Running(ref local) => Some(local.client.clone()),
1982            Mode::Building => None,
1983        }
1984    }
1985
1986    pub fn has_ever_stopped(&self) -> bool {
1987        self.mode.has_ever_stopped()
1988    }
1989    pub fn step_over(
1990        &mut self,
1991        thread_id: ThreadId,
1992        granularity: SteppingGranularity,
1993        cx: &mut Context<Self>,
1994    ) {
1995        let supports_single_thread_execution_requests =
1996            self.capabilities.supports_single_thread_execution_requests;
1997        let supports_stepping_granularity = self
1998            .capabilities
1999            .supports_stepping_granularity
2000            .unwrap_or_default();
2001
2002        let command = NextCommand {
2003            inner: StepCommand {
2004                thread_id: thread_id.0,
2005                granularity: supports_stepping_granularity.then(|| granularity),
2006                single_thread: supports_single_thread_execution_requests,
2007            },
2008        };
2009
2010        self.thread_states.process_step(thread_id);
2011        self.request(
2012            command,
2013            Self::on_step_response::<NextCommand>(thread_id),
2014            cx,
2015        )
2016        .detach();
2017    }
2018
2019    pub fn step_in(
2020        &mut self,
2021        thread_id: ThreadId,
2022        granularity: SteppingGranularity,
2023        cx: &mut Context<Self>,
2024    ) {
2025        let supports_single_thread_execution_requests =
2026            self.capabilities.supports_single_thread_execution_requests;
2027        let supports_stepping_granularity = self
2028            .capabilities
2029            .supports_stepping_granularity
2030            .unwrap_or_default();
2031
2032        let command = StepInCommand {
2033            inner: StepCommand {
2034                thread_id: thread_id.0,
2035                granularity: supports_stepping_granularity.then(|| granularity),
2036                single_thread: supports_single_thread_execution_requests,
2037            },
2038        };
2039
2040        self.thread_states.process_step(thread_id);
2041        self.request(
2042            command,
2043            Self::on_step_response::<StepInCommand>(thread_id),
2044            cx,
2045        )
2046        .detach();
2047    }
2048
2049    pub fn step_out(
2050        &mut self,
2051        thread_id: ThreadId,
2052        granularity: SteppingGranularity,
2053        cx: &mut Context<Self>,
2054    ) {
2055        let supports_single_thread_execution_requests =
2056            self.capabilities.supports_single_thread_execution_requests;
2057        let supports_stepping_granularity = self
2058            .capabilities
2059            .supports_stepping_granularity
2060            .unwrap_or_default();
2061
2062        let command = StepOutCommand {
2063            inner: StepCommand {
2064                thread_id: thread_id.0,
2065                granularity: supports_stepping_granularity.then(|| granularity),
2066                single_thread: supports_single_thread_execution_requests,
2067            },
2068        };
2069
2070        self.thread_states.process_step(thread_id);
2071        self.request(
2072            command,
2073            Self::on_step_response::<StepOutCommand>(thread_id),
2074            cx,
2075        )
2076        .detach();
2077    }
2078
2079    pub fn step_back(
2080        &mut self,
2081        thread_id: ThreadId,
2082        granularity: SteppingGranularity,
2083        cx: &mut Context<Self>,
2084    ) {
2085        let supports_single_thread_execution_requests =
2086            self.capabilities.supports_single_thread_execution_requests;
2087        let supports_stepping_granularity = self
2088            .capabilities
2089            .supports_stepping_granularity
2090            .unwrap_or_default();
2091
2092        let command = StepBackCommand {
2093            inner: StepCommand {
2094                thread_id: thread_id.0,
2095                granularity: supports_stepping_granularity.then(|| granularity),
2096                single_thread: supports_single_thread_execution_requests,
2097            },
2098        };
2099
2100        self.thread_states.process_step(thread_id);
2101
2102        self.request(
2103            command,
2104            Self::on_step_response::<StepBackCommand>(thread_id),
2105            cx,
2106        )
2107        .detach();
2108    }
2109
2110    pub fn stack_frames(
2111        &mut self,
2112        thread_id: ThreadId,
2113        cx: &mut Context<Self>,
2114    ) -> Result<Vec<StackFrame>> {
2115        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2116            && self.requests.contains_key(&ThreadsCommand.type_id())
2117            && self.threads.contains_key(&thread_id)
2118        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2119        // We could still be using an old thread id and have sent a new thread's request
2120        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2121        // But it very well could cause a minor bug in the future that is hard to track down
2122        {
2123            self.fetch(
2124                super::dap_command::StackTraceCommand {
2125                    thread_id: thread_id.0,
2126                    start_frame: None,
2127                    levels: None,
2128                },
2129                move |this, stack_frames, cx| {
2130                    let entry =
2131                        this.threads
2132                            .entry(thread_id)
2133                            .and_modify(|thread| match &stack_frames {
2134                                Ok(stack_frames) => {
2135                                    thread.stack_frames = stack_frames
2136                                        .iter()
2137                                        .cloned()
2138                                        .map(StackFrame::from)
2139                                        .collect();
2140                                    thread.stack_frames_error = None;
2141                                }
2142                                Err(error) => {
2143                                    thread.stack_frames.clear();
2144                                    thread.stack_frames_error = Some(error.cloned());
2145                                }
2146                            });
2147                    debug_assert!(
2148                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2149                        "Sent request for thread_id that doesn't exist"
2150                    );
2151                    if let Ok(stack_frames) = stack_frames {
2152                        this.stack_frames.extend(
2153                            stack_frames
2154                                .into_iter()
2155                                .filter(|frame| {
2156                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2157                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2158                                    !(frame.id == 0
2159                                        && frame.line == 0
2160                                        && frame.column == 0
2161                                        && frame.presentation_hint
2162                                            == Some(StackFramePresentationHint::Label))
2163                                })
2164                                .map(|frame| (frame.id, StackFrame::from(frame))),
2165                        );
2166                    }
2167
2168                    this.invalidate_command_type::<ScopesCommand>();
2169                    this.invalidate_command_type::<VariablesCommand>();
2170
2171                    cx.emit(SessionEvent::StackTrace);
2172                },
2173                cx,
2174            );
2175        }
2176
2177        match self.threads.get(&thread_id) {
2178            Some(thread) => {
2179                if let Some(error) = &thread.stack_frames_error {
2180                    Err(error.cloned())
2181                } else {
2182                    Ok(thread.stack_frames.clone())
2183                }
2184            }
2185            None => Ok(Vec::new()),
2186        }
2187    }
2188
2189    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2190        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2191            && self
2192                .requests
2193                .contains_key(&TypeId::of::<StackTraceCommand>())
2194        {
2195            self.fetch(
2196                ScopesCommand { stack_frame_id },
2197                move |this, scopes, cx| {
2198                    let Some(scopes) = scopes.log_err() else {
2199                        return
2200                    };
2201
2202                    for scope in scopes.iter() {
2203                        this.variables(scope.variables_reference, cx);
2204                    }
2205
2206                    let entry = this
2207                        .stack_frames
2208                        .entry(stack_frame_id)
2209                        .and_modify(|stack_frame| {
2210                            stack_frame.scopes = scopes;
2211                        });
2212
2213                    cx.emit(SessionEvent::Variables);
2214
2215                    debug_assert!(
2216                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2217                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2218                    );
2219                },
2220                cx,
2221            );
2222        }
2223
2224        self.stack_frames
2225            .get(&stack_frame_id)
2226            .map(|frame| frame.scopes.as_slice())
2227            .unwrap_or_default()
2228    }
2229
2230    pub fn variables_by_stack_frame_id(
2231        &self,
2232        stack_frame_id: StackFrameId,
2233        globals: bool,
2234        locals: bool,
2235    ) -> Vec<dap::Variable> {
2236        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2237            return Vec::new();
2238        };
2239
2240        stack_frame
2241            .scopes
2242            .iter()
2243            .filter(|scope| {
2244                (scope.name.to_lowercase().contains("local") && locals)
2245                    || (scope.name.to_lowercase().contains("global") && globals)
2246            })
2247            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2248            .flatten()
2249            .cloned()
2250            .collect()
2251    }
2252
2253    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2254        &self.watchers
2255    }
2256
2257    pub fn add_watcher(
2258        &mut self,
2259        expression: SharedString,
2260        frame_id: u64,
2261        cx: &mut Context<Self>,
2262    ) -> Task<Result<()>> {
2263        let request = self.mode.request_dap(EvaluateCommand {
2264            expression: expression.to_string(),
2265            context: Some(EvaluateArgumentsContext::Watch),
2266            frame_id: Some(frame_id),
2267            source: None,
2268        });
2269
2270        cx.spawn(async move |this, cx| {
2271            let response = request.await?;
2272
2273            this.update(cx, |session, cx| {
2274                session.watchers.insert(
2275                    expression.clone(),
2276                    Watcher {
2277                        expression,
2278                        value: response.result.into(),
2279                        variables_reference: response.variables_reference,
2280                        presentation_hint: response.presentation_hint,
2281                    },
2282                );
2283                cx.emit(SessionEvent::Watchers);
2284            })
2285        })
2286    }
2287
2288    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2289        let watches = self.watchers.clone();
2290        for (_, watch) in watches.into_iter() {
2291            self.add_watcher(watch.expression.clone(), frame_id, cx)
2292                .detach();
2293        }
2294    }
2295
2296    pub fn remove_watcher(&mut self, expression: SharedString) {
2297        self.watchers.remove(&expression);
2298    }
2299
2300    pub fn variables(
2301        &mut self,
2302        variables_reference: VariableReference,
2303        cx: &mut Context<Self>,
2304    ) -> Vec<dap::Variable> {
2305        let command = VariablesCommand {
2306            variables_reference,
2307            filter: None,
2308            start: None,
2309            count: None,
2310            format: None,
2311        };
2312
2313        self.fetch(
2314            command,
2315            move |this, variables, cx| {
2316                let Some(variables) = variables.log_err() else {
2317                    return;
2318                };
2319
2320                this.variables.insert(variables_reference, variables);
2321
2322                cx.emit(SessionEvent::Variables);
2323                cx.emit(SessionEvent::InvalidateInlineValue);
2324            },
2325            cx,
2326        );
2327
2328        self.variables
2329            .get(&variables_reference)
2330            .cloned()
2331            .unwrap_or_default()
2332    }
2333
2334    pub fn set_variable_value(
2335        &mut self,
2336        stack_frame_id: u64,
2337        variables_reference: u64,
2338        name: String,
2339        value: String,
2340        cx: &mut Context<Self>,
2341    ) {
2342        if self.capabilities.supports_set_variable.unwrap_or_default() {
2343            self.request(
2344                SetVariableValueCommand {
2345                    name,
2346                    value,
2347                    variables_reference,
2348                },
2349                move |this, response, cx| {
2350                    let response = response.log_err()?;
2351                    this.invalidate_command_type::<VariablesCommand>();
2352                    this.refresh_watchers(stack_frame_id, cx);
2353                    cx.emit(SessionEvent::Variables);
2354                    Some(response)
2355                },
2356                cx,
2357            )
2358            .detach();
2359        }
2360    }
2361
2362    pub fn evaluate(
2363        &mut self,
2364        expression: String,
2365        context: Option<EvaluateArgumentsContext>,
2366        frame_id: Option<u64>,
2367        source: Option<Source>,
2368        cx: &mut Context<Self>,
2369    ) -> Task<()> {
2370        let event = dap::OutputEvent {
2371            category: None,
2372            output: format!("> {expression}"),
2373            group: None,
2374            variables_reference: None,
2375            source: None,
2376            line: None,
2377            column: None,
2378            data: None,
2379            location_reference: None,
2380        };
2381        self.push_output(event);
2382        let request = self.mode.request_dap(EvaluateCommand {
2383            expression,
2384            context,
2385            frame_id,
2386            source,
2387        });
2388        cx.spawn(async move |this, cx| {
2389            let response = request.await;
2390            this.update(cx, |this, cx| {
2391                match response {
2392                    Ok(response) => {
2393                        let event = dap::OutputEvent {
2394                            category: None,
2395                            output: format!("< {}", &response.result),
2396                            group: None,
2397                            variables_reference: Some(response.variables_reference),
2398                            source: None,
2399                            line: None,
2400                            column: None,
2401                            data: None,
2402                            location_reference: None,
2403                        };
2404                        this.push_output(event);
2405                    }
2406                    Err(e) => {
2407                        let event = dap::OutputEvent {
2408                            category: None,
2409                            output: format!("{}", e),
2410                            group: None,
2411                            variables_reference: None,
2412                            source: None,
2413                            line: None,
2414                            column: None,
2415                            data: None,
2416                            location_reference: None,
2417                        };
2418                        this.push_output(event);
2419                    }
2420                };
2421                cx.notify();
2422            })
2423            .ok();
2424        })
2425    }
2426
2427    pub fn location(
2428        &mut self,
2429        reference: u64,
2430        cx: &mut Context<Self>,
2431    ) -> Option<dap::LocationsResponse> {
2432        self.fetch(
2433            LocationsCommand { reference },
2434            move |this, response, _| {
2435                let Some(response) = response.log_err() else {
2436                    return;
2437                };
2438                this.locations.insert(reference, response);
2439            },
2440            cx,
2441        );
2442        self.locations.get(&reference).cloned()
2443    }
2444
2445    pub fn is_attached(&self) -> bool {
2446        let Mode::Running(local_mode) = &self.mode else {
2447            return false;
2448        };
2449        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2450    }
2451
2452    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2453        let command = DisconnectCommand {
2454            restart: Some(false),
2455            terminate_debuggee: Some(false),
2456            suspend_debuggee: Some(false),
2457        };
2458
2459        self.request(command, Self::empty_response, cx).detach()
2460    }
2461
2462    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2463        if self
2464            .capabilities
2465            .supports_terminate_threads_request
2466            .unwrap_or_default()
2467        {
2468            self.request(
2469                TerminateThreadsCommand {
2470                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2471                },
2472                Self::clear_active_debug_line_response,
2473                cx,
2474            )
2475            .detach();
2476        } else {
2477            self.shutdown(cx).detach();
2478        }
2479    }
2480
2481    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2482        self.thread_states.thread_state(thread_id)
2483    }
2484}