session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::DebugAdapterBinary;
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22};
  23use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
  24use futures::channel::oneshot;
  25use futures::{FutureExt, future::Shared};
  26use gpui::{
  27    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  28    Task, WeakEntity,
  29};
  30
  31use serde_json::{Value, json};
  32use smol::stream::StreamExt;
  33use std::any::TypeId;
  34use std::collections::BTreeMap;
  35use std::u64;
  36use std::{
  37    any::Any,
  38    collections::hash_map::Entry,
  39    hash::{Hash, Hasher},
  40    path::Path,
  41    sync::Arc,
  42};
  43use task::DebugTaskDefinition;
  44use text::{PointUtf16, ToPointUtf16};
  45use util::{ResultExt, merge_json_value_into};
  46use worktree::Worktree;
  47
  48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  49#[repr(transparent)]
  50pub struct ThreadId(pub u64);
  51
  52impl ThreadId {
  53    pub const MIN: ThreadId = ThreadId(u64::MIN);
  54    pub const MAX: ThreadId = ThreadId(u64::MAX);
  55}
  56
  57impl From<u64> for ThreadId {
  58    fn from(id: u64) -> Self {
  59        Self(id)
  60    }
  61}
  62
  63#[derive(Clone, Debug)]
  64pub struct StackFrame {
  65    pub dap: dap::StackFrame,
  66    pub scopes: Vec<dap::Scope>,
  67}
  68
  69impl From<dap::StackFrame> for StackFrame {
  70    fn from(stack_frame: dap::StackFrame) -> Self {
  71        Self {
  72            scopes: vec![],
  73            dap: stack_frame,
  74        }
  75    }
  76}
  77
  78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  79pub enum ThreadStatus {
  80    #[default]
  81    Running,
  82    Stopped,
  83    Stepping,
  84    Exited,
  85    Ended,
  86}
  87
  88impl ThreadStatus {
  89    pub fn label(&self) -> &'static str {
  90        match self {
  91            ThreadStatus::Running => "Running",
  92            ThreadStatus::Stopped => "Stopped",
  93            ThreadStatus::Stepping => "Stepping",
  94            ThreadStatus::Exited => "Exited",
  95            ThreadStatus::Ended => "Ended",
  96        }
  97    }
  98}
  99
 100#[derive(Debug)]
 101pub struct Thread {
 102    dap: dap::Thread,
 103    stack_frame_ids: IndexSet<StackFrameId>,
 104    _has_stopped: bool,
 105}
 106
 107impl From<dap::Thread> for Thread {
 108    fn from(dap: dap::Thread) -> Self {
 109        Self {
 110            dap,
 111            stack_frame_ids: Default::default(),
 112            _has_stopped: false,
 113        }
 114    }
 115}
 116
 117enum Mode {
 118    Building,
 119    Running(LocalMode),
 120}
 121
 122#[derive(Clone)]
 123pub struct LocalMode {
 124    client: Arc<DebugAdapterClient>,
 125    binary: DebugAdapterBinary,
 126    root_binary: Option<Arc<DebugAdapterBinary>>,
 127    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 128    tmp_breakpoint: Option<SourceBreakpoint>,
 129    worktree: WeakEntity<Worktree>,
 130}
 131
 132fn client_source(abs_path: &Path) -> dap::Source {
 133    dap::Source {
 134        name: abs_path
 135            .file_name()
 136            .map(|filename| filename.to_string_lossy().to_string()),
 137        path: Some(abs_path.to_string_lossy().to_string()),
 138        source_reference: None,
 139        presentation_hint: None,
 140        origin: None,
 141        sources: None,
 142        adapter_data: None,
 143        checksums: None,
 144    }
 145}
 146
 147impl LocalMode {
 148    async fn new(
 149        session_id: SessionId,
 150        parent_session: Option<Entity<Session>>,
 151        worktree: WeakEntity<Worktree>,
 152        breakpoint_store: Entity<BreakpointStore>,
 153        binary: DebugAdapterBinary,
 154        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 155        cx: AsyncApp,
 156    ) -> Result<Self> {
 157        let message_handler = Box::new(move |message| {
 158            messages_tx.unbounded_send(message).ok();
 159        });
 160
 161        let root_binary = if let Some(parent_session) = parent_session.as_ref() {
 162            Some(parent_session.read_with(&cx, |session, _| session.root_binary().clone())?)
 163        } else {
 164            None
 165        };
 166
 167        let client = Arc::new(
 168            if let Some(client) = parent_session
 169                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 170                .flatten()
 171            {
 172                client
 173                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 174                    .await?
 175            } else {
 176                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 177                    .await
 178                    .with_context(|| "Failed to start communication with debug adapter")?
 179            },
 180        );
 181
 182        Ok(Self {
 183            client,
 184            breakpoint_store,
 185            worktree,
 186            tmp_breakpoint: None,
 187            root_binary,
 188            binary,
 189        })
 190    }
 191
 192    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 193        &self.worktree
 194    }
 195
 196    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 197        let tasks: Vec<_> = paths
 198            .into_iter()
 199            .map(|path| {
 200                self.request(
 201                    dap_command::SetBreakpoints {
 202                        source: client_source(path),
 203                        source_modified: None,
 204                        breakpoints: vec![],
 205                    },
 206                    cx.background_executor().clone(),
 207                )
 208            })
 209            .collect();
 210
 211        cx.background_spawn(async move {
 212            futures::future::join_all(tasks)
 213                .await
 214                .iter()
 215                .for_each(|res| match res {
 216                    Ok(_) => {}
 217                    Err(err) => {
 218                        log::warn!("Set breakpoints request failed: {}", err);
 219                    }
 220                });
 221        })
 222    }
 223
 224    fn send_breakpoints_from_path(
 225        &self,
 226        abs_path: Arc<Path>,
 227        reason: BreakpointUpdatedReason,
 228        cx: &mut App,
 229    ) -> Task<()> {
 230        let breakpoints = self
 231            .breakpoint_store
 232            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 233            .into_iter()
 234            .filter(|bp| bp.state.is_enabled())
 235            .chain(self.tmp_breakpoint.clone())
 236            .map(Into::into)
 237            .collect();
 238
 239        let task = self.request(
 240            dap_command::SetBreakpoints {
 241                source: client_source(&abs_path),
 242                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 243                breakpoints,
 244            },
 245            cx.background_executor().clone(),
 246        );
 247
 248        cx.background_spawn(async move {
 249            match task.await {
 250                Ok(_) => {}
 251                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 252            }
 253        })
 254    }
 255
 256    fn send_exception_breakpoints(
 257        &self,
 258        filters: Vec<ExceptionBreakpointsFilter>,
 259        supports_filter_options: bool,
 260        cx: &App,
 261    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 262        let arg = if supports_filter_options {
 263            SetExceptionBreakpoints::WithOptions {
 264                filters: filters
 265                    .into_iter()
 266                    .map(|filter| ExceptionFilterOptions {
 267                        filter_id: filter.filter,
 268                        condition: None,
 269                        mode: None,
 270                    })
 271                    .collect(),
 272            }
 273        } else {
 274            SetExceptionBreakpoints::Plain {
 275                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 276            }
 277        };
 278        self.request(arg, cx.background_executor().clone())
 279    }
 280
 281    fn send_source_breakpoints(
 282        &self,
 283        ignore_breakpoints: bool,
 284        cx: &App,
 285    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 286        let mut breakpoint_tasks = Vec::new();
 287        let breakpoints = self
 288            .breakpoint_store
 289            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 290
 291        for (path, breakpoints) in breakpoints {
 292            let breakpoints = if ignore_breakpoints {
 293                vec![]
 294            } else {
 295                breakpoints
 296                    .into_iter()
 297                    .filter(|bp| bp.state.is_enabled())
 298                    .map(Into::into)
 299                    .collect()
 300            };
 301
 302            breakpoint_tasks.push(
 303                self.request(
 304                    dap_command::SetBreakpoints {
 305                        source: client_source(&path),
 306                        source_modified: Some(false),
 307                        breakpoints,
 308                    },
 309                    cx.background_executor().clone(),
 310                )
 311                .map(|result| result.map_err(|e| (path, e))),
 312            );
 313        }
 314
 315        cx.background_spawn(async move {
 316            futures::future::join_all(breakpoint_tasks)
 317                .await
 318                .into_iter()
 319                .filter_map(Result::err)
 320                .collect::<HashMap<_, _>>()
 321        })
 322    }
 323
 324    fn initialize_sequence(
 325        &self,
 326        capabilities: &Capabilities,
 327        definition: &DebugTaskDefinition,
 328        initialized_rx: oneshot::Receiver<()>,
 329        dap_store: WeakEntity<DapStore>,
 330        cx: &App,
 331    ) -> Task<Result<()>> {
 332        let mut raw = self.binary.request_args.clone();
 333
 334        merge_json_value_into(
 335            definition.initialize_args.clone().unwrap_or(json!({})),
 336            &mut raw.configuration,
 337        );
 338
 339        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 340        let launch = match raw.request {
 341            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
 342                Launch {
 343                    raw: raw.configuration,
 344                },
 345                cx.background_executor().clone(),
 346            ),
 347            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
 348                Attach {
 349                    raw: raw.configuration,
 350                },
 351                cx.background_executor().clone(),
 352            ),
 353        };
 354
 355        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 356        let exception_filters = capabilities
 357            .exception_breakpoint_filters
 358            .as_ref()
 359            .map(|exception_filters| {
 360                exception_filters
 361                    .iter()
 362                    .filter(|filter| filter.default == Some(true))
 363                    .cloned()
 364                    .collect::<Vec<_>>()
 365            })
 366            .unwrap_or_default();
 367        let supports_exception_filters = capabilities
 368            .supports_exception_filter_options
 369            .unwrap_or_default();
 370        let this = self.clone();
 371        let worktree = self.worktree().clone();
 372        let configuration_sequence = cx.spawn({
 373            async move |cx| {
 374                initialized_rx.await?;
 375                let errors_by_path = cx
 376                    .update(|cx| this.send_source_breakpoints(false, cx))?
 377                    .await;
 378
 379                dap_store.update(cx, |_, cx| {
 380                    let Some(worktree) = worktree.upgrade() else {
 381                        return;
 382                    };
 383
 384                    for (path, error) in &errors_by_path {
 385                        log::error!("failed to set breakpoints for {path:?}: {error}");
 386                    }
 387
 388                    if let Some(failed_path) = errors_by_path.keys().next() {
 389                        let failed_path = failed_path
 390                            .strip_prefix(worktree.read(cx).abs_path())
 391                            .unwrap_or(failed_path)
 392                            .display();
 393                        let message = format!(
 394                            "Failed to set breakpoints for {failed_path}{}",
 395                            match errors_by_path.len() {
 396                                0 => unreachable!(),
 397                                1 => "".into(),
 398                                2 => " and 1 other path".into(),
 399                                n => format!(" and {} other paths", n - 1),
 400                            }
 401                        );
 402                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 403                    }
 404                })?;
 405
 406                cx.update(|cx| {
 407                    this.send_exception_breakpoints(
 408                        exception_filters,
 409                        supports_exception_filters,
 410                        cx,
 411                    )
 412                })?
 413                .await
 414                .ok();
 415                let ret = if configuration_done_supported {
 416                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 417                } else {
 418                    Task::ready(Ok(()))
 419                }
 420                .await;
 421                ret
 422            }
 423        });
 424
 425        cx.background_spawn(async move {
 426            futures::future::try_join(launch, configuration_sequence).await?;
 427            Ok(())
 428        })
 429    }
 430
 431    fn request<R: LocalDapCommand>(
 432        &self,
 433        request: R,
 434        executor: BackgroundExecutor,
 435    ) -> Task<Result<R::Response>>
 436    where
 437        <R::DapRequest as dap::requests::Request>::Response: 'static,
 438        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 439    {
 440        let request = Arc::new(request);
 441
 442        let request_clone = request.clone();
 443        let connection = self.client.clone();
 444        let request_task = executor.spawn(async move {
 445            let args = request_clone.to_dap();
 446            connection.request::<R::DapRequest>(args).await
 447        });
 448
 449        executor.spawn(async move {
 450            let response = request.response_from_dap(request_task.await?);
 451            response
 452        })
 453    }
 454}
 455
 456impl Mode {
 457    fn request_dap<R: DapCommand>(
 458        &self,
 459        request: R,
 460        cx: &mut Context<Session>,
 461    ) -> Task<Result<R::Response>>
 462    where
 463        <R::DapRequest as dap::requests::Request>::Response: 'static,
 464        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 465    {
 466        match self {
 467            Mode::Running(debug_adapter_client) => {
 468                debug_adapter_client.request(request, cx.background_executor().clone())
 469            }
 470            Mode::Building => Task::ready(Err(anyhow!(
 471                "no adapter running to send request: {:?}",
 472                request
 473            ))),
 474        }
 475    }
 476}
 477
 478#[derive(Default)]
 479struct ThreadStates {
 480    global_state: Option<ThreadStatus>,
 481    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 482}
 483
 484impl ThreadStates {
 485    fn stop_all_threads(&mut self) {
 486        self.global_state = Some(ThreadStatus::Stopped);
 487        self.known_thread_states.clear();
 488    }
 489
 490    fn exit_all_threads(&mut self) {
 491        self.global_state = Some(ThreadStatus::Exited);
 492        self.known_thread_states.clear();
 493    }
 494
 495    fn continue_all_threads(&mut self) {
 496        self.global_state = Some(ThreadStatus::Running);
 497        self.known_thread_states.clear();
 498    }
 499
 500    fn stop_thread(&mut self, thread_id: ThreadId) {
 501        self.known_thread_states
 502            .insert(thread_id, ThreadStatus::Stopped);
 503    }
 504
 505    fn continue_thread(&mut self, thread_id: ThreadId) {
 506        self.known_thread_states
 507            .insert(thread_id, ThreadStatus::Running);
 508    }
 509
 510    fn process_step(&mut self, thread_id: ThreadId) {
 511        self.known_thread_states
 512            .insert(thread_id, ThreadStatus::Stepping);
 513    }
 514
 515    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 516        self.thread_state(thread_id)
 517            .unwrap_or(ThreadStatus::Running)
 518    }
 519
 520    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 521        self.known_thread_states
 522            .get(&thread_id)
 523            .copied()
 524            .or(self.global_state)
 525    }
 526
 527    fn exit_thread(&mut self, thread_id: ThreadId) {
 528        self.known_thread_states
 529            .insert(thread_id, ThreadStatus::Exited);
 530    }
 531
 532    fn any_stopped_thread(&self) -> bool {
 533        self.global_state
 534            .is_some_and(|state| state == ThreadStatus::Stopped)
 535            || self
 536                .known_thread_states
 537                .values()
 538                .any(|status| *status == ThreadStatus::Stopped)
 539    }
 540}
 541const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 542
 543type IsEnabled = bool;
 544
 545#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 546pub struct OutputToken(pub usize);
 547/// Represents a current state of a single debug adapter and provides ways to mutate it.
 548pub struct Session {
 549    mode: Mode,
 550    definition: DebugTaskDefinition,
 551    pub(super) capabilities: Capabilities,
 552    id: SessionId,
 553    child_session_ids: HashSet<SessionId>,
 554    parent_session: Option<Entity<Session>>,
 555    ignore_breakpoints: bool,
 556    modules: Vec<dap::Module>,
 557    loaded_sources: Vec<dap::Source>,
 558    output_token: OutputToken,
 559    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 560    threads: IndexMap<ThreadId, Thread>,
 561    thread_states: ThreadStates,
 562    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 563    stack_frames: IndexMap<StackFrameId, StackFrame>,
 564    locations: HashMap<u64, dap::LocationsResponse>,
 565    is_session_terminated: bool,
 566    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 567    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 568    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 569    background_tasks: Vec<Task<()>>,
 570}
 571
 572trait CacheableCommand: Any + Send + Sync {
 573    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 574    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 575    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 576}
 577
 578impl<T> CacheableCommand for T
 579where
 580    T: DapCommand + PartialEq + Eq + Hash,
 581{
 582    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 583        (rhs as &dyn Any)
 584            .downcast_ref::<Self>()
 585            .map_or(false, |rhs| self == rhs)
 586    }
 587
 588    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 589        T::hash(self, &mut hasher);
 590    }
 591
 592    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 593        self
 594    }
 595}
 596
 597pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 598
 599impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 600    fn from(request: T) -> Self {
 601        Self(Arc::new(request))
 602    }
 603}
 604
 605impl PartialEq for RequestSlot {
 606    fn eq(&self, other: &Self) -> bool {
 607        self.0.dyn_eq(other.0.as_ref())
 608    }
 609}
 610
 611impl Eq for RequestSlot {}
 612
 613impl Hash for RequestSlot {
 614    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 615        self.0.dyn_hash(state);
 616        (&*self.0 as &dyn Any).type_id().hash(state)
 617    }
 618}
 619
 620#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 621pub struct CompletionsQuery {
 622    pub query: String,
 623    pub column: u64,
 624    pub line: Option<u64>,
 625    pub frame_id: Option<u64>,
 626}
 627
 628impl CompletionsQuery {
 629    pub fn new(
 630        buffer: &language::Buffer,
 631        cursor_position: language::Anchor,
 632        frame_id: Option<u64>,
 633    ) -> Self {
 634        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 635        Self {
 636            query: buffer.text(),
 637            column: column as u64,
 638            frame_id,
 639            line: Some(row as u64),
 640        }
 641    }
 642}
 643
 644pub enum SessionEvent {
 645    Modules,
 646    LoadedSources,
 647    Stopped(Option<ThreadId>),
 648    StackTrace,
 649    Variables,
 650    Threads,
 651    CapabilitiesLoaded,
 652}
 653
 654#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 655pub enum SessionStateEvent {
 656    Running,
 657    Shutdown,
 658    Restart,
 659}
 660
 661impl EventEmitter<SessionEvent> for Session {}
 662impl EventEmitter<SessionStateEvent> for Session {}
 663
 664// local session will send breakpoint updates to DAP for all new breakpoints
 665// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 666// BreakpointStore notifies session on breakpoint changes
 667impl Session {
 668    pub(crate) fn new(
 669        breakpoint_store: Entity<BreakpointStore>,
 670        session_id: SessionId,
 671        parent_session: Option<Entity<Session>>,
 672        template: DebugTaskDefinition,
 673        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 674        cx: &mut App,
 675    ) -> Entity<Self> {
 676        cx.new::<Self>(|cx| {
 677            cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 678                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 679                    if let Some(local) = (!this.ignore_breakpoints)
 680                        .then(|| this.as_local_mut())
 681                        .flatten()
 682                    {
 683                        local
 684                            .send_breakpoints_from_path(path.clone(), *reason, cx)
 685                            .detach();
 686                    };
 687                }
 688                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 689                    if let Some(local) = (!this.ignore_breakpoints)
 690                        .then(|| this.as_local_mut())
 691                        .flatten()
 692                    {
 693                        local.unset_breakpoints_from_paths(paths, cx).detach();
 694                    }
 695                }
 696                BreakpointStoreEvent::ActiveDebugLineChanged => {}
 697            })
 698            .detach();
 699
 700            let this = Self {
 701                mode: Mode::Building,
 702                id: session_id,
 703                child_session_ids: HashSet::default(),
 704                parent_session,
 705                capabilities: Capabilities::default(),
 706                ignore_breakpoints: false,
 707                variables: Default::default(),
 708                stack_frames: Default::default(),
 709                thread_states: ThreadStates::default(),
 710                output_token: OutputToken(0),
 711                output: circular_buffer::CircularBuffer::boxed(),
 712                requests: HashMap::default(),
 713                modules: Vec::default(),
 714                loaded_sources: Vec::default(),
 715                threads: IndexMap::default(),
 716                background_tasks: Vec::default(),
 717                locations: Default::default(),
 718                is_session_terminated: false,
 719                exception_breakpoints: Default::default(),
 720                definition: template,
 721                start_debugging_requests_tx,
 722            };
 723
 724            this
 725        })
 726    }
 727
 728    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 729        match &self.mode {
 730            Mode::Building => None,
 731            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 732        }
 733    }
 734
 735    pub fn boot(
 736        &mut self,
 737        binary: DebugAdapterBinary,
 738        worktree: Entity<Worktree>,
 739        breakpoint_store: Entity<BreakpointStore>,
 740        dap_store: WeakEntity<DapStore>,
 741        cx: &mut Context<Self>,
 742    ) -> Task<Result<()>> {
 743        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 744        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 745        let session_id = self.session_id();
 746
 747        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 748            let mut initialized_tx = Some(initialized_tx);
 749            while let Some(message) = message_rx.next().await {
 750                if let Message::Event(event) = message {
 751                    if let Events::Initialized(_) = *event {
 752                        if let Some(tx) = initialized_tx.take() {
 753                            tx.send(()).ok();
 754                        }
 755                    } else {
 756                        let Ok(_) = this.update(cx, |session, cx| {
 757                            session.handle_dap_event(event, cx);
 758                        }) else {
 759                            break;
 760                        };
 761                    }
 762                } else {
 763                    let Ok(Ok(_)) = this.update(cx, |this, _| {
 764                        this.start_debugging_requests_tx
 765                            .unbounded_send((session_id, message))
 766                    }) else {
 767                        break;
 768                    };
 769                }
 770            }
 771        })];
 772        self.background_tasks = background_tasks;
 773        let id = self.id;
 774        let parent_session = self.parent_session.clone();
 775
 776        cx.spawn(async move |this, cx| {
 777            let mode = LocalMode::new(
 778                id,
 779                parent_session,
 780                worktree.downgrade(),
 781                breakpoint_store.clone(),
 782                binary,
 783                message_tx,
 784                cx.clone(),
 785            )
 786            .await?;
 787            this.update(cx, |this, cx| {
 788                this.mode = Mode::Running(mode);
 789                cx.emit(SessionStateEvent::Running);
 790            })?;
 791
 792            this.update(cx, |session, cx| session.request_initialize(cx))?
 793                .await?;
 794
 795            this.update(cx, |session, cx| {
 796                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 797            })?
 798            .await
 799        })
 800    }
 801
 802    pub fn session_id(&self) -> SessionId {
 803        self.id
 804    }
 805
 806    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 807        self.child_session_ids.clone()
 808    }
 809
 810    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 811        self.child_session_ids.insert(session_id);
 812    }
 813
 814    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 815        self.child_session_ids.remove(&session_id);
 816    }
 817
 818    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 819        self.parent_session
 820            .as_ref()
 821            .map(|session| session.read(cx).id)
 822    }
 823
 824    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 825        self.parent_session.as_ref()
 826    }
 827
 828    pub fn capabilities(&self) -> &Capabilities {
 829        &self.capabilities
 830    }
 831
 832    pub(crate) fn root_binary(&self) -> Arc<DebugAdapterBinary> {
 833        match &self.mode {
 834            Mode::Building => {
 835                // todo(debugger): Implement root_binary for building mode
 836                unimplemented!()
 837            }
 838            Mode::Running(running) => running
 839                .root_binary
 840                .clone()
 841                .unwrap_or_else(|| Arc::new(running.binary.clone())),
 842        }
 843    }
 844
 845    pub fn binary(&self) -> &DebugAdapterBinary {
 846        let Mode::Running(local_mode) = &self.mode else {
 847            panic!("Session is not local");
 848        };
 849        &local_mode.binary
 850    }
 851
 852    pub fn adapter_name(&self) -> SharedString {
 853        self.definition.adapter.clone().into()
 854    }
 855
 856    pub fn label(&self) -> String {
 857        self.definition.label.clone()
 858    }
 859
 860    pub fn definition(&self) -> DebugTaskDefinition {
 861        self.definition.clone()
 862    }
 863
 864    pub fn is_terminated(&self) -> bool {
 865        self.is_session_terminated
 866    }
 867
 868    pub fn is_local(&self) -> bool {
 869        matches!(self.mode, Mode::Running(_))
 870    }
 871
 872    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 873        match &mut self.mode {
 874            Mode::Running(local_mode) => Some(local_mode),
 875            Mode::Building => None,
 876        }
 877    }
 878
 879    pub fn as_local(&self) -> Option<&LocalMode> {
 880        match &self.mode {
 881            Mode::Running(local_mode) => Some(local_mode),
 882            Mode::Building => None,
 883        }
 884    }
 885
 886    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 887        let adapter_id = self.definition.adapter.clone();
 888        let request = Initialize { adapter_id };
 889        match &self.mode {
 890            Mode::Running(local_mode) => {
 891                let capabilities = local_mode.request(request, cx.background_executor().clone());
 892
 893                cx.spawn(async move |this, cx| {
 894                    let capabilities = capabilities.await?;
 895                    this.update(cx, |session, cx| {
 896                        session.capabilities = capabilities;
 897                        let filters = session
 898                            .capabilities
 899                            .exception_breakpoint_filters
 900                            .clone()
 901                            .unwrap_or_default();
 902                        for filter in filters {
 903                            let default = filter.default.unwrap_or_default();
 904                            session
 905                                .exception_breakpoints
 906                                .entry(filter.filter.clone())
 907                                .or_insert_with(|| (filter, default));
 908                        }
 909                        cx.emit(SessionEvent::CapabilitiesLoaded);
 910                    })?;
 911                    Ok(())
 912                })
 913            }
 914            Mode::Building => Task::ready(Err(anyhow!(
 915                "Cannot send initialize request, task still building"
 916            ))),
 917        }
 918    }
 919
 920    pub(super) fn initialize_sequence(
 921        &mut self,
 922        initialize_rx: oneshot::Receiver<()>,
 923        dap_store: WeakEntity<DapStore>,
 924        cx: &mut Context<Self>,
 925    ) -> Task<Result<()>> {
 926        match &self.mode {
 927            Mode::Running(local_mode) => local_mode.initialize_sequence(
 928                &self.capabilities,
 929                &self.definition,
 930                initialize_rx,
 931                dap_store,
 932                cx,
 933            ),
 934            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
 935        }
 936    }
 937
 938    pub fn run_to_position(
 939        &mut self,
 940        breakpoint: SourceBreakpoint,
 941        active_thread_id: ThreadId,
 942        cx: &mut Context<Self>,
 943    ) {
 944        match &mut self.mode {
 945            Mode::Running(local_mode) => {
 946                if !matches!(
 947                    self.thread_states.thread_state(active_thread_id),
 948                    Some(ThreadStatus::Stopped)
 949                ) {
 950                    return;
 951                };
 952                let path = breakpoint.path.clone();
 953                local_mode.tmp_breakpoint = Some(breakpoint);
 954                let task = local_mode.send_breakpoints_from_path(
 955                    path,
 956                    BreakpointUpdatedReason::Toggled,
 957                    cx,
 958                );
 959
 960                cx.spawn(async move |this, cx| {
 961                    task.await;
 962                    this.update(cx, |this, cx| {
 963                        this.continue_thread(active_thread_id, cx);
 964                    })
 965                })
 966                .detach();
 967            }
 968            Mode::Building => {}
 969        }
 970    }
 971
 972    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
 973        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
 974    }
 975
 976    pub fn output(
 977        &self,
 978        since: OutputToken,
 979    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 980        if self.output_token.0 == 0 {
 981            return (self.output.range(0..0), OutputToken(0));
 982        };
 983
 984        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 985
 986        let clamped_events_since = events_since.clamp(0, self.output.len());
 987        (
 988            self.output
 989                .range(self.output.len() - clamped_events_since..),
 990            self.output_token,
 991        )
 992    }
 993
 994    pub fn respond_to_client(
 995        &self,
 996        request_seq: u64,
 997        success: bool,
 998        command: String,
 999        body: Option<serde_json::Value>,
1000        cx: &mut Context<Self>,
1001    ) -> Task<Result<()>> {
1002        let Some(local_session) = self.as_local() else {
1003            unreachable!("Cannot respond to remote client");
1004        };
1005        let client = local_session.client.clone();
1006
1007        cx.background_spawn(async move {
1008            client
1009                .send_message(Message::Response(Response {
1010                    body,
1011                    success,
1012                    command,
1013                    seq: request_seq + 1,
1014                    request_seq,
1015                    message: None,
1016                }))
1017                .await
1018        })
1019    }
1020
1021    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1022        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1023            let breakpoint = local.tmp_breakpoint.take()?;
1024            let path = breakpoint.path.clone();
1025            Some((local, path))
1026        }) {
1027            local
1028                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1029                .detach();
1030        };
1031
1032        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1033            self.thread_states.stop_all_threads();
1034
1035            self.invalidate_command_type::<StackTraceCommand>();
1036        }
1037
1038        // Event if we stopped all threads we still need to insert the thread_id
1039        // to our own data
1040        if let Some(thread_id) = event.thread_id {
1041            self.thread_states.stop_thread(ThreadId(thread_id));
1042
1043            self.invalidate_state(
1044                &StackTraceCommand {
1045                    thread_id,
1046                    start_frame: None,
1047                    levels: None,
1048                }
1049                .into(),
1050            );
1051        }
1052
1053        self.invalidate_generic();
1054        self.threads.clear();
1055        self.variables.clear();
1056        cx.emit(SessionEvent::Stopped(
1057            event
1058                .thread_id
1059                .map(Into::into)
1060                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1061        ));
1062        cx.notify();
1063    }
1064
1065    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1066        match *event {
1067            Events::Initialized(_) => {
1068                debug_assert!(
1069                    false,
1070                    "Initialized event should have been handled in LocalMode"
1071                );
1072            }
1073            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1074            Events::Continued(event) => {
1075                if event.all_threads_continued.unwrap_or_default() {
1076                    self.thread_states.continue_all_threads();
1077                } else {
1078                    self.thread_states
1079                        .continue_thread(ThreadId(event.thread_id));
1080                }
1081                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1082                self.invalidate_generic();
1083            }
1084            Events::Exited(_event) => {
1085                self.clear_active_debug_line(cx);
1086            }
1087            Events::Terminated(_) => {
1088                self.is_session_terminated = true;
1089                self.clear_active_debug_line(cx);
1090            }
1091            Events::Thread(event) => {
1092                let thread_id = ThreadId(event.thread_id);
1093
1094                match event.reason {
1095                    dap::ThreadEventReason::Started => {
1096                        self.thread_states.continue_thread(thread_id);
1097                    }
1098                    dap::ThreadEventReason::Exited => {
1099                        self.thread_states.exit_thread(thread_id);
1100                    }
1101                    reason => {
1102                        log::error!("Unhandled thread event reason {:?}", reason);
1103                    }
1104                }
1105                self.invalidate_state(&ThreadsCommand.into());
1106                cx.notify();
1107            }
1108            Events::Output(event) => {
1109                if event
1110                    .category
1111                    .as_ref()
1112                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1113                {
1114                    return;
1115                }
1116
1117                self.output.push_back(event);
1118                self.output_token.0 += 1;
1119                cx.notify();
1120            }
1121            Events::Breakpoint(_) => {}
1122            Events::Module(event) => {
1123                match event.reason {
1124                    dap::ModuleEventReason::New => {
1125                        self.modules.push(event.module);
1126                    }
1127                    dap::ModuleEventReason::Changed => {
1128                        if let Some(module) = self
1129                            .modules
1130                            .iter_mut()
1131                            .find(|other| event.module.id == other.id)
1132                        {
1133                            *module = event.module;
1134                        }
1135                    }
1136                    dap::ModuleEventReason::Removed => {
1137                        self.modules.retain(|other| event.module.id != other.id);
1138                    }
1139                }
1140
1141                // todo(debugger): We should only send the invalidate command to downstream clients.
1142                // self.invalidate_state(&ModulesCommand.into());
1143            }
1144            Events::LoadedSource(_) => {
1145                self.invalidate_state(&LoadedSourcesCommand.into());
1146            }
1147            Events::Capabilities(event) => {
1148                self.capabilities = self.capabilities.merge(event.capabilities);
1149                cx.notify();
1150            }
1151            Events::Memory(_) => {}
1152            Events::Process(_) => {}
1153            Events::ProgressEnd(_) => {}
1154            Events::ProgressStart(_) => {}
1155            Events::ProgressUpdate(_) => {}
1156            Events::Invalidated(_) => {}
1157            Events::Other(_) => {}
1158        }
1159    }
1160
1161    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1162    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1163        &mut self,
1164        request: T,
1165        process_result: impl FnOnce(
1166            &mut Self,
1167            Result<T::Response>,
1168            &mut Context<Self>,
1169        ) -> Option<T::Response>
1170        + 'static,
1171        cx: &mut Context<Self>,
1172    ) {
1173        const {
1174            assert!(
1175                T::CACHEABLE,
1176                "Only requests marked as cacheable should invoke `fetch`"
1177            );
1178        }
1179
1180        if !self.thread_states.any_stopped_thread()
1181            && request.type_id() != TypeId::of::<ThreadsCommand>()
1182            || self.is_session_terminated
1183        {
1184            return;
1185        }
1186
1187        let request_map = self
1188            .requests
1189            .entry(std::any::TypeId::of::<T>())
1190            .or_default();
1191
1192        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1193            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1194
1195            let task = Self::request_inner::<Arc<T>>(
1196                &self.capabilities,
1197                &self.mode,
1198                command,
1199                process_result,
1200                cx,
1201            );
1202            let task = cx
1203                .background_executor()
1204                .spawn(async move {
1205                    let _ = task.await?;
1206                    Some(())
1207                })
1208                .shared();
1209
1210            vacant.insert(task);
1211            cx.notify();
1212        }
1213    }
1214
1215    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1216        capabilities: &Capabilities,
1217        mode: &Mode,
1218        request: T,
1219        process_result: impl FnOnce(
1220            &mut Self,
1221            Result<T::Response>,
1222            &mut Context<Self>,
1223        ) -> Option<T::Response>
1224        + 'static,
1225        cx: &mut Context<Self>,
1226    ) -> Task<Option<T::Response>> {
1227        if !T::is_supported(&capabilities) {
1228            log::warn!(
1229                "Attempted to send a DAP request that isn't supported: {:?}",
1230                request
1231            );
1232            let error = Err(anyhow::Error::msg(
1233                "Couldn't complete request because it's not supported",
1234            ));
1235            return cx.spawn(async move |this, cx| {
1236                this.update(cx, |this, cx| process_result(this, error, cx))
1237                    .log_err()
1238                    .flatten()
1239            });
1240        }
1241
1242        let request = mode.request_dap(request, cx);
1243        cx.spawn(async move |this, cx| {
1244            let result = request.await;
1245            this.update(cx, |this, cx| process_result(this, result, cx))
1246                .log_err()
1247                .flatten()
1248        })
1249    }
1250
1251    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1252        &self,
1253        request: T,
1254        process_result: impl FnOnce(
1255            &mut Self,
1256            Result<T::Response>,
1257            &mut Context<Self>,
1258        ) -> Option<T::Response>
1259        + 'static,
1260        cx: &mut Context<Self>,
1261    ) -> Task<Option<T::Response>> {
1262        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1263    }
1264
1265    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1266        self.requests.remove(&std::any::TypeId::of::<Command>());
1267    }
1268
1269    fn invalidate_generic(&mut self) {
1270        self.invalidate_command_type::<ModulesCommand>();
1271        self.invalidate_command_type::<LoadedSourcesCommand>();
1272        self.invalidate_command_type::<ThreadsCommand>();
1273    }
1274
1275    fn invalidate_state(&mut self, key: &RequestSlot) {
1276        self.requests
1277            .entry((&*key.0 as &dyn Any).type_id())
1278            .and_modify(|request_map| {
1279                request_map.remove(&key);
1280            });
1281    }
1282
1283    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1284        self.thread_states.thread_status(thread_id)
1285    }
1286
1287    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1288        self.fetch(
1289            dap_command::ThreadsCommand,
1290            |this, result, cx| {
1291                let result = result.log_err()?;
1292
1293                this.threads = result
1294                    .iter()
1295                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1296                    .collect();
1297
1298                this.invalidate_command_type::<StackTraceCommand>();
1299                cx.emit(SessionEvent::Threads);
1300                cx.notify();
1301
1302                Some(result)
1303            },
1304            cx,
1305        );
1306
1307        self.threads
1308            .values()
1309            .map(|thread| {
1310                (
1311                    thread.dap.clone(),
1312                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1313                )
1314            })
1315            .collect()
1316    }
1317
1318    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1319        self.fetch(
1320            dap_command::ModulesCommand,
1321            |this, result, cx| {
1322                let result = result.log_err()?;
1323
1324                this.modules = result.iter().cloned().collect();
1325                cx.emit(SessionEvent::Modules);
1326                cx.notify();
1327
1328                Some(result)
1329            },
1330            cx,
1331        );
1332
1333        &self.modules
1334    }
1335
1336    pub fn ignore_breakpoints(&self) -> bool {
1337        self.ignore_breakpoints
1338    }
1339
1340    pub fn toggle_ignore_breakpoints(
1341        &mut self,
1342        cx: &mut App,
1343    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1344        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1345    }
1346
1347    pub(crate) fn set_ignore_breakpoints(
1348        &mut self,
1349        ignore: bool,
1350        cx: &mut App,
1351    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1352        if self.ignore_breakpoints == ignore {
1353            return Task::ready(HashMap::default());
1354        }
1355
1356        self.ignore_breakpoints = ignore;
1357
1358        if let Some(local) = self.as_local() {
1359            local.send_source_breakpoints(ignore, cx)
1360        } else {
1361            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1362            unimplemented!()
1363        }
1364    }
1365
1366    pub fn exception_breakpoints(
1367        &self,
1368    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1369        self.exception_breakpoints.values()
1370    }
1371
1372    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1373        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1374            *is_enabled = !*is_enabled;
1375            self.send_exception_breakpoints(cx);
1376        }
1377    }
1378
1379    fn send_exception_breakpoints(&mut self, cx: &App) {
1380        if let Some(local) = self.as_local() {
1381            let exception_filters = self
1382                .exception_breakpoints
1383                .values()
1384                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1385                .collect();
1386
1387            let supports_exception_filters = self
1388                .capabilities
1389                .supports_exception_filter_options
1390                .unwrap_or_default();
1391            local
1392                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1393                .detach_and_log_err(cx);
1394        } else {
1395            debug_assert!(false, "Not implemented");
1396        }
1397    }
1398
1399    pub fn breakpoints_enabled(&self) -> bool {
1400        self.ignore_breakpoints
1401    }
1402
1403    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1404        self.fetch(
1405            dap_command::LoadedSourcesCommand,
1406            |this, result, cx| {
1407                let result = result.log_err()?;
1408                this.loaded_sources = result.iter().cloned().collect();
1409                cx.emit(SessionEvent::LoadedSources);
1410                cx.notify();
1411                Some(result)
1412            },
1413            cx,
1414        );
1415
1416        &self.loaded_sources
1417    }
1418
1419    fn fallback_to_manual_restart(
1420        &mut self,
1421        res: Result<()>,
1422        cx: &mut Context<Self>,
1423    ) -> Option<()> {
1424        if res.log_err().is_none() {
1425            cx.emit(SessionStateEvent::Restart);
1426            return None;
1427        }
1428        Some(())
1429    }
1430
1431    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1432        res.log_err()?;
1433        Some(())
1434    }
1435
1436    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1437        thread_id: ThreadId,
1438    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1439    {
1440        move |this, response, cx| match response.log_err() {
1441            Some(response) => Some(response),
1442            None => {
1443                this.thread_states.stop_thread(thread_id);
1444                cx.notify();
1445                None
1446            }
1447        }
1448    }
1449
1450    fn clear_active_debug_line_response(
1451        &mut self,
1452        response: Result<()>,
1453        cx: &mut Context<Session>,
1454    ) -> Option<()> {
1455        response.log_err()?;
1456        self.clear_active_debug_line(cx);
1457        Some(())
1458    }
1459
1460    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1461        self.as_local()
1462            .expect("Message handler will only run in local mode")
1463            .breakpoint_store
1464            .update(cx, |store, cx| {
1465                store.remove_active_position(Some(self.id), cx)
1466            });
1467    }
1468
1469    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1470        self.request(
1471            PauseCommand {
1472                thread_id: thread_id.0,
1473            },
1474            Self::empty_response,
1475            cx,
1476        )
1477        .detach();
1478    }
1479
1480    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1481        self.request(
1482            RestartStackFrameCommand { stack_frame_id },
1483            Self::empty_response,
1484            cx,
1485        )
1486        .detach();
1487    }
1488
1489    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1490        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1491            self.request(
1492                RestartCommand {
1493                    raw: args.unwrap_or(Value::Null),
1494                },
1495                Self::fallback_to_manual_restart,
1496                cx,
1497            )
1498            .detach();
1499        } else {
1500            cx.emit(SessionStateEvent::Restart);
1501        }
1502    }
1503
1504    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1505        self.is_session_terminated = true;
1506        self.thread_states.exit_all_threads();
1507        cx.notify();
1508
1509        let task = if self
1510            .capabilities
1511            .supports_terminate_request
1512            .unwrap_or_default()
1513        {
1514            self.request(
1515                TerminateCommand {
1516                    restart: Some(false),
1517                },
1518                Self::clear_active_debug_line_response,
1519                cx,
1520            )
1521        } else {
1522            self.request(
1523                DisconnectCommand {
1524                    restart: Some(false),
1525                    terminate_debuggee: Some(true),
1526                    suspend_debuggee: Some(false),
1527                },
1528                Self::clear_active_debug_line_response,
1529                cx,
1530            )
1531        };
1532
1533        cx.emit(SessionStateEvent::Shutdown);
1534
1535        let debug_client = self.adapter_client();
1536
1537        cx.background_spawn(async move {
1538            let _ = task.await;
1539
1540            if let Some(client) = debug_client {
1541                client.shutdown().await.log_err();
1542            }
1543        })
1544    }
1545
1546    pub fn completions(
1547        &mut self,
1548        query: CompletionsQuery,
1549        cx: &mut Context<Self>,
1550    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1551        let task = self.request(query, |_, result, _| result.log_err(), cx);
1552
1553        cx.background_executor().spawn(async move {
1554            anyhow::Ok(
1555                task.await
1556                    .map(|response| response.targets)
1557                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1558            )
1559        })
1560    }
1561
1562    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1563        self.thread_states.continue_thread(thread_id);
1564        self.request(
1565            ContinueCommand {
1566                args: ContinueArguments {
1567                    thread_id: thread_id.0,
1568                    single_thread: Some(true),
1569                },
1570            },
1571            Self::on_step_response::<ContinueCommand>(thread_id),
1572            cx,
1573        )
1574        .detach();
1575    }
1576
1577    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1578        match self.mode {
1579            Mode::Running(ref local) => Some(local.client.clone()),
1580            Mode::Building => None,
1581        }
1582    }
1583
1584    pub fn step_over(
1585        &mut self,
1586        thread_id: ThreadId,
1587        granularity: SteppingGranularity,
1588        cx: &mut Context<Self>,
1589    ) {
1590        let supports_single_thread_execution_requests =
1591            self.capabilities.supports_single_thread_execution_requests;
1592        let supports_stepping_granularity = self
1593            .capabilities
1594            .supports_stepping_granularity
1595            .unwrap_or_default();
1596
1597        let command = NextCommand {
1598            inner: StepCommand {
1599                thread_id: thread_id.0,
1600                granularity: supports_stepping_granularity.then(|| granularity),
1601                single_thread: supports_single_thread_execution_requests,
1602            },
1603        };
1604
1605        self.thread_states.process_step(thread_id);
1606        self.request(
1607            command,
1608            Self::on_step_response::<NextCommand>(thread_id),
1609            cx,
1610        )
1611        .detach();
1612    }
1613
1614    pub fn step_in(
1615        &mut self,
1616        thread_id: ThreadId,
1617        granularity: SteppingGranularity,
1618        cx: &mut Context<Self>,
1619    ) {
1620        let supports_single_thread_execution_requests =
1621            self.capabilities.supports_single_thread_execution_requests;
1622        let supports_stepping_granularity = self
1623            .capabilities
1624            .supports_stepping_granularity
1625            .unwrap_or_default();
1626
1627        let command = StepInCommand {
1628            inner: StepCommand {
1629                thread_id: thread_id.0,
1630                granularity: supports_stepping_granularity.then(|| granularity),
1631                single_thread: supports_single_thread_execution_requests,
1632            },
1633        };
1634
1635        self.thread_states.process_step(thread_id);
1636        self.request(
1637            command,
1638            Self::on_step_response::<StepInCommand>(thread_id),
1639            cx,
1640        )
1641        .detach();
1642    }
1643
1644    pub fn step_out(
1645        &mut self,
1646        thread_id: ThreadId,
1647        granularity: SteppingGranularity,
1648        cx: &mut Context<Self>,
1649    ) {
1650        let supports_single_thread_execution_requests =
1651            self.capabilities.supports_single_thread_execution_requests;
1652        let supports_stepping_granularity = self
1653            .capabilities
1654            .supports_stepping_granularity
1655            .unwrap_or_default();
1656
1657        let command = StepOutCommand {
1658            inner: StepCommand {
1659                thread_id: thread_id.0,
1660                granularity: supports_stepping_granularity.then(|| granularity),
1661                single_thread: supports_single_thread_execution_requests,
1662            },
1663        };
1664
1665        self.thread_states.process_step(thread_id);
1666        self.request(
1667            command,
1668            Self::on_step_response::<StepOutCommand>(thread_id),
1669            cx,
1670        )
1671        .detach();
1672    }
1673
1674    pub fn step_back(
1675        &mut self,
1676        thread_id: ThreadId,
1677        granularity: SteppingGranularity,
1678        cx: &mut Context<Self>,
1679    ) {
1680        let supports_single_thread_execution_requests =
1681            self.capabilities.supports_single_thread_execution_requests;
1682        let supports_stepping_granularity = self
1683            .capabilities
1684            .supports_stepping_granularity
1685            .unwrap_or_default();
1686
1687        let command = StepBackCommand {
1688            inner: StepCommand {
1689                thread_id: thread_id.0,
1690                granularity: supports_stepping_granularity.then(|| granularity),
1691                single_thread: supports_single_thread_execution_requests,
1692            },
1693        };
1694
1695        self.thread_states.process_step(thread_id);
1696
1697        self.request(
1698            command,
1699            Self::on_step_response::<StepBackCommand>(thread_id),
1700            cx,
1701        )
1702        .detach();
1703    }
1704
1705    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1706        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1707            && self.requests.contains_key(&ThreadsCommand.type_id())
1708            && self.threads.contains_key(&thread_id)
1709        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1710        // We could still be using an old thread id and have sent a new thread's request
1711        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1712        // But it very well could cause a minor bug in the future that is hard to track down
1713        {
1714            self.fetch(
1715                super::dap_command::StackTraceCommand {
1716                    thread_id: thread_id.0,
1717                    start_frame: None,
1718                    levels: None,
1719                },
1720                move |this, stack_frames, cx| {
1721                    let stack_frames = stack_frames.log_err()?;
1722
1723                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1724                        thread.stack_frame_ids =
1725                            stack_frames.iter().map(|frame| frame.id).collect();
1726                    });
1727                    debug_assert!(
1728                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1729                        "Sent request for thread_id that doesn't exist"
1730                    );
1731
1732                    this.stack_frames.extend(
1733                        stack_frames
1734                            .iter()
1735                            .cloned()
1736                            .map(|frame| (frame.id, StackFrame::from(frame))),
1737                    );
1738
1739                    this.invalidate_command_type::<ScopesCommand>();
1740                    this.invalidate_command_type::<VariablesCommand>();
1741
1742                    cx.emit(SessionEvent::StackTrace);
1743                    cx.notify();
1744                    Some(stack_frames)
1745                },
1746                cx,
1747            );
1748        }
1749
1750        self.threads
1751            .get(&thread_id)
1752            .map(|thread| {
1753                thread
1754                    .stack_frame_ids
1755                    .iter()
1756                    .filter_map(|id| self.stack_frames.get(id))
1757                    .cloned()
1758                    .collect()
1759            })
1760            .unwrap_or_default()
1761    }
1762
1763    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1764        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1765            && self
1766                .requests
1767                .contains_key(&TypeId::of::<StackTraceCommand>())
1768        {
1769            self.fetch(
1770                ScopesCommand { stack_frame_id },
1771                move |this, scopes, cx| {
1772                    let scopes = scopes.log_err()?;
1773
1774                    for scope in scopes .iter(){
1775                        this.variables(scope.variables_reference, cx);
1776                    }
1777
1778                    let entry = this
1779                        .stack_frames
1780                        .entry(stack_frame_id)
1781                        .and_modify(|stack_frame| {
1782                            stack_frame.scopes = scopes.clone();
1783                        });
1784
1785                    cx.emit(SessionEvent::Variables);
1786
1787                    debug_assert!(
1788                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1789                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1790                    );
1791
1792                    Some(scopes)
1793                },
1794                cx,
1795            );
1796        }
1797
1798        self.stack_frames
1799            .get(&stack_frame_id)
1800            .map(|frame| frame.scopes.as_slice())
1801            .unwrap_or_default()
1802    }
1803
1804    pub fn variables(
1805        &mut self,
1806        variables_reference: VariableReference,
1807        cx: &mut Context<Self>,
1808    ) -> Vec<dap::Variable> {
1809        let command = VariablesCommand {
1810            variables_reference,
1811            filter: None,
1812            start: None,
1813            count: None,
1814            format: None,
1815        };
1816
1817        self.fetch(
1818            command,
1819            move |this, variables, cx| {
1820                let variables = variables.log_err()?;
1821                this.variables
1822                    .insert(variables_reference, variables.clone());
1823
1824                cx.emit(SessionEvent::Variables);
1825                Some(variables)
1826            },
1827            cx,
1828        );
1829
1830        self.variables
1831            .get(&variables_reference)
1832            .cloned()
1833            .unwrap_or_default()
1834    }
1835
1836    pub fn set_variable_value(
1837        &mut self,
1838        variables_reference: u64,
1839        name: String,
1840        value: String,
1841        cx: &mut Context<Self>,
1842    ) {
1843        if self.capabilities.supports_set_variable.unwrap_or_default() {
1844            self.request(
1845                SetVariableValueCommand {
1846                    name,
1847                    value,
1848                    variables_reference,
1849                },
1850                move |this, response, cx| {
1851                    let response = response.log_err()?;
1852                    this.invalidate_command_type::<VariablesCommand>();
1853                    cx.notify();
1854                    Some(response)
1855                },
1856                cx,
1857            )
1858            .detach()
1859        }
1860    }
1861
1862    pub fn evaluate(
1863        &mut self,
1864        expression: String,
1865        context: Option<EvaluateArgumentsContext>,
1866        frame_id: Option<u64>,
1867        source: Option<Source>,
1868        cx: &mut Context<Self>,
1869    ) {
1870        self.request(
1871            EvaluateCommand {
1872                expression,
1873                context,
1874                frame_id,
1875                source,
1876            },
1877            |this, response, cx| {
1878                let response = response.log_err()?;
1879                this.output_token.0 += 1;
1880                this.output.push_back(dap::OutputEvent {
1881                    category: None,
1882                    output: response.result.clone(),
1883                    group: None,
1884                    variables_reference: Some(response.variables_reference),
1885                    source: None,
1886                    line: None,
1887                    column: None,
1888                    data: None,
1889                    location_reference: None,
1890                });
1891
1892                this.invalidate_command_type::<ScopesCommand>();
1893                cx.notify();
1894                Some(response)
1895            },
1896            cx,
1897        )
1898        .detach();
1899    }
1900
1901    pub fn location(
1902        &mut self,
1903        reference: u64,
1904        cx: &mut Context<Self>,
1905    ) -> Option<dap::LocationsResponse> {
1906        self.fetch(
1907            LocationsCommand { reference },
1908            move |this, response, _| {
1909                let response = response.log_err()?;
1910                this.locations.insert(reference, response.clone());
1911                Some(response)
1912            },
1913            cx,
1914        );
1915        self.locations.get(&reference).cloned()
1916    }
1917    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1918        let command = DisconnectCommand {
1919            restart: Some(false),
1920            terminate_debuggee: Some(true),
1921            suspend_debuggee: Some(false),
1922        };
1923
1924        self.request(command, Self::empty_response, cx).detach()
1925    }
1926
1927    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1928        if self
1929            .capabilities
1930            .supports_terminate_threads_request
1931            .unwrap_or_default()
1932        {
1933            self.request(
1934                TerminateThreadsCommand {
1935                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1936                },
1937                Self::clear_active_debug_line_response,
1938                cx,
1939            )
1940            .detach();
1941        } else {
1942            self.shutdown(cx).detach();
1943        }
1944    }
1945}