session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::DebugAdapterBinary;
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22};
  23use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
  24use futures::channel::oneshot;
  25use futures::{FutureExt, future::Shared};
  26use gpui::{
  27    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  28    Task, WeakEntity,
  29};
  30
  31use serde_json::{Value, json};
  32use smol::stream::StreamExt;
  33use std::any::TypeId;
  34use std::collections::BTreeMap;
  35use std::u64;
  36use std::{
  37    any::Any,
  38    collections::hash_map::Entry,
  39    hash::{Hash, Hasher},
  40    path::Path,
  41    sync::Arc,
  42};
  43use task::DebugTaskDefinition;
  44use text::{PointUtf16, ToPointUtf16};
  45use util::{ResultExt, merge_json_value_into};
  46use worktree::Worktree;
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117enum Mode {
 118    Building,
 119    Running(LocalMode),
 120}
 121
 122#[derive(Clone)]
 123pub struct LocalMode {
 124    client: Arc<DebugAdapterClient>,
 125    binary: DebugAdapterBinary,
 126    root_binary: Option<Arc<DebugAdapterBinary>>,
 127    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 128    tmp_breakpoint: Option<SourceBreakpoint>,
 129    worktree: WeakEntity<Worktree>,
 130}
 131
 132fn client_source(abs_path: &Path) -> dap::Source {
 133    dap::Source {
 134        name: abs_path
 135            .file_name()
 136            .map(|filename| filename.to_string_lossy().to_string()),
 137        path: Some(abs_path.to_string_lossy().to_string()),
 138        source_reference: None,
 139        presentation_hint: None,
 140        origin: None,
 141        sources: None,
 142        adapter_data: None,
 143        checksums: None,
 144    }
 145}
 146
 147impl LocalMode {
 148    async fn new(
 149        session_id: SessionId,
 150        parent_session: Option<Entity<Session>>,
 151        worktree: WeakEntity<Worktree>,
 152        breakpoint_store: Entity<BreakpointStore>,
 153        binary: DebugAdapterBinary,
 154        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 155        cx: AsyncApp,
 156    ) -> Result<Self> {
 157        let message_handler = Box::new(move |message| {
 158            messages_tx.unbounded_send(message).ok();
 159        });
 160
 161        let root_binary = if let Some(parent_session) = parent_session.as_ref() {
 162            Some(parent_session.read_with(&cx, |session, _| session.root_binary().clone())?)
 163        } else {
 164            None
 165        };
 166
 167        let client = Arc::new(
 168            if let Some(client) = parent_session
 169                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 170                .flatten()
 171            {
 172                client
 173                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 174                    .await?
 175            } else {
 176                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 177                    .await
 178                    .with_context(|| "Failed to start communication with debug adapter")?
 179            },
 180        );
 181
 182        Ok(Self {
 183            client,
 184            breakpoint_store,
 185            worktree,
 186            tmp_breakpoint: None,
 187            root_binary,
 188            binary,
 189        })
 190    }
 191
 192    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 193        &self.worktree
 194    }
 195
 196    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 197        let tasks: Vec<_> = paths
 198            .into_iter()
 199            .map(|path| {
 200                self.request(
 201                    dap_command::SetBreakpoints {
 202                        source: client_source(path),
 203                        source_modified: None,
 204                        breakpoints: vec![],
 205                    },
 206                    cx.background_executor().clone(),
 207                )
 208            })
 209            .collect();
 210
 211        cx.background_spawn(async move {
 212            futures::future::join_all(tasks)
 213                .await
 214                .iter()
 215                .for_each(|res| match res {
 216                    Ok(_) => {}
 217                    Err(err) => {
 218                        log::warn!("Set breakpoints request failed: {}", err);
 219                    }
 220                });
 221        })
 222    }
 223
 224    fn send_breakpoints_from_path(
 225        &self,
 226        abs_path: Arc<Path>,
 227        reason: BreakpointUpdatedReason,
 228        cx: &mut App,
 229    ) -> Task<()> {
 230        let breakpoints = self
 231            .breakpoint_store
 232            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 233            .into_iter()
 234            .filter(|bp| bp.state.is_enabled())
 235            .chain(self.tmp_breakpoint.clone())
 236            .map(Into::into)
 237            .collect();
 238
 239        let task = self.request(
 240            dap_command::SetBreakpoints {
 241                source: client_source(&abs_path),
 242                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 243                breakpoints,
 244            },
 245            cx.background_executor().clone(),
 246        );
 247
 248        cx.background_spawn(async move {
 249            match task.await {
 250                Ok(_) => {}
 251                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 252            }
 253        })
 254    }
 255
 256    fn send_exception_breakpoints(
 257        &self,
 258        filters: Vec<ExceptionBreakpointsFilter>,
 259        supports_filter_options: bool,
 260        cx: &App,
 261    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 262        let arg = if supports_filter_options {
 263            SetExceptionBreakpoints::WithOptions {
 264                filters: filters
 265                    .into_iter()
 266                    .map(|filter| ExceptionFilterOptions {
 267                        filter_id: filter.filter,
 268                        condition: None,
 269                        mode: None,
 270                    })
 271                    .collect(),
 272            }
 273        } else {
 274            SetExceptionBreakpoints::Plain {
 275                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 276            }
 277        };
 278        self.request(arg, cx.background_executor().clone())
 279    }
 280
 281    fn send_source_breakpoints(
 282        &self,
 283        ignore_breakpoints: bool,
 284        cx: &App,
 285    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 286        let mut breakpoint_tasks = Vec::new();
 287        let breakpoints = self
 288            .breakpoint_store
 289            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 290
 291        for (path, breakpoints) in breakpoints {
 292            let breakpoints = if ignore_breakpoints {
 293                vec![]
 294            } else {
 295                breakpoints
 296                    .into_iter()
 297                    .filter(|bp| bp.state.is_enabled())
 298                    .map(Into::into)
 299                    .collect()
 300            };
 301
 302            breakpoint_tasks.push(
 303                self.request(
 304                    dap_command::SetBreakpoints {
 305                        source: client_source(&path),
 306                        source_modified: Some(false),
 307                        breakpoints,
 308                    },
 309                    cx.background_executor().clone(),
 310                )
 311                .map(|result| result.map_err(|e| (path, e))),
 312            );
 313        }
 314
 315        cx.background_spawn(async move {
 316            futures::future::join_all(breakpoint_tasks)
 317                .await
 318                .into_iter()
 319                .filter_map(Result::err)
 320                .collect::<HashMap<_, _>>()
 321        })
 322    }
 323
 324    fn initialize_sequence(
 325        &self,
 326        capabilities: &Capabilities,
 327        definition: &DebugTaskDefinition,
 328        initialized_rx: oneshot::Receiver<()>,
 329        dap_store: WeakEntity<DapStore>,
 330        cx: &App,
 331    ) -> Task<Result<()>> {
 332        let mut raw = self.binary.request_args.clone();
 333
 334        merge_json_value_into(
 335            definition.initialize_args.clone().unwrap_or(json!({})),
 336            &mut raw.configuration,
 337        );
 338
 339        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 340        let launch = match raw.request {
 341            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
 342                Launch {
 343                    raw: raw.configuration,
 344                },
 345                cx.background_executor().clone(),
 346            ),
 347            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
 348                Attach {
 349                    raw: raw.configuration,
 350                },
 351                cx.background_executor().clone(),
 352            ),
 353        };
 354
 355        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 356        let exception_filters = capabilities
 357            .exception_breakpoint_filters
 358            .as_ref()
 359            .map(|exception_filters| {
 360                exception_filters
 361                    .iter()
 362                    .filter(|filter| filter.default == Some(true))
 363                    .cloned()
 364                    .collect::<Vec<_>>()
 365            })
 366            .unwrap_or_default();
 367        let supports_exception_filters = capabilities
 368            .supports_exception_filter_options
 369            .unwrap_or_default();
 370        let this = self.clone();
 371        let worktree = self.worktree().clone();
 372        let configuration_sequence = cx.spawn({
 373            async move |cx| {
 374                initialized_rx.await?;
 375                let errors_by_path = cx
 376                    .update(|cx| this.send_source_breakpoints(false, cx))?
 377                    .await;
 378
 379                dap_store.update(cx, |_, cx| {
 380                    let Some(worktree) = worktree.upgrade() else {
 381                        return;
 382                    };
 383
 384                    for (path, error) in &errors_by_path {
 385                        log::error!("failed to set breakpoints for {path:?}: {error}");
 386                    }
 387
 388                    if let Some(failed_path) = errors_by_path.keys().next() {
 389                        let failed_path = failed_path
 390                            .strip_prefix(worktree.read(cx).abs_path())
 391                            .unwrap_or(failed_path)
 392                            .display();
 393                        let message = format!(
 394                            "Failed to set breakpoints for {failed_path}{}",
 395                            match errors_by_path.len() {
 396                                0 => unreachable!(),
 397                                1 => "".into(),
 398                                2 => " and 1 other path".into(),
 399                                n => format!(" and {} other paths", n - 1),
 400                            }
 401                        );
 402                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 403                    }
 404                })?;
 405
 406                cx.update(|cx| {
 407                    this.send_exception_breakpoints(
 408                        exception_filters,
 409                        supports_exception_filters,
 410                        cx,
 411                    )
 412                })?
 413                .await
 414                .ok();
 415                let ret = if configuration_done_supported {
 416                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 417                } else {
 418                    Task::ready(Ok(()))
 419                }
 420                .await;
 421                ret
 422            }
 423        });
 424
 425        cx.background_spawn(async move {
 426            futures::future::try_join(launch, configuration_sequence).await?;
 427            Ok(())
 428        })
 429    }
 430
 431    fn request<R: LocalDapCommand>(
 432        &self,
 433        request: R,
 434        executor: BackgroundExecutor,
 435    ) -> Task<Result<R::Response>>
 436    where
 437        <R::DapRequest as dap::requests::Request>::Response: 'static,
 438        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 439    {
 440        let request = Arc::new(request);
 441
 442        let request_clone = request.clone();
 443        let connection = self.client.clone();
 444        let request_task = executor.spawn(async move {
 445            let args = request_clone.to_dap();
 446            connection.request::<R::DapRequest>(args).await
 447        });
 448
 449        executor.spawn(async move {
 450            let response = request.response_from_dap(request_task.await?);
 451            response
 452        })
 453    }
 454}
 455
 456impl Mode {
 457    fn request_dap<R: DapCommand>(
 458        &self,
 459        request: R,
 460        cx: &mut Context<Session>,
 461    ) -> Task<Result<R::Response>>
 462    where
 463        <R::DapRequest as dap::requests::Request>::Response: 'static,
 464        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 465    {
 466        match self {
 467            Mode::Running(debug_adapter_client) => {
 468                debug_adapter_client.request(request, cx.background_executor().clone())
 469            }
 470            Mode::Building => Task::ready(Err(anyhow!(
 471                "no adapter running to send request: {:?}",
 472                request
 473            ))),
 474        }
 475    }
 476}
 477
 478#[derive(Default)]
 479struct ThreadStates {
 480    global_state: Option<ThreadStatus>,
 481    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 482}
 483
 484impl ThreadStates {
 485    fn stop_all_threads(&mut self) {
 486        self.global_state = Some(ThreadStatus::Stopped);
 487        self.known_thread_states.clear();
 488    }
 489
 490    fn exit_all_threads(&mut self) {
 491        self.global_state = Some(ThreadStatus::Exited);
 492        self.known_thread_states.clear();
 493    }
 494
 495    fn continue_all_threads(&mut self) {
 496        self.global_state = Some(ThreadStatus::Running);
 497        self.known_thread_states.clear();
 498    }
 499
 500    fn stop_thread(&mut self, thread_id: ThreadId) {
 501        self.known_thread_states
 502            .insert(thread_id, ThreadStatus::Stopped);
 503    }
 504
 505    fn continue_thread(&mut self, thread_id: ThreadId) {
 506        self.known_thread_states
 507            .insert(thread_id, ThreadStatus::Running);
 508    }
 509
 510    fn process_step(&mut self, thread_id: ThreadId) {
 511        self.known_thread_states
 512            .insert(thread_id, ThreadStatus::Stepping);
 513    }
 514
 515    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 516        self.thread_state(thread_id)
 517            .unwrap_or(ThreadStatus::Running)
 518    }
 519
 520    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 521        self.known_thread_states
 522            .get(&thread_id)
 523            .copied()
 524            .or(self.global_state)
 525    }
 526
 527    fn exit_thread(&mut self, thread_id: ThreadId) {
 528        self.known_thread_states
 529            .insert(thread_id, ThreadStatus::Exited);
 530    }
 531
 532    fn any_stopped_thread(&self) -> bool {
 533        self.global_state
 534            .is_some_and(|state| state == ThreadStatus::Stopped)
 535            || self
 536                .known_thread_states
 537                .values()
 538                .any(|status| *status == ThreadStatus::Stopped)
 539    }
 540}
 541const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 542
 543type IsEnabled = bool;
 544
 545#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 546pub struct OutputToken(pub usize);
 547/// Represents a current state of a single debug adapter and provides ways to mutate it.
 548pub struct Session {
 549    mode: Mode,
 550    definition: DebugTaskDefinition,
 551    pub(super) capabilities: Capabilities,
 552    id: SessionId,
 553    child_session_ids: HashSet<SessionId>,
 554    parent_session: Option<Entity<Session>>,
 555    ignore_breakpoints: bool,
 556    modules: Vec<dap::Module>,
 557    loaded_sources: Vec<dap::Source>,
 558    output_token: OutputToken,
 559    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 560    threads: IndexMap<ThreadId, Thread>,
 561    thread_states: ThreadStates,
 562    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 563    stack_frames: IndexMap<StackFrameId, StackFrame>,
 564    locations: HashMap<u64, dap::LocationsResponse>,
 565    is_session_terminated: bool,
 566    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 567    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 568    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 569    background_tasks: Vec<Task<()>>,
 570}
 571
 572trait CacheableCommand: Any + Send + Sync {
 573    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 574    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 575    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 576}
 577
 578impl<T> CacheableCommand for T
 579where
 580    T: DapCommand + PartialEq + Eq + Hash,
 581{
 582    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 583        (rhs as &dyn Any)
 584            .downcast_ref::<Self>()
 585            .map_or(false, |rhs| self == rhs)
 586    }
 587
 588    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 589        T::hash(self, &mut hasher);
 590    }
 591
 592    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 593        self
 594    }
 595}
 596
 597pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 598
 599impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 600    fn from(request: T) -> Self {
 601        Self(Arc::new(request))
 602    }
 603}
 604
 605impl PartialEq for RequestSlot {
 606    fn eq(&self, other: &Self) -> bool {
 607        self.0.dyn_eq(other.0.as_ref())
 608    }
 609}
 610
 611impl Eq for RequestSlot {}
 612
 613impl Hash for RequestSlot {
 614    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 615        self.0.dyn_hash(state);
 616        (&*self.0 as &dyn Any).type_id().hash(state)
 617    }
 618}
 619
 620#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 621pub struct CompletionsQuery {
 622    pub query: String,
 623    pub column: u64,
 624    pub line: Option<u64>,
 625    pub frame_id: Option<u64>,
 626}
 627
 628impl CompletionsQuery {
 629    pub fn new(
 630        buffer: &language::Buffer,
 631        cursor_position: language::Anchor,
 632        frame_id: Option<u64>,
 633    ) -> Self {
 634        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 635        Self {
 636            query: buffer.text(),
 637            column: column as u64,
 638            frame_id,
 639            line: Some(row as u64),
 640        }
 641    }
 642}
 643
 644#[derive(Debug)]
 645pub enum SessionEvent {
 646    Modules,
 647    LoadedSources,
 648    Stopped(Option<ThreadId>),
 649    StackTrace,
 650    Variables,
 651    Threads,
 652    CapabilitiesLoaded,
 653}
 654
 655#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 656pub enum SessionStateEvent {
 657    Running,
 658    Shutdown,
 659    Restart,
 660}
 661
 662impl EventEmitter<SessionEvent> for Session {}
 663impl EventEmitter<SessionStateEvent> for Session {}
 664
 665// local session will send breakpoint updates to DAP for all new breakpoints
 666// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 667// BreakpointStore notifies session on breakpoint changes
 668impl Session {
 669    pub(crate) fn new(
 670        breakpoint_store: Entity<BreakpointStore>,
 671        session_id: SessionId,
 672        parent_session: Option<Entity<Session>>,
 673        template: DebugTaskDefinition,
 674        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 675        cx: &mut App,
 676    ) -> Entity<Self> {
 677        cx.new::<Self>(|cx| {
 678            cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 679                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 680                    if let Some(local) = (!this.ignore_breakpoints)
 681                        .then(|| this.as_local_mut())
 682                        .flatten()
 683                    {
 684                        local
 685                            .send_breakpoints_from_path(path.clone(), *reason, cx)
 686                            .detach();
 687                    };
 688                }
 689                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 690                    if let Some(local) = (!this.ignore_breakpoints)
 691                        .then(|| this.as_local_mut())
 692                        .flatten()
 693                    {
 694                        local.unset_breakpoints_from_paths(paths, cx).detach();
 695                    }
 696                }
 697                BreakpointStoreEvent::ActiveDebugLineChanged => {}
 698            })
 699            .detach();
 700
 701            let this = Self {
 702                mode: Mode::Building,
 703                id: session_id,
 704                child_session_ids: HashSet::default(),
 705                parent_session,
 706                capabilities: Capabilities::default(),
 707                ignore_breakpoints: false,
 708                variables: Default::default(),
 709                stack_frames: Default::default(),
 710                thread_states: ThreadStates::default(),
 711                output_token: OutputToken(0),
 712                output: circular_buffer::CircularBuffer::boxed(),
 713                requests: HashMap::default(),
 714                modules: Vec::default(),
 715                loaded_sources: Vec::default(),
 716                threads: IndexMap::default(),
 717                background_tasks: Vec::default(),
 718                locations: Default::default(),
 719                is_session_terminated: false,
 720                exception_breakpoints: Default::default(),
 721                definition: template,
 722                start_debugging_requests_tx,
 723            };
 724
 725            this
 726        })
 727    }
 728
 729    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 730        match &self.mode {
 731            Mode::Building => None,
 732            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 733        }
 734    }
 735
 736    pub fn boot(
 737        &mut self,
 738        binary: DebugAdapterBinary,
 739        worktree: Entity<Worktree>,
 740        breakpoint_store: Entity<BreakpointStore>,
 741        dap_store: WeakEntity<DapStore>,
 742        cx: &mut Context<Self>,
 743    ) -> Task<Result<()>> {
 744        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 745        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 746        let session_id = self.session_id();
 747
 748        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 749            let mut initialized_tx = Some(initialized_tx);
 750            while let Some(message) = message_rx.next().await {
 751                if let Message::Event(event) = message {
 752                    if let Events::Initialized(_) = *event {
 753                        if let Some(tx) = initialized_tx.take() {
 754                            tx.send(()).ok();
 755                        }
 756                    } else {
 757                        let Ok(_) = this.update(cx, |session, cx| {
 758                            session.handle_dap_event(event, cx);
 759                        }) else {
 760                            break;
 761                        };
 762                    }
 763                } else {
 764                    let Ok(Ok(_)) = this.update(cx, |this, _| {
 765                        this.start_debugging_requests_tx
 766                            .unbounded_send((session_id, message))
 767                    }) else {
 768                        break;
 769                    };
 770                }
 771            }
 772        })];
 773        self.background_tasks = background_tasks;
 774        let id = self.id;
 775        let parent_session = self.parent_session.clone();
 776
 777        cx.spawn(async move |this, cx| {
 778            let mode = LocalMode::new(
 779                id,
 780                parent_session,
 781                worktree.downgrade(),
 782                breakpoint_store.clone(),
 783                binary,
 784                message_tx,
 785                cx.clone(),
 786            )
 787            .await?;
 788            this.update(cx, |this, cx| {
 789                this.mode = Mode::Running(mode);
 790                cx.emit(SessionStateEvent::Running);
 791            })?;
 792
 793            this.update(cx, |session, cx| session.request_initialize(cx))?
 794                .await?;
 795
 796            this.update(cx, |session, cx| {
 797                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 798            })?
 799            .await
 800        })
 801    }
 802
 803    pub fn session_id(&self) -> SessionId {
 804        self.id
 805    }
 806
 807    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 808        self.child_session_ids.clone()
 809    }
 810
 811    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 812        self.child_session_ids.insert(session_id);
 813    }
 814
 815    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 816        self.child_session_ids.remove(&session_id);
 817    }
 818
 819    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 820        self.parent_session
 821            .as_ref()
 822            .map(|session| session.read(cx).id)
 823    }
 824
 825    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 826        self.parent_session.as_ref()
 827    }
 828
 829    pub fn capabilities(&self) -> &Capabilities {
 830        &self.capabilities
 831    }
 832
 833    pub(crate) fn root_binary(&self) -> Arc<DebugAdapterBinary> {
 834        match &self.mode {
 835            Mode::Building => {
 836                // todo(debugger): Implement root_binary for building mode
 837                unimplemented!()
 838            }
 839            Mode::Running(running) => running
 840                .root_binary
 841                .clone()
 842                .unwrap_or_else(|| Arc::new(running.binary.clone())),
 843        }
 844    }
 845
 846    pub fn binary(&self) -> &DebugAdapterBinary {
 847        let Mode::Running(local_mode) = &self.mode else {
 848            panic!("Session is not local");
 849        };
 850        &local_mode.binary
 851    }
 852
 853    pub fn adapter_name(&self) -> SharedString {
 854        self.definition.adapter.clone().into()
 855    }
 856
 857    pub fn label(&self) -> String {
 858        self.definition.label.clone()
 859    }
 860
 861    pub fn definition(&self) -> DebugTaskDefinition {
 862        self.definition.clone()
 863    }
 864
 865    pub fn is_terminated(&self) -> bool {
 866        self.is_session_terminated
 867    }
 868
 869    pub fn is_local(&self) -> bool {
 870        matches!(self.mode, Mode::Running(_))
 871    }
 872
 873    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 874        match &mut self.mode {
 875            Mode::Running(local_mode) => Some(local_mode),
 876            Mode::Building => None,
 877        }
 878    }
 879
 880    pub fn as_local(&self) -> Option<&LocalMode> {
 881        match &self.mode {
 882            Mode::Running(local_mode) => Some(local_mode),
 883            Mode::Building => None,
 884        }
 885    }
 886
 887    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 888        let adapter_id = self.definition.adapter.clone();
 889        let request = Initialize { adapter_id };
 890        match &self.mode {
 891            Mode::Running(local_mode) => {
 892                let capabilities = local_mode.request(request, cx.background_executor().clone());
 893
 894                cx.spawn(async move |this, cx| {
 895                    let capabilities = capabilities.await?;
 896                    this.update(cx, |session, cx| {
 897                        session.capabilities = capabilities;
 898                        let filters = session
 899                            .capabilities
 900                            .exception_breakpoint_filters
 901                            .clone()
 902                            .unwrap_or_default();
 903                        for filter in filters {
 904                            let default = filter.default.unwrap_or_default();
 905                            session
 906                                .exception_breakpoints
 907                                .entry(filter.filter.clone())
 908                                .or_insert_with(|| (filter, default));
 909                        }
 910                        cx.emit(SessionEvent::CapabilitiesLoaded);
 911                    })?;
 912                    Ok(())
 913                })
 914            }
 915            Mode::Building => Task::ready(Err(anyhow!(
 916                "Cannot send initialize request, task still building"
 917            ))),
 918        }
 919    }
 920
 921    pub(super) fn initialize_sequence(
 922        &mut self,
 923        initialize_rx: oneshot::Receiver<()>,
 924        dap_store: WeakEntity<DapStore>,
 925        cx: &mut Context<Self>,
 926    ) -> Task<Result<()>> {
 927        match &self.mode {
 928            Mode::Running(local_mode) => local_mode.initialize_sequence(
 929                &self.capabilities,
 930                &self.definition,
 931                initialize_rx,
 932                dap_store,
 933                cx,
 934            ),
 935            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
 936        }
 937    }
 938
 939    pub fn run_to_position(
 940        &mut self,
 941        breakpoint: SourceBreakpoint,
 942        active_thread_id: ThreadId,
 943        cx: &mut Context<Self>,
 944    ) {
 945        match &mut self.mode {
 946            Mode::Running(local_mode) => {
 947                if !matches!(
 948                    self.thread_states.thread_state(active_thread_id),
 949                    Some(ThreadStatus::Stopped)
 950                ) {
 951                    return;
 952                };
 953                let path = breakpoint.path.clone();
 954                local_mode.tmp_breakpoint = Some(breakpoint);
 955                let task = local_mode.send_breakpoints_from_path(
 956                    path,
 957                    BreakpointUpdatedReason::Toggled,
 958                    cx,
 959                );
 960
 961                cx.spawn(async move |this, cx| {
 962                    task.await;
 963                    this.update(cx, |this, cx| {
 964                        this.continue_thread(active_thread_id, cx);
 965                    })
 966                })
 967                .detach();
 968            }
 969            Mode::Building => {}
 970        }
 971    }
 972
 973    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
 974        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
 975    }
 976
 977    pub fn output(
 978        &self,
 979        since: OutputToken,
 980    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 981        if self.output_token.0 == 0 {
 982            return (self.output.range(0..0), OutputToken(0));
 983        };
 984
 985        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 986
 987        let clamped_events_since = events_since.clamp(0, self.output.len());
 988        (
 989            self.output
 990                .range(self.output.len() - clamped_events_since..),
 991            self.output_token,
 992        )
 993    }
 994
 995    pub fn respond_to_client(
 996        &self,
 997        request_seq: u64,
 998        success: bool,
 999        command: String,
1000        body: Option<serde_json::Value>,
1001        cx: &mut Context<Self>,
1002    ) -> Task<Result<()>> {
1003        let Some(local_session) = self.as_local() else {
1004            unreachable!("Cannot respond to remote client");
1005        };
1006        let client = local_session.client.clone();
1007
1008        cx.background_spawn(async move {
1009            client
1010                .send_message(Message::Response(Response {
1011                    body,
1012                    success,
1013                    command,
1014                    seq: request_seq + 1,
1015                    request_seq,
1016                    message: None,
1017                }))
1018                .await
1019        })
1020    }
1021
1022    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1023        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1024            let breakpoint = local.tmp_breakpoint.take()?;
1025            let path = breakpoint.path.clone();
1026            Some((local, path))
1027        }) {
1028            local
1029                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1030                .detach();
1031        };
1032
1033        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1034            self.thread_states.stop_all_threads();
1035
1036            self.invalidate_command_type::<StackTraceCommand>();
1037        }
1038
1039        // Event if we stopped all threads we still need to insert the thread_id
1040        // to our own data
1041        if let Some(thread_id) = event.thread_id {
1042            self.thread_states.stop_thread(ThreadId(thread_id));
1043
1044            self.invalidate_state(
1045                &StackTraceCommand {
1046                    thread_id,
1047                    start_frame: None,
1048                    levels: None,
1049                }
1050                .into(),
1051            );
1052        }
1053
1054        self.invalidate_generic();
1055        self.threads.clear();
1056        self.variables.clear();
1057        cx.emit(SessionEvent::Stopped(
1058            event
1059                .thread_id
1060                .map(Into::into)
1061                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1062        ));
1063        cx.notify();
1064    }
1065
1066    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1067        match *event {
1068            Events::Initialized(_) => {
1069                debug_assert!(
1070                    false,
1071                    "Initialized event should have been handled in LocalMode"
1072                );
1073            }
1074            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1075            Events::Continued(event) => {
1076                if event.all_threads_continued.unwrap_or_default() {
1077                    self.thread_states.continue_all_threads();
1078                } else {
1079                    self.thread_states
1080                        .continue_thread(ThreadId(event.thread_id));
1081                }
1082                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1083                self.invalidate_generic();
1084            }
1085            Events::Exited(_event) => {
1086                self.clear_active_debug_line(cx);
1087            }
1088            Events::Terminated(_) => {
1089                self.is_session_terminated = true;
1090                self.clear_active_debug_line(cx);
1091            }
1092            Events::Thread(event) => {
1093                let thread_id = ThreadId(event.thread_id);
1094
1095                match event.reason {
1096                    dap::ThreadEventReason::Started => {
1097                        self.thread_states.continue_thread(thread_id);
1098                    }
1099                    dap::ThreadEventReason::Exited => {
1100                        self.thread_states.exit_thread(thread_id);
1101                    }
1102                    reason => {
1103                        log::error!("Unhandled thread event reason {:?}", reason);
1104                    }
1105                }
1106                self.invalidate_state(&ThreadsCommand.into());
1107                cx.notify();
1108            }
1109            Events::Output(event) => {
1110                if event
1111                    .category
1112                    .as_ref()
1113                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1114                {
1115                    return;
1116                }
1117
1118                self.output.push_back(event);
1119                self.output_token.0 += 1;
1120                cx.notify();
1121            }
1122            Events::Breakpoint(_) => {}
1123            Events::Module(event) => {
1124                match event.reason {
1125                    dap::ModuleEventReason::New => {
1126                        self.modules.push(event.module);
1127                    }
1128                    dap::ModuleEventReason::Changed => {
1129                        if let Some(module) = self
1130                            .modules
1131                            .iter_mut()
1132                            .find(|other| event.module.id == other.id)
1133                        {
1134                            *module = event.module;
1135                        }
1136                    }
1137                    dap::ModuleEventReason::Removed => {
1138                        self.modules.retain(|other| event.module.id != other.id);
1139                    }
1140                }
1141
1142                // todo(debugger): We should only send the invalidate command to downstream clients.
1143                // self.invalidate_state(&ModulesCommand.into());
1144            }
1145            Events::LoadedSource(_) => {
1146                self.invalidate_state(&LoadedSourcesCommand.into());
1147            }
1148            Events::Capabilities(event) => {
1149                self.capabilities = self.capabilities.merge(event.capabilities);
1150                cx.notify();
1151            }
1152            Events::Memory(_) => {}
1153            Events::Process(_) => {}
1154            Events::ProgressEnd(_) => {}
1155            Events::ProgressStart(_) => {}
1156            Events::ProgressUpdate(_) => {}
1157            Events::Invalidated(_) => {}
1158            Events::Other(_) => {}
1159        }
1160    }
1161
1162    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1163    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1164        &mut self,
1165        request: T,
1166        process_result: impl FnOnce(
1167            &mut Self,
1168            Result<T::Response>,
1169            &mut Context<Self>,
1170        ) -> Option<T::Response>
1171        + 'static,
1172        cx: &mut Context<Self>,
1173    ) {
1174        const {
1175            assert!(
1176                T::CACHEABLE,
1177                "Only requests marked as cacheable should invoke `fetch`"
1178            );
1179        }
1180
1181        if !self.thread_states.any_stopped_thread()
1182            && request.type_id() != TypeId::of::<ThreadsCommand>()
1183            || self.is_session_terminated
1184        {
1185            return;
1186        }
1187
1188        let request_map = self
1189            .requests
1190            .entry(std::any::TypeId::of::<T>())
1191            .or_default();
1192
1193        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1194            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1195
1196            let task = Self::request_inner::<Arc<T>>(
1197                &self.capabilities,
1198                &self.mode,
1199                command,
1200                process_result,
1201                cx,
1202            );
1203            let task = cx
1204                .background_executor()
1205                .spawn(async move {
1206                    let _ = task.await?;
1207                    Some(())
1208                })
1209                .shared();
1210
1211            vacant.insert(task);
1212            cx.notify();
1213        }
1214    }
1215
1216    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1217        capabilities: &Capabilities,
1218        mode: &Mode,
1219        request: T,
1220        process_result: impl FnOnce(
1221            &mut Self,
1222            Result<T::Response>,
1223            &mut Context<Self>,
1224        ) -> Option<T::Response>
1225        + 'static,
1226        cx: &mut Context<Self>,
1227    ) -> Task<Option<T::Response>> {
1228        if !T::is_supported(&capabilities) {
1229            log::warn!(
1230                "Attempted to send a DAP request that isn't supported: {:?}",
1231                request
1232            );
1233            let error = Err(anyhow::Error::msg(
1234                "Couldn't complete request because it's not supported",
1235            ));
1236            return cx.spawn(async move |this, cx| {
1237                this.update(cx, |this, cx| process_result(this, error, cx))
1238                    .log_err()
1239                    .flatten()
1240            });
1241        }
1242
1243        let request = mode.request_dap(request, cx);
1244        cx.spawn(async move |this, cx| {
1245            let result = request.await;
1246            this.update(cx, |this, cx| process_result(this, result, cx))
1247                .log_err()
1248                .flatten()
1249        })
1250    }
1251
1252    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1253        &self,
1254        request: T,
1255        process_result: impl FnOnce(
1256            &mut Self,
1257            Result<T::Response>,
1258            &mut Context<Self>,
1259        ) -> Option<T::Response>
1260        + 'static,
1261        cx: &mut Context<Self>,
1262    ) -> Task<Option<T::Response>> {
1263        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1264    }
1265
1266    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1267        self.requests.remove(&std::any::TypeId::of::<Command>());
1268    }
1269
1270    fn invalidate_generic(&mut self) {
1271        self.invalidate_command_type::<ModulesCommand>();
1272        self.invalidate_command_type::<LoadedSourcesCommand>();
1273        self.invalidate_command_type::<ThreadsCommand>();
1274    }
1275
1276    fn invalidate_state(&mut self, key: &RequestSlot) {
1277        self.requests
1278            .entry((&*key.0 as &dyn Any).type_id())
1279            .and_modify(|request_map| {
1280                request_map.remove(&key);
1281            });
1282    }
1283
1284    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1285        self.thread_states.thread_status(thread_id)
1286    }
1287
1288    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1289        self.fetch(
1290            dap_command::ThreadsCommand,
1291            |this, result, cx| {
1292                let result = result.log_err()?;
1293
1294                this.threads = result
1295                    .iter()
1296                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1297                    .collect();
1298
1299                this.invalidate_command_type::<StackTraceCommand>();
1300                cx.emit(SessionEvent::Threads);
1301                cx.notify();
1302
1303                Some(result)
1304            },
1305            cx,
1306        );
1307
1308        self.threads
1309            .values()
1310            .map(|thread| {
1311                (
1312                    thread.dap.clone(),
1313                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1314                )
1315            })
1316            .collect()
1317    }
1318
1319    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1320        self.fetch(
1321            dap_command::ModulesCommand,
1322            |this, result, cx| {
1323                let result = result.log_err()?;
1324
1325                this.modules = result.iter().cloned().collect();
1326                cx.emit(SessionEvent::Modules);
1327                cx.notify();
1328
1329                Some(result)
1330            },
1331            cx,
1332        );
1333
1334        &self.modules
1335    }
1336
1337    pub fn ignore_breakpoints(&self) -> bool {
1338        self.ignore_breakpoints
1339    }
1340
1341    pub fn toggle_ignore_breakpoints(
1342        &mut self,
1343        cx: &mut App,
1344    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1345        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1346    }
1347
1348    pub(crate) fn set_ignore_breakpoints(
1349        &mut self,
1350        ignore: bool,
1351        cx: &mut App,
1352    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1353        if self.ignore_breakpoints == ignore {
1354            return Task::ready(HashMap::default());
1355        }
1356
1357        self.ignore_breakpoints = ignore;
1358
1359        if let Some(local) = self.as_local() {
1360            local.send_source_breakpoints(ignore, cx)
1361        } else {
1362            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1363            unimplemented!()
1364        }
1365    }
1366
1367    pub fn exception_breakpoints(
1368        &self,
1369    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1370        self.exception_breakpoints.values()
1371    }
1372
1373    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1374        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1375            *is_enabled = !*is_enabled;
1376            self.send_exception_breakpoints(cx);
1377        }
1378    }
1379
1380    fn send_exception_breakpoints(&mut self, cx: &App) {
1381        if let Some(local) = self.as_local() {
1382            let exception_filters = self
1383                .exception_breakpoints
1384                .values()
1385                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1386                .collect();
1387
1388            let supports_exception_filters = self
1389                .capabilities
1390                .supports_exception_filter_options
1391                .unwrap_or_default();
1392            local
1393                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1394                .detach_and_log_err(cx);
1395        } else {
1396            debug_assert!(false, "Not implemented");
1397        }
1398    }
1399
1400    pub fn breakpoints_enabled(&self) -> bool {
1401        self.ignore_breakpoints
1402    }
1403
1404    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1405        self.fetch(
1406            dap_command::LoadedSourcesCommand,
1407            |this, result, cx| {
1408                let result = result.log_err()?;
1409                this.loaded_sources = result.iter().cloned().collect();
1410                cx.emit(SessionEvent::LoadedSources);
1411                cx.notify();
1412                Some(result)
1413            },
1414            cx,
1415        );
1416
1417        &self.loaded_sources
1418    }
1419
1420    fn fallback_to_manual_restart(
1421        &mut self,
1422        res: Result<()>,
1423        cx: &mut Context<Self>,
1424    ) -> Option<()> {
1425        if res.log_err().is_none() {
1426            cx.emit(SessionStateEvent::Restart);
1427            return None;
1428        }
1429        Some(())
1430    }
1431
1432    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1433        res.log_err()?;
1434        Some(())
1435    }
1436
1437    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1438        thread_id: ThreadId,
1439    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1440    {
1441        move |this, response, cx| match response.log_err() {
1442            Some(response) => Some(response),
1443            None => {
1444                this.thread_states.stop_thread(thread_id);
1445                cx.notify();
1446                None
1447            }
1448        }
1449    }
1450
1451    fn clear_active_debug_line_response(
1452        &mut self,
1453        response: Result<()>,
1454        cx: &mut Context<Session>,
1455    ) -> Option<()> {
1456        response.log_err()?;
1457        self.clear_active_debug_line(cx);
1458        Some(())
1459    }
1460
1461    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1462        self.as_local()
1463            .expect("Message handler will only run in local mode")
1464            .breakpoint_store
1465            .update(cx, |store, cx| {
1466                store.remove_active_position(Some(self.id), cx)
1467            });
1468    }
1469
1470    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1471        self.request(
1472            PauseCommand {
1473                thread_id: thread_id.0,
1474            },
1475            Self::empty_response,
1476            cx,
1477        )
1478        .detach();
1479    }
1480
1481    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1482        self.request(
1483            RestartStackFrameCommand { stack_frame_id },
1484            Self::empty_response,
1485            cx,
1486        )
1487        .detach();
1488    }
1489
1490    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1491        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1492            self.request(
1493                RestartCommand {
1494                    raw: args.unwrap_or(Value::Null),
1495                },
1496                Self::fallback_to_manual_restart,
1497                cx,
1498            )
1499            .detach();
1500        } else {
1501            cx.emit(SessionStateEvent::Restart);
1502        }
1503    }
1504
1505    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1506        self.is_session_terminated = true;
1507        self.thread_states.exit_all_threads();
1508        cx.notify();
1509
1510        let task = if self
1511            .capabilities
1512            .supports_terminate_request
1513            .unwrap_or_default()
1514        {
1515            self.request(
1516                TerminateCommand {
1517                    restart: Some(false),
1518                },
1519                Self::clear_active_debug_line_response,
1520                cx,
1521            )
1522        } else {
1523            self.request(
1524                DisconnectCommand {
1525                    restart: Some(false),
1526                    terminate_debuggee: Some(true),
1527                    suspend_debuggee: Some(false),
1528                },
1529                Self::clear_active_debug_line_response,
1530                cx,
1531            )
1532        };
1533
1534        cx.emit(SessionStateEvent::Shutdown);
1535
1536        let debug_client = self.adapter_client();
1537
1538        cx.background_spawn(async move {
1539            let _ = task.await;
1540
1541            if let Some(client) = debug_client {
1542                client.shutdown().await.log_err();
1543            }
1544        })
1545    }
1546
1547    pub fn completions(
1548        &mut self,
1549        query: CompletionsQuery,
1550        cx: &mut Context<Self>,
1551    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1552        let task = self.request(query, |_, result, _| result.log_err(), cx);
1553
1554        cx.background_executor().spawn(async move {
1555            anyhow::Ok(
1556                task.await
1557                    .map(|response| response.targets)
1558                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1559            )
1560        })
1561    }
1562
1563    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1564        self.thread_states.continue_thread(thread_id);
1565        self.request(
1566            ContinueCommand {
1567                args: ContinueArguments {
1568                    thread_id: thread_id.0,
1569                    single_thread: Some(true),
1570                },
1571            },
1572            Self::on_step_response::<ContinueCommand>(thread_id),
1573            cx,
1574        )
1575        .detach();
1576    }
1577
1578    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1579        match self.mode {
1580            Mode::Running(ref local) => Some(local.client.clone()),
1581            Mode::Building => None,
1582        }
1583    }
1584
1585    pub fn step_over(
1586        &mut self,
1587        thread_id: ThreadId,
1588        granularity: SteppingGranularity,
1589        cx: &mut Context<Self>,
1590    ) {
1591        let supports_single_thread_execution_requests =
1592            self.capabilities.supports_single_thread_execution_requests;
1593        let supports_stepping_granularity = self
1594            .capabilities
1595            .supports_stepping_granularity
1596            .unwrap_or_default();
1597
1598        let command = NextCommand {
1599            inner: StepCommand {
1600                thread_id: thread_id.0,
1601                granularity: supports_stepping_granularity.then(|| granularity),
1602                single_thread: supports_single_thread_execution_requests,
1603            },
1604        };
1605
1606        self.thread_states.process_step(thread_id);
1607        self.request(
1608            command,
1609            Self::on_step_response::<NextCommand>(thread_id),
1610            cx,
1611        )
1612        .detach();
1613    }
1614
1615    pub fn step_in(
1616        &mut self,
1617        thread_id: ThreadId,
1618        granularity: SteppingGranularity,
1619        cx: &mut Context<Self>,
1620    ) {
1621        let supports_single_thread_execution_requests =
1622            self.capabilities.supports_single_thread_execution_requests;
1623        let supports_stepping_granularity = self
1624            .capabilities
1625            .supports_stepping_granularity
1626            .unwrap_or_default();
1627
1628        let command = StepInCommand {
1629            inner: StepCommand {
1630                thread_id: thread_id.0,
1631                granularity: supports_stepping_granularity.then(|| granularity),
1632                single_thread: supports_single_thread_execution_requests,
1633            },
1634        };
1635
1636        self.thread_states.process_step(thread_id);
1637        self.request(
1638            command,
1639            Self::on_step_response::<StepInCommand>(thread_id),
1640            cx,
1641        )
1642        .detach();
1643    }
1644
1645    pub fn step_out(
1646        &mut self,
1647        thread_id: ThreadId,
1648        granularity: SteppingGranularity,
1649        cx: &mut Context<Self>,
1650    ) {
1651        let supports_single_thread_execution_requests =
1652            self.capabilities.supports_single_thread_execution_requests;
1653        let supports_stepping_granularity = self
1654            .capabilities
1655            .supports_stepping_granularity
1656            .unwrap_or_default();
1657
1658        let command = StepOutCommand {
1659            inner: StepCommand {
1660                thread_id: thread_id.0,
1661                granularity: supports_stepping_granularity.then(|| granularity),
1662                single_thread: supports_single_thread_execution_requests,
1663            },
1664        };
1665
1666        self.thread_states.process_step(thread_id);
1667        self.request(
1668            command,
1669            Self::on_step_response::<StepOutCommand>(thread_id),
1670            cx,
1671        )
1672        .detach();
1673    }
1674
1675    pub fn step_back(
1676        &mut self,
1677        thread_id: ThreadId,
1678        granularity: SteppingGranularity,
1679        cx: &mut Context<Self>,
1680    ) {
1681        let supports_single_thread_execution_requests =
1682            self.capabilities.supports_single_thread_execution_requests;
1683        let supports_stepping_granularity = self
1684            .capabilities
1685            .supports_stepping_granularity
1686            .unwrap_or_default();
1687
1688        let command = StepBackCommand {
1689            inner: StepCommand {
1690                thread_id: thread_id.0,
1691                granularity: supports_stepping_granularity.then(|| granularity),
1692                single_thread: supports_single_thread_execution_requests,
1693            },
1694        };
1695
1696        self.thread_states.process_step(thread_id);
1697
1698        self.request(
1699            command,
1700            Self::on_step_response::<StepBackCommand>(thread_id),
1701            cx,
1702        )
1703        .detach();
1704    }
1705
1706    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1707        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1708            && self.requests.contains_key(&ThreadsCommand.type_id())
1709            && self.threads.contains_key(&thread_id)
1710        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1711        // We could still be using an old thread id and have sent a new thread's request
1712        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1713        // But it very well could cause a minor bug in the future that is hard to track down
1714        {
1715            self.fetch(
1716                super::dap_command::StackTraceCommand {
1717                    thread_id: thread_id.0,
1718                    start_frame: None,
1719                    levels: None,
1720                },
1721                move |this, stack_frames, cx| {
1722                    let stack_frames = stack_frames.log_err()?;
1723
1724                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1725                        thread.stack_frame_ids =
1726                            stack_frames.iter().map(|frame| frame.id).collect();
1727                    });
1728                    debug_assert!(
1729                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1730                        "Sent request for thread_id that doesn't exist"
1731                    );
1732
1733                    this.stack_frames.extend(
1734                        stack_frames
1735                            .iter()
1736                            .cloned()
1737                            .map(|frame| (frame.id, StackFrame::from(frame))),
1738                    );
1739
1740                    this.invalidate_command_type::<ScopesCommand>();
1741                    this.invalidate_command_type::<VariablesCommand>();
1742
1743                    cx.emit(SessionEvent::StackTrace);
1744                    cx.notify();
1745                    Some(stack_frames)
1746                },
1747                cx,
1748            );
1749        }
1750
1751        self.threads
1752            .get(&thread_id)
1753            .map(|thread| {
1754                thread
1755                    .stack_frame_ids
1756                    .iter()
1757                    .filter_map(|id| self.stack_frames.get(id))
1758                    .cloned()
1759                    .collect()
1760            })
1761            .unwrap_or_default()
1762    }
1763
1764    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1765        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1766            && self
1767                .requests
1768                .contains_key(&TypeId::of::<StackTraceCommand>())
1769        {
1770            self.fetch(
1771                ScopesCommand { stack_frame_id },
1772                move |this, scopes, cx| {
1773                    let scopes = scopes.log_err()?;
1774
1775                    for scope in scopes .iter(){
1776                        this.variables(scope.variables_reference, cx);
1777                    }
1778
1779                    let entry = this
1780                        .stack_frames
1781                        .entry(stack_frame_id)
1782                        .and_modify(|stack_frame| {
1783                            stack_frame.scopes = scopes.clone();
1784                        });
1785
1786                    cx.emit(SessionEvent::Variables);
1787
1788                    debug_assert!(
1789                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1790                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1791                    );
1792
1793                    Some(scopes)
1794                },
1795                cx,
1796            );
1797        }
1798
1799        self.stack_frames
1800            .get(&stack_frame_id)
1801            .map(|frame| frame.scopes.as_slice())
1802            .unwrap_or_default()
1803    }
1804
1805    pub fn variables(
1806        &mut self,
1807        variables_reference: VariableReference,
1808        cx: &mut Context<Self>,
1809    ) -> Vec<dap::Variable> {
1810        let command = VariablesCommand {
1811            variables_reference,
1812            filter: None,
1813            start: None,
1814            count: None,
1815            format: None,
1816        };
1817
1818        self.fetch(
1819            command,
1820            move |this, variables, cx| {
1821                let variables = variables.log_err()?;
1822                this.variables
1823                    .insert(variables_reference, variables.clone());
1824
1825                cx.emit(SessionEvent::Variables);
1826                Some(variables)
1827            },
1828            cx,
1829        );
1830
1831        self.variables
1832            .get(&variables_reference)
1833            .cloned()
1834            .unwrap_or_default()
1835    }
1836
1837    pub fn set_variable_value(
1838        &mut self,
1839        variables_reference: u64,
1840        name: String,
1841        value: String,
1842        cx: &mut Context<Self>,
1843    ) {
1844        if self.capabilities.supports_set_variable.unwrap_or_default() {
1845            self.request(
1846                SetVariableValueCommand {
1847                    name,
1848                    value,
1849                    variables_reference,
1850                },
1851                move |this, response, cx| {
1852                    let response = response.log_err()?;
1853                    this.invalidate_command_type::<VariablesCommand>();
1854                    cx.notify();
1855                    Some(response)
1856                },
1857                cx,
1858            )
1859            .detach()
1860        }
1861    }
1862
1863    pub fn evaluate(
1864        &mut self,
1865        expression: String,
1866        context: Option<EvaluateArgumentsContext>,
1867        frame_id: Option<u64>,
1868        source: Option<Source>,
1869        cx: &mut Context<Self>,
1870    ) {
1871        self.request(
1872            EvaluateCommand {
1873                expression,
1874                context,
1875                frame_id,
1876                source,
1877            },
1878            |this, response, cx| {
1879                let response = response.log_err()?;
1880                this.output_token.0 += 1;
1881                this.output.push_back(dap::OutputEvent {
1882                    category: None,
1883                    output: response.result.clone(),
1884                    group: None,
1885                    variables_reference: Some(response.variables_reference),
1886                    source: None,
1887                    line: None,
1888                    column: None,
1889                    data: None,
1890                    location_reference: None,
1891                });
1892
1893                this.invalidate_command_type::<ScopesCommand>();
1894                cx.notify();
1895                Some(response)
1896            },
1897            cx,
1898        )
1899        .detach();
1900    }
1901
1902    pub fn location(
1903        &mut self,
1904        reference: u64,
1905        cx: &mut Context<Self>,
1906    ) -> Option<dap::LocationsResponse> {
1907        self.fetch(
1908            LocationsCommand { reference },
1909            move |this, response, _| {
1910                let response = response.log_err()?;
1911                this.locations.insert(reference, response.clone());
1912                Some(response)
1913            },
1914            cx,
1915        );
1916        self.locations.get(&reference).cloned()
1917    }
1918    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1919        let command = DisconnectCommand {
1920            restart: Some(false),
1921            terminate_debuggee: Some(true),
1922            suspend_debuggee: Some(false),
1923        };
1924
1925        self.request(command, Self::empty_response, cx).detach()
1926    }
1927
1928    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1929        if self
1930            .capabilities
1931            .supports_terminate_threads_request
1932            .unwrap_or_default()
1933        {
1934            self.request(
1935                TerminateThreadsCommand {
1936                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1937                },
1938                Self::clear_active_debug_line_response,
1939                cx,
1940            )
1941            .detach();
1942        } else {
1943            self.shutdown(cx).detach();
1944        }
1945    }
1946}