session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapterBinary, DebugTaskDefinition};
  16use dap::messages::Response;
  17use dap::requests::{Request, RunInTerminal, StartDebugging};
  18use dap::{
  19    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  20    SteppingGranularity, StoppedEvent, VariableReference,
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{
  25    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory,
  26    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  27};
  28use futures::channel::{mpsc, oneshot};
  29use futures::{FutureExt, future::Shared};
  30use gpui::{
  31    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  32    Task, WeakEntity,
  33};
  34
  35use serde_json::{Value, json};
  36use smol::stream::StreamExt;
  37use std::any::TypeId;
  38use std::collections::BTreeMap;
  39use std::u64;
  40use std::{
  41    any::Any,
  42    collections::hash_map::Entry,
  43    hash::{Hash, Hasher},
  44    path::Path,
  45    sync::Arc,
  46};
  47use text::{PointUtf16, ToPointUtf16};
  48use util::{ResultExt, merge_json_value_into};
  49use worktree::Worktree;
  50
  51#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  52#[repr(transparent)]
  53pub struct ThreadId(pub u64);
  54
  55impl ThreadId {
  56    pub const MIN: ThreadId = ThreadId(u64::MIN);
  57    pub const MAX: ThreadId = ThreadId(u64::MAX);
  58}
  59
  60impl From<u64> for ThreadId {
  61    fn from(id: u64) -> Self {
  62        Self(id)
  63    }
  64}
  65
  66#[derive(Clone, Debug)]
  67pub struct StackFrame {
  68    pub dap: dap::StackFrame,
  69    pub scopes: Vec<dap::Scope>,
  70}
  71
  72impl From<dap::StackFrame> for StackFrame {
  73    fn from(stack_frame: dap::StackFrame) -> Self {
  74        Self {
  75            scopes: vec![],
  76            dap: stack_frame,
  77        }
  78    }
  79}
  80
  81#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  82pub enum ThreadStatus {
  83    #[default]
  84    Running,
  85    Stopped,
  86    Stepping,
  87    Exited,
  88    Ended,
  89}
  90
  91impl ThreadStatus {
  92    pub fn label(&self) -> &'static str {
  93        match self {
  94            ThreadStatus::Running => "Running",
  95            ThreadStatus::Stopped => "Stopped",
  96            ThreadStatus::Stepping => "Stepping",
  97            ThreadStatus::Exited => "Exited",
  98            ThreadStatus::Ended => "Ended",
  99        }
 100    }
 101}
 102
 103#[derive(Debug)]
 104pub struct Thread {
 105    dap: dap::Thread,
 106    stack_frame_ids: IndexSet<StackFrameId>,
 107    _has_stopped: bool,
 108}
 109
 110impl From<dap::Thread> for Thread {
 111    fn from(dap: dap::Thread) -> Self {
 112        Self {
 113            dap,
 114            stack_frame_ids: Default::default(),
 115            _has_stopped: false,
 116        }
 117    }
 118}
 119
 120pub enum Mode {
 121    Building,
 122    Running(LocalMode),
 123}
 124
 125#[derive(Clone)]
 126pub struct LocalMode {
 127    client: Arc<DebugAdapterClient>,
 128    binary: DebugAdapterBinary,
 129    tmp_breakpoint: Option<SourceBreakpoint>,
 130    worktree: WeakEntity<Worktree>,
 131    executor: BackgroundExecutor,
 132}
 133
 134fn client_source(abs_path: &Path) -> dap::Source {
 135    dap::Source {
 136        name: abs_path
 137            .file_name()
 138            .map(|filename| filename.to_string_lossy().to_string()),
 139        path: Some(abs_path.to_string_lossy().to_string()),
 140        source_reference: None,
 141        presentation_hint: None,
 142        origin: None,
 143        sources: None,
 144        adapter_data: None,
 145        checksums: None,
 146    }
 147}
 148
 149impl LocalMode {
 150    async fn new(
 151        session_id: SessionId,
 152        parent_session: Option<Entity<Session>>,
 153        worktree: WeakEntity<Worktree>,
 154        binary: DebugAdapterBinary,
 155        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 156        cx: AsyncApp,
 157    ) -> Result<Self> {
 158        let message_handler = Box::new(move |message| {
 159            messages_tx.unbounded_send(message).ok();
 160        });
 161
 162        let client = Arc::new(
 163            if let Some(client) = parent_session
 164                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 165                .flatten()
 166            {
 167                client
 168                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 169                    .await?
 170            } else {
 171                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 172                    .await
 173                    .with_context(|| "Failed to start communication with debug adapter")?
 174            },
 175        );
 176
 177        Ok(Self {
 178            client,
 179            worktree,
 180            tmp_breakpoint: None,
 181            binary,
 182            executor: cx.background_executor().clone(),
 183        })
 184    }
 185
 186    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 187        &self.worktree
 188    }
 189
 190    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 191        let tasks: Vec<_> = paths
 192            .into_iter()
 193            .map(|path| {
 194                self.request(dap_command::SetBreakpoints {
 195                    source: client_source(path),
 196                    source_modified: None,
 197                    breakpoints: vec![],
 198                })
 199            })
 200            .collect();
 201
 202        cx.background_spawn(async move {
 203            futures::future::join_all(tasks)
 204                .await
 205                .iter()
 206                .for_each(|res| match res {
 207                    Ok(_) => {}
 208                    Err(err) => {
 209                        log::warn!("Set breakpoints request failed: {}", err);
 210                    }
 211                });
 212        })
 213    }
 214
 215    fn send_breakpoints_from_path(
 216        &self,
 217        abs_path: Arc<Path>,
 218        reason: BreakpointUpdatedReason,
 219        breakpoint_store: &Entity<BreakpointStore>,
 220        cx: &mut App,
 221    ) -> Task<()> {
 222        let breakpoints = breakpoint_store
 223            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 224            .into_iter()
 225            .filter(|bp| bp.state.is_enabled())
 226            .chain(self.tmp_breakpoint.clone())
 227            .map(Into::into)
 228            .collect();
 229
 230        let task = self.request(dap_command::SetBreakpoints {
 231            source: client_source(&abs_path),
 232            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 233            breakpoints,
 234        });
 235
 236        cx.background_spawn(async move {
 237            match task.await {
 238                Ok(_) => {}
 239                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 240            }
 241        })
 242    }
 243
 244    fn send_exception_breakpoints(
 245        &self,
 246        filters: Vec<ExceptionBreakpointsFilter>,
 247        supports_filter_options: bool,
 248    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 249        let arg = if supports_filter_options {
 250            SetExceptionBreakpoints::WithOptions {
 251                filters: filters
 252                    .into_iter()
 253                    .map(|filter| ExceptionFilterOptions {
 254                        filter_id: filter.filter,
 255                        condition: None,
 256                        mode: None,
 257                    })
 258                    .collect(),
 259            }
 260        } else {
 261            SetExceptionBreakpoints::Plain {
 262                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 263            }
 264        };
 265        self.request(arg)
 266    }
 267
 268    fn send_source_breakpoints(
 269        &self,
 270        ignore_breakpoints: bool,
 271        breakpoint_store: &Entity<BreakpointStore>,
 272        cx: &App,
 273    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 274        let mut breakpoint_tasks = Vec::new();
 275        let breakpoints = breakpoint_store.read_with(cx, |store, cx| store.all_breakpoints(cx));
 276
 277        for (path, breakpoints) in breakpoints {
 278            let breakpoints = if ignore_breakpoints {
 279                vec![]
 280            } else {
 281                breakpoints
 282                    .into_iter()
 283                    .filter(|bp| bp.state.is_enabled())
 284                    .map(Into::into)
 285                    .collect()
 286            };
 287
 288            breakpoint_tasks.push(
 289                self.request(dap_command::SetBreakpoints {
 290                    source: client_source(&path),
 291                    source_modified: Some(false),
 292                    breakpoints,
 293                })
 294                .map(|result| result.map_err(|e| (path, e))),
 295            );
 296        }
 297
 298        cx.background_spawn(async move {
 299            futures::future::join_all(breakpoint_tasks)
 300                .await
 301                .into_iter()
 302                .filter_map(Result::err)
 303                .collect::<HashMap<_, _>>()
 304        })
 305    }
 306
 307    fn initialize_sequence(
 308        &self,
 309        capabilities: &Capabilities,
 310        definition: &DebugTaskDefinition,
 311        initialized_rx: oneshot::Receiver<()>,
 312        dap_store: WeakEntity<DapStore>,
 313        breakpoint_store: Entity<BreakpointStore>,
 314        cx: &App,
 315    ) -> Task<Result<()>> {
 316        let mut raw = self.binary.request_args.clone();
 317
 318        merge_json_value_into(
 319            definition.initialize_args.clone().unwrap_or(json!({})),
 320            &mut raw.configuration,
 321        );
 322
 323        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 324        let launch = match raw.request {
 325            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 326                raw: raw.configuration,
 327            }),
 328            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 329                raw: raw.configuration,
 330            }),
 331        };
 332
 333        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 334        let exception_filters = capabilities
 335            .exception_breakpoint_filters
 336            .as_ref()
 337            .map(|exception_filters| {
 338                exception_filters
 339                    .iter()
 340                    .filter(|filter| filter.default == Some(true))
 341                    .cloned()
 342                    .collect::<Vec<_>>()
 343            })
 344            .unwrap_or_default();
 345        let supports_exception_filters = capabilities
 346            .supports_exception_filter_options
 347            .unwrap_or_default();
 348        let this = self.clone();
 349        let worktree = self.worktree().clone();
 350        let configuration_sequence = cx.spawn({
 351            async move |cx| {
 352                initialized_rx.await?;
 353                let errors_by_path = cx
 354                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 355                    .await;
 356
 357                dap_store.update(cx, |_, cx| {
 358                    let Some(worktree) = worktree.upgrade() else {
 359                        return;
 360                    };
 361
 362                    for (path, error) in &errors_by_path {
 363                        log::error!("failed to set breakpoints for {path:?}: {error}");
 364                    }
 365
 366                    if let Some(failed_path) = errors_by_path.keys().next() {
 367                        let failed_path = failed_path
 368                            .strip_prefix(worktree.read(cx).abs_path())
 369                            .unwrap_or(failed_path)
 370                            .display();
 371                        let message = format!(
 372                            "Failed to set breakpoints for {failed_path}{}",
 373                            match errors_by_path.len() {
 374                                0 => unreachable!(),
 375                                1 => "".into(),
 376                                2 => " and 1 other path".into(),
 377                                n => format!(" and {} other paths", n - 1),
 378                            }
 379                        );
 380                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 381                    }
 382                })?;
 383
 384                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 385                    .await
 386                    .ok();
 387                let ret = if configuration_done_supported {
 388                    this.request(ConfigurationDone {})
 389                } else {
 390                    Task::ready(Ok(()))
 391                }
 392                .await;
 393                ret
 394            }
 395        });
 396
 397        cx.background_spawn(async move {
 398            futures::future::try_join(launch, configuration_sequence).await?;
 399            Ok(())
 400        })
 401    }
 402
 403    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 404    where
 405        <R::DapRequest as dap::requests::Request>::Response: 'static,
 406        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 407    {
 408        let request = Arc::new(request);
 409
 410        let request_clone = request.clone();
 411        let connection = self.client.clone();
 412        self.executor.spawn(async move {
 413            let args = request_clone.to_dap();
 414            let response = connection.request::<R::DapRequest>(args).await?;
 415            request.response_from_dap(response)
 416        })
 417    }
 418}
 419
 420impl Mode {
 421    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 422    where
 423        <R::DapRequest as dap::requests::Request>::Response: 'static,
 424        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 425    {
 426        match self {
 427            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 428            Mode::Building => Task::ready(Err(anyhow!(
 429                "no adapter running to send request: {:?}",
 430                request
 431            ))),
 432        }
 433    }
 434}
 435
 436#[derive(Default)]
 437struct ThreadStates {
 438    global_state: Option<ThreadStatus>,
 439    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 440}
 441
 442impl ThreadStates {
 443    fn stop_all_threads(&mut self) {
 444        self.global_state = Some(ThreadStatus::Stopped);
 445        self.known_thread_states.clear();
 446    }
 447
 448    fn exit_all_threads(&mut self) {
 449        self.global_state = Some(ThreadStatus::Exited);
 450        self.known_thread_states.clear();
 451    }
 452
 453    fn continue_all_threads(&mut self) {
 454        self.global_state = Some(ThreadStatus::Running);
 455        self.known_thread_states.clear();
 456    }
 457
 458    fn stop_thread(&mut self, thread_id: ThreadId) {
 459        self.known_thread_states
 460            .insert(thread_id, ThreadStatus::Stopped);
 461    }
 462
 463    fn continue_thread(&mut self, thread_id: ThreadId) {
 464        self.known_thread_states
 465            .insert(thread_id, ThreadStatus::Running);
 466    }
 467
 468    fn process_step(&mut self, thread_id: ThreadId) {
 469        self.known_thread_states
 470            .insert(thread_id, ThreadStatus::Stepping);
 471    }
 472
 473    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 474        self.thread_state(thread_id)
 475            .unwrap_or(ThreadStatus::Running)
 476    }
 477
 478    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 479        self.known_thread_states
 480            .get(&thread_id)
 481            .copied()
 482            .or(self.global_state)
 483    }
 484
 485    fn exit_thread(&mut self, thread_id: ThreadId) {
 486        self.known_thread_states
 487            .insert(thread_id, ThreadStatus::Exited);
 488    }
 489
 490    fn any_stopped_thread(&self) -> bool {
 491        self.global_state
 492            .is_some_and(|state| state == ThreadStatus::Stopped)
 493            || self
 494                .known_thread_states
 495                .values()
 496                .any(|status| *status == ThreadStatus::Stopped)
 497    }
 498}
 499const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 500
 501type IsEnabled = bool;
 502
 503#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 504pub struct OutputToken(pub usize);
 505/// Represents a current state of a single debug adapter and provides ways to mutate it.
 506pub struct Session {
 507    pub mode: Mode,
 508    definition: DebugTaskDefinition,
 509    pub(super) capabilities: Capabilities,
 510    id: SessionId,
 511    child_session_ids: HashSet<SessionId>,
 512    parent_session: Option<Entity<Session>>,
 513    modules: Vec<dap::Module>,
 514    loaded_sources: Vec<dap::Source>,
 515    output_token: OutputToken,
 516    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 517    threads: IndexMap<ThreadId, Thread>,
 518    thread_states: ThreadStates,
 519    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 520    stack_frames: IndexMap<StackFrameId, StackFrame>,
 521    locations: HashMap<u64, dap::LocationsResponse>,
 522    is_session_terminated: bool,
 523    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 524    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 525    ignore_breakpoints: bool,
 526    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 527    background_tasks: Vec<Task<()>>,
 528}
 529
 530trait CacheableCommand: Any + Send + Sync {
 531    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 532    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 533    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 534}
 535
 536impl<T> CacheableCommand for T
 537where
 538    T: DapCommand + PartialEq + Eq + Hash,
 539{
 540    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 541        (rhs as &dyn Any)
 542            .downcast_ref::<Self>()
 543            .map_or(false, |rhs| self == rhs)
 544    }
 545
 546    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 547        T::hash(self, &mut hasher);
 548    }
 549
 550    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 551        self
 552    }
 553}
 554
 555pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 556
 557impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 558    fn from(request: T) -> Self {
 559        Self(Arc::new(request))
 560    }
 561}
 562
 563impl PartialEq for RequestSlot {
 564    fn eq(&self, other: &Self) -> bool {
 565        self.0.dyn_eq(other.0.as_ref())
 566    }
 567}
 568
 569impl Eq for RequestSlot {}
 570
 571impl Hash for RequestSlot {
 572    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 573        self.0.dyn_hash(state);
 574        (&*self.0 as &dyn Any).type_id().hash(state)
 575    }
 576}
 577
 578#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 579pub struct CompletionsQuery {
 580    pub query: String,
 581    pub column: u64,
 582    pub line: Option<u64>,
 583    pub frame_id: Option<u64>,
 584}
 585
 586impl CompletionsQuery {
 587    pub fn new(
 588        buffer: &language::Buffer,
 589        cursor_position: language::Anchor,
 590        frame_id: Option<u64>,
 591    ) -> Self {
 592        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 593        Self {
 594            query: buffer.text(),
 595            column: column as u64,
 596            frame_id,
 597            line: Some(row as u64),
 598        }
 599    }
 600}
 601
 602#[derive(Debug)]
 603pub enum SessionEvent {
 604    Modules,
 605    LoadedSources,
 606    Stopped(Option<ThreadId>),
 607    StackTrace,
 608    Variables,
 609    Threads,
 610    InvalidateInlineValue,
 611    CapabilitiesLoaded,
 612    RunInTerminal {
 613        request: RunInTerminalRequestArguments,
 614        sender: mpsc::Sender<Result<u32>>,
 615    },
 616}
 617
 618#[derive(Clone, Debug, PartialEq, Eq)]
 619pub enum SessionStateEvent {
 620    Running,
 621    Shutdown,
 622    Restart,
 623    SpawnChildSession {
 624        request: StartDebuggingRequestArguments,
 625    },
 626}
 627
 628impl EventEmitter<SessionEvent> for Session {}
 629impl EventEmitter<SessionStateEvent> for Session {}
 630
 631// local session will send breakpoint updates to DAP for all new breakpoints
 632// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 633// BreakpointStore notifies session on breakpoint changes
 634impl Session {
 635    pub(crate) fn new(
 636        breakpoint_store: Entity<BreakpointStore>,
 637        session_id: SessionId,
 638        parent_session: Option<Entity<Session>>,
 639        template: DebugTaskDefinition,
 640        cx: &mut App,
 641    ) -> Entity<Self> {
 642        cx.new::<Self>(|cx| {
 643            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 644                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 645                    if let Some(local) = (!this.ignore_breakpoints)
 646                        .then(|| this.as_local_mut())
 647                        .flatten()
 648                    {
 649                        local
 650                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 651                            .detach();
 652                    };
 653                }
 654                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 655                    if let Some(local) = (!this.ignore_breakpoints)
 656                        .then(|| this.as_local_mut())
 657                        .flatten()
 658                    {
 659                        local.unset_breakpoints_from_paths(paths, cx).detach();
 660                    }
 661                }
 662                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 663            })
 664            .detach();
 665            cx.on_app_quit(Self::on_app_quit).detach();
 666
 667            let this = Self {
 668                mode: Mode::Building,
 669                id: session_id,
 670                child_session_ids: HashSet::default(),
 671                parent_session,
 672                capabilities: Capabilities::default(),
 673                variables: Default::default(),
 674                stack_frames: Default::default(),
 675                thread_states: ThreadStates::default(),
 676                output_token: OutputToken(0),
 677                output: circular_buffer::CircularBuffer::boxed(),
 678                requests: HashMap::default(),
 679                modules: Vec::default(),
 680                loaded_sources: Vec::default(),
 681                threads: IndexMap::default(),
 682                background_tasks: Vec::default(),
 683                locations: Default::default(),
 684                is_session_terminated: false,
 685                ignore_breakpoints: false,
 686                breakpoint_store,
 687                exception_breakpoints: Default::default(),
 688                definition: template,
 689            };
 690
 691            this
 692        })
 693    }
 694
 695    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 696        match &self.mode {
 697            Mode::Building => None,
 698            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 699        }
 700    }
 701
 702    pub fn boot(
 703        &mut self,
 704        binary: DebugAdapterBinary,
 705        worktree: Entity<Worktree>,
 706        dap_store: WeakEntity<DapStore>,
 707        cx: &mut Context<Self>,
 708    ) -> Task<Result<()>> {
 709        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 710        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 711
 712        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 713            let mut initialized_tx = Some(initialized_tx);
 714            while let Some(message) = message_rx.next().await {
 715                if let Message::Event(event) = message {
 716                    if let Events::Initialized(_) = *event {
 717                        if let Some(tx) = initialized_tx.take() {
 718                            tx.send(()).ok();
 719                        }
 720                    } else {
 721                        let Ok(_) = this.update(cx, |session, cx| {
 722                            session.handle_dap_event(event, cx);
 723                        }) else {
 724                            break;
 725                        };
 726                    }
 727                } else if let Message::Request(request) = message {
 728                    let Ok(_) = this.update(cx, |this, cx| {
 729                        if request.command == StartDebugging::COMMAND {
 730                            this.handle_start_debugging_request(request, cx)
 731                                .detach_and_log_err(cx);
 732                        } else if request.command == RunInTerminal::COMMAND {
 733                            this.handle_run_in_terminal_request(request, cx)
 734                                .detach_and_log_err(cx);
 735                        }
 736                    }) else {
 737                        break;
 738                    };
 739                }
 740            }
 741        })];
 742        self.background_tasks = background_tasks;
 743        let id = self.id;
 744        let parent_session = self.parent_session.clone();
 745
 746        cx.spawn(async move |this, cx| {
 747            let mode = LocalMode::new(
 748                id,
 749                parent_session,
 750                worktree.downgrade(),
 751                binary,
 752                message_tx,
 753                cx.clone(),
 754            )
 755            .await?;
 756            this.update(cx, |this, cx| {
 757                this.mode = Mode::Running(mode);
 758                cx.emit(SessionStateEvent::Running);
 759            })?;
 760
 761            this.update(cx, |session, cx| session.request_initialize(cx))?
 762                .await?;
 763
 764            this.update(cx, |session, cx| {
 765                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 766            })?
 767            .await
 768        })
 769    }
 770
 771    pub fn session_id(&self) -> SessionId {
 772        self.id
 773    }
 774
 775    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 776        self.child_session_ids.clone()
 777    }
 778
 779    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 780        self.child_session_ids.insert(session_id);
 781    }
 782
 783    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 784        self.child_session_ids.remove(&session_id);
 785    }
 786
 787    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 788        self.parent_session
 789            .as_ref()
 790            .map(|session| session.read(cx).id)
 791    }
 792
 793    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 794        self.parent_session.as_ref()
 795    }
 796
 797    pub fn capabilities(&self) -> &Capabilities {
 798        &self.capabilities
 799    }
 800
 801    pub fn binary(&self) -> &DebugAdapterBinary {
 802        let Mode::Running(local_mode) = &self.mode else {
 803            panic!("Session is not local");
 804        };
 805        &local_mode.binary
 806    }
 807
 808    pub fn adapter_name(&self) -> SharedString {
 809        self.definition.adapter.clone()
 810    }
 811
 812    pub fn label(&self) -> SharedString {
 813        self.definition.label.clone()
 814    }
 815
 816    pub fn definition(&self) -> DebugTaskDefinition {
 817        self.definition.clone()
 818    }
 819
 820    pub fn is_terminated(&self) -> bool {
 821        self.is_session_terminated
 822    }
 823
 824    pub fn is_local(&self) -> bool {
 825        matches!(self.mode, Mode::Running(_))
 826    }
 827
 828    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 829        match &mut self.mode {
 830            Mode::Running(local_mode) => Some(local_mode),
 831            Mode::Building => None,
 832        }
 833    }
 834
 835    pub fn as_local(&self) -> Option<&LocalMode> {
 836        match &self.mode {
 837            Mode::Running(local_mode) => Some(local_mode),
 838            Mode::Building => None,
 839        }
 840    }
 841
 842    fn handle_start_debugging_request(
 843        &mut self,
 844        request: dap::messages::Request,
 845        cx: &mut Context<Self>,
 846    ) -> Task<Result<()>> {
 847        let request_seq = request.seq;
 848
 849        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 850            .arguments
 851            .as_ref()
 852            .map(|value| serde_json::from_value(value.clone()));
 853
 854        let mut success = true;
 855        if let Some(Ok(request)) = launch_request {
 856            cx.emit(SessionStateEvent::SpawnChildSession { request });
 857        } else {
 858            log::error!(
 859                "Failed to parse launch request arguments: {:?}",
 860                request.arguments
 861            );
 862            success = false;
 863        }
 864
 865        cx.spawn(async move |this, cx| {
 866            this.update(cx, |this, cx| {
 867                this.respond_to_client(
 868                    request_seq,
 869                    success,
 870                    StartDebugging::COMMAND.to_string(),
 871                    None,
 872                    cx,
 873                )
 874            })?
 875            .await
 876        })
 877    }
 878
 879    fn handle_run_in_terminal_request(
 880        &mut self,
 881        request: dap::messages::Request,
 882        cx: &mut Context<Self>,
 883    ) -> Task<Result<()>> {
 884        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
 885            request.arguments.unwrap_or_default(),
 886        )
 887        .expect("To parse StartDebuggingRequestArguments");
 888
 889        let seq = request.seq;
 890
 891        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
 892        cx.emit(SessionEvent::RunInTerminal {
 893            request: request_args,
 894            sender: tx,
 895        });
 896        cx.notify();
 897
 898        cx.spawn(async move |session, cx| {
 899            let result = util::maybe!(async move {
 900                rx.next().await.ok_or_else(|| {
 901                    anyhow!("failed to receive response from spawn terminal".to_string())
 902                })?
 903            })
 904            .await;
 905            let (success, body) = match result {
 906                Ok(pid) => (
 907                    true,
 908                    serde_json::to_value(dap::RunInTerminalResponse {
 909                        process_id: None,
 910                        shell_process_id: Some(pid as u64),
 911                    })
 912                    .ok(),
 913                ),
 914                Err(error) => (
 915                    false,
 916                    serde_json::to_value(dap::ErrorResponse {
 917                        error: Some(dap::Message {
 918                            id: seq,
 919                            format: error.to_string(),
 920                            variables: None,
 921                            send_telemetry: None,
 922                            show_user: None,
 923                            url: None,
 924                            url_label: None,
 925                        }),
 926                    })
 927                    .ok(),
 928                ),
 929            };
 930
 931            session
 932                .update(cx, |session, cx| {
 933                    session.respond_to_client(
 934                        seq,
 935                        success,
 936                        RunInTerminal::COMMAND.to_string(),
 937                        body,
 938                        cx,
 939                    )
 940                })?
 941                .await
 942        })
 943    }
 944
 945    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 946        let adapter_id = String::from(self.definition.adapter.clone());
 947        let request = Initialize { adapter_id };
 948        match &self.mode {
 949            Mode::Running(local_mode) => {
 950                let capabilities = local_mode.request(request);
 951
 952                cx.spawn(async move |this, cx| {
 953                    let capabilities = capabilities.await?;
 954                    this.update(cx, |session, cx| {
 955                        session.capabilities = capabilities;
 956                        let filters = session
 957                            .capabilities
 958                            .exception_breakpoint_filters
 959                            .clone()
 960                            .unwrap_or_default();
 961                        for filter in filters {
 962                            let default = filter.default.unwrap_or_default();
 963                            session
 964                                .exception_breakpoints
 965                                .entry(filter.filter.clone())
 966                                .or_insert_with(|| (filter, default));
 967                        }
 968                        cx.emit(SessionEvent::CapabilitiesLoaded);
 969                    })?;
 970                    Ok(())
 971                })
 972            }
 973            Mode::Building => Task::ready(Err(anyhow!(
 974                "Cannot send initialize request, task still building"
 975            ))),
 976        }
 977    }
 978
 979    pub(super) fn initialize_sequence(
 980        &mut self,
 981        initialize_rx: oneshot::Receiver<()>,
 982        dap_store: WeakEntity<DapStore>,
 983        cx: &mut Context<Self>,
 984    ) -> Task<Result<()>> {
 985        match &self.mode {
 986            Mode::Running(local_mode) => local_mode.initialize_sequence(
 987                &self.capabilities,
 988                &self.definition,
 989                initialize_rx,
 990                dap_store,
 991                self.breakpoint_store.clone(),
 992                cx,
 993            ),
 994            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
 995        }
 996    }
 997
 998    pub fn run_to_position(
 999        &mut self,
1000        breakpoint: SourceBreakpoint,
1001        active_thread_id: ThreadId,
1002        cx: &mut Context<Self>,
1003    ) {
1004        match &mut self.mode {
1005            Mode::Running(local_mode) => {
1006                if !matches!(
1007                    self.thread_states.thread_state(active_thread_id),
1008                    Some(ThreadStatus::Stopped)
1009                ) {
1010                    return;
1011                };
1012                let path = breakpoint.path.clone();
1013                local_mode.tmp_breakpoint = Some(breakpoint);
1014                let task = local_mode.send_breakpoints_from_path(
1015                    path,
1016                    BreakpointUpdatedReason::Toggled,
1017                    &self.breakpoint_store,
1018                    cx,
1019                );
1020
1021                cx.spawn(async move |this, cx| {
1022                    task.await;
1023                    this.update(cx, |this, cx| {
1024                        this.continue_thread(active_thread_id, cx);
1025                    })
1026                })
1027                .detach();
1028            }
1029            Mode::Building => {}
1030        }
1031    }
1032
1033    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1034        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1035    }
1036
1037    pub fn output(
1038        &self,
1039        since: OutputToken,
1040    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1041        if self.output_token.0 == 0 {
1042            return (self.output.range(0..0), OutputToken(0));
1043        };
1044
1045        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1046
1047        let clamped_events_since = events_since.clamp(0, self.output.len());
1048        (
1049            self.output
1050                .range(self.output.len() - clamped_events_since..),
1051            self.output_token,
1052        )
1053    }
1054
1055    pub fn respond_to_client(
1056        &self,
1057        request_seq: u64,
1058        success: bool,
1059        command: String,
1060        body: Option<serde_json::Value>,
1061        cx: &mut Context<Self>,
1062    ) -> Task<Result<()>> {
1063        let Some(local_session) = self.as_local() else {
1064            unreachable!("Cannot respond to remote client");
1065        };
1066        let client = local_session.client.clone();
1067
1068        cx.background_spawn(async move {
1069            client
1070                .send_message(Message::Response(Response {
1071                    body,
1072                    success,
1073                    command,
1074                    seq: request_seq + 1,
1075                    request_seq,
1076                    message: None,
1077                }))
1078                .await
1079        })
1080    }
1081
1082    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1083        // todo(debugger): Find a clean way to get around the clone
1084        let breakpoint_store = self.breakpoint_store.clone();
1085        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1086            let breakpoint = local.tmp_breakpoint.take()?;
1087            let path = breakpoint.path.clone();
1088            Some((local, path))
1089        }) {
1090            local
1091                .send_breakpoints_from_path(
1092                    path,
1093                    BreakpointUpdatedReason::Toggled,
1094                    &breakpoint_store,
1095                    cx,
1096                )
1097                .detach();
1098        };
1099
1100        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1101            self.thread_states.stop_all_threads();
1102
1103            self.invalidate_command_type::<StackTraceCommand>();
1104        }
1105
1106        // Event if we stopped all threads we still need to insert the thread_id
1107        // to our own data
1108        if let Some(thread_id) = event.thread_id {
1109            self.thread_states.stop_thread(ThreadId(thread_id));
1110
1111            self.invalidate_state(
1112                &StackTraceCommand {
1113                    thread_id,
1114                    start_frame: None,
1115                    levels: None,
1116                }
1117                .into(),
1118            );
1119        }
1120
1121        self.invalidate_generic();
1122        self.threads.clear();
1123        self.variables.clear();
1124        cx.emit(SessionEvent::Stopped(
1125            event
1126                .thread_id
1127                .map(Into::into)
1128                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1129        ));
1130        cx.emit(SessionEvent::InvalidateInlineValue);
1131        cx.notify();
1132    }
1133
1134    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1135        match *event {
1136            Events::Initialized(_) => {
1137                debug_assert!(
1138                    false,
1139                    "Initialized event should have been handled in LocalMode"
1140                );
1141            }
1142            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1143            Events::Continued(event) => {
1144                if event.all_threads_continued.unwrap_or_default() {
1145                    self.thread_states.continue_all_threads();
1146                    self.breakpoint_store.update(cx, |store, cx| {
1147                        store.remove_active_position(Some(self.session_id()), cx)
1148                    });
1149                } else {
1150                    self.thread_states
1151                        .continue_thread(ThreadId(event.thread_id));
1152                }
1153                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1154                self.invalidate_generic();
1155            }
1156            Events::Exited(_event) => {
1157                self.clear_active_debug_line(cx);
1158            }
1159            Events::Terminated(_) => {
1160                self.is_session_terminated = true;
1161                self.clear_active_debug_line(cx);
1162            }
1163            Events::Thread(event) => {
1164                let thread_id = ThreadId(event.thread_id);
1165
1166                match event.reason {
1167                    dap::ThreadEventReason::Started => {
1168                        self.thread_states.continue_thread(thread_id);
1169                    }
1170                    dap::ThreadEventReason::Exited => {
1171                        self.thread_states.exit_thread(thread_id);
1172                    }
1173                    reason => {
1174                        log::error!("Unhandled thread event reason {:?}", reason);
1175                    }
1176                }
1177                self.invalidate_state(&ThreadsCommand.into());
1178                cx.notify();
1179            }
1180            Events::Output(event) => {
1181                if event
1182                    .category
1183                    .as_ref()
1184                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1185                {
1186                    return;
1187                }
1188
1189                self.output.push_back(event);
1190                self.output_token.0 += 1;
1191                cx.notify();
1192            }
1193            Events::Breakpoint(_) => {}
1194            Events::Module(event) => {
1195                match event.reason {
1196                    dap::ModuleEventReason::New => {
1197                        self.modules.push(event.module);
1198                    }
1199                    dap::ModuleEventReason::Changed => {
1200                        if let Some(module) = self
1201                            .modules
1202                            .iter_mut()
1203                            .find(|other| event.module.id == other.id)
1204                        {
1205                            *module = event.module;
1206                        }
1207                    }
1208                    dap::ModuleEventReason::Removed => {
1209                        self.modules.retain(|other| event.module.id != other.id);
1210                    }
1211                }
1212
1213                // todo(debugger): We should only send the invalidate command to downstream clients.
1214                // self.invalidate_state(&ModulesCommand.into());
1215            }
1216            Events::LoadedSource(_) => {
1217                self.invalidate_state(&LoadedSourcesCommand.into());
1218            }
1219            Events::Capabilities(event) => {
1220                self.capabilities = self.capabilities.merge(event.capabilities);
1221                cx.notify();
1222            }
1223            Events::Memory(_) => {}
1224            Events::Process(_) => {}
1225            Events::ProgressEnd(_) => {}
1226            Events::ProgressStart(_) => {}
1227            Events::ProgressUpdate(_) => {}
1228            Events::Invalidated(_) => {}
1229            Events::Other(_) => {}
1230        }
1231    }
1232
1233    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1234    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1235        &mut self,
1236        request: T,
1237        process_result: impl FnOnce(
1238            &mut Self,
1239            Result<T::Response>,
1240            &mut Context<Self>,
1241        ) -> Option<T::Response>
1242        + 'static,
1243        cx: &mut Context<Self>,
1244    ) {
1245        const {
1246            assert!(
1247                T::CACHEABLE,
1248                "Only requests marked as cacheable should invoke `fetch`"
1249            );
1250        }
1251
1252        if !self.thread_states.any_stopped_thread()
1253            && request.type_id() != TypeId::of::<ThreadsCommand>()
1254            || self.is_session_terminated
1255        {
1256            return;
1257        }
1258
1259        let request_map = self
1260            .requests
1261            .entry(std::any::TypeId::of::<T>())
1262            .or_default();
1263
1264        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1265            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1266
1267            let task = Self::request_inner::<Arc<T>>(
1268                &self.capabilities,
1269                &self.mode,
1270                command,
1271                process_result,
1272                cx,
1273            );
1274            let task = cx
1275                .background_executor()
1276                .spawn(async move {
1277                    let _ = task.await?;
1278                    Some(())
1279                })
1280                .shared();
1281
1282            vacant.insert(task);
1283            cx.notify();
1284        }
1285    }
1286
1287    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1288        &self,
1289        request: T,
1290    ) -> Result<T::Response> {
1291        if !T::is_supported(&self.capabilities) {
1292            anyhow::bail!("DAP request {:?} is not supported", request);
1293        }
1294
1295        self.mode.request_dap(request).await
1296    }
1297
1298    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1299        capabilities: &Capabilities,
1300        mode: &Mode,
1301        request: T,
1302        process_result: impl FnOnce(
1303            &mut Self,
1304            Result<T::Response>,
1305            &mut Context<Self>,
1306        ) -> Option<T::Response>
1307        + 'static,
1308        cx: &mut Context<Self>,
1309    ) -> Task<Option<T::Response>> {
1310        if !T::is_supported(&capabilities) {
1311            log::warn!(
1312                "Attempted to send a DAP request that isn't supported: {:?}",
1313                request
1314            );
1315            let error = Err(anyhow::Error::msg(
1316                "Couldn't complete request because it's not supported",
1317            ));
1318            return cx.spawn(async move |this, cx| {
1319                this.update(cx, |this, cx| process_result(this, error, cx))
1320                    .log_err()
1321                    .flatten()
1322            });
1323        }
1324
1325        let request = mode.request_dap(request);
1326        cx.spawn(async move |this, cx| {
1327            let result = request.await;
1328            this.update(cx, |this, cx| process_result(this, result, cx))
1329                .log_err()
1330                .flatten()
1331        })
1332    }
1333
1334    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1335        &self,
1336        request: T,
1337        process_result: impl FnOnce(
1338            &mut Self,
1339            Result<T::Response>,
1340            &mut Context<Self>,
1341        ) -> Option<T::Response>
1342        + 'static,
1343        cx: &mut Context<Self>,
1344    ) -> Task<Option<T::Response>> {
1345        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1346    }
1347
1348    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1349        self.requests.remove(&std::any::TypeId::of::<Command>());
1350    }
1351
1352    fn invalidate_generic(&mut self) {
1353        self.invalidate_command_type::<ModulesCommand>();
1354        self.invalidate_command_type::<LoadedSourcesCommand>();
1355        self.invalidate_command_type::<ThreadsCommand>();
1356    }
1357
1358    fn invalidate_state(&mut self, key: &RequestSlot) {
1359        self.requests
1360            .entry((&*key.0 as &dyn Any).type_id())
1361            .and_modify(|request_map| {
1362                request_map.remove(&key);
1363            });
1364    }
1365
1366    pub fn any_stopped_thread(&self) -> bool {
1367        self.thread_states.any_stopped_thread()
1368    }
1369
1370    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1371        self.thread_states.thread_status(thread_id)
1372    }
1373
1374    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1375        self.fetch(
1376            dap_command::ThreadsCommand,
1377            |this, result, cx| {
1378                let result = result.log_err()?;
1379
1380                this.threads = result
1381                    .iter()
1382                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1383                    .collect();
1384
1385                this.invalidate_command_type::<StackTraceCommand>();
1386                cx.emit(SessionEvent::Threads);
1387                cx.notify();
1388
1389                Some(result)
1390            },
1391            cx,
1392        );
1393
1394        self.threads
1395            .values()
1396            .map(|thread| {
1397                (
1398                    thread.dap.clone(),
1399                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1400                )
1401            })
1402            .collect()
1403    }
1404
1405    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1406        self.fetch(
1407            dap_command::ModulesCommand,
1408            |this, result, cx| {
1409                let result = result.log_err()?;
1410
1411                this.modules = result.iter().cloned().collect();
1412                cx.emit(SessionEvent::Modules);
1413                cx.notify();
1414
1415                Some(result)
1416            },
1417            cx,
1418        );
1419
1420        &self.modules
1421    }
1422
1423    pub fn ignore_breakpoints(&self) -> bool {
1424        self.ignore_breakpoints
1425    }
1426
1427    pub fn toggle_ignore_breakpoints(
1428        &mut self,
1429        cx: &mut App,
1430    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1431        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1432    }
1433
1434    pub(crate) fn set_ignore_breakpoints(
1435        &mut self,
1436        ignore: bool,
1437        cx: &mut App,
1438    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1439        if self.ignore_breakpoints == ignore {
1440            return Task::ready(HashMap::default());
1441        }
1442
1443        self.ignore_breakpoints = ignore;
1444
1445        if let Some(local) = self.as_local() {
1446            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1447        } else {
1448            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1449            unimplemented!()
1450        }
1451    }
1452
1453    pub fn exception_breakpoints(
1454        &self,
1455    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1456        self.exception_breakpoints.values()
1457    }
1458
1459    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1460        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1461            *is_enabled = !*is_enabled;
1462            self.send_exception_breakpoints(cx);
1463        }
1464    }
1465
1466    fn send_exception_breakpoints(&mut self, cx: &App) {
1467        if let Some(local) = self.as_local() {
1468            let exception_filters = self
1469                .exception_breakpoints
1470                .values()
1471                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1472                .collect();
1473
1474            let supports_exception_filters = self
1475                .capabilities
1476                .supports_exception_filter_options
1477                .unwrap_or_default();
1478            local
1479                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1480                .detach_and_log_err(cx);
1481        } else {
1482            debug_assert!(false, "Not implemented");
1483        }
1484    }
1485
1486    pub fn breakpoints_enabled(&self) -> bool {
1487        self.ignore_breakpoints
1488    }
1489
1490    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1491        self.fetch(
1492            dap_command::LoadedSourcesCommand,
1493            |this, result, cx| {
1494                let result = result.log_err()?;
1495                this.loaded_sources = result.iter().cloned().collect();
1496                cx.emit(SessionEvent::LoadedSources);
1497                cx.notify();
1498                Some(result)
1499            },
1500            cx,
1501        );
1502
1503        &self.loaded_sources
1504    }
1505
1506    fn fallback_to_manual_restart(
1507        &mut self,
1508        res: Result<()>,
1509        cx: &mut Context<Self>,
1510    ) -> Option<()> {
1511        if res.log_err().is_none() {
1512            cx.emit(SessionStateEvent::Restart);
1513            return None;
1514        }
1515        Some(())
1516    }
1517
1518    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1519        res.log_err()?;
1520        Some(())
1521    }
1522
1523    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1524        thread_id: ThreadId,
1525    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1526    {
1527        move |this, response, cx| match response.log_err() {
1528            Some(response) => {
1529                this.breakpoint_store.update(cx, |store, cx| {
1530                    store.remove_active_position(Some(this.session_id()), cx)
1531                });
1532                Some(response)
1533            }
1534            None => {
1535                this.thread_states.stop_thread(thread_id);
1536                cx.notify();
1537                None
1538            }
1539        }
1540    }
1541
1542    fn clear_active_debug_line_response(
1543        &mut self,
1544        response: Result<()>,
1545        cx: &mut Context<Session>,
1546    ) -> Option<()> {
1547        response.log_err()?;
1548        self.clear_active_debug_line(cx);
1549        Some(())
1550    }
1551
1552    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1553        self.breakpoint_store.update(cx, |store, cx| {
1554            store.remove_active_position(Some(self.id), cx)
1555        });
1556    }
1557
1558    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1559        self.request(
1560            PauseCommand {
1561                thread_id: thread_id.0,
1562            },
1563            Self::empty_response,
1564            cx,
1565        )
1566        .detach();
1567    }
1568
1569    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1570        self.request(
1571            RestartStackFrameCommand { stack_frame_id },
1572            Self::empty_response,
1573            cx,
1574        )
1575        .detach();
1576    }
1577
1578    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1579        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1580            self.request(
1581                RestartCommand {
1582                    raw: args.unwrap_or(Value::Null),
1583                },
1584                Self::fallback_to_manual_restart,
1585                cx,
1586            )
1587            .detach();
1588        } else {
1589            cx.emit(SessionStateEvent::Restart);
1590        }
1591    }
1592
1593    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1594        let debug_adapter = self.adapter_client();
1595
1596        cx.background_spawn(async move {
1597            if let Some(client) = debug_adapter {
1598                client.shutdown().await.log_err();
1599            }
1600        })
1601    }
1602
1603    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1604        self.is_session_terminated = true;
1605        self.thread_states.exit_all_threads();
1606        cx.notify();
1607
1608        let task = if self
1609            .capabilities
1610            .supports_terminate_request
1611            .unwrap_or_default()
1612        {
1613            self.request(
1614                TerminateCommand {
1615                    restart: Some(false),
1616                },
1617                Self::clear_active_debug_line_response,
1618                cx,
1619            )
1620        } else {
1621            self.request(
1622                DisconnectCommand {
1623                    restart: Some(false),
1624                    terminate_debuggee: Some(true),
1625                    suspend_debuggee: Some(false),
1626                },
1627                Self::clear_active_debug_line_response,
1628                cx,
1629            )
1630        };
1631
1632        cx.emit(SessionStateEvent::Shutdown);
1633
1634        let debug_client = self.adapter_client();
1635
1636        cx.background_spawn(async move {
1637            let _ = task.await;
1638
1639            if let Some(client) = debug_client {
1640                client.shutdown().await.log_err();
1641            }
1642        })
1643    }
1644
1645    pub fn completions(
1646        &mut self,
1647        query: CompletionsQuery,
1648        cx: &mut Context<Self>,
1649    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1650        let task = self.request(query, |_, result, _| result.log_err(), cx);
1651
1652        cx.background_executor().spawn(async move {
1653            anyhow::Ok(
1654                task.await
1655                    .map(|response| response.targets)
1656                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1657            )
1658        })
1659    }
1660
1661    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1662        self.thread_states.continue_thread(thread_id);
1663        self.request(
1664            ContinueCommand {
1665                args: ContinueArguments {
1666                    thread_id: thread_id.0,
1667                    single_thread: Some(true),
1668                },
1669            },
1670            Self::on_step_response::<ContinueCommand>(thread_id),
1671            cx,
1672        )
1673        .detach();
1674    }
1675
1676    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1677        match self.mode {
1678            Mode::Running(ref local) => Some(local.client.clone()),
1679            Mode::Building => None,
1680        }
1681    }
1682
1683    pub fn step_over(
1684        &mut self,
1685        thread_id: ThreadId,
1686        granularity: SteppingGranularity,
1687        cx: &mut Context<Self>,
1688    ) {
1689        let supports_single_thread_execution_requests =
1690            self.capabilities.supports_single_thread_execution_requests;
1691        let supports_stepping_granularity = self
1692            .capabilities
1693            .supports_stepping_granularity
1694            .unwrap_or_default();
1695
1696        let command = NextCommand {
1697            inner: StepCommand {
1698                thread_id: thread_id.0,
1699                granularity: supports_stepping_granularity.then(|| granularity),
1700                single_thread: supports_single_thread_execution_requests,
1701            },
1702        };
1703
1704        self.thread_states.process_step(thread_id);
1705        self.request(
1706            command,
1707            Self::on_step_response::<NextCommand>(thread_id),
1708            cx,
1709        )
1710        .detach();
1711    }
1712
1713    pub fn step_in(
1714        &mut self,
1715        thread_id: ThreadId,
1716        granularity: SteppingGranularity,
1717        cx: &mut Context<Self>,
1718    ) {
1719        let supports_single_thread_execution_requests =
1720            self.capabilities.supports_single_thread_execution_requests;
1721        let supports_stepping_granularity = self
1722            .capabilities
1723            .supports_stepping_granularity
1724            .unwrap_or_default();
1725
1726        let command = StepInCommand {
1727            inner: StepCommand {
1728                thread_id: thread_id.0,
1729                granularity: supports_stepping_granularity.then(|| granularity),
1730                single_thread: supports_single_thread_execution_requests,
1731            },
1732        };
1733
1734        self.thread_states.process_step(thread_id);
1735        self.request(
1736            command,
1737            Self::on_step_response::<StepInCommand>(thread_id),
1738            cx,
1739        )
1740        .detach();
1741    }
1742
1743    pub fn step_out(
1744        &mut self,
1745        thread_id: ThreadId,
1746        granularity: SteppingGranularity,
1747        cx: &mut Context<Self>,
1748    ) {
1749        let supports_single_thread_execution_requests =
1750            self.capabilities.supports_single_thread_execution_requests;
1751        let supports_stepping_granularity = self
1752            .capabilities
1753            .supports_stepping_granularity
1754            .unwrap_or_default();
1755
1756        let command = StepOutCommand {
1757            inner: StepCommand {
1758                thread_id: thread_id.0,
1759                granularity: supports_stepping_granularity.then(|| granularity),
1760                single_thread: supports_single_thread_execution_requests,
1761            },
1762        };
1763
1764        self.thread_states.process_step(thread_id);
1765        self.request(
1766            command,
1767            Self::on_step_response::<StepOutCommand>(thread_id),
1768            cx,
1769        )
1770        .detach();
1771    }
1772
1773    pub fn step_back(
1774        &mut self,
1775        thread_id: ThreadId,
1776        granularity: SteppingGranularity,
1777        cx: &mut Context<Self>,
1778    ) {
1779        let supports_single_thread_execution_requests =
1780            self.capabilities.supports_single_thread_execution_requests;
1781        let supports_stepping_granularity = self
1782            .capabilities
1783            .supports_stepping_granularity
1784            .unwrap_or_default();
1785
1786        let command = StepBackCommand {
1787            inner: StepCommand {
1788                thread_id: thread_id.0,
1789                granularity: supports_stepping_granularity.then(|| granularity),
1790                single_thread: supports_single_thread_execution_requests,
1791            },
1792        };
1793
1794        self.thread_states.process_step(thread_id);
1795
1796        self.request(
1797            command,
1798            Self::on_step_response::<StepBackCommand>(thread_id),
1799            cx,
1800        )
1801        .detach();
1802    }
1803
1804    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1805        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1806            && self.requests.contains_key(&ThreadsCommand.type_id())
1807            && self.threads.contains_key(&thread_id)
1808        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1809        // We could still be using an old thread id and have sent a new thread's request
1810        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1811        // But it very well could cause a minor bug in the future that is hard to track down
1812        {
1813            self.fetch(
1814                super::dap_command::StackTraceCommand {
1815                    thread_id: thread_id.0,
1816                    start_frame: None,
1817                    levels: None,
1818                },
1819                move |this, stack_frames, cx| {
1820                    let stack_frames = stack_frames.log_err()?;
1821
1822                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1823                        thread.stack_frame_ids =
1824                            stack_frames.iter().map(|frame| frame.id).collect();
1825                    });
1826                    debug_assert!(
1827                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1828                        "Sent request for thread_id that doesn't exist"
1829                    );
1830
1831                    this.stack_frames.extend(
1832                        stack_frames
1833                            .iter()
1834                            .cloned()
1835                            .map(|frame| (frame.id, StackFrame::from(frame))),
1836                    );
1837
1838                    this.invalidate_command_type::<ScopesCommand>();
1839                    this.invalidate_command_type::<VariablesCommand>();
1840
1841                    cx.emit(SessionEvent::StackTrace);
1842                    cx.notify();
1843                    Some(stack_frames)
1844                },
1845                cx,
1846            );
1847        }
1848
1849        self.threads
1850            .get(&thread_id)
1851            .map(|thread| {
1852                thread
1853                    .stack_frame_ids
1854                    .iter()
1855                    .filter_map(|id| self.stack_frames.get(id))
1856                    .cloned()
1857                    .collect()
1858            })
1859            .unwrap_or_default()
1860    }
1861
1862    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1863        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1864            && self
1865                .requests
1866                .contains_key(&TypeId::of::<StackTraceCommand>())
1867        {
1868            self.fetch(
1869                ScopesCommand { stack_frame_id },
1870                move |this, scopes, cx| {
1871                    let scopes = scopes.log_err()?;
1872
1873                    for scope in scopes .iter(){
1874                        this.variables(scope.variables_reference, cx);
1875                    }
1876
1877                    let entry = this
1878                        .stack_frames
1879                        .entry(stack_frame_id)
1880                        .and_modify(|stack_frame| {
1881                            stack_frame.scopes = scopes.clone();
1882                        });
1883
1884                    cx.emit(SessionEvent::Variables);
1885
1886                    debug_assert!(
1887                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1888                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1889                    );
1890
1891                    Some(scopes)
1892                },
1893                cx,
1894            );
1895        }
1896
1897        self.stack_frames
1898            .get(&stack_frame_id)
1899            .map(|frame| frame.scopes.as_slice())
1900            .unwrap_or_default()
1901    }
1902
1903    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
1904        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
1905            return Vec::new();
1906        };
1907
1908        stack_frame
1909            .scopes
1910            .iter()
1911            .filter_map(|scope| self.variables.get(&scope.variables_reference))
1912            .flatten()
1913            .cloned()
1914            .collect()
1915    }
1916
1917    pub fn variables(
1918        &mut self,
1919        variables_reference: VariableReference,
1920        cx: &mut Context<Self>,
1921    ) -> Vec<dap::Variable> {
1922        let command = VariablesCommand {
1923            variables_reference,
1924            filter: None,
1925            start: None,
1926            count: None,
1927            format: None,
1928        };
1929
1930        self.fetch(
1931            command,
1932            move |this, variables, cx| {
1933                let variables = variables.log_err()?;
1934                this.variables
1935                    .insert(variables_reference, variables.clone());
1936
1937                cx.emit(SessionEvent::Variables);
1938                cx.emit(SessionEvent::InvalidateInlineValue);
1939                Some(variables)
1940            },
1941            cx,
1942        );
1943
1944        self.variables
1945            .get(&variables_reference)
1946            .cloned()
1947            .unwrap_or_default()
1948    }
1949
1950    pub fn set_variable_value(
1951        &mut self,
1952        variables_reference: u64,
1953        name: String,
1954        value: String,
1955        cx: &mut Context<Self>,
1956    ) {
1957        if self.capabilities.supports_set_variable.unwrap_or_default() {
1958            self.request(
1959                SetVariableValueCommand {
1960                    name,
1961                    value,
1962                    variables_reference,
1963                },
1964                move |this, response, cx| {
1965                    let response = response.log_err()?;
1966                    this.invalidate_command_type::<VariablesCommand>();
1967                    cx.notify();
1968                    Some(response)
1969                },
1970                cx,
1971            )
1972            .detach()
1973        }
1974    }
1975
1976    pub fn evaluate(
1977        &mut self,
1978        expression: String,
1979        context: Option<EvaluateArgumentsContext>,
1980        frame_id: Option<u64>,
1981        source: Option<Source>,
1982        cx: &mut Context<Self>,
1983    ) -> Task<()> {
1984        let request = self.mode.request_dap(EvaluateCommand {
1985            expression,
1986            context,
1987            frame_id,
1988            source,
1989        });
1990        cx.spawn(async move |this, cx| {
1991            let response = request.await;
1992            this.update(cx, |this, cx| {
1993                match response {
1994                    Ok(response) => {
1995                        this.output_token.0 += 1;
1996                        this.output.push_back(dap::OutputEvent {
1997                            category: None,
1998                            output: format!("< {}", &response.result),
1999                            group: None,
2000                            variables_reference: Some(response.variables_reference),
2001                            source: None,
2002                            line: None,
2003                            column: None,
2004                            data: None,
2005                            location_reference: None,
2006                        });
2007                    }
2008                    Err(e) => {
2009                        this.output_token.0 += 1;
2010                        this.output.push_back(dap::OutputEvent {
2011                            category: None,
2012                            output: format!("{}", e),
2013                            group: None,
2014                            variables_reference: None,
2015                            source: None,
2016                            line: None,
2017                            column: None,
2018                            data: None,
2019                            location_reference: None,
2020                        });
2021                    }
2022                };
2023                this.invalidate_command_type::<ScopesCommand>();
2024                cx.notify();
2025            })
2026            .ok();
2027        })
2028    }
2029
2030    pub fn location(
2031        &mut self,
2032        reference: u64,
2033        cx: &mut Context<Self>,
2034    ) -> Option<dap::LocationsResponse> {
2035        self.fetch(
2036            LocationsCommand { reference },
2037            move |this, response, _| {
2038                let response = response.log_err()?;
2039                this.locations.insert(reference, response.clone());
2040                Some(response)
2041            },
2042            cx,
2043        );
2044        self.locations.get(&reference).cloned()
2045    }
2046
2047    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2048        let command = DisconnectCommand {
2049            restart: Some(false),
2050            terminate_debuggee: Some(true),
2051            suspend_debuggee: Some(false),
2052        };
2053
2054        self.request(command, Self::empty_response, cx).detach()
2055    }
2056
2057    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2058        if self
2059            .capabilities
2060            .supports_terminate_threads_request
2061            .unwrap_or_default()
2062        {
2063            self.request(
2064                TerminateThreadsCommand {
2065                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2066                },
2067                Self::clear_active_debug_line_response,
2068                cx,
2069            )
2070            .detach();
2071        } else {
2072            self.shutdown(cx).detach();
2073        }
2074    }
2075}