session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::DebugAdapterBinary;
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22};
  23use dap::{
  24    EvaluateResponse, ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory,
  25};
  26use futures::channel::oneshot;
  27use futures::{FutureExt, future::Shared};
  28use gpui::{
  29    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  30    Task, WeakEntity,
  31};
  32
  33use serde_json::{Value, json};
  34use smol::stream::StreamExt;
  35use std::any::TypeId;
  36use std::collections::BTreeMap;
  37use std::u64;
  38use std::{
  39    any::Any,
  40    collections::hash_map::Entry,
  41    hash::{Hash, Hasher},
  42    path::Path,
  43    sync::Arc,
  44};
  45use task::DebugTaskDefinition;
  46use text::{PointUtf16, ToPointUtf16};
  47use util::{ResultExt, merge_json_value_into};
  48use worktree::Worktree;
  49
  50#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  51#[repr(transparent)]
  52pub struct ThreadId(pub u64);
  53
  54impl ThreadId {
  55    pub const MIN: ThreadId = ThreadId(u64::MIN);
  56    pub const MAX: ThreadId = ThreadId(u64::MAX);
  57}
  58
  59impl From<u64> for ThreadId {
  60    fn from(id: u64) -> Self {
  61        Self(id)
  62    }
  63}
  64
  65#[derive(Clone, Debug)]
  66pub struct StackFrame {
  67    pub dap: dap::StackFrame,
  68    pub scopes: Vec<dap::Scope>,
  69}
  70
  71impl From<dap::StackFrame> for StackFrame {
  72    fn from(stack_frame: dap::StackFrame) -> Self {
  73        Self {
  74            scopes: vec![],
  75            dap: stack_frame,
  76        }
  77    }
  78}
  79
  80#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  81pub enum ThreadStatus {
  82    #[default]
  83    Running,
  84    Stopped,
  85    Stepping,
  86    Exited,
  87    Ended,
  88}
  89
  90impl ThreadStatus {
  91    pub fn label(&self) -> &'static str {
  92        match self {
  93            ThreadStatus::Running => "Running",
  94            ThreadStatus::Stopped => "Stopped",
  95            ThreadStatus::Stepping => "Stepping",
  96            ThreadStatus::Exited => "Exited",
  97            ThreadStatus::Ended => "Ended",
  98        }
  99    }
 100}
 101
 102#[derive(Debug)]
 103pub struct Thread {
 104    dap: dap::Thread,
 105    stack_frame_ids: IndexSet<StackFrameId>,
 106    _has_stopped: bool,
 107}
 108
 109impl From<dap::Thread> for Thread {
 110    fn from(dap: dap::Thread) -> Self {
 111        Self {
 112            dap,
 113            stack_frame_ids: Default::default(),
 114            _has_stopped: false,
 115        }
 116    }
 117}
 118
 119enum Mode {
 120    Building,
 121    Running(LocalMode),
 122}
 123
 124#[derive(Clone)]
 125pub struct LocalMode {
 126    client: Arc<DebugAdapterClient>,
 127    binary: DebugAdapterBinary,
 128    root_binary: Option<Arc<DebugAdapterBinary>>,
 129    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 130    tmp_breakpoint: Option<SourceBreakpoint>,
 131    worktree: WeakEntity<Worktree>,
 132}
 133
 134fn client_source(abs_path: &Path) -> dap::Source {
 135    dap::Source {
 136        name: abs_path
 137            .file_name()
 138            .map(|filename| filename.to_string_lossy().to_string()),
 139        path: Some(abs_path.to_string_lossy().to_string()),
 140        source_reference: None,
 141        presentation_hint: None,
 142        origin: None,
 143        sources: None,
 144        adapter_data: None,
 145        checksums: None,
 146    }
 147}
 148
 149impl LocalMode {
 150    async fn new(
 151        session_id: SessionId,
 152        parent_session: Option<Entity<Session>>,
 153        worktree: WeakEntity<Worktree>,
 154        breakpoint_store: Entity<BreakpointStore>,
 155        binary: DebugAdapterBinary,
 156        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 157        cx: AsyncApp,
 158    ) -> Result<Self> {
 159        let message_handler = Box::new(move |message| {
 160            messages_tx.unbounded_send(message).ok();
 161        });
 162
 163        let root_binary = if let Some(parent_session) = parent_session.as_ref() {
 164            Some(parent_session.read_with(&cx, |session, _| session.root_binary().clone())?)
 165        } else {
 166            None
 167        };
 168
 169        let client = Arc::new(
 170            if let Some(client) = parent_session
 171                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 172                .flatten()
 173            {
 174                client
 175                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 176                    .await?
 177            } else {
 178                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 179                    .await
 180                    .with_context(|| "Failed to start communication with debug adapter")?
 181            },
 182        );
 183
 184        Ok(Self {
 185            client,
 186            breakpoint_store,
 187            worktree,
 188            tmp_breakpoint: None,
 189            root_binary,
 190            binary,
 191        })
 192    }
 193
 194    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 195        &self.worktree
 196    }
 197
 198    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 199        let tasks: Vec<_> = paths
 200            .into_iter()
 201            .map(|path| {
 202                self.request(
 203                    dap_command::SetBreakpoints {
 204                        source: client_source(path),
 205                        source_modified: None,
 206                        breakpoints: vec![],
 207                    },
 208                    cx.background_executor().clone(),
 209                )
 210            })
 211            .collect();
 212
 213        cx.background_spawn(async move {
 214            futures::future::join_all(tasks)
 215                .await
 216                .iter()
 217                .for_each(|res| match res {
 218                    Ok(_) => {}
 219                    Err(err) => {
 220                        log::warn!("Set breakpoints request failed: {}", err);
 221                    }
 222                });
 223        })
 224    }
 225
 226    fn send_breakpoints_from_path(
 227        &self,
 228        abs_path: Arc<Path>,
 229        reason: BreakpointUpdatedReason,
 230        cx: &mut App,
 231    ) -> Task<()> {
 232        let breakpoints = self
 233            .breakpoint_store
 234            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 235            .into_iter()
 236            .filter(|bp| bp.state.is_enabled())
 237            .chain(self.tmp_breakpoint.clone())
 238            .map(Into::into)
 239            .collect();
 240
 241        let task = self.request(
 242            dap_command::SetBreakpoints {
 243                source: client_source(&abs_path),
 244                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 245                breakpoints,
 246            },
 247            cx.background_executor().clone(),
 248        );
 249
 250        cx.background_spawn(async move {
 251            match task.await {
 252                Ok(_) => {}
 253                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 254            }
 255        })
 256    }
 257
 258    fn send_exception_breakpoints(
 259        &self,
 260        filters: Vec<ExceptionBreakpointsFilter>,
 261        supports_filter_options: bool,
 262        cx: &App,
 263    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 264        let arg = if supports_filter_options {
 265            SetExceptionBreakpoints::WithOptions {
 266                filters: filters
 267                    .into_iter()
 268                    .map(|filter| ExceptionFilterOptions {
 269                        filter_id: filter.filter,
 270                        condition: None,
 271                        mode: None,
 272                    })
 273                    .collect(),
 274            }
 275        } else {
 276            SetExceptionBreakpoints::Plain {
 277                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 278            }
 279        };
 280        self.request(arg, cx.background_executor().clone())
 281    }
 282
 283    fn send_source_breakpoints(
 284        &self,
 285        ignore_breakpoints: bool,
 286        cx: &App,
 287    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 288        let mut breakpoint_tasks = Vec::new();
 289        let breakpoints = self
 290            .breakpoint_store
 291            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 292
 293        for (path, breakpoints) in breakpoints {
 294            let breakpoints = if ignore_breakpoints {
 295                vec![]
 296            } else {
 297                breakpoints
 298                    .into_iter()
 299                    .filter(|bp| bp.state.is_enabled())
 300                    .map(Into::into)
 301                    .collect()
 302            };
 303
 304            breakpoint_tasks.push(
 305                self.request(
 306                    dap_command::SetBreakpoints {
 307                        source: client_source(&path),
 308                        source_modified: Some(false),
 309                        breakpoints,
 310                    },
 311                    cx.background_executor().clone(),
 312                )
 313                .map(|result| result.map_err(|e| (path, e))),
 314            );
 315        }
 316
 317        cx.background_spawn(async move {
 318            futures::future::join_all(breakpoint_tasks)
 319                .await
 320                .into_iter()
 321                .filter_map(Result::err)
 322                .collect::<HashMap<_, _>>()
 323        })
 324    }
 325
 326    fn initialize_sequence(
 327        &self,
 328        capabilities: &Capabilities,
 329        definition: &DebugTaskDefinition,
 330        initialized_rx: oneshot::Receiver<()>,
 331        dap_store: WeakEntity<DapStore>,
 332        cx: &App,
 333    ) -> Task<Result<()>> {
 334        let mut raw = self.binary.request_args.clone();
 335
 336        merge_json_value_into(
 337            definition.initialize_args.clone().unwrap_or(json!({})),
 338            &mut raw.configuration,
 339        );
 340
 341        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 342        let launch = match raw.request {
 343            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
 344                Launch {
 345                    raw: raw.configuration,
 346                },
 347                cx.background_executor().clone(),
 348            ),
 349            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
 350                Attach {
 351                    raw: raw.configuration,
 352                },
 353                cx.background_executor().clone(),
 354            ),
 355        };
 356
 357        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 358        let exception_filters = capabilities
 359            .exception_breakpoint_filters
 360            .as_ref()
 361            .map(|exception_filters| {
 362                exception_filters
 363                    .iter()
 364                    .filter(|filter| filter.default == Some(true))
 365                    .cloned()
 366                    .collect::<Vec<_>>()
 367            })
 368            .unwrap_or_default();
 369        let supports_exception_filters = capabilities
 370            .supports_exception_filter_options
 371            .unwrap_or_default();
 372        let this = self.clone();
 373        let worktree = self.worktree().clone();
 374        let configuration_sequence = cx.spawn({
 375            async move |cx| {
 376                initialized_rx.await?;
 377                let errors_by_path = cx
 378                    .update(|cx| this.send_source_breakpoints(false, cx))?
 379                    .await;
 380
 381                dap_store.update(cx, |_, cx| {
 382                    let Some(worktree) = worktree.upgrade() else {
 383                        return;
 384                    };
 385
 386                    for (path, error) in &errors_by_path {
 387                        log::error!("failed to set breakpoints for {path:?}: {error}");
 388                    }
 389
 390                    if let Some(failed_path) = errors_by_path.keys().next() {
 391                        let failed_path = failed_path
 392                            .strip_prefix(worktree.read(cx).abs_path())
 393                            .unwrap_or(failed_path)
 394                            .display();
 395                        let message = format!(
 396                            "Failed to set breakpoints for {failed_path}{}",
 397                            match errors_by_path.len() {
 398                                0 => unreachable!(),
 399                                1 => "".into(),
 400                                2 => " and 1 other path".into(),
 401                                n => format!(" and {} other paths", n - 1),
 402                            }
 403                        );
 404                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 405                    }
 406                })?;
 407
 408                cx.update(|cx| {
 409                    this.send_exception_breakpoints(
 410                        exception_filters,
 411                        supports_exception_filters,
 412                        cx,
 413                    )
 414                })?
 415                .await
 416                .ok();
 417                let ret = if configuration_done_supported {
 418                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 419                } else {
 420                    Task::ready(Ok(()))
 421                }
 422                .await;
 423                ret
 424            }
 425        });
 426
 427        cx.background_spawn(async move {
 428            futures::future::try_join(launch, configuration_sequence).await?;
 429            Ok(())
 430        })
 431    }
 432
 433    fn request<R: LocalDapCommand>(
 434        &self,
 435        request: R,
 436        executor: BackgroundExecutor,
 437    ) -> Task<Result<R::Response>>
 438    where
 439        <R::DapRequest as dap::requests::Request>::Response: 'static,
 440        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 441    {
 442        let request = Arc::new(request);
 443
 444        let request_clone = request.clone();
 445        let connection = self.client.clone();
 446        let request_task = executor.spawn(async move {
 447            let args = request_clone.to_dap();
 448            connection.request::<R::DapRequest>(args).await
 449        });
 450
 451        executor.spawn(async move {
 452            let response = request.response_from_dap(request_task.await?);
 453            response
 454        })
 455    }
 456}
 457
 458impl Mode {
 459    fn request_dap<R: DapCommand>(
 460        &self,
 461        request: R,
 462        cx: &mut Context<Session>,
 463    ) -> Task<Result<R::Response>>
 464    where
 465        <R::DapRequest as dap::requests::Request>::Response: 'static,
 466        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 467    {
 468        match self {
 469            Mode::Running(debug_adapter_client) => {
 470                debug_adapter_client.request(request, cx.background_executor().clone())
 471            }
 472            Mode::Building => Task::ready(Err(anyhow!(
 473                "no adapter running to send request: {:?}",
 474                request
 475            ))),
 476        }
 477    }
 478}
 479
 480#[derive(Default)]
 481struct ThreadStates {
 482    global_state: Option<ThreadStatus>,
 483    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 484}
 485
 486impl ThreadStates {
 487    fn stop_all_threads(&mut self) {
 488        self.global_state = Some(ThreadStatus::Stopped);
 489        self.known_thread_states.clear();
 490    }
 491
 492    fn exit_all_threads(&mut self) {
 493        self.global_state = Some(ThreadStatus::Exited);
 494        self.known_thread_states.clear();
 495    }
 496
 497    fn continue_all_threads(&mut self) {
 498        self.global_state = Some(ThreadStatus::Running);
 499        self.known_thread_states.clear();
 500    }
 501
 502    fn stop_thread(&mut self, thread_id: ThreadId) {
 503        self.known_thread_states
 504            .insert(thread_id, ThreadStatus::Stopped);
 505    }
 506
 507    fn continue_thread(&mut self, thread_id: ThreadId) {
 508        self.known_thread_states
 509            .insert(thread_id, ThreadStatus::Running);
 510    }
 511
 512    fn process_step(&mut self, thread_id: ThreadId) {
 513        self.known_thread_states
 514            .insert(thread_id, ThreadStatus::Stepping);
 515    }
 516
 517    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 518        self.thread_state(thread_id)
 519            .unwrap_or(ThreadStatus::Running)
 520    }
 521
 522    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 523        self.known_thread_states
 524            .get(&thread_id)
 525            .copied()
 526            .or(self.global_state)
 527    }
 528
 529    fn exit_thread(&mut self, thread_id: ThreadId) {
 530        self.known_thread_states
 531            .insert(thread_id, ThreadStatus::Exited);
 532    }
 533
 534    fn any_stopped_thread(&self) -> bool {
 535        self.global_state
 536            .is_some_and(|state| state == ThreadStatus::Stopped)
 537            || self
 538                .known_thread_states
 539                .values()
 540                .any(|status| *status == ThreadStatus::Stopped)
 541    }
 542}
 543const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 544
 545type IsEnabled = bool;
 546
 547#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 548pub struct OutputToken(pub usize);
 549/// Represents a current state of a single debug adapter and provides ways to mutate it.
 550pub struct Session {
 551    mode: Mode,
 552    definition: DebugTaskDefinition,
 553    pub(super) capabilities: Capabilities,
 554    id: SessionId,
 555    child_session_ids: HashSet<SessionId>,
 556    parent_session: Option<Entity<Session>>,
 557    ignore_breakpoints: bool,
 558    modules: Vec<dap::Module>,
 559    loaded_sources: Vec<dap::Source>,
 560    output_token: OutputToken,
 561    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 562    threads: IndexMap<ThreadId, Thread>,
 563    thread_states: ThreadStates,
 564    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 565    stack_frames: IndexMap<StackFrameId, StackFrame>,
 566    locations: HashMap<u64, dap::LocationsResponse>,
 567    is_session_terminated: bool,
 568    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 569    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 570    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 571    background_tasks: Vec<Task<()>>,
 572}
 573
 574trait CacheableCommand: Any + Send + Sync {
 575    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 576    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 577    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 578}
 579
 580impl<T> CacheableCommand for T
 581where
 582    T: DapCommand + PartialEq + Eq + Hash,
 583{
 584    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 585        (rhs as &dyn Any)
 586            .downcast_ref::<Self>()
 587            .map_or(false, |rhs| self == rhs)
 588    }
 589
 590    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 591        T::hash(self, &mut hasher);
 592    }
 593
 594    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 595        self
 596    }
 597}
 598
 599pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 600
 601impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 602    fn from(request: T) -> Self {
 603        Self(Arc::new(request))
 604    }
 605}
 606
 607impl PartialEq for RequestSlot {
 608    fn eq(&self, other: &Self) -> bool {
 609        self.0.dyn_eq(other.0.as_ref())
 610    }
 611}
 612
 613impl Eq for RequestSlot {}
 614
 615impl Hash for RequestSlot {
 616    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 617        self.0.dyn_hash(state);
 618        (&*self.0 as &dyn Any).type_id().hash(state)
 619    }
 620}
 621
 622#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 623pub struct CompletionsQuery {
 624    pub query: String,
 625    pub column: u64,
 626    pub line: Option<u64>,
 627    pub frame_id: Option<u64>,
 628}
 629
 630impl CompletionsQuery {
 631    pub fn new(
 632        buffer: &language::Buffer,
 633        cursor_position: language::Anchor,
 634        frame_id: Option<u64>,
 635    ) -> Self {
 636        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 637        Self {
 638            query: buffer.text(),
 639            column: column as u64,
 640            frame_id,
 641            line: Some(row as u64),
 642        }
 643    }
 644}
 645
 646#[derive(Debug)]
 647pub enum SessionEvent {
 648    Modules,
 649    LoadedSources,
 650    Stopped(Option<ThreadId>),
 651    StackTrace,
 652    Variables,
 653    Threads,
 654    InvalidateInlineValue,
 655    CapabilitiesLoaded,
 656}
 657
 658#[derive(Clone, Copy, Debug, PartialEq, Eq)]
 659pub enum SessionStateEvent {
 660    Running,
 661    Shutdown,
 662    Restart,
 663}
 664
 665impl EventEmitter<SessionEvent> for Session {}
 666impl EventEmitter<SessionStateEvent> for Session {}
 667
 668// local session will send breakpoint updates to DAP for all new breakpoints
 669// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 670// BreakpointStore notifies session on breakpoint changes
 671impl Session {
 672    pub(crate) fn new(
 673        breakpoint_store: Entity<BreakpointStore>,
 674        session_id: SessionId,
 675        parent_session: Option<Entity<Session>>,
 676        template: DebugTaskDefinition,
 677        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 678        cx: &mut App,
 679    ) -> Entity<Self> {
 680        cx.new::<Self>(|cx| {
 681            cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 682                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 683                    if let Some(local) = (!this.ignore_breakpoints)
 684                        .then(|| this.as_local_mut())
 685                        .flatten()
 686                    {
 687                        local
 688                            .send_breakpoints_from_path(path.clone(), *reason, cx)
 689                            .detach();
 690                    };
 691                }
 692                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 693                    if let Some(local) = (!this.ignore_breakpoints)
 694                        .then(|| this.as_local_mut())
 695                        .flatten()
 696                    {
 697                        local.unset_breakpoints_from_paths(paths, cx).detach();
 698                    }
 699                }
 700                BreakpointStoreEvent::ActiveDebugLineChanged => {}
 701            })
 702            .detach();
 703            cx.on_app_quit(Self::on_app_quit).detach();
 704
 705            let this = Self {
 706                mode: Mode::Building,
 707                id: session_id,
 708                child_session_ids: HashSet::default(),
 709                parent_session,
 710                capabilities: Capabilities::default(),
 711                ignore_breakpoints: false,
 712                variables: Default::default(),
 713                stack_frames: Default::default(),
 714                thread_states: ThreadStates::default(),
 715                output_token: OutputToken(0),
 716                output: circular_buffer::CircularBuffer::boxed(),
 717                requests: HashMap::default(),
 718                modules: Vec::default(),
 719                loaded_sources: Vec::default(),
 720                threads: IndexMap::default(),
 721                background_tasks: Vec::default(),
 722                locations: Default::default(),
 723                is_session_terminated: false,
 724                exception_breakpoints: Default::default(),
 725                definition: template,
 726                start_debugging_requests_tx,
 727            };
 728
 729            this
 730        })
 731    }
 732
 733    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 734        match &self.mode {
 735            Mode::Building => None,
 736            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 737        }
 738    }
 739
 740    pub fn boot(
 741        &mut self,
 742        binary: DebugAdapterBinary,
 743        worktree: Entity<Worktree>,
 744        breakpoint_store: Entity<BreakpointStore>,
 745        dap_store: WeakEntity<DapStore>,
 746        cx: &mut Context<Self>,
 747    ) -> Task<Result<()>> {
 748        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 749        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 750        let session_id = self.session_id();
 751
 752        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 753            let mut initialized_tx = Some(initialized_tx);
 754            while let Some(message) = message_rx.next().await {
 755                if let Message::Event(event) = message {
 756                    if let Events::Initialized(_) = *event {
 757                        if let Some(tx) = initialized_tx.take() {
 758                            tx.send(()).ok();
 759                        }
 760                    } else {
 761                        let Ok(_) = this.update(cx, |session, cx| {
 762                            session.handle_dap_event(event, cx);
 763                        }) else {
 764                            break;
 765                        };
 766                    }
 767                } else {
 768                    let Ok(Ok(_)) = this.update(cx, |this, _| {
 769                        this.start_debugging_requests_tx
 770                            .unbounded_send((session_id, message))
 771                    }) else {
 772                        break;
 773                    };
 774                }
 775            }
 776        })];
 777        self.background_tasks = background_tasks;
 778        let id = self.id;
 779        let parent_session = self.parent_session.clone();
 780
 781        cx.spawn(async move |this, cx| {
 782            let mode = LocalMode::new(
 783                id,
 784                parent_session,
 785                worktree.downgrade(),
 786                breakpoint_store.clone(),
 787                binary,
 788                message_tx,
 789                cx.clone(),
 790            )
 791            .await?;
 792            this.update(cx, |this, cx| {
 793                this.mode = Mode::Running(mode);
 794                cx.emit(SessionStateEvent::Running);
 795            })?;
 796
 797            this.update(cx, |session, cx| session.request_initialize(cx))?
 798                .await?;
 799
 800            this.update(cx, |session, cx| {
 801                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 802            })?
 803            .await
 804        })
 805    }
 806
 807    pub fn session_id(&self) -> SessionId {
 808        self.id
 809    }
 810
 811    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 812        self.child_session_ids.clone()
 813    }
 814
 815    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 816        self.child_session_ids.insert(session_id);
 817    }
 818
 819    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 820        self.child_session_ids.remove(&session_id);
 821    }
 822
 823    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 824        self.parent_session
 825            .as_ref()
 826            .map(|session| session.read(cx).id)
 827    }
 828
 829    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 830        self.parent_session.as_ref()
 831    }
 832
 833    pub fn capabilities(&self) -> &Capabilities {
 834        &self.capabilities
 835    }
 836
 837    pub(crate) fn root_binary(&self) -> Arc<DebugAdapterBinary> {
 838        match &self.mode {
 839            Mode::Building => {
 840                // todo(debugger): Implement root_binary for building mode
 841                unimplemented!()
 842            }
 843            Mode::Running(running) => running
 844                .root_binary
 845                .clone()
 846                .unwrap_or_else(|| Arc::new(running.binary.clone())),
 847        }
 848    }
 849
 850    pub fn binary(&self) -> &DebugAdapterBinary {
 851        let Mode::Running(local_mode) = &self.mode else {
 852            panic!("Session is not local");
 853        };
 854        &local_mode.binary
 855    }
 856
 857    pub fn adapter_name(&self) -> SharedString {
 858        self.definition.adapter.clone().into()
 859    }
 860
 861    pub fn label(&self) -> String {
 862        self.definition.label.clone()
 863    }
 864
 865    pub fn definition(&self) -> DebugTaskDefinition {
 866        self.definition.clone()
 867    }
 868
 869    pub fn is_terminated(&self) -> bool {
 870        self.is_session_terminated
 871    }
 872
 873    pub fn is_local(&self) -> bool {
 874        matches!(self.mode, Mode::Running(_))
 875    }
 876
 877    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 878        match &mut self.mode {
 879            Mode::Running(local_mode) => Some(local_mode),
 880            Mode::Building => None,
 881        }
 882    }
 883
 884    pub fn as_local(&self) -> Option<&LocalMode> {
 885        match &self.mode {
 886            Mode::Running(local_mode) => Some(local_mode),
 887            Mode::Building => None,
 888        }
 889    }
 890
 891    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 892        let adapter_id = self.definition.adapter.clone();
 893        let request = Initialize { adapter_id };
 894        match &self.mode {
 895            Mode::Running(local_mode) => {
 896                let capabilities = local_mode.request(request, cx.background_executor().clone());
 897
 898                cx.spawn(async move |this, cx| {
 899                    let capabilities = capabilities.await?;
 900                    this.update(cx, |session, cx| {
 901                        session.capabilities = capabilities;
 902                        let filters = session
 903                            .capabilities
 904                            .exception_breakpoint_filters
 905                            .clone()
 906                            .unwrap_or_default();
 907                        for filter in filters {
 908                            let default = filter.default.unwrap_or_default();
 909                            session
 910                                .exception_breakpoints
 911                                .entry(filter.filter.clone())
 912                                .or_insert_with(|| (filter, default));
 913                        }
 914                        cx.emit(SessionEvent::CapabilitiesLoaded);
 915                    })?;
 916                    Ok(())
 917                })
 918            }
 919            Mode::Building => Task::ready(Err(anyhow!(
 920                "Cannot send initialize request, task still building"
 921            ))),
 922        }
 923    }
 924
 925    pub(super) fn initialize_sequence(
 926        &mut self,
 927        initialize_rx: oneshot::Receiver<()>,
 928        dap_store: WeakEntity<DapStore>,
 929        cx: &mut Context<Self>,
 930    ) -> Task<Result<()>> {
 931        match &self.mode {
 932            Mode::Running(local_mode) => local_mode.initialize_sequence(
 933                &self.capabilities,
 934                &self.definition,
 935                initialize_rx,
 936                dap_store,
 937                cx,
 938            ),
 939            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
 940        }
 941    }
 942
 943    pub fn run_to_position(
 944        &mut self,
 945        breakpoint: SourceBreakpoint,
 946        active_thread_id: ThreadId,
 947        cx: &mut Context<Self>,
 948    ) {
 949        match &mut self.mode {
 950            Mode::Running(local_mode) => {
 951                if !matches!(
 952                    self.thread_states.thread_state(active_thread_id),
 953                    Some(ThreadStatus::Stopped)
 954                ) {
 955                    return;
 956                };
 957                let path = breakpoint.path.clone();
 958                local_mode.tmp_breakpoint = Some(breakpoint);
 959                let task = local_mode.send_breakpoints_from_path(
 960                    path,
 961                    BreakpointUpdatedReason::Toggled,
 962                    cx,
 963                );
 964
 965                cx.spawn(async move |this, cx| {
 966                    task.await;
 967                    this.update(cx, |this, cx| {
 968                        this.continue_thread(active_thread_id, cx);
 969                    })
 970                })
 971                .detach();
 972            }
 973            Mode::Building => {}
 974        }
 975    }
 976
 977    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
 978        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
 979    }
 980
 981    pub fn output(
 982        &self,
 983        since: OutputToken,
 984    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 985        if self.output_token.0 == 0 {
 986            return (self.output.range(0..0), OutputToken(0));
 987        };
 988
 989        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 990
 991        let clamped_events_since = events_since.clamp(0, self.output.len());
 992        (
 993            self.output
 994                .range(self.output.len() - clamped_events_since..),
 995            self.output_token,
 996        )
 997    }
 998
 999    pub fn respond_to_client(
1000        &self,
1001        request_seq: u64,
1002        success: bool,
1003        command: String,
1004        body: Option<serde_json::Value>,
1005        cx: &mut Context<Self>,
1006    ) -> Task<Result<()>> {
1007        let Some(local_session) = self.as_local() else {
1008            unreachable!("Cannot respond to remote client");
1009        };
1010        let client = local_session.client.clone();
1011
1012        cx.background_spawn(async move {
1013            client
1014                .send_message(Message::Response(Response {
1015                    body,
1016                    success,
1017                    command,
1018                    seq: request_seq + 1,
1019                    request_seq,
1020                    message: None,
1021                }))
1022                .await
1023        })
1024    }
1025
1026    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1027        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1028            let breakpoint = local.tmp_breakpoint.take()?;
1029            let path = breakpoint.path.clone();
1030            Some((local, path))
1031        }) {
1032            local
1033                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1034                .detach();
1035        };
1036
1037        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1038            self.thread_states.stop_all_threads();
1039
1040            self.invalidate_command_type::<StackTraceCommand>();
1041        }
1042
1043        // Event if we stopped all threads we still need to insert the thread_id
1044        // to our own data
1045        if let Some(thread_id) = event.thread_id {
1046            self.thread_states.stop_thread(ThreadId(thread_id));
1047
1048            self.invalidate_state(
1049                &StackTraceCommand {
1050                    thread_id,
1051                    start_frame: None,
1052                    levels: None,
1053                }
1054                .into(),
1055            );
1056        }
1057
1058        self.invalidate_generic();
1059        self.threads.clear();
1060        self.variables.clear();
1061        cx.emit(SessionEvent::Stopped(
1062            event
1063                .thread_id
1064                .map(Into::into)
1065                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1066        ));
1067        cx.emit(SessionEvent::InvalidateInlineValue);
1068        cx.notify();
1069    }
1070
1071    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1072        match *event {
1073            Events::Initialized(_) => {
1074                debug_assert!(
1075                    false,
1076                    "Initialized event should have been handled in LocalMode"
1077                );
1078            }
1079            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1080            Events::Continued(event) => {
1081                if event.all_threads_continued.unwrap_or_default() {
1082                    self.thread_states.continue_all_threads();
1083                } else {
1084                    self.thread_states
1085                        .continue_thread(ThreadId(event.thread_id));
1086                }
1087                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1088                self.invalidate_generic();
1089            }
1090            Events::Exited(_event) => {
1091                self.clear_active_debug_line(cx);
1092            }
1093            Events::Terminated(_) => {
1094                self.is_session_terminated = true;
1095                self.clear_active_debug_line(cx);
1096            }
1097            Events::Thread(event) => {
1098                let thread_id = ThreadId(event.thread_id);
1099
1100                match event.reason {
1101                    dap::ThreadEventReason::Started => {
1102                        self.thread_states.continue_thread(thread_id);
1103                    }
1104                    dap::ThreadEventReason::Exited => {
1105                        self.thread_states.exit_thread(thread_id);
1106                    }
1107                    reason => {
1108                        log::error!("Unhandled thread event reason {:?}", reason);
1109                    }
1110                }
1111                self.invalidate_state(&ThreadsCommand.into());
1112                cx.notify();
1113            }
1114            Events::Output(event) => {
1115                if event
1116                    .category
1117                    .as_ref()
1118                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1119                {
1120                    return;
1121                }
1122
1123                self.output.push_back(event);
1124                self.output_token.0 += 1;
1125                cx.notify();
1126            }
1127            Events::Breakpoint(_) => {}
1128            Events::Module(event) => {
1129                match event.reason {
1130                    dap::ModuleEventReason::New => {
1131                        self.modules.push(event.module);
1132                    }
1133                    dap::ModuleEventReason::Changed => {
1134                        if let Some(module) = self
1135                            .modules
1136                            .iter_mut()
1137                            .find(|other| event.module.id == other.id)
1138                        {
1139                            *module = event.module;
1140                        }
1141                    }
1142                    dap::ModuleEventReason::Removed => {
1143                        self.modules.retain(|other| event.module.id != other.id);
1144                    }
1145                }
1146
1147                // todo(debugger): We should only send the invalidate command to downstream clients.
1148                // self.invalidate_state(&ModulesCommand.into());
1149            }
1150            Events::LoadedSource(_) => {
1151                self.invalidate_state(&LoadedSourcesCommand.into());
1152            }
1153            Events::Capabilities(event) => {
1154                self.capabilities = self.capabilities.merge(event.capabilities);
1155                cx.notify();
1156            }
1157            Events::Memory(_) => {}
1158            Events::Process(_) => {}
1159            Events::ProgressEnd(_) => {}
1160            Events::ProgressStart(_) => {}
1161            Events::ProgressUpdate(_) => {}
1162            Events::Invalidated(_) => {}
1163            Events::Other(_) => {}
1164        }
1165    }
1166
1167    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1168    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1169        &mut self,
1170        request: T,
1171        process_result: impl FnOnce(
1172            &mut Self,
1173            Result<T::Response>,
1174            &mut Context<Self>,
1175        ) -> Option<T::Response>
1176        + 'static,
1177        cx: &mut Context<Self>,
1178    ) {
1179        const {
1180            assert!(
1181                T::CACHEABLE,
1182                "Only requests marked as cacheable should invoke `fetch`"
1183            );
1184        }
1185
1186        if !self.thread_states.any_stopped_thread()
1187            && request.type_id() != TypeId::of::<ThreadsCommand>()
1188            || self.is_session_terminated
1189        {
1190            return;
1191        }
1192
1193        let request_map = self
1194            .requests
1195            .entry(std::any::TypeId::of::<T>())
1196            .or_default();
1197
1198        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1199            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1200
1201            let task = Self::request_inner::<Arc<T>>(
1202                &self.capabilities,
1203                &self.mode,
1204                command,
1205                process_result,
1206                cx,
1207            );
1208            let task = cx
1209                .background_executor()
1210                .spawn(async move {
1211                    let _ = task.await?;
1212                    Some(())
1213                })
1214                .shared();
1215
1216            vacant.insert(task);
1217            cx.notify();
1218        }
1219    }
1220
1221    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1222        capabilities: &Capabilities,
1223        mode: &Mode,
1224        request: T,
1225        process_result: impl FnOnce(
1226            &mut Self,
1227            Result<T::Response>,
1228            &mut Context<Self>,
1229        ) -> Option<T::Response>
1230        + 'static,
1231        cx: &mut Context<Self>,
1232    ) -> Task<Option<T::Response>> {
1233        if !T::is_supported(&capabilities) {
1234            log::warn!(
1235                "Attempted to send a DAP request that isn't supported: {:?}",
1236                request
1237            );
1238            let error = Err(anyhow::Error::msg(
1239                "Couldn't complete request because it's not supported",
1240            ));
1241            return cx.spawn(async move |this, cx| {
1242                this.update(cx, |this, cx| process_result(this, error, cx))
1243                    .log_err()
1244                    .flatten()
1245            });
1246        }
1247
1248        let request = mode.request_dap(request, cx);
1249        cx.spawn(async move |this, cx| {
1250            let result = request.await;
1251            this.update(cx, |this, cx| process_result(this, result, cx))
1252                .log_err()
1253                .flatten()
1254        })
1255    }
1256
1257    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1258        &self,
1259        request: T,
1260        process_result: impl FnOnce(
1261            &mut Self,
1262            Result<T::Response>,
1263            &mut Context<Self>,
1264        ) -> Option<T::Response>
1265        + 'static,
1266        cx: &mut Context<Self>,
1267    ) -> Task<Option<T::Response>> {
1268        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1269    }
1270
1271    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1272        self.requests.remove(&std::any::TypeId::of::<Command>());
1273    }
1274
1275    fn invalidate_generic(&mut self) {
1276        self.invalidate_command_type::<ModulesCommand>();
1277        self.invalidate_command_type::<LoadedSourcesCommand>();
1278        self.invalidate_command_type::<ThreadsCommand>();
1279    }
1280
1281    fn invalidate_state(&mut self, key: &RequestSlot) {
1282        self.requests
1283            .entry((&*key.0 as &dyn Any).type_id())
1284            .and_modify(|request_map| {
1285                request_map.remove(&key);
1286            });
1287    }
1288
1289    pub fn any_stopped_thread(&self) -> bool {
1290        self.thread_states.any_stopped_thread()
1291    }
1292
1293    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1294        self.thread_states.thread_status(thread_id)
1295    }
1296
1297    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1298        self.fetch(
1299            dap_command::ThreadsCommand,
1300            |this, result, cx| {
1301                let result = result.log_err()?;
1302
1303                this.threads = result
1304                    .iter()
1305                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1306                    .collect();
1307
1308                this.invalidate_command_type::<StackTraceCommand>();
1309                cx.emit(SessionEvent::Threads);
1310                cx.notify();
1311
1312                Some(result)
1313            },
1314            cx,
1315        );
1316
1317        self.threads
1318            .values()
1319            .map(|thread| {
1320                (
1321                    thread.dap.clone(),
1322                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1323                )
1324            })
1325            .collect()
1326    }
1327
1328    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1329        self.fetch(
1330            dap_command::ModulesCommand,
1331            |this, result, cx| {
1332                let result = result.log_err()?;
1333
1334                this.modules = result.iter().cloned().collect();
1335                cx.emit(SessionEvent::Modules);
1336                cx.notify();
1337
1338                Some(result)
1339            },
1340            cx,
1341        );
1342
1343        &self.modules
1344    }
1345
1346    pub fn ignore_breakpoints(&self) -> bool {
1347        self.ignore_breakpoints
1348    }
1349
1350    pub fn toggle_ignore_breakpoints(
1351        &mut self,
1352        cx: &mut App,
1353    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1354        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1355    }
1356
1357    pub(crate) fn set_ignore_breakpoints(
1358        &mut self,
1359        ignore: bool,
1360        cx: &mut App,
1361    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1362        if self.ignore_breakpoints == ignore {
1363            return Task::ready(HashMap::default());
1364        }
1365
1366        self.ignore_breakpoints = ignore;
1367
1368        if let Some(local) = self.as_local() {
1369            local.send_source_breakpoints(ignore, cx)
1370        } else {
1371            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1372            unimplemented!()
1373        }
1374    }
1375
1376    pub fn exception_breakpoints(
1377        &self,
1378    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1379        self.exception_breakpoints.values()
1380    }
1381
1382    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1383        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1384            *is_enabled = !*is_enabled;
1385            self.send_exception_breakpoints(cx);
1386        }
1387    }
1388
1389    fn send_exception_breakpoints(&mut self, cx: &App) {
1390        if let Some(local) = self.as_local() {
1391            let exception_filters = self
1392                .exception_breakpoints
1393                .values()
1394                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1395                .collect();
1396
1397            let supports_exception_filters = self
1398                .capabilities
1399                .supports_exception_filter_options
1400                .unwrap_or_default();
1401            local
1402                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1403                .detach_and_log_err(cx);
1404        } else {
1405            debug_assert!(false, "Not implemented");
1406        }
1407    }
1408
1409    pub fn breakpoints_enabled(&self) -> bool {
1410        self.ignore_breakpoints
1411    }
1412
1413    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1414        self.fetch(
1415            dap_command::LoadedSourcesCommand,
1416            |this, result, cx| {
1417                let result = result.log_err()?;
1418                this.loaded_sources = result.iter().cloned().collect();
1419                cx.emit(SessionEvent::LoadedSources);
1420                cx.notify();
1421                Some(result)
1422            },
1423            cx,
1424        );
1425
1426        &self.loaded_sources
1427    }
1428
1429    fn fallback_to_manual_restart(
1430        &mut self,
1431        res: Result<()>,
1432        cx: &mut Context<Self>,
1433    ) -> Option<()> {
1434        if res.log_err().is_none() {
1435            cx.emit(SessionStateEvent::Restart);
1436            return None;
1437        }
1438        Some(())
1439    }
1440
1441    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1442        res.log_err()?;
1443        Some(())
1444    }
1445
1446    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1447        thread_id: ThreadId,
1448    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1449    {
1450        move |this, response, cx| match response.log_err() {
1451            Some(response) => Some(response),
1452            None => {
1453                this.thread_states.stop_thread(thread_id);
1454                cx.notify();
1455                None
1456            }
1457        }
1458    }
1459
1460    fn clear_active_debug_line_response(
1461        &mut self,
1462        response: Result<()>,
1463        cx: &mut Context<Session>,
1464    ) -> Option<()> {
1465        response.log_err()?;
1466        self.clear_active_debug_line(cx);
1467        Some(())
1468    }
1469
1470    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1471        self.as_local()
1472            .expect("Message handler will only run in local mode")
1473            .breakpoint_store
1474            .update(cx, |store, cx| {
1475                store.remove_active_position(Some(self.id), cx)
1476            });
1477    }
1478
1479    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1480        self.request(
1481            PauseCommand {
1482                thread_id: thread_id.0,
1483            },
1484            Self::empty_response,
1485            cx,
1486        )
1487        .detach();
1488    }
1489
1490    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1491        self.request(
1492            RestartStackFrameCommand { stack_frame_id },
1493            Self::empty_response,
1494            cx,
1495        )
1496        .detach();
1497    }
1498
1499    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1500        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1501            self.request(
1502                RestartCommand {
1503                    raw: args.unwrap_or(Value::Null),
1504                },
1505                Self::fallback_to_manual_restart,
1506                cx,
1507            )
1508            .detach();
1509        } else {
1510            cx.emit(SessionStateEvent::Restart);
1511        }
1512    }
1513
1514    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1515        let debug_adapter = self.adapter_client();
1516
1517        cx.background_spawn(async move {
1518            if let Some(client) = debug_adapter {
1519                client.shutdown().await.log_err();
1520            }
1521        })
1522    }
1523
1524    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1525        self.is_session_terminated = true;
1526        self.thread_states.exit_all_threads();
1527        cx.notify();
1528
1529        let task = if self
1530            .capabilities
1531            .supports_terminate_request
1532            .unwrap_or_default()
1533        {
1534            self.request(
1535                TerminateCommand {
1536                    restart: Some(false),
1537                },
1538                Self::clear_active_debug_line_response,
1539                cx,
1540            )
1541        } else {
1542            self.request(
1543                DisconnectCommand {
1544                    restart: Some(false),
1545                    terminate_debuggee: Some(true),
1546                    suspend_debuggee: Some(false),
1547                },
1548                Self::clear_active_debug_line_response,
1549                cx,
1550            )
1551        };
1552
1553        cx.emit(SessionStateEvent::Shutdown);
1554
1555        let debug_client = self.adapter_client();
1556
1557        cx.background_spawn(async move {
1558            let _ = task.await;
1559
1560            if let Some(client) = debug_client {
1561                client.shutdown().await.log_err();
1562            }
1563        })
1564    }
1565
1566    pub fn completions(
1567        &mut self,
1568        query: CompletionsQuery,
1569        cx: &mut Context<Self>,
1570    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1571        let task = self.request(query, |_, result, _| result.log_err(), cx);
1572
1573        cx.background_executor().spawn(async move {
1574            anyhow::Ok(
1575                task.await
1576                    .map(|response| response.targets)
1577                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1578            )
1579        })
1580    }
1581
1582    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1583        self.thread_states.continue_thread(thread_id);
1584        self.request(
1585            ContinueCommand {
1586                args: ContinueArguments {
1587                    thread_id: thread_id.0,
1588                    single_thread: Some(true),
1589                },
1590            },
1591            Self::on_step_response::<ContinueCommand>(thread_id),
1592            cx,
1593        )
1594        .detach();
1595    }
1596
1597    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1598        match self.mode {
1599            Mode::Running(ref local) => Some(local.client.clone()),
1600            Mode::Building => None,
1601        }
1602    }
1603
1604    pub fn step_over(
1605        &mut self,
1606        thread_id: ThreadId,
1607        granularity: SteppingGranularity,
1608        cx: &mut Context<Self>,
1609    ) {
1610        let supports_single_thread_execution_requests =
1611            self.capabilities.supports_single_thread_execution_requests;
1612        let supports_stepping_granularity = self
1613            .capabilities
1614            .supports_stepping_granularity
1615            .unwrap_or_default();
1616
1617        let command = NextCommand {
1618            inner: StepCommand {
1619                thread_id: thread_id.0,
1620                granularity: supports_stepping_granularity.then(|| granularity),
1621                single_thread: supports_single_thread_execution_requests,
1622            },
1623        };
1624
1625        self.thread_states.process_step(thread_id);
1626        self.request(
1627            command,
1628            Self::on_step_response::<NextCommand>(thread_id),
1629            cx,
1630        )
1631        .detach();
1632    }
1633
1634    pub fn step_in(
1635        &mut self,
1636        thread_id: ThreadId,
1637        granularity: SteppingGranularity,
1638        cx: &mut Context<Self>,
1639    ) {
1640        let supports_single_thread_execution_requests =
1641            self.capabilities.supports_single_thread_execution_requests;
1642        let supports_stepping_granularity = self
1643            .capabilities
1644            .supports_stepping_granularity
1645            .unwrap_or_default();
1646
1647        let command = StepInCommand {
1648            inner: StepCommand {
1649                thread_id: thread_id.0,
1650                granularity: supports_stepping_granularity.then(|| granularity),
1651                single_thread: supports_single_thread_execution_requests,
1652            },
1653        };
1654
1655        self.thread_states.process_step(thread_id);
1656        self.request(
1657            command,
1658            Self::on_step_response::<StepInCommand>(thread_id),
1659            cx,
1660        )
1661        .detach();
1662    }
1663
1664    pub fn step_out(
1665        &mut self,
1666        thread_id: ThreadId,
1667        granularity: SteppingGranularity,
1668        cx: &mut Context<Self>,
1669    ) {
1670        let supports_single_thread_execution_requests =
1671            self.capabilities.supports_single_thread_execution_requests;
1672        let supports_stepping_granularity = self
1673            .capabilities
1674            .supports_stepping_granularity
1675            .unwrap_or_default();
1676
1677        let command = StepOutCommand {
1678            inner: StepCommand {
1679                thread_id: thread_id.0,
1680                granularity: supports_stepping_granularity.then(|| granularity),
1681                single_thread: supports_single_thread_execution_requests,
1682            },
1683        };
1684
1685        self.thread_states.process_step(thread_id);
1686        self.request(
1687            command,
1688            Self::on_step_response::<StepOutCommand>(thread_id),
1689            cx,
1690        )
1691        .detach();
1692    }
1693
1694    pub fn step_back(
1695        &mut self,
1696        thread_id: ThreadId,
1697        granularity: SteppingGranularity,
1698        cx: &mut Context<Self>,
1699    ) {
1700        let supports_single_thread_execution_requests =
1701            self.capabilities.supports_single_thread_execution_requests;
1702        let supports_stepping_granularity = self
1703            .capabilities
1704            .supports_stepping_granularity
1705            .unwrap_or_default();
1706
1707        let command = StepBackCommand {
1708            inner: StepCommand {
1709                thread_id: thread_id.0,
1710                granularity: supports_stepping_granularity.then(|| granularity),
1711                single_thread: supports_single_thread_execution_requests,
1712            },
1713        };
1714
1715        self.thread_states.process_step(thread_id);
1716
1717        self.request(
1718            command,
1719            Self::on_step_response::<StepBackCommand>(thread_id),
1720            cx,
1721        )
1722        .detach();
1723    }
1724
1725    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1726        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1727            && self.requests.contains_key(&ThreadsCommand.type_id())
1728            && self.threads.contains_key(&thread_id)
1729        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1730        // We could still be using an old thread id and have sent a new thread's request
1731        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1732        // But it very well could cause a minor bug in the future that is hard to track down
1733        {
1734            self.fetch(
1735                super::dap_command::StackTraceCommand {
1736                    thread_id: thread_id.0,
1737                    start_frame: None,
1738                    levels: None,
1739                },
1740                move |this, stack_frames, cx| {
1741                    let stack_frames = stack_frames.log_err()?;
1742
1743                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1744                        thread.stack_frame_ids =
1745                            stack_frames.iter().map(|frame| frame.id).collect();
1746                    });
1747                    debug_assert!(
1748                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1749                        "Sent request for thread_id that doesn't exist"
1750                    );
1751
1752                    this.stack_frames.extend(
1753                        stack_frames
1754                            .iter()
1755                            .cloned()
1756                            .map(|frame| (frame.id, StackFrame::from(frame))),
1757                    );
1758
1759                    this.invalidate_command_type::<ScopesCommand>();
1760                    this.invalidate_command_type::<VariablesCommand>();
1761
1762                    cx.emit(SessionEvent::StackTrace);
1763                    cx.notify();
1764                    Some(stack_frames)
1765                },
1766                cx,
1767            );
1768        }
1769
1770        self.threads
1771            .get(&thread_id)
1772            .map(|thread| {
1773                thread
1774                    .stack_frame_ids
1775                    .iter()
1776                    .filter_map(|id| self.stack_frames.get(id))
1777                    .cloned()
1778                    .collect()
1779            })
1780            .unwrap_or_default()
1781    }
1782
1783    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1784        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1785            && self
1786                .requests
1787                .contains_key(&TypeId::of::<StackTraceCommand>())
1788        {
1789            self.fetch(
1790                ScopesCommand { stack_frame_id },
1791                move |this, scopes, cx| {
1792                    let scopes = scopes.log_err()?;
1793
1794                    for scope in scopes .iter(){
1795                        this.variables(scope.variables_reference, cx);
1796                    }
1797
1798                    let entry = this
1799                        .stack_frames
1800                        .entry(stack_frame_id)
1801                        .and_modify(|stack_frame| {
1802                            stack_frame.scopes = scopes.clone();
1803                        });
1804
1805                    cx.emit(SessionEvent::Variables);
1806
1807                    debug_assert!(
1808                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1809                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1810                    );
1811
1812                    Some(scopes)
1813                },
1814                cx,
1815            );
1816        }
1817
1818        self.stack_frames
1819            .get(&stack_frame_id)
1820            .map(|frame| frame.scopes.as_slice())
1821            .unwrap_or_default()
1822    }
1823
1824    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
1825        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
1826            return Vec::new();
1827        };
1828
1829        stack_frame
1830            .scopes
1831            .iter()
1832            .filter_map(|scope| self.variables.get(&scope.variables_reference))
1833            .flatten()
1834            .cloned()
1835            .collect()
1836    }
1837
1838    pub fn variables(
1839        &mut self,
1840        variables_reference: VariableReference,
1841        cx: &mut Context<Self>,
1842    ) -> Vec<dap::Variable> {
1843        let command = VariablesCommand {
1844            variables_reference,
1845            filter: None,
1846            start: None,
1847            count: None,
1848            format: None,
1849        };
1850
1851        self.fetch(
1852            command,
1853            move |this, variables, cx| {
1854                let variables = variables.log_err()?;
1855                this.variables
1856                    .insert(variables_reference, variables.clone());
1857
1858                cx.emit(SessionEvent::Variables);
1859                Some(variables)
1860            },
1861            cx,
1862        );
1863
1864        self.variables
1865            .get(&variables_reference)
1866            .cloned()
1867            .unwrap_or_default()
1868    }
1869
1870    pub fn set_variable_value(
1871        &mut self,
1872        variables_reference: u64,
1873        name: String,
1874        value: String,
1875        cx: &mut Context<Self>,
1876    ) {
1877        if self.capabilities.supports_set_variable.unwrap_or_default() {
1878            self.request(
1879                SetVariableValueCommand {
1880                    name,
1881                    value,
1882                    variables_reference,
1883                },
1884                move |this, response, cx| {
1885                    let response = response.log_err()?;
1886                    this.invalidate_command_type::<VariablesCommand>();
1887                    cx.notify();
1888                    Some(response)
1889                },
1890                cx,
1891            )
1892            .detach()
1893        }
1894    }
1895
1896    pub fn evaluate(
1897        &mut self,
1898        expression: String,
1899        context: Option<EvaluateArgumentsContext>,
1900        frame_id: Option<u64>,
1901        source: Option<Source>,
1902        cx: &mut Context<Self>,
1903    ) -> Task<Option<EvaluateResponse>> {
1904        self.request(
1905            EvaluateCommand {
1906                expression,
1907                context,
1908                frame_id,
1909                source,
1910            },
1911            |this, response, cx| {
1912                let response = response.log_err()?;
1913                this.output_token.0 += 1;
1914                this.output.push_back(dap::OutputEvent {
1915                    category: None,
1916                    output: response.result.clone(),
1917                    group: None,
1918                    variables_reference: Some(response.variables_reference),
1919                    source: None,
1920                    line: None,
1921                    column: None,
1922                    data: None,
1923                    location_reference: None,
1924                });
1925
1926                this.invalidate_command_type::<ScopesCommand>();
1927                cx.notify();
1928                Some(response)
1929            },
1930            cx,
1931        )
1932    }
1933
1934    pub fn location(
1935        &mut self,
1936        reference: u64,
1937        cx: &mut Context<Self>,
1938    ) -> Option<dap::LocationsResponse> {
1939        self.fetch(
1940            LocationsCommand { reference },
1941            move |this, response, _| {
1942                let response = response.log_err()?;
1943                this.locations.insert(reference, response.clone());
1944                Some(response)
1945            },
1946            cx,
1947        );
1948        self.locations.get(&reference).cloned()
1949    }
1950
1951    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1952        let command = DisconnectCommand {
1953            restart: Some(false),
1954            terminate_debuggee: Some(true),
1955            suspend_debuggee: Some(false),
1956        };
1957
1958        self.request(command, Self::empty_response, cx).detach()
1959    }
1960
1961    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1962        if self
1963            .capabilities
1964            .supports_terminate_threads_request
1965            .unwrap_or_default()
1966        {
1967            self.request(
1968                TerminateThreadsCommand {
1969                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1970                },
1971                Self::clear_active_debug_line_response,
1972                cx,
1973            )
1974            .detach();
1975        } else {
1976            self.shutdown(cx).detach();
1977        }
1978    }
1979}