session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
  29    StartDebuggingRequestArgumentsRequest, VariablePresentationHint,
  30};
  31use futures::SinkExt;
  32use futures::channel::mpsc::UnboundedSender;
  33use futures::channel::{mpsc, oneshot};
  34use futures::{FutureExt, future::Shared};
  35use gpui::{
  36    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  37    Task, WeakEntity,
  38};
  39
  40use rpc::ErrorExt;
  41use serde_json::Value;
  42use smol::stream::StreamExt;
  43use std::any::TypeId;
  44use std::collections::BTreeMap;
  45use std::u64;
  46use std::{
  47    any::Any,
  48    collections::hash_map::Entry,
  49    hash::{Hash, Hasher},
  50    path::Path,
  51    sync::Arc,
  52};
  53use task::TaskContext;
  54use text::{PointUtf16, ToPointUtf16};
  55use util::ResultExt;
  56use worktree::Worktree;
  57
  58#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  59#[repr(transparent)]
  60pub struct ThreadId(pub u64);
  61
  62impl ThreadId {
  63    pub const MIN: ThreadId = ThreadId(u64::MIN);
  64    pub const MAX: ThreadId = ThreadId(u64::MAX);
  65}
  66
  67impl From<u64> for ThreadId {
  68    fn from(id: u64) -> Self {
  69        Self(id)
  70    }
  71}
  72
  73#[derive(Clone, Debug)]
  74pub struct StackFrame {
  75    pub dap: dap::StackFrame,
  76    pub scopes: Vec<dap::Scope>,
  77}
  78
  79impl From<dap::StackFrame> for StackFrame {
  80    fn from(stack_frame: dap::StackFrame) -> Self {
  81        Self {
  82            scopes: vec![],
  83            dap: stack_frame,
  84        }
  85    }
  86}
  87
  88#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  89pub enum ThreadStatus {
  90    #[default]
  91    Running,
  92    Stopped,
  93    Stepping,
  94    Exited,
  95    Ended,
  96}
  97
  98impl ThreadStatus {
  99    pub fn label(&self) -> &'static str {
 100        match self {
 101            ThreadStatus::Running => "Running",
 102            ThreadStatus::Stopped => "Stopped",
 103            ThreadStatus::Stepping => "Stepping",
 104            ThreadStatus::Exited => "Exited",
 105            ThreadStatus::Ended => "Ended",
 106        }
 107    }
 108}
 109
 110#[derive(Debug)]
 111pub struct Thread {
 112    dap: dap::Thread,
 113    stack_frames: Vec<StackFrame>,
 114    stack_frames_error: Option<anyhow::Error>,
 115    _has_stopped: bool,
 116}
 117
 118impl From<dap::Thread> for Thread {
 119    fn from(dap: dap::Thread) -> Self {
 120        Self {
 121            dap,
 122            stack_frames: Default::default(),
 123            stack_frames_error: None,
 124            _has_stopped: false,
 125        }
 126    }
 127}
 128
 129#[derive(Debug, Clone, PartialEq)]
 130pub struct Watcher {
 131    pub expression: SharedString,
 132    pub value: SharedString,
 133    pub variables_reference: u64,
 134    pub presentation_hint: Option<VariablePresentationHint>,
 135}
 136
 137pub enum Mode {
 138    Building,
 139    Running(RunningMode),
 140}
 141
 142#[derive(Clone)]
 143pub struct RunningMode {
 144    client: Arc<DebugAdapterClient>,
 145    binary: DebugAdapterBinary,
 146    tmp_breakpoint: Option<SourceBreakpoint>,
 147    worktree: WeakEntity<Worktree>,
 148    executor: BackgroundExecutor,
 149    is_started: bool,
 150    has_ever_stopped: bool,
 151    messages_tx: UnboundedSender<Message>,
 152}
 153
 154fn client_source(abs_path: &Path) -> dap::Source {
 155    dap::Source {
 156        name: abs_path
 157            .file_name()
 158            .map(|filename| filename.to_string_lossy().to_string()),
 159        path: Some(abs_path.to_string_lossy().to_string()),
 160        source_reference: None,
 161        presentation_hint: None,
 162        origin: None,
 163        sources: None,
 164        adapter_data: None,
 165        checksums: None,
 166    }
 167}
 168
 169impl RunningMode {
 170    async fn new(
 171        session_id: SessionId,
 172        parent_session: Option<Entity<Session>>,
 173        worktree: WeakEntity<Worktree>,
 174        binary: DebugAdapterBinary,
 175        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 176        cx: &mut AsyncApp,
 177    ) -> Result<Self> {
 178        let message_handler = Box::new({
 179            let messages_tx = messages_tx.clone();
 180            move |message| {
 181                messages_tx.unbounded_send(message).ok();
 182            }
 183        });
 184
 185        let client = if let Some(client) = parent_session
 186            .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 187            .flatten()
 188        {
 189            client
 190                .create_child_connection(session_id, binary.clone(), message_handler, cx)
 191                .await?
 192        } else {
 193            DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
 194        };
 195
 196        Ok(Self {
 197            client: Arc::new(client),
 198            worktree,
 199            tmp_breakpoint: None,
 200            binary,
 201            executor: cx.background_executor().clone(),
 202            is_started: false,
 203            has_ever_stopped: false,
 204            messages_tx,
 205        })
 206    }
 207
 208    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 209        &self.worktree
 210    }
 211
 212    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 213        let tasks: Vec<_> = paths
 214            .into_iter()
 215            .map(|path| {
 216                self.request(dap_command::SetBreakpoints {
 217                    source: client_source(path),
 218                    source_modified: None,
 219                    breakpoints: vec![],
 220                })
 221            })
 222            .collect();
 223
 224        cx.background_spawn(async move {
 225            futures::future::join_all(tasks)
 226                .await
 227                .iter()
 228                .for_each(|res| match res {
 229                    Ok(_) => {}
 230                    Err(err) => {
 231                        log::warn!("Set breakpoints request failed: {}", err);
 232                    }
 233                });
 234        })
 235    }
 236
 237    fn send_breakpoints_from_path(
 238        &self,
 239        abs_path: Arc<Path>,
 240        reason: BreakpointUpdatedReason,
 241        breakpoint_store: &Entity<BreakpointStore>,
 242        cx: &mut App,
 243    ) -> Task<()> {
 244        let breakpoints =
 245            breakpoint_store
 246                .read(cx)
 247                .source_breakpoints_from_path(&abs_path, cx)
 248                .into_iter()
 249                .filter(|bp| bp.state.is_enabled())
 250                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 251                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 252                }))
 253                .map(Into::into)
 254                .collect();
 255
 256        let raw_breakpoints = breakpoint_store
 257            .read(cx)
 258            .breakpoints_from_path(&abs_path)
 259            .into_iter()
 260            .filter(|bp| bp.bp.state.is_enabled())
 261            .collect::<Vec<_>>();
 262
 263        let task = self.request(dap_command::SetBreakpoints {
 264            source: client_source(&abs_path),
 265            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 266            breakpoints,
 267        });
 268        let session_id = self.client.id();
 269        let breakpoint_store = breakpoint_store.downgrade();
 270        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 271            Ok(breakpoints) => {
 272                let breakpoints =
 273                    breakpoints
 274                        .into_iter()
 275                        .zip(raw_breakpoints)
 276                        .filter_map(|(dap_bp, zed_bp)| {
 277                            Some((
 278                                zed_bp,
 279                                BreakpointSessionState {
 280                                    id: dap_bp.id?,
 281                                    verified: dap_bp.verified,
 282                                },
 283                            ))
 284                        });
 285                breakpoint_store
 286                    .update(cx, |this, _| {
 287                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 288                    })
 289                    .ok();
 290            }
 291            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 292        })
 293    }
 294
 295    fn send_exception_breakpoints(
 296        &self,
 297        filters: Vec<ExceptionBreakpointsFilter>,
 298        supports_filter_options: bool,
 299    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 300        let arg = if supports_filter_options {
 301            SetExceptionBreakpoints::WithOptions {
 302                filters: filters
 303                    .into_iter()
 304                    .map(|filter| ExceptionFilterOptions {
 305                        filter_id: filter.filter,
 306                        condition: None,
 307                        mode: None,
 308                    })
 309                    .collect(),
 310            }
 311        } else {
 312            SetExceptionBreakpoints::Plain {
 313                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 314            }
 315        };
 316        self.request(arg)
 317    }
 318
 319    fn send_source_breakpoints(
 320        &self,
 321        ignore_breakpoints: bool,
 322        breakpoint_store: &Entity<BreakpointStore>,
 323        cx: &App,
 324    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 325        let mut breakpoint_tasks = Vec::new();
 326        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 327        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 328        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 329        let session_id = self.client.id();
 330        for (path, breakpoints) in breakpoints {
 331            let breakpoints = if ignore_breakpoints {
 332                vec![]
 333            } else {
 334                breakpoints
 335                    .into_iter()
 336                    .filter(|bp| bp.state.is_enabled())
 337                    .map(Into::into)
 338                    .collect()
 339            };
 340
 341            let raw_breakpoints = raw_breakpoints
 342                .remove(&path)
 343                .unwrap_or_default()
 344                .into_iter()
 345                .filter(|bp| bp.bp.state.is_enabled());
 346            let error_path = path.clone();
 347            let send_request = self
 348                .request(dap_command::SetBreakpoints {
 349                    source: client_source(&path),
 350                    source_modified: Some(false),
 351                    breakpoints,
 352                })
 353                .map(|result| result.map_err(move |e| (error_path, e)));
 354
 355            let task = cx.spawn({
 356                let breakpoint_store = breakpoint_store.downgrade();
 357                async move |cx| {
 358                    let breakpoints = cx.background_spawn(send_request).await?;
 359
 360                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 361                        |(dap_bp, zed_bp)| {
 362                            Some((
 363                                zed_bp,
 364                                BreakpointSessionState {
 365                                    id: dap_bp.id?,
 366                                    verified: dap_bp.verified,
 367                                },
 368                            ))
 369                        },
 370                    );
 371                    breakpoint_store
 372                        .update(cx, |this, _| {
 373                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 374                        })
 375                        .ok();
 376
 377                    Ok(())
 378                }
 379            });
 380            breakpoint_tasks.push(task);
 381        }
 382
 383        cx.background_spawn(async move {
 384            futures::future::join_all(breakpoint_tasks)
 385                .await
 386                .into_iter()
 387                .filter_map(Result::err)
 388                .collect::<HashMap<_, _>>()
 389        })
 390    }
 391
 392    fn initialize_sequence(
 393        &self,
 394        capabilities: &Capabilities,
 395        initialized_rx: oneshot::Receiver<()>,
 396        dap_store: WeakEntity<DapStore>,
 397        cx: &mut Context<Session>,
 398    ) -> Task<Result<()>> {
 399        let raw = self.binary.request_args.clone();
 400
 401        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 402        let launch = match raw.request {
 403            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 404                raw: raw.configuration,
 405            }),
 406            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 407                raw: raw.configuration,
 408            }),
 409        };
 410
 411        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 412        let exception_filters = capabilities
 413            .exception_breakpoint_filters
 414            .as_ref()
 415            .map(|exception_filters| {
 416                exception_filters
 417                    .iter()
 418                    .filter(|filter| filter.default == Some(true))
 419                    .cloned()
 420                    .collect::<Vec<_>>()
 421            })
 422            .unwrap_or_default();
 423        let supports_exception_filters = capabilities
 424            .supports_exception_filter_options
 425            .unwrap_or_default();
 426        let this = self.clone();
 427        let worktree = self.worktree().clone();
 428        let configuration_sequence = cx.spawn({
 429            async move |_, cx| {
 430                let breakpoint_store =
 431                    dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 432                initialized_rx.await?;
 433                let errors_by_path = cx
 434                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 435                    .await;
 436
 437                dap_store.update(cx, |_, cx| {
 438                    let Some(worktree) = worktree.upgrade() else {
 439                        return;
 440                    };
 441
 442                    for (path, error) in &errors_by_path {
 443                        log::error!("failed to set breakpoints for {path:?}: {error}");
 444                    }
 445
 446                    if let Some(failed_path) = errors_by_path.keys().next() {
 447                        let failed_path = failed_path
 448                            .strip_prefix(worktree.read(cx).abs_path())
 449                            .unwrap_or(failed_path)
 450                            .display();
 451                        let message = format!(
 452                            "Failed to set breakpoints for {failed_path}{}",
 453                            match errors_by_path.len() {
 454                                0 => unreachable!(),
 455                                1 => "".into(),
 456                                2 => " and 1 other path".into(),
 457                                n => format!(" and {} other paths", n - 1),
 458                            }
 459                        );
 460                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 461                    }
 462                })?;
 463
 464                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 465                    .await
 466                    .ok();
 467                let ret = if configuration_done_supported {
 468                    this.request(ConfigurationDone {})
 469                } else {
 470                    Task::ready(Ok(()))
 471                }
 472                .await;
 473                ret
 474            }
 475        });
 476
 477        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 478
 479        cx.spawn(async move |this, cx| {
 480            let result = task.await;
 481
 482            this.update(cx, |this, cx| {
 483                if let Some(this) = this.as_running_mut() {
 484                    this.is_started = true;
 485                    cx.notify();
 486                }
 487            })
 488            .ok();
 489
 490            result?;
 491            anyhow::Ok(())
 492        })
 493    }
 494
 495    fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
 496        let client = self.client.clone();
 497        let messages_tx = self.messages_tx.clone();
 498        let message_handler = Box::new(move |message| {
 499            messages_tx.unbounded_send(message).ok();
 500        });
 501        if client.should_reconnect_for_ssh() {
 502            Some(cx.spawn(async move |cx| {
 503                client.connect(message_handler, cx).await?;
 504                anyhow::Ok(())
 505            }))
 506        } else {
 507            None
 508        }
 509    }
 510
 511    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 512    where
 513        <R::DapRequest as dap::requests::Request>::Response: 'static,
 514        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 515    {
 516        let request = Arc::new(request);
 517
 518        let request_clone = request.clone();
 519        let connection = self.client.clone();
 520        self.executor.spawn(async move {
 521            let args = request_clone.to_dap();
 522            let response = connection.request::<R::DapRequest>(args).await?;
 523            request.response_from_dap(response)
 524        })
 525    }
 526}
 527
 528impl Mode {
 529    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 530    where
 531        <R::DapRequest as dap::requests::Request>::Response: 'static,
 532        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 533    {
 534        match self {
 535            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 536            Mode::Building => Task::ready(Err(anyhow!(
 537                "no adapter running to send request: {request:?}"
 538            ))),
 539        }
 540    }
 541
 542    /// Did this debug session stop at least once?
 543    pub(crate) fn has_ever_stopped(&self) -> bool {
 544        match self {
 545            Mode::Building => false,
 546            Mode::Running(running_mode) => running_mode.has_ever_stopped,
 547        }
 548    }
 549
 550    fn stopped(&mut self) {
 551        if let Mode::Running(running) = self {
 552            running.has_ever_stopped = true;
 553        }
 554    }
 555}
 556
 557#[derive(Default)]
 558struct ThreadStates {
 559    global_state: Option<ThreadStatus>,
 560    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 561}
 562
 563impl ThreadStates {
 564    fn stop_all_threads(&mut self) {
 565        self.global_state = Some(ThreadStatus::Stopped);
 566        self.known_thread_states.clear();
 567    }
 568
 569    fn exit_all_threads(&mut self) {
 570        self.global_state = Some(ThreadStatus::Exited);
 571        self.known_thread_states.clear();
 572    }
 573
 574    fn continue_all_threads(&mut self) {
 575        self.global_state = Some(ThreadStatus::Running);
 576        self.known_thread_states.clear();
 577    }
 578
 579    fn stop_thread(&mut self, thread_id: ThreadId) {
 580        self.known_thread_states
 581            .insert(thread_id, ThreadStatus::Stopped);
 582    }
 583
 584    fn continue_thread(&mut self, thread_id: ThreadId) {
 585        self.known_thread_states
 586            .insert(thread_id, ThreadStatus::Running);
 587    }
 588
 589    fn process_step(&mut self, thread_id: ThreadId) {
 590        self.known_thread_states
 591            .insert(thread_id, ThreadStatus::Stepping);
 592    }
 593
 594    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 595        self.thread_state(thread_id)
 596            .unwrap_or(ThreadStatus::Running)
 597    }
 598
 599    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 600        self.known_thread_states
 601            .get(&thread_id)
 602            .copied()
 603            .or(self.global_state)
 604    }
 605
 606    fn exit_thread(&mut self, thread_id: ThreadId) {
 607        self.known_thread_states
 608            .insert(thread_id, ThreadStatus::Exited);
 609    }
 610
 611    fn any_stopped_thread(&self) -> bool {
 612        self.global_state
 613            .is_some_and(|state| state == ThreadStatus::Stopped)
 614            || self
 615                .known_thread_states
 616                .values()
 617                .any(|status| *status == ThreadStatus::Stopped)
 618    }
 619}
 620const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 621
 622type IsEnabled = bool;
 623
 624#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 625pub struct OutputToken(pub usize);
 626/// Represents a current state of a single debug adapter and provides ways to mutate it.
 627pub struct Session {
 628    pub mode: Mode,
 629    id: SessionId,
 630    label: SharedString,
 631    adapter: DebugAdapterName,
 632    pub(super) capabilities: Capabilities,
 633    child_session_ids: HashSet<SessionId>,
 634    parent_session: Option<Entity<Session>>,
 635    modules: Vec<dap::Module>,
 636    loaded_sources: Vec<dap::Source>,
 637    output_token: OutputToken,
 638    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 639    threads: IndexMap<ThreadId, Thread>,
 640    thread_states: ThreadStates,
 641    watchers: HashMap<SharedString, Watcher>,
 642    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 643    stack_frames: IndexMap<StackFrameId, StackFrame>,
 644    locations: HashMap<u64, dap::LocationsResponse>,
 645    is_session_terminated: bool,
 646    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 647    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 648    ignore_breakpoints: bool,
 649    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 650    background_tasks: Vec<Task<()>>,
 651    task_context: TaskContext,
 652}
 653
 654trait CacheableCommand: Any + Send + Sync {
 655    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 656    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 657    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 658}
 659
 660impl<T> CacheableCommand for T
 661where
 662    T: DapCommand + PartialEq + Eq + Hash,
 663{
 664    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 665        (rhs as &dyn Any)
 666            .downcast_ref::<Self>()
 667            .map_or(false, |rhs| self == rhs)
 668    }
 669
 670    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 671        T::hash(self, &mut hasher);
 672    }
 673
 674    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 675        self
 676    }
 677}
 678
 679pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 680
 681impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 682    fn from(request: T) -> Self {
 683        Self(Arc::new(request))
 684    }
 685}
 686
 687impl PartialEq for RequestSlot {
 688    fn eq(&self, other: &Self) -> bool {
 689        self.0.dyn_eq(other.0.as_ref())
 690    }
 691}
 692
 693impl Eq for RequestSlot {}
 694
 695impl Hash for RequestSlot {
 696    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 697        self.0.dyn_hash(state);
 698        (&*self.0 as &dyn Any).type_id().hash(state)
 699    }
 700}
 701
 702#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 703pub struct CompletionsQuery {
 704    pub query: String,
 705    pub column: u64,
 706    pub line: Option<u64>,
 707    pub frame_id: Option<u64>,
 708}
 709
 710impl CompletionsQuery {
 711    pub fn new(
 712        buffer: &language::Buffer,
 713        cursor_position: language::Anchor,
 714        frame_id: Option<u64>,
 715    ) -> Self {
 716        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 717        Self {
 718            query: buffer.text(),
 719            column: column as u64,
 720            frame_id,
 721            line: Some(row as u64),
 722        }
 723    }
 724}
 725
 726#[derive(Debug)]
 727pub enum SessionEvent {
 728    Modules,
 729    LoadedSources,
 730    Stopped(Option<ThreadId>),
 731    StackTrace,
 732    Variables,
 733    Watchers,
 734    Threads,
 735    InvalidateInlineValue,
 736    CapabilitiesLoaded,
 737    RunInTerminal {
 738        request: RunInTerminalRequestArguments,
 739        sender: mpsc::Sender<Result<u32>>,
 740    },
 741    ConsoleOutput,
 742}
 743
 744#[derive(Clone, Debug, PartialEq, Eq)]
 745pub enum SessionStateEvent {
 746    Running,
 747    Shutdown,
 748    Restart,
 749    SpawnChildSession {
 750        request: StartDebuggingRequestArguments,
 751    },
 752}
 753
 754impl EventEmitter<SessionEvent> for Session {}
 755impl EventEmitter<SessionStateEvent> for Session {}
 756
 757// local session will send breakpoint updates to DAP for all new breakpoints
 758// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 759// BreakpointStore notifies session on breakpoint changes
 760impl Session {
 761    pub(crate) fn new(
 762        breakpoint_store: Entity<BreakpointStore>,
 763        session_id: SessionId,
 764        parent_session: Option<Entity<Session>>,
 765        label: SharedString,
 766        adapter: DebugAdapterName,
 767        task_context: TaskContext,
 768        cx: &mut App,
 769    ) -> Entity<Self> {
 770        cx.new::<Self>(|cx| {
 771            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 772                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 773                    if let Some(local) = (!this.ignore_breakpoints)
 774                        .then(|| this.as_running_mut())
 775                        .flatten()
 776                    {
 777                        local
 778                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 779                            .detach();
 780                    };
 781                }
 782                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 783                    if let Some(local) = (!this.ignore_breakpoints)
 784                        .then(|| this.as_running_mut())
 785                        .flatten()
 786                    {
 787                        local.unset_breakpoints_from_paths(paths, cx).detach();
 788                    }
 789                }
 790                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 791            })
 792            .detach();
 793            // cx.on_app_quit(Self::on_app_quit).detach();
 794
 795            let this = Self {
 796                mode: Mode::Building,
 797                id: session_id,
 798                child_session_ids: HashSet::default(),
 799                parent_session,
 800                capabilities: Capabilities::default(),
 801                watchers: HashMap::default(),
 802                variables: Default::default(),
 803                stack_frames: Default::default(),
 804                thread_states: ThreadStates::default(),
 805                output_token: OutputToken(0),
 806                output: circular_buffer::CircularBuffer::boxed(),
 807                requests: HashMap::default(),
 808                modules: Vec::default(),
 809                loaded_sources: Vec::default(),
 810                threads: IndexMap::default(),
 811                background_tasks: Vec::default(),
 812                locations: Default::default(),
 813                is_session_terminated: false,
 814                ignore_breakpoints: false,
 815                breakpoint_store,
 816                exception_breakpoints: Default::default(),
 817                label,
 818                adapter,
 819                task_context,
 820            };
 821
 822            this
 823        })
 824    }
 825
 826    pub fn task_context(&self) -> &TaskContext {
 827        &self.task_context
 828    }
 829
 830    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 831        match &self.mode {
 832            Mode::Building => None,
 833            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 834        }
 835    }
 836
 837    pub fn boot(
 838        &mut self,
 839        binary: DebugAdapterBinary,
 840        worktree: Entity<Worktree>,
 841        dap_store: WeakEntity<DapStore>,
 842        cx: &mut Context<Self>,
 843    ) -> Task<Result<()>> {
 844        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 845        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 846
 847        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 848            let mut initialized_tx = Some(initialized_tx);
 849            while let Some(message) = message_rx.next().await {
 850                if let Message::Event(event) = message {
 851                    if let Events::Initialized(_) = *event {
 852                        if let Some(tx) = initialized_tx.take() {
 853                            tx.send(()).ok();
 854                        }
 855                    } else {
 856                        let Ok(_) = this.update(cx, |session, cx| {
 857                            session.handle_dap_event(event, cx);
 858                        }) else {
 859                            break;
 860                        };
 861                    }
 862                } else if let Message::Request(request) = message {
 863                    let Ok(_) = this.update(cx, |this, cx| {
 864                        if request.command == StartDebugging::COMMAND {
 865                            this.handle_start_debugging_request(request, cx)
 866                                .detach_and_log_err(cx);
 867                        } else if request.command == RunInTerminal::COMMAND {
 868                            this.handle_run_in_terminal_request(request, cx)
 869                                .detach_and_log_err(cx);
 870                        }
 871                    }) else {
 872                        break;
 873                    };
 874                }
 875            }
 876        })];
 877        self.background_tasks = background_tasks;
 878        let id = self.id;
 879        let parent_session = self.parent_session.clone();
 880
 881        cx.spawn(async move |this, cx| {
 882            let mode = RunningMode::new(
 883                id,
 884                parent_session,
 885                worktree.downgrade(),
 886                binary.clone(),
 887                message_tx,
 888                cx,
 889            )
 890            .await?;
 891            this.update(cx, |this, cx| {
 892                this.mode = Mode::Running(mode);
 893                cx.emit(SessionStateEvent::Running);
 894            })?;
 895
 896            this.update(cx, |session, cx| session.request_initialize(cx))?
 897                .await?;
 898
 899            let result = this
 900                .update(cx, |session, cx| {
 901                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 902                })?
 903                .await;
 904
 905            if result.is_err() {
 906                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 907
 908                console
 909                    .send(format!(
 910                        "Tried to launch debugger with: {}",
 911                        serde_json::to_string_pretty(&binary.request_args.configuration)
 912                            .unwrap_or_default(),
 913                    ))
 914                    .await
 915                    .ok();
 916            }
 917
 918            result
 919        })
 920    }
 921
 922    pub fn session_id(&self) -> SessionId {
 923        self.id
 924    }
 925
 926    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 927        self.child_session_ids.clone()
 928    }
 929
 930    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 931        self.child_session_ids.insert(session_id);
 932    }
 933
 934    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 935        self.child_session_ids.remove(&session_id);
 936    }
 937
 938    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 939        self.parent_session
 940            .as_ref()
 941            .map(|session| session.read(cx).id)
 942    }
 943
 944    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 945        self.parent_session.as_ref()
 946    }
 947
 948    pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
 949        let Some(client) = self.adapter_client() else {
 950            return Task::ready(());
 951        };
 952
 953        let supports_terminate = self
 954            .capabilities
 955            .support_terminate_debuggee
 956            .unwrap_or(false);
 957
 958        cx.background_spawn(async move {
 959            if supports_terminate {
 960                client
 961                    .request::<dap::requests::Terminate>(dap::TerminateArguments {
 962                        restart: Some(false),
 963                    })
 964                    .await
 965                    .ok();
 966            } else {
 967                client
 968                    .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
 969                        restart: Some(false),
 970                        terminate_debuggee: Some(true),
 971                        suspend_debuggee: Some(false),
 972                    })
 973                    .await
 974                    .ok();
 975            }
 976        })
 977    }
 978
 979    pub fn capabilities(&self) -> &Capabilities {
 980        &self.capabilities
 981    }
 982
 983    pub fn binary(&self) -> Option<&DebugAdapterBinary> {
 984        match &self.mode {
 985            Mode::Building => None,
 986            Mode::Running(running_mode) => Some(&running_mode.binary),
 987        }
 988    }
 989
 990    pub fn adapter(&self) -> DebugAdapterName {
 991        self.adapter.clone()
 992    }
 993
 994    pub fn label(&self) -> SharedString {
 995        self.label.clone()
 996    }
 997
 998    pub fn is_terminated(&self) -> bool {
 999        self.is_session_terminated
1000    }
1001
1002    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1003        let (tx, mut rx) = mpsc::unbounded();
1004
1005        cx.spawn(async move |this, cx| {
1006            while let Some(output) = rx.next().await {
1007                this.update(cx, |this, cx| {
1008                    let event = dap::OutputEvent {
1009                        category: None,
1010                        output,
1011                        group: None,
1012                        variables_reference: None,
1013                        source: None,
1014                        line: None,
1015                        column: None,
1016                        data: None,
1017                        location_reference: None,
1018                    };
1019                    this.push_output(event, cx);
1020                })?;
1021            }
1022            anyhow::Ok(())
1023        })
1024        .detach();
1025
1026        return tx;
1027    }
1028
1029    pub fn is_started(&self) -> bool {
1030        match &self.mode {
1031            Mode::Building => false,
1032            Mode::Running(running) => running.is_started,
1033        }
1034    }
1035
1036    pub fn is_building(&self) -> bool {
1037        matches!(self.mode, Mode::Building)
1038    }
1039
1040    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1041        match &mut self.mode {
1042            Mode::Running(local_mode) => Some(local_mode),
1043            Mode::Building => None,
1044        }
1045    }
1046
1047    pub fn as_running(&self) -> Option<&RunningMode> {
1048        match &self.mode {
1049            Mode::Running(local_mode) => Some(local_mode),
1050            Mode::Building => None,
1051        }
1052    }
1053
1054    fn handle_start_debugging_request(
1055        &mut self,
1056        request: dap::messages::Request,
1057        cx: &mut Context<Self>,
1058    ) -> Task<Result<()>> {
1059        let request_seq = request.seq;
1060
1061        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1062            .arguments
1063            .as_ref()
1064            .map(|value| serde_json::from_value(value.clone()));
1065
1066        let mut success = true;
1067        if let Some(Ok(request)) = launch_request {
1068            cx.emit(SessionStateEvent::SpawnChildSession { request });
1069        } else {
1070            log::error!(
1071                "Failed to parse launch request arguments: {:?}",
1072                request.arguments
1073            );
1074            success = false;
1075        }
1076
1077        cx.spawn(async move |this, cx| {
1078            this.update(cx, |this, cx| {
1079                this.respond_to_client(
1080                    request_seq,
1081                    success,
1082                    StartDebugging::COMMAND.to_string(),
1083                    None,
1084                    cx,
1085                )
1086            })?
1087            .await
1088        })
1089    }
1090
1091    fn handle_run_in_terminal_request(
1092        &mut self,
1093        request: dap::messages::Request,
1094        cx: &mut Context<Self>,
1095    ) -> Task<Result<()>> {
1096        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1097            request.arguments.unwrap_or_default(),
1098        ) {
1099            Ok(args) => args,
1100            Err(error) => {
1101                return cx.spawn(async move |session, cx| {
1102                    let error = serde_json::to_value(dap::ErrorResponse {
1103                        error: Some(dap::Message {
1104                            id: request.seq,
1105                            format: error.to_string(),
1106                            variables: None,
1107                            send_telemetry: None,
1108                            show_user: None,
1109                            url: None,
1110                            url_label: None,
1111                        }),
1112                    })
1113                    .ok();
1114
1115                    session
1116                        .update(cx, |this, cx| {
1117                            this.respond_to_client(
1118                                request.seq,
1119                                false,
1120                                StartDebugging::COMMAND.to_string(),
1121                                error,
1122                                cx,
1123                            )
1124                        })?
1125                        .await?;
1126
1127                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1128                });
1129            }
1130        };
1131
1132        let seq = request.seq;
1133
1134        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1135        cx.emit(SessionEvent::RunInTerminal {
1136            request: request_args,
1137            sender: tx,
1138        });
1139        cx.notify();
1140
1141        cx.spawn(async move |session, cx| {
1142            let result = util::maybe!(async move {
1143                rx.next().await.ok_or_else(|| {
1144                    anyhow!("failed to receive response from spawn terminal".to_string())
1145                })?
1146            })
1147            .await;
1148            let (success, body) = match result {
1149                Ok(pid) => (
1150                    true,
1151                    serde_json::to_value(dap::RunInTerminalResponse {
1152                        process_id: None,
1153                        shell_process_id: Some(pid as u64),
1154                    })
1155                    .ok(),
1156                ),
1157                Err(error) => (
1158                    false,
1159                    serde_json::to_value(dap::ErrorResponse {
1160                        error: Some(dap::Message {
1161                            id: seq,
1162                            format: error.to_string(),
1163                            variables: None,
1164                            send_telemetry: None,
1165                            show_user: None,
1166                            url: None,
1167                            url_label: None,
1168                        }),
1169                    })
1170                    .ok(),
1171                ),
1172            };
1173
1174            session
1175                .update(cx, |session, cx| {
1176                    session.respond_to_client(
1177                        seq,
1178                        success,
1179                        RunInTerminal::COMMAND.to_string(),
1180                        body,
1181                        cx,
1182                    )
1183                })?
1184                .await
1185        })
1186    }
1187
1188    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1189        let adapter_id = self.adapter().to_string();
1190        let request = Initialize { adapter_id };
1191
1192        let Mode::Running(running) = &self.mode else {
1193            return Task::ready(Err(anyhow!(
1194                "Cannot send initialize request, task still building"
1195            )));
1196        };
1197        let mut response = running.request(request.clone());
1198
1199        cx.spawn(async move |this, cx| {
1200            loop {
1201                let capabilities = response.await;
1202                match capabilities {
1203                    Err(e) => {
1204                        let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1205                            this.as_running()
1206                                .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1207                        }) else {
1208                            return Err(e);
1209                        };
1210                        log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1211                        reconnect.await?;
1212
1213                        let Ok(Some(r)) = this.update(cx, |this, _| {
1214                            this.as_running()
1215                                .map(|running| running.request(request.clone()))
1216                        }) else {
1217                            return Err(e);
1218                        };
1219                        response = r
1220                    }
1221                    Ok(capabilities) => {
1222                        this.update(cx, |session, cx| {
1223                            session.capabilities = capabilities;
1224                            let filters = session
1225                                .capabilities
1226                                .exception_breakpoint_filters
1227                                .clone()
1228                                .unwrap_or_default();
1229                            for filter in filters {
1230                                let default = filter.default.unwrap_or_default();
1231                                session
1232                                    .exception_breakpoints
1233                                    .entry(filter.filter.clone())
1234                                    .or_insert_with(|| (filter, default));
1235                            }
1236                            cx.emit(SessionEvent::CapabilitiesLoaded);
1237                        })?;
1238                        return Ok(());
1239                    }
1240                }
1241            }
1242        })
1243    }
1244
1245    pub(super) fn initialize_sequence(
1246        &mut self,
1247        initialize_rx: oneshot::Receiver<()>,
1248        dap_store: WeakEntity<DapStore>,
1249        cx: &mut Context<Self>,
1250    ) -> Task<Result<()>> {
1251        match &self.mode {
1252            Mode::Running(local_mode) => {
1253                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1254            }
1255            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1256        }
1257    }
1258
1259    pub fn run_to_position(
1260        &mut self,
1261        breakpoint: SourceBreakpoint,
1262        active_thread_id: ThreadId,
1263        cx: &mut Context<Self>,
1264    ) {
1265        match &mut self.mode {
1266            Mode::Running(local_mode) => {
1267                if !matches!(
1268                    self.thread_states.thread_state(active_thread_id),
1269                    Some(ThreadStatus::Stopped)
1270                ) {
1271                    return;
1272                };
1273                let path = breakpoint.path.clone();
1274                local_mode.tmp_breakpoint = Some(breakpoint);
1275                let task = local_mode.send_breakpoints_from_path(
1276                    path,
1277                    BreakpointUpdatedReason::Toggled,
1278                    &self.breakpoint_store,
1279                    cx,
1280                );
1281
1282                cx.spawn(async move |this, cx| {
1283                    task.await;
1284                    this.update(cx, |this, cx| {
1285                        this.continue_thread(active_thread_id, cx);
1286                    })
1287                })
1288                .detach();
1289            }
1290            Mode::Building => {}
1291        }
1292    }
1293
1294    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1295        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1296    }
1297
1298    pub fn output(
1299        &self,
1300        since: OutputToken,
1301    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1302        if self.output_token.0 == 0 {
1303            return (self.output.range(0..0), OutputToken(0));
1304        };
1305
1306        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1307
1308        let clamped_events_since = events_since.clamp(0, self.output.len());
1309        (
1310            self.output
1311                .range(self.output.len() - clamped_events_since..),
1312            self.output_token,
1313        )
1314    }
1315
1316    pub fn respond_to_client(
1317        &self,
1318        request_seq: u64,
1319        success: bool,
1320        command: String,
1321        body: Option<serde_json::Value>,
1322        cx: &mut Context<Self>,
1323    ) -> Task<Result<()>> {
1324        let Some(local_session) = self.as_running() else {
1325            unreachable!("Cannot respond to remote client");
1326        };
1327        let client = local_session.client.clone();
1328
1329        cx.background_spawn(async move {
1330            client
1331                .send_message(Message::Response(Response {
1332                    body,
1333                    success,
1334                    command,
1335                    seq: request_seq + 1,
1336                    request_seq,
1337                    message: None,
1338                }))
1339                .await
1340        })
1341    }
1342
1343    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1344        self.mode.stopped();
1345        // todo(debugger): Find a clean way to get around the clone
1346        let breakpoint_store = self.breakpoint_store.clone();
1347        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1348            let breakpoint = local.tmp_breakpoint.take()?;
1349            let path = breakpoint.path.clone();
1350            Some((local, path))
1351        }) {
1352            local
1353                .send_breakpoints_from_path(
1354                    path,
1355                    BreakpointUpdatedReason::Toggled,
1356                    &breakpoint_store,
1357                    cx,
1358                )
1359                .detach();
1360        };
1361
1362        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1363            self.thread_states.stop_all_threads();
1364            self.invalidate_command_type::<StackTraceCommand>();
1365        }
1366
1367        // Event if we stopped all threads we still need to insert the thread_id
1368        // to our own data
1369        if let Some(thread_id) = event.thread_id {
1370            self.thread_states.stop_thread(ThreadId(thread_id));
1371
1372            self.invalidate_state(
1373                &StackTraceCommand {
1374                    thread_id,
1375                    start_frame: None,
1376                    levels: None,
1377                }
1378                .into(),
1379            );
1380        }
1381
1382        self.invalidate_generic();
1383        self.threads.clear();
1384        self.variables.clear();
1385        cx.emit(SessionEvent::Stopped(
1386            event
1387                .thread_id
1388                .map(Into::into)
1389                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1390        ));
1391        cx.emit(SessionEvent::InvalidateInlineValue);
1392        cx.notify();
1393    }
1394
1395    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1396        match *event {
1397            Events::Initialized(_) => {
1398                debug_assert!(
1399                    false,
1400                    "Initialized event should have been handled in LocalMode"
1401                );
1402            }
1403            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1404            Events::Continued(event) => {
1405                if event.all_threads_continued.unwrap_or_default() {
1406                    self.thread_states.continue_all_threads();
1407                    self.breakpoint_store.update(cx, |store, cx| {
1408                        store.remove_active_position(Some(self.session_id()), cx)
1409                    });
1410                } else {
1411                    self.thread_states
1412                        .continue_thread(ThreadId(event.thread_id));
1413                }
1414                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1415                self.invalidate_generic();
1416            }
1417            Events::Exited(_event) => {
1418                self.clear_active_debug_line(cx);
1419            }
1420            Events::Terminated(_) => {
1421                self.shutdown(cx).detach();
1422            }
1423            Events::Thread(event) => {
1424                let thread_id = ThreadId(event.thread_id);
1425
1426                match event.reason {
1427                    dap::ThreadEventReason::Started => {
1428                        self.thread_states.continue_thread(thread_id);
1429                    }
1430                    dap::ThreadEventReason::Exited => {
1431                        self.thread_states.exit_thread(thread_id);
1432                    }
1433                    reason => {
1434                        log::error!("Unhandled thread event reason {:?}", reason);
1435                    }
1436                }
1437                self.invalidate_state(&ThreadsCommand.into());
1438                cx.notify();
1439            }
1440            Events::Output(event) => {
1441                if event
1442                    .category
1443                    .as_ref()
1444                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1445                {
1446                    return;
1447                }
1448
1449                self.push_output(event, cx);
1450                cx.notify();
1451            }
1452            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1453                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1454            }),
1455            Events::Module(event) => {
1456                match event.reason {
1457                    dap::ModuleEventReason::New => {
1458                        self.modules.push(event.module);
1459                    }
1460                    dap::ModuleEventReason::Changed => {
1461                        if let Some(module) = self
1462                            .modules
1463                            .iter_mut()
1464                            .find(|other| event.module.id == other.id)
1465                        {
1466                            *module = event.module;
1467                        }
1468                    }
1469                    dap::ModuleEventReason::Removed => {
1470                        self.modules.retain(|other| event.module.id != other.id);
1471                    }
1472                }
1473
1474                // todo(debugger): We should only send the invalidate command to downstream clients.
1475                // self.invalidate_state(&ModulesCommand.into());
1476            }
1477            Events::LoadedSource(_) => {
1478                self.invalidate_state(&LoadedSourcesCommand.into());
1479            }
1480            Events::Capabilities(event) => {
1481                self.capabilities = self.capabilities.merge(event.capabilities);
1482
1483                // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1484                let recent_filters = self
1485                    .capabilities
1486                    .exception_breakpoint_filters
1487                    .iter()
1488                    .flatten()
1489                    .map(|filter| (filter.filter.clone(), filter.clone()))
1490                    .collect::<BTreeMap<_, _>>();
1491                for filter in recent_filters.values() {
1492                    let default = filter.default.unwrap_or_default();
1493                    self.exception_breakpoints
1494                        .entry(filter.filter.clone())
1495                        .or_insert_with(|| (filter.clone(), default));
1496                }
1497                self.exception_breakpoints
1498                    .retain(|k, _| recent_filters.contains_key(k));
1499                if self.is_started() {
1500                    self.send_exception_breakpoints(cx);
1501                }
1502
1503                // Remove the ones that no longer exist.
1504                cx.notify();
1505            }
1506            Events::Memory(_) => {}
1507            Events::Process(_) => {}
1508            Events::ProgressEnd(_) => {}
1509            Events::ProgressStart(_) => {}
1510            Events::ProgressUpdate(_) => {}
1511            Events::Invalidated(_) => {}
1512            Events::Other(_) => {}
1513        }
1514    }
1515
1516    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1517    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1518        &mut self,
1519        request: T,
1520        process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1521        cx: &mut Context<Self>,
1522    ) {
1523        const {
1524            assert!(
1525                T::CACHEABLE,
1526                "Only requests marked as cacheable should invoke `fetch`"
1527            );
1528        }
1529
1530        if !self.thread_states.any_stopped_thread()
1531            && request.type_id() != TypeId::of::<ThreadsCommand>()
1532            || self.is_session_terminated
1533        {
1534            return;
1535        }
1536
1537        let request_map = self
1538            .requests
1539            .entry(std::any::TypeId::of::<T>())
1540            .or_default();
1541
1542        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1543            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1544
1545            let task = Self::request_inner::<Arc<T>>(
1546                &self.capabilities,
1547                &self.mode,
1548                command,
1549                |this, result, cx| {
1550                    process_result(this, result, cx);
1551                    None
1552                },
1553                cx,
1554            );
1555            let task = cx
1556                .background_executor()
1557                .spawn(async move {
1558                    let _ = task.await?;
1559                    Some(())
1560                })
1561                .shared();
1562
1563            vacant.insert(task);
1564            cx.notify();
1565        }
1566    }
1567
1568    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1569        capabilities: &Capabilities,
1570        mode: &Mode,
1571        request: T,
1572        process_result: impl FnOnce(
1573            &mut Self,
1574            Result<T::Response>,
1575            &mut Context<Self>,
1576        ) -> Option<T::Response>
1577        + 'static,
1578        cx: &mut Context<Self>,
1579    ) -> Task<Option<T::Response>> {
1580        if !T::is_supported(&capabilities) {
1581            log::warn!(
1582                "Attempted to send a DAP request that isn't supported: {:?}",
1583                request
1584            );
1585            let error = Err(anyhow::Error::msg(
1586                "Couldn't complete request because it's not supported",
1587            ));
1588            return cx.spawn(async move |this, cx| {
1589                this.update(cx, |this, cx| process_result(this, error, cx))
1590                    .ok()
1591                    .flatten()
1592            });
1593        }
1594
1595        let request = mode.request_dap(request);
1596        cx.spawn(async move |this, cx| {
1597            let result = request.await;
1598            this.update(cx, |this, cx| process_result(this, result, cx))
1599                .ok()
1600                .flatten()
1601        })
1602    }
1603
1604    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1605        &self,
1606        request: T,
1607        process_result: impl FnOnce(
1608            &mut Self,
1609            Result<T::Response>,
1610            &mut Context<Self>,
1611        ) -> Option<T::Response>
1612        + 'static,
1613        cx: &mut Context<Self>,
1614    ) -> Task<Option<T::Response>> {
1615        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1616    }
1617
1618    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1619        self.requests.remove(&std::any::TypeId::of::<Command>());
1620    }
1621
1622    fn invalidate_generic(&mut self) {
1623        self.invalidate_command_type::<ModulesCommand>();
1624        self.invalidate_command_type::<LoadedSourcesCommand>();
1625        self.invalidate_command_type::<ThreadsCommand>();
1626    }
1627
1628    fn invalidate_state(&mut self, key: &RequestSlot) {
1629        self.requests
1630            .entry((&*key.0 as &dyn Any).type_id())
1631            .and_modify(|request_map| {
1632                request_map.remove(&key);
1633            });
1634    }
1635
1636    fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1637        self.output.push_back(event);
1638        self.output_token.0 += 1;
1639        cx.emit(SessionEvent::ConsoleOutput);
1640    }
1641
1642    pub fn any_stopped_thread(&self) -> bool {
1643        self.thread_states.any_stopped_thread()
1644    }
1645
1646    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1647        self.thread_states.thread_status(thread_id)
1648    }
1649
1650    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1651        self.fetch(
1652            dap_command::ThreadsCommand,
1653            |this, result, cx| {
1654                let Some(result) = result.log_err() else {
1655                    return;
1656                };
1657
1658                this.threads = result
1659                    .into_iter()
1660                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1661                    .collect();
1662
1663                this.invalidate_command_type::<StackTraceCommand>();
1664                cx.emit(SessionEvent::Threads);
1665                cx.notify();
1666            },
1667            cx,
1668        );
1669
1670        self.threads
1671            .values()
1672            .map(|thread| {
1673                (
1674                    thread.dap.clone(),
1675                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1676                )
1677            })
1678            .collect()
1679    }
1680
1681    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1682        self.fetch(
1683            dap_command::ModulesCommand,
1684            |this, result, cx| {
1685                let Some(result) = result.log_err() else {
1686                    return;
1687                };
1688
1689                this.modules = result;
1690                cx.emit(SessionEvent::Modules);
1691                cx.notify();
1692            },
1693            cx,
1694        );
1695
1696        &self.modules
1697    }
1698
1699    pub fn ignore_breakpoints(&self) -> bool {
1700        self.ignore_breakpoints
1701    }
1702
1703    pub fn toggle_ignore_breakpoints(
1704        &mut self,
1705        cx: &mut App,
1706    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1707        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1708    }
1709
1710    pub(crate) fn set_ignore_breakpoints(
1711        &mut self,
1712        ignore: bool,
1713        cx: &mut App,
1714    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1715        if self.ignore_breakpoints == ignore {
1716            return Task::ready(HashMap::default());
1717        }
1718
1719        self.ignore_breakpoints = ignore;
1720
1721        if let Some(local) = self.as_running() {
1722            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1723        } else {
1724            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1725            unimplemented!()
1726        }
1727    }
1728
1729    pub fn exception_breakpoints(
1730        &self,
1731    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1732        self.exception_breakpoints.values()
1733    }
1734
1735    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1736        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1737            *is_enabled = !*is_enabled;
1738            self.send_exception_breakpoints(cx);
1739        }
1740    }
1741
1742    fn send_exception_breakpoints(&mut self, cx: &App) {
1743        if let Some(local) = self.as_running() {
1744            let exception_filters = self
1745                .exception_breakpoints
1746                .values()
1747                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1748                .collect();
1749
1750            let supports_exception_filters = self
1751                .capabilities
1752                .supports_exception_filter_options
1753                .unwrap_or_default();
1754            local
1755                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1756                .detach_and_log_err(cx);
1757        } else {
1758            debug_assert!(false, "Not implemented");
1759        }
1760    }
1761
1762    pub fn breakpoints_enabled(&self) -> bool {
1763        self.ignore_breakpoints
1764    }
1765
1766    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1767        self.fetch(
1768            dap_command::LoadedSourcesCommand,
1769            |this, result, cx| {
1770                let Some(result) = result.log_err() else {
1771                    return;
1772                };
1773                this.loaded_sources = result;
1774                cx.emit(SessionEvent::LoadedSources);
1775                cx.notify();
1776            },
1777            cx,
1778        );
1779
1780        &self.loaded_sources
1781    }
1782
1783    fn fallback_to_manual_restart(
1784        &mut self,
1785        res: Result<()>,
1786        cx: &mut Context<Self>,
1787    ) -> Option<()> {
1788        if res.log_err().is_none() {
1789            cx.emit(SessionStateEvent::Restart);
1790            return None;
1791        }
1792        Some(())
1793    }
1794
1795    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1796        res.log_err()?;
1797        Some(())
1798    }
1799
1800    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1801        thread_id: ThreadId,
1802    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1803    {
1804        move |this, response, cx| match response.log_err() {
1805            Some(response) => {
1806                this.breakpoint_store.update(cx, |store, cx| {
1807                    store.remove_active_position(Some(this.session_id()), cx)
1808                });
1809                Some(response)
1810            }
1811            None => {
1812                this.thread_states.stop_thread(thread_id);
1813                cx.notify();
1814                None
1815            }
1816        }
1817    }
1818
1819    fn clear_active_debug_line_response(
1820        &mut self,
1821        response: Result<()>,
1822        cx: &mut Context<Session>,
1823    ) -> Option<()> {
1824        response.log_err()?;
1825        self.clear_active_debug_line(cx);
1826        Some(())
1827    }
1828
1829    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1830        self.breakpoint_store.update(cx, |store, cx| {
1831            store.remove_active_position(Some(self.id), cx)
1832        });
1833    }
1834
1835    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1836        self.request(
1837            PauseCommand {
1838                thread_id: thread_id.0,
1839            },
1840            Self::empty_response,
1841            cx,
1842        )
1843        .detach();
1844    }
1845
1846    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1847        self.request(
1848            RestartStackFrameCommand { stack_frame_id },
1849            Self::empty_response,
1850            cx,
1851        )
1852        .detach();
1853    }
1854
1855    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1856        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1857            self.request(
1858                RestartCommand {
1859                    raw: args.unwrap_or(Value::Null),
1860                },
1861                Self::fallback_to_manual_restart,
1862                cx,
1863            )
1864            .detach();
1865        } else {
1866            cx.emit(SessionStateEvent::Restart);
1867        }
1868    }
1869
1870    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1871        if self.is_session_terminated {
1872            return Task::ready(());
1873        }
1874
1875        self.is_session_terminated = true;
1876        self.thread_states.exit_all_threads();
1877        cx.notify();
1878
1879        let task = if self
1880            .capabilities
1881            .supports_terminate_request
1882            .unwrap_or_default()
1883        {
1884            self.request(
1885                TerminateCommand {
1886                    restart: Some(false),
1887                },
1888                Self::clear_active_debug_line_response,
1889                cx,
1890            )
1891        } else {
1892            self.request(
1893                DisconnectCommand {
1894                    restart: Some(false),
1895                    terminate_debuggee: Some(true),
1896                    suspend_debuggee: Some(false),
1897                },
1898                Self::clear_active_debug_line_response,
1899                cx,
1900            )
1901        };
1902
1903        cx.emit(SessionStateEvent::Shutdown);
1904
1905        cx.spawn(async move |_, _| {
1906            task.await;
1907        })
1908    }
1909
1910    pub fn completions(
1911        &mut self,
1912        query: CompletionsQuery,
1913        cx: &mut Context<Self>,
1914    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1915        let task = self.request(query, |_, result, _| result.log_err(), cx);
1916
1917        cx.background_executor().spawn(async move {
1918            anyhow::Ok(
1919                task.await
1920                    .map(|response| response.targets)
1921                    .context("failed to fetch completions")?,
1922            )
1923        })
1924    }
1925
1926    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1927        self.thread_states.continue_thread(thread_id);
1928        self.request(
1929            ContinueCommand {
1930                args: ContinueArguments {
1931                    thread_id: thread_id.0,
1932                    single_thread: Some(true),
1933                },
1934            },
1935            Self::on_step_response::<ContinueCommand>(thread_id),
1936            cx,
1937        )
1938        .detach();
1939    }
1940
1941    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1942        match self.mode {
1943            Mode::Running(ref local) => Some(local.client.clone()),
1944            Mode::Building => None,
1945        }
1946    }
1947
1948    pub fn has_ever_stopped(&self) -> bool {
1949        self.mode.has_ever_stopped()
1950    }
1951    pub fn step_over(
1952        &mut self,
1953        thread_id: ThreadId,
1954        granularity: SteppingGranularity,
1955        cx: &mut Context<Self>,
1956    ) {
1957        let supports_single_thread_execution_requests =
1958            self.capabilities.supports_single_thread_execution_requests;
1959        let supports_stepping_granularity = self
1960            .capabilities
1961            .supports_stepping_granularity
1962            .unwrap_or_default();
1963
1964        let command = NextCommand {
1965            inner: StepCommand {
1966                thread_id: thread_id.0,
1967                granularity: supports_stepping_granularity.then(|| granularity),
1968                single_thread: supports_single_thread_execution_requests,
1969            },
1970        };
1971
1972        self.thread_states.process_step(thread_id);
1973        self.request(
1974            command,
1975            Self::on_step_response::<NextCommand>(thread_id),
1976            cx,
1977        )
1978        .detach();
1979    }
1980
1981    pub fn step_in(
1982        &mut self,
1983        thread_id: ThreadId,
1984        granularity: SteppingGranularity,
1985        cx: &mut Context<Self>,
1986    ) {
1987        let supports_single_thread_execution_requests =
1988            self.capabilities.supports_single_thread_execution_requests;
1989        let supports_stepping_granularity = self
1990            .capabilities
1991            .supports_stepping_granularity
1992            .unwrap_or_default();
1993
1994        let command = StepInCommand {
1995            inner: StepCommand {
1996                thread_id: thread_id.0,
1997                granularity: supports_stepping_granularity.then(|| granularity),
1998                single_thread: supports_single_thread_execution_requests,
1999            },
2000        };
2001
2002        self.thread_states.process_step(thread_id);
2003        self.request(
2004            command,
2005            Self::on_step_response::<StepInCommand>(thread_id),
2006            cx,
2007        )
2008        .detach();
2009    }
2010
2011    pub fn step_out(
2012        &mut self,
2013        thread_id: ThreadId,
2014        granularity: SteppingGranularity,
2015        cx: &mut Context<Self>,
2016    ) {
2017        let supports_single_thread_execution_requests =
2018            self.capabilities.supports_single_thread_execution_requests;
2019        let supports_stepping_granularity = self
2020            .capabilities
2021            .supports_stepping_granularity
2022            .unwrap_or_default();
2023
2024        let command = StepOutCommand {
2025            inner: StepCommand {
2026                thread_id: thread_id.0,
2027                granularity: supports_stepping_granularity.then(|| granularity),
2028                single_thread: supports_single_thread_execution_requests,
2029            },
2030        };
2031
2032        self.thread_states.process_step(thread_id);
2033        self.request(
2034            command,
2035            Self::on_step_response::<StepOutCommand>(thread_id),
2036            cx,
2037        )
2038        .detach();
2039    }
2040
2041    pub fn step_back(
2042        &mut self,
2043        thread_id: ThreadId,
2044        granularity: SteppingGranularity,
2045        cx: &mut Context<Self>,
2046    ) {
2047        let supports_single_thread_execution_requests =
2048            self.capabilities.supports_single_thread_execution_requests;
2049        let supports_stepping_granularity = self
2050            .capabilities
2051            .supports_stepping_granularity
2052            .unwrap_or_default();
2053
2054        let command = StepBackCommand {
2055            inner: StepCommand {
2056                thread_id: thread_id.0,
2057                granularity: supports_stepping_granularity.then(|| granularity),
2058                single_thread: supports_single_thread_execution_requests,
2059            },
2060        };
2061
2062        self.thread_states.process_step(thread_id);
2063
2064        self.request(
2065            command,
2066            Self::on_step_response::<StepBackCommand>(thread_id),
2067            cx,
2068        )
2069        .detach();
2070    }
2071
2072    pub fn stack_frames(
2073        &mut self,
2074        thread_id: ThreadId,
2075        cx: &mut Context<Self>,
2076    ) -> Result<Vec<StackFrame>> {
2077        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2078            && self.requests.contains_key(&ThreadsCommand.type_id())
2079            && self.threads.contains_key(&thread_id)
2080        // ^ todo(debugger): We need a better way to check that we're not querying stale data
2081        // We could still be using an old thread id and have sent a new thread's request
2082        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2083        // But it very well could cause a minor bug in the future that is hard to track down
2084        {
2085            self.fetch(
2086                super::dap_command::StackTraceCommand {
2087                    thread_id: thread_id.0,
2088                    start_frame: None,
2089                    levels: None,
2090                },
2091                move |this, stack_frames, cx| {
2092                    let entry =
2093                        this.threads
2094                            .entry(thread_id)
2095                            .and_modify(|thread| match &stack_frames {
2096                                Ok(stack_frames) => {
2097                                    thread.stack_frames = stack_frames
2098                                        .iter()
2099                                        .cloned()
2100                                        .map(StackFrame::from)
2101                                        .collect();
2102                                    thread.stack_frames_error = None;
2103                                }
2104                                Err(error) => {
2105                                    thread.stack_frames.clear();
2106                                    thread.stack_frames_error = Some(error.cloned());
2107                                }
2108                            });
2109                    debug_assert!(
2110                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2111                        "Sent request for thread_id that doesn't exist"
2112                    );
2113                    if let Ok(stack_frames) = stack_frames {
2114                        this.stack_frames.extend(
2115                            stack_frames
2116                                .into_iter()
2117                                .filter(|frame| {
2118                                    // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2119                                    // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2120                                    !(frame.id == 0
2121                                        && frame.line == 0
2122                                        && frame.column == 0
2123                                        && frame.presentation_hint
2124                                            == Some(StackFramePresentationHint::Label))
2125                                })
2126                                .map(|frame| (frame.id, StackFrame::from(frame))),
2127                        );
2128                    }
2129
2130                    this.invalidate_command_type::<ScopesCommand>();
2131                    this.invalidate_command_type::<VariablesCommand>();
2132
2133                    cx.emit(SessionEvent::StackTrace);
2134                },
2135                cx,
2136            );
2137        }
2138
2139        match self.threads.get(&thread_id) {
2140            Some(thread) => {
2141                if let Some(error) = &thread.stack_frames_error {
2142                    Err(error.cloned())
2143                } else {
2144                    Ok(thread.stack_frames.clone())
2145                }
2146            }
2147            None => Ok(Vec::new()),
2148        }
2149    }
2150
2151    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2152        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2153            && self
2154                .requests
2155                .contains_key(&TypeId::of::<StackTraceCommand>())
2156        {
2157            self.fetch(
2158                ScopesCommand { stack_frame_id },
2159                move |this, scopes, cx| {
2160                    let Some(scopes) = scopes.log_err() else {
2161                        return
2162                    };
2163
2164                    for scope in scopes.iter() {
2165                        this.variables(scope.variables_reference, cx);
2166                    }
2167
2168                    let entry = this
2169                        .stack_frames
2170                        .entry(stack_frame_id)
2171                        .and_modify(|stack_frame| {
2172                            stack_frame.scopes = scopes;
2173                        });
2174
2175                    cx.emit(SessionEvent::Variables);
2176
2177                    debug_assert!(
2178                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2179                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2180                    );
2181                },
2182                cx,
2183            );
2184        }
2185
2186        self.stack_frames
2187            .get(&stack_frame_id)
2188            .map(|frame| frame.scopes.as_slice())
2189            .unwrap_or_default()
2190    }
2191
2192    pub fn variables_by_stack_frame_id(
2193        &self,
2194        stack_frame_id: StackFrameId,
2195        globals: bool,
2196        locals: bool,
2197    ) -> Vec<dap::Variable> {
2198        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2199            return Vec::new();
2200        };
2201
2202        stack_frame
2203            .scopes
2204            .iter()
2205            .filter(|scope| {
2206                (scope.name.to_lowercase().contains("local") && locals)
2207                    || (scope.name.to_lowercase().contains("global") && globals)
2208            })
2209            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2210            .flatten()
2211            .cloned()
2212            .collect()
2213    }
2214
2215    pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2216        &self.watchers
2217    }
2218
2219    pub fn add_watcher(
2220        &mut self,
2221        expression: SharedString,
2222        frame_id: u64,
2223        cx: &mut Context<Self>,
2224    ) -> Task<Result<()>> {
2225        let request = self.mode.request_dap(EvaluateCommand {
2226            expression: expression.to_string(),
2227            context: Some(EvaluateArgumentsContext::Watch),
2228            frame_id: Some(frame_id),
2229            source: None,
2230        });
2231
2232        cx.spawn(async move |this, cx| {
2233            let response = request.await?;
2234
2235            this.update(cx, |session, cx| {
2236                session.watchers.insert(
2237                    expression.clone(),
2238                    Watcher {
2239                        expression,
2240                        value: response.result.into(),
2241                        variables_reference: response.variables_reference,
2242                        presentation_hint: response.presentation_hint,
2243                    },
2244                );
2245                cx.emit(SessionEvent::Watchers);
2246            })
2247        })
2248    }
2249
2250    pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2251        let watches = self.watchers.clone();
2252        for (_, watch) in watches.into_iter() {
2253            self.add_watcher(watch.expression.clone(), frame_id, cx)
2254                .detach();
2255        }
2256    }
2257
2258    pub fn remove_watcher(&mut self, expression: SharedString) {
2259        self.watchers.remove(&expression);
2260    }
2261
2262    pub fn variables(
2263        &mut self,
2264        variables_reference: VariableReference,
2265        cx: &mut Context<Self>,
2266    ) -> Vec<dap::Variable> {
2267        let command = VariablesCommand {
2268            variables_reference,
2269            filter: None,
2270            start: None,
2271            count: None,
2272            format: None,
2273        };
2274
2275        self.fetch(
2276            command,
2277            move |this, variables, cx| {
2278                let Some(variables) = variables.log_err() else {
2279                    return;
2280                };
2281
2282                this.variables.insert(variables_reference, variables);
2283
2284                cx.emit(SessionEvent::Variables);
2285                cx.emit(SessionEvent::InvalidateInlineValue);
2286            },
2287            cx,
2288        );
2289
2290        self.variables
2291            .get(&variables_reference)
2292            .cloned()
2293            .unwrap_or_default()
2294    }
2295
2296    pub fn set_variable_value(
2297        &mut self,
2298        stack_frame_id: u64,
2299        variables_reference: u64,
2300        name: String,
2301        value: String,
2302        cx: &mut Context<Self>,
2303    ) {
2304        if self.capabilities.supports_set_variable.unwrap_or_default() {
2305            self.request(
2306                SetVariableValueCommand {
2307                    name,
2308                    value,
2309                    variables_reference,
2310                },
2311                move |this, response, cx| {
2312                    let response = response.log_err()?;
2313                    this.invalidate_command_type::<VariablesCommand>();
2314                    this.refresh_watchers(stack_frame_id, cx);
2315                    cx.emit(SessionEvent::Variables);
2316                    Some(response)
2317                },
2318                cx,
2319            )
2320            .detach();
2321        }
2322    }
2323
2324    pub fn evaluate(
2325        &mut self,
2326        expression: String,
2327        context: Option<EvaluateArgumentsContext>,
2328        frame_id: Option<u64>,
2329        source: Option<Source>,
2330        cx: &mut Context<Self>,
2331    ) -> Task<()> {
2332        let event = dap::OutputEvent {
2333            category: None,
2334            output: format!("> {expression}"),
2335            group: None,
2336            variables_reference: None,
2337            source: None,
2338            line: None,
2339            column: None,
2340            data: None,
2341            location_reference: None,
2342        };
2343        self.push_output(event, cx);
2344        let request = self.mode.request_dap(EvaluateCommand {
2345            expression,
2346            context,
2347            frame_id,
2348            source,
2349        });
2350        cx.spawn(async move |this, cx| {
2351            let response = request.await;
2352            this.update(cx, |this, cx| {
2353                match response {
2354                    Ok(response) => {
2355                        let event = dap::OutputEvent {
2356                            category: None,
2357                            output: format!("< {}", &response.result),
2358                            group: None,
2359                            variables_reference: Some(response.variables_reference),
2360                            source: None,
2361                            line: None,
2362                            column: None,
2363                            data: None,
2364                            location_reference: None,
2365                        };
2366                        this.push_output(event, cx);
2367                    }
2368                    Err(e) => {
2369                        let event = dap::OutputEvent {
2370                            category: None,
2371                            output: format!("{}", e),
2372                            group: None,
2373                            variables_reference: None,
2374                            source: None,
2375                            line: None,
2376                            column: None,
2377                            data: None,
2378                            location_reference: None,
2379                        };
2380                        this.push_output(event, cx);
2381                    }
2382                };
2383                cx.notify();
2384            })
2385            .ok();
2386        })
2387    }
2388
2389    pub fn location(
2390        &mut self,
2391        reference: u64,
2392        cx: &mut Context<Self>,
2393    ) -> Option<dap::LocationsResponse> {
2394        self.fetch(
2395            LocationsCommand { reference },
2396            move |this, response, _| {
2397                let Some(response) = response.log_err() else {
2398                    return;
2399                };
2400                this.locations.insert(reference, response);
2401            },
2402            cx,
2403        );
2404        self.locations.get(&reference).cloned()
2405    }
2406
2407    pub fn is_attached(&self) -> bool {
2408        let Mode::Running(local_mode) = &self.mode else {
2409            return false;
2410        };
2411        local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2412    }
2413
2414    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2415        let command = DisconnectCommand {
2416            restart: Some(false),
2417            terminate_debuggee: Some(false),
2418            suspend_debuggee: Some(false),
2419        };
2420
2421        self.request(command, Self::empty_response, cx).detach()
2422    }
2423
2424    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2425        if self
2426            .capabilities
2427            .supports_terminate_threads_request
2428            .unwrap_or_default()
2429        {
2430            self.request(
2431                TerminateThreadsCommand {
2432                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2433                },
2434                Self::clear_active_debug_line_response,
2435                cx,
2436            )
2437            .detach();
2438        } else {
2439            self.shutdown(cx).detach();
2440        }
2441    }
2442
2443    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2444        self.thread_states.thread_state(thread_id)
2445    }
2446}