session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapterBinary, DebugTaskDefinition};
  16use dap::messages::Response;
  17use dap::requests::{Request, RunInTerminal, StartDebugging};
  18use dap::{
  19    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  20    SteppingGranularity, StoppedEvent, VariableReference,
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{
  25    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory,
  26    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  27};
  28use futures::channel::{mpsc, oneshot};
  29use futures::{FutureExt, future::Shared};
  30use gpui::{
  31    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  32    Task, WeakEntity,
  33};
  34
  35use serde_json::{Value, json};
  36use smol::stream::StreamExt;
  37use std::any::TypeId;
  38use std::collections::BTreeMap;
  39use std::u64;
  40use std::{
  41    any::Any,
  42    collections::hash_map::Entry,
  43    hash::{Hash, Hasher},
  44    path::Path,
  45    sync::Arc,
  46};
  47use text::{PointUtf16, ToPointUtf16};
  48use util::{ResultExt, merge_json_value_into};
  49use worktree::Worktree;
  50
  51#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  52#[repr(transparent)]
  53pub struct ThreadId(pub u64);
  54
  55impl ThreadId {
  56    pub const MIN: ThreadId = ThreadId(u64::MIN);
  57    pub const MAX: ThreadId = ThreadId(u64::MAX);
  58}
  59
  60impl From<u64> for ThreadId {
  61    fn from(id: u64) -> Self {
  62        Self(id)
  63    }
  64}
  65
  66#[derive(Clone, Debug)]
  67pub struct StackFrame {
  68    pub dap: dap::StackFrame,
  69    pub scopes: Vec<dap::Scope>,
  70}
  71
  72impl From<dap::StackFrame> for StackFrame {
  73    fn from(stack_frame: dap::StackFrame) -> Self {
  74        Self {
  75            scopes: vec![],
  76            dap: stack_frame,
  77        }
  78    }
  79}
  80
  81#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  82pub enum ThreadStatus {
  83    #[default]
  84    Running,
  85    Stopped,
  86    Stepping,
  87    Exited,
  88    Ended,
  89}
  90
  91impl ThreadStatus {
  92    pub fn label(&self) -> &'static str {
  93        match self {
  94            ThreadStatus::Running => "Running",
  95            ThreadStatus::Stopped => "Stopped",
  96            ThreadStatus::Stepping => "Stepping",
  97            ThreadStatus::Exited => "Exited",
  98            ThreadStatus::Ended => "Ended",
  99        }
 100    }
 101}
 102
 103#[derive(Debug)]
 104pub struct Thread {
 105    dap: dap::Thread,
 106    stack_frame_ids: IndexSet<StackFrameId>,
 107    _has_stopped: bool,
 108}
 109
 110impl From<dap::Thread> for Thread {
 111    fn from(dap: dap::Thread) -> Self {
 112        Self {
 113            dap,
 114            stack_frame_ids: Default::default(),
 115            _has_stopped: false,
 116        }
 117    }
 118}
 119
 120pub enum Mode {
 121    Building,
 122    Running(LocalMode),
 123}
 124
 125#[derive(Clone)]
 126pub struct LocalMode {
 127    client: Arc<DebugAdapterClient>,
 128    binary: DebugAdapterBinary,
 129    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 130    tmp_breakpoint: Option<SourceBreakpoint>,
 131    worktree: WeakEntity<Worktree>,
 132    executor: BackgroundExecutor,
 133}
 134
 135fn client_source(abs_path: &Path) -> dap::Source {
 136    dap::Source {
 137        name: abs_path
 138            .file_name()
 139            .map(|filename| filename.to_string_lossy().to_string()),
 140        path: Some(abs_path.to_string_lossy().to_string()),
 141        source_reference: None,
 142        presentation_hint: None,
 143        origin: None,
 144        sources: None,
 145        adapter_data: None,
 146        checksums: None,
 147    }
 148}
 149
 150impl LocalMode {
 151    async fn new(
 152        session_id: SessionId,
 153        parent_session: Option<Entity<Session>>,
 154        worktree: WeakEntity<Worktree>,
 155        breakpoint_store: Entity<BreakpointStore>,
 156        binary: DebugAdapterBinary,
 157        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 158        cx: AsyncApp,
 159    ) -> Result<Self> {
 160        let message_handler = Box::new(move |message| {
 161            messages_tx.unbounded_send(message).ok();
 162        });
 163
 164        let client = Arc::new(
 165            if let Some(client) = parent_session
 166                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 167                .flatten()
 168            {
 169                client
 170                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 171                    .await?
 172            } else {
 173                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 174                    .await
 175                    .with_context(|| "Failed to start communication with debug adapter")?
 176            },
 177        );
 178
 179        Ok(Self {
 180            client,
 181            breakpoint_store,
 182            worktree,
 183            tmp_breakpoint: None,
 184            binary,
 185            executor: cx.background_executor().clone(),
 186        })
 187    }
 188
 189    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 190        &self.worktree
 191    }
 192
 193    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 194        let tasks: Vec<_> = paths
 195            .into_iter()
 196            .map(|path| {
 197                self.request(dap_command::SetBreakpoints {
 198                    source: client_source(path),
 199                    source_modified: None,
 200                    breakpoints: vec![],
 201                })
 202            })
 203            .collect();
 204
 205        cx.background_spawn(async move {
 206            futures::future::join_all(tasks)
 207                .await
 208                .iter()
 209                .for_each(|res| match res {
 210                    Ok(_) => {}
 211                    Err(err) => {
 212                        log::warn!("Set breakpoints request failed: {}", err);
 213                    }
 214                });
 215        })
 216    }
 217
 218    fn send_breakpoints_from_path(
 219        &self,
 220        abs_path: Arc<Path>,
 221        reason: BreakpointUpdatedReason,
 222        cx: &mut App,
 223    ) -> Task<()> {
 224        let breakpoints = self
 225            .breakpoint_store
 226            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 227            .into_iter()
 228            .filter(|bp| bp.state.is_enabled())
 229            .chain(self.tmp_breakpoint.clone())
 230            .map(Into::into)
 231            .collect();
 232
 233        let task = self.request(dap_command::SetBreakpoints {
 234            source: client_source(&abs_path),
 235            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 236            breakpoints,
 237        });
 238
 239        cx.background_spawn(async move {
 240            match task.await {
 241                Ok(_) => {}
 242                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 243            }
 244        })
 245    }
 246
 247    fn send_exception_breakpoints(
 248        &self,
 249        filters: Vec<ExceptionBreakpointsFilter>,
 250        supports_filter_options: bool,
 251    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 252        let arg = if supports_filter_options {
 253            SetExceptionBreakpoints::WithOptions {
 254                filters: filters
 255                    .into_iter()
 256                    .map(|filter| ExceptionFilterOptions {
 257                        filter_id: filter.filter,
 258                        condition: None,
 259                        mode: None,
 260                    })
 261                    .collect(),
 262            }
 263        } else {
 264            SetExceptionBreakpoints::Plain {
 265                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 266            }
 267        };
 268        self.request(arg)
 269    }
 270
 271    fn send_source_breakpoints(
 272        &self,
 273        ignore_breakpoints: bool,
 274        cx: &App,
 275    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 276        let mut breakpoint_tasks = Vec::new();
 277        let breakpoints = self
 278            .breakpoint_store
 279            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 280
 281        for (path, breakpoints) in breakpoints {
 282            let breakpoints = if ignore_breakpoints {
 283                vec![]
 284            } else {
 285                breakpoints
 286                    .into_iter()
 287                    .filter(|bp| bp.state.is_enabled())
 288                    .map(Into::into)
 289                    .collect()
 290            };
 291
 292            breakpoint_tasks.push(
 293                self.request(dap_command::SetBreakpoints {
 294                    source: client_source(&path),
 295                    source_modified: Some(false),
 296                    breakpoints,
 297                })
 298                .map(|result| result.map_err(|e| (path, e))),
 299            );
 300        }
 301
 302        cx.background_spawn(async move {
 303            futures::future::join_all(breakpoint_tasks)
 304                .await
 305                .into_iter()
 306                .filter_map(Result::err)
 307                .collect::<HashMap<_, _>>()
 308        })
 309    }
 310
 311    fn initialize_sequence(
 312        &self,
 313        capabilities: &Capabilities,
 314        definition: &DebugTaskDefinition,
 315        initialized_rx: oneshot::Receiver<()>,
 316        dap_store: WeakEntity<DapStore>,
 317        cx: &App,
 318    ) -> Task<Result<()>> {
 319        let mut raw = self.binary.request_args.clone();
 320
 321        merge_json_value_into(
 322            definition.initialize_args.clone().unwrap_or(json!({})),
 323            &mut raw.configuration,
 324        );
 325
 326        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 327        let launch = match raw.request {
 328            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 329                raw: raw.configuration,
 330            }),
 331            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 332                raw: raw.configuration,
 333            }),
 334        };
 335
 336        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 337        let exception_filters = capabilities
 338            .exception_breakpoint_filters
 339            .as_ref()
 340            .map(|exception_filters| {
 341                exception_filters
 342                    .iter()
 343                    .filter(|filter| filter.default == Some(true))
 344                    .cloned()
 345                    .collect::<Vec<_>>()
 346            })
 347            .unwrap_or_default();
 348        let supports_exception_filters = capabilities
 349            .supports_exception_filter_options
 350            .unwrap_or_default();
 351        let this = self.clone();
 352        let worktree = self.worktree().clone();
 353        let configuration_sequence = cx.spawn({
 354            async move |cx| {
 355                initialized_rx.await?;
 356                let errors_by_path = cx
 357                    .update(|cx| this.send_source_breakpoints(false, cx))?
 358                    .await;
 359
 360                dap_store.update(cx, |_, cx| {
 361                    let Some(worktree) = worktree.upgrade() else {
 362                        return;
 363                    };
 364
 365                    for (path, error) in &errors_by_path {
 366                        log::error!("failed to set breakpoints for {path:?}: {error}");
 367                    }
 368
 369                    if let Some(failed_path) = errors_by_path.keys().next() {
 370                        let failed_path = failed_path
 371                            .strip_prefix(worktree.read(cx).abs_path())
 372                            .unwrap_or(failed_path)
 373                            .display();
 374                        let message = format!(
 375                            "Failed to set breakpoints for {failed_path}{}",
 376                            match errors_by_path.len() {
 377                                0 => unreachable!(),
 378                                1 => "".into(),
 379                                2 => " and 1 other path".into(),
 380                                n => format!(" and {} other paths", n - 1),
 381                            }
 382                        );
 383                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 384                    }
 385                })?;
 386
 387                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 388                    .await
 389                    .ok();
 390                let ret = if configuration_done_supported {
 391                    this.request(ConfigurationDone {})
 392                } else {
 393                    Task::ready(Ok(()))
 394                }
 395                .await;
 396                ret
 397            }
 398        });
 399
 400        cx.background_spawn(async move {
 401            futures::future::try_join(launch, configuration_sequence).await?;
 402            Ok(())
 403        })
 404    }
 405
 406    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 407    where
 408        <R::DapRequest as dap::requests::Request>::Response: 'static,
 409        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 410    {
 411        let request = Arc::new(request);
 412
 413        let request_clone = request.clone();
 414        let connection = self.client.clone();
 415        self.executor.spawn(async move {
 416            let args = request_clone.to_dap();
 417            let response = connection.request::<R::DapRequest>(args).await?;
 418            request.response_from_dap(response)
 419        })
 420    }
 421}
 422
 423impl Mode {
 424    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 425    where
 426        <R::DapRequest as dap::requests::Request>::Response: 'static,
 427        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 428    {
 429        match self {
 430            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 431            Mode::Building => Task::ready(Err(anyhow!(
 432                "no adapter running to send request: {:?}",
 433                request
 434            ))),
 435        }
 436    }
 437}
 438
 439#[derive(Default)]
 440struct ThreadStates {
 441    global_state: Option<ThreadStatus>,
 442    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 443}
 444
 445impl ThreadStates {
 446    fn stop_all_threads(&mut self) {
 447        self.global_state = Some(ThreadStatus::Stopped);
 448        self.known_thread_states.clear();
 449    }
 450
 451    fn exit_all_threads(&mut self) {
 452        self.global_state = Some(ThreadStatus::Exited);
 453        self.known_thread_states.clear();
 454    }
 455
 456    fn continue_all_threads(&mut self) {
 457        self.global_state = Some(ThreadStatus::Running);
 458        self.known_thread_states.clear();
 459    }
 460
 461    fn stop_thread(&mut self, thread_id: ThreadId) {
 462        self.known_thread_states
 463            .insert(thread_id, ThreadStatus::Stopped);
 464    }
 465
 466    fn continue_thread(&mut self, thread_id: ThreadId) {
 467        self.known_thread_states
 468            .insert(thread_id, ThreadStatus::Running);
 469    }
 470
 471    fn process_step(&mut self, thread_id: ThreadId) {
 472        self.known_thread_states
 473            .insert(thread_id, ThreadStatus::Stepping);
 474    }
 475
 476    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 477        self.thread_state(thread_id)
 478            .unwrap_or(ThreadStatus::Running)
 479    }
 480
 481    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 482        self.known_thread_states
 483            .get(&thread_id)
 484            .copied()
 485            .or(self.global_state)
 486    }
 487
 488    fn exit_thread(&mut self, thread_id: ThreadId) {
 489        self.known_thread_states
 490            .insert(thread_id, ThreadStatus::Exited);
 491    }
 492
 493    fn any_stopped_thread(&self) -> bool {
 494        self.global_state
 495            .is_some_and(|state| state == ThreadStatus::Stopped)
 496            || self
 497                .known_thread_states
 498                .values()
 499                .any(|status| *status == ThreadStatus::Stopped)
 500    }
 501}
 502const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 503
 504type IsEnabled = bool;
 505
 506#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 507pub struct OutputToken(pub usize);
 508/// Represents a current state of a single debug adapter and provides ways to mutate it.
 509pub struct Session {
 510    pub mode: Mode,
 511    definition: DebugTaskDefinition,
 512    pub(super) capabilities: Capabilities,
 513    id: SessionId,
 514    child_session_ids: HashSet<SessionId>,
 515    parent_session: Option<Entity<Session>>,
 516    ignore_breakpoints: bool,
 517    modules: Vec<dap::Module>,
 518    loaded_sources: Vec<dap::Source>,
 519    output_token: OutputToken,
 520    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 521    threads: IndexMap<ThreadId, Thread>,
 522    thread_states: ThreadStates,
 523    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 524    stack_frames: IndexMap<StackFrameId, StackFrame>,
 525    locations: HashMap<u64, dap::LocationsResponse>,
 526    is_session_terminated: bool,
 527    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 528    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 529    background_tasks: Vec<Task<()>>,
 530}
 531
 532trait CacheableCommand: Any + Send + Sync {
 533    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 534    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 535    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 536}
 537
 538impl<T> CacheableCommand for T
 539where
 540    T: DapCommand + PartialEq + Eq + Hash,
 541{
 542    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 543        (rhs as &dyn Any)
 544            .downcast_ref::<Self>()
 545            .map_or(false, |rhs| self == rhs)
 546    }
 547
 548    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 549        T::hash(self, &mut hasher);
 550    }
 551
 552    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 553        self
 554    }
 555}
 556
 557pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 558
 559impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 560    fn from(request: T) -> Self {
 561        Self(Arc::new(request))
 562    }
 563}
 564
 565impl PartialEq for RequestSlot {
 566    fn eq(&self, other: &Self) -> bool {
 567        self.0.dyn_eq(other.0.as_ref())
 568    }
 569}
 570
 571impl Eq for RequestSlot {}
 572
 573impl Hash for RequestSlot {
 574    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 575        self.0.dyn_hash(state);
 576        (&*self.0 as &dyn Any).type_id().hash(state)
 577    }
 578}
 579
 580#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 581pub struct CompletionsQuery {
 582    pub query: String,
 583    pub column: u64,
 584    pub line: Option<u64>,
 585    pub frame_id: Option<u64>,
 586}
 587
 588impl CompletionsQuery {
 589    pub fn new(
 590        buffer: &language::Buffer,
 591        cursor_position: language::Anchor,
 592        frame_id: Option<u64>,
 593    ) -> Self {
 594        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 595        Self {
 596            query: buffer.text(),
 597            column: column as u64,
 598            frame_id,
 599            line: Some(row as u64),
 600        }
 601    }
 602}
 603
 604#[derive(Debug)]
 605pub enum SessionEvent {
 606    Modules,
 607    LoadedSources,
 608    Stopped(Option<ThreadId>),
 609    StackTrace,
 610    Variables,
 611    Threads,
 612    InvalidateInlineValue,
 613    CapabilitiesLoaded,
 614    RunInTerminal {
 615        request: RunInTerminalRequestArguments,
 616        sender: mpsc::Sender<Result<u32>>,
 617    },
 618}
 619
 620#[derive(Clone, Debug, PartialEq, Eq)]
 621pub enum SessionStateEvent {
 622    Running,
 623    Shutdown,
 624    Restart,
 625    SpawnChildSession {
 626        request: StartDebuggingRequestArguments,
 627    },
 628}
 629
 630impl EventEmitter<SessionEvent> for Session {}
 631impl EventEmitter<SessionStateEvent> for Session {}
 632
 633// local session will send breakpoint updates to DAP for all new breakpoints
 634// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 635// BreakpointStore notifies session on breakpoint changes
 636impl Session {
 637    pub(crate) fn new(
 638        breakpoint_store: Entity<BreakpointStore>,
 639        session_id: SessionId,
 640        parent_session: Option<Entity<Session>>,
 641        template: DebugTaskDefinition,
 642        cx: &mut App,
 643    ) -> Entity<Self> {
 644        cx.new::<Self>(|cx| {
 645            cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
 646                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 647                    if let Some(local) = (!this.ignore_breakpoints)
 648                        .then(|| this.as_local_mut())
 649                        .flatten()
 650                    {
 651                        local
 652                            .send_breakpoints_from_path(path.clone(), *reason, cx)
 653                            .detach();
 654                    };
 655                }
 656                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 657                    if let Some(local) = (!this.ignore_breakpoints)
 658                        .then(|| this.as_local_mut())
 659                        .flatten()
 660                    {
 661                        local.unset_breakpoints_from_paths(paths, cx).detach();
 662                    }
 663                }
 664                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 665            })
 666            .detach();
 667            cx.on_app_quit(Self::on_app_quit).detach();
 668
 669            let this = Self {
 670                mode: Mode::Building,
 671                id: session_id,
 672                child_session_ids: HashSet::default(),
 673                parent_session,
 674                capabilities: Capabilities::default(),
 675                ignore_breakpoints: false,
 676                variables: Default::default(),
 677                stack_frames: Default::default(),
 678                thread_states: ThreadStates::default(),
 679                output_token: OutputToken(0),
 680                output: circular_buffer::CircularBuffer::boxed(),
 681                requests: HashMap::default(),
 682                modules: Vec::default(),
 683                loaded_sources: Vec::default(),
 684                threads: IndexMap::default(),
 685                background_tasks: Vec::default(),
 686                locations: Default::default(),
 687                is_session_terminated: false,
 688                exception_breakpoints: Default::default(),
 689                definition: template,
 690            };
 691
 692            this
 693        })
 694    }
 695
 696    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 697        match &self.mode {
 698            Mode::Building => None,
 699            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 700        }
 701    }
 702
 703    pub fn boot(
 704        &mut self,
 705        binary: DebugAdapterBinary,
 706        worktree: Entity<Worktree>,
 707        breakpoint_store: Entity<BreakpointStore>,
 708        dap_store: WeakEntity<DapStore>,
 709        cx: &mut Context<Self>,
 710    ) -> Task<Result<()>> {
 711        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 712        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 713
 714        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 715            let mut initialized_tx = Some(initialized_tx);
 716            while let Some(message) = message_rx.next().await {
 717                if let Message::Event(event) = message {
 718                    if let Events::Initialized(_) = *event {
 719                        if let Some(tx) = initialized_tx.take() {
 720                            tx.send(()).ok();
 721                        }
 722                    } else {
 723                        let Ok(_) = this.update(cx, |session, cx| {
 724                            session.handle_dap_event(event, cx);
 725                        }) else {
 726                            break;
 727                        };
 728                    }
 729                } else if let Message::Request(request) = message {
 730                    let Ok(_) = this.update(cx, |this, cx| {
 731                        if request.command == StartDebugging::COMMAND {
 732                            this.handle_start_debugging_request(request, cx)
 733                                .detach_and_log_err(cx);
 734                        } else if request.command == RunInTerminal::COMMAND {
 735                            this.handle_run_in_terminal_request(request, cx)
 736                                .detach_and_log_err(cx);
 737                        }
 738                    }) else {
 739                        break;
 740                    };
 741                }
 742            }
 743        })];
 744        self.background_tasks = background_tasks;
 745        let id = self.id;
 746        let parent_session = self.parent_session.clone();
 747
 748        cx.spawn(async move |this, cx| {
 749            let mode = LocalMode::new(
 750                id,
 751                parent_session,
 752                worktree.downgrade(),
 753                breakpoint_store.clone(),
 754                binary,
 755                message_tx,
 756                cx.clone(),
 757            )
 758            .await?;
 759            this.update(cx, |this, cx| {
 760                this.mode = Mode::Running(mode);
 761                cx.emit(SessionStateEvent::Running);
 762            })?;
 763
 764            this.update(cx, |session, cx| session.request_initialize(cx))?
 765                .await?;
 766
 767            this.update(cx, |session, cx| {
 768                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 769            })?
 770            .await
 771        })
 772    }
 773
 774    pub fn session_id(&self) -> SessionId {
 775        self.id
 776    }
 777
 778    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 779        self.child_session_ids.clone()
 780    }
 781
 782    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 783        self.child_session_ids.insert(session_id);
 784    }
 785
 786    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 787        self.child_session_ids.remove(&session_id);
 788    }
 789
 790    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 791        self.parent_session
 792            .as_ref()
 793            .map(|session| session.read(cx).id)
 794    }
 795
 796    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 797        self.parent_session.as_ref()
 798    }
 799
 800    pub fn capabilities(&self) -> &Capabilities {
 801        &self.capabilities
 802    }
 803
 804    pub fn binary(&self) -> &DebugAdapterBinary {
 805        let Mode::Running(local_mode) = &self.mode else {
 806            panic!("Session is not local");
 807        };
 808        &local_mode.binary
 809    }
 810
 811    pub fn adapter_name(&self) -> SharedString {
 812        self.definition.adapter.clone()
 813    }
 814
 815    pub fn label(&self) -> SharedString {
 816        self.definition.label.clone()
 817    }
 818
 819    pub fn definition(&self) -> DebugTaskDefinition {
 820        self.definition.clone()
 821    }
 822
 823    pub fn is_terminated(&self) -> bool {
 824        self.is_session_terminated
 825    }
 826
 827    pub fn is_local(&self) -> bool {
 828        matches!(self.mode, Mode::Running(_))
 829    }
 830
 831    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 832        match &mut self.mode {
 833            Mode::Running(local_mode) => Some(local_mode),
 834            Mode::Building => None,
 835        }
 836    }
 837
 838    pub fn as_local(&self) -> Option<&LocalMode> {
 839        match &self.mode {
 840            Mode::Running(local_mode) => Some(local_mode),
 841            Mode::Building => None,
 842        }
 843    }
 844
 845    fn handle_start_debugging_request(
 846        &mut self,
 847        request: dap::messages::Request,
 848        cx: &mut Context<Self>,
 849    ) -> Task<Result<()>> {
 850        let request_seq = request.seq;
 851
 852        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 853            .arguments
 854            .as_ref()
 855            .map(|value| serde_json::from_value(value.clone()));
 856
 857        let mut success = true;
 858        if let Some(Ok(request)) = launch_request {
 859            cx.emit(SessionStateEvent::SpawnChildSession { request });
 860        } else {
 861            log::error!(
 862                "Failed to parse launch request arguments: {:?}",
 863                request.arguments
 864            );
 865            success = false;
 866        }
 867
 868        cx.spawn(async move |this, cx| {
 869            this.update(cx, |this, cx| {
 870                this.respond_to_client(
 871                    request_seq,
 872                    success,
 873                    StartDebugging::COMMAND.to_string(),
 874                    None,
 875                    cx,
 876                )
 877            })?
 878            .await
 879        })
 880    }
 881
 882    fn handle_run_in_terminal_request(
 883        &mut self,
 884        request: dap::messages::Request,
 885        cx: &mut Context<Self>,
 886    ) -> Task<Result<()>> {
 887        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
 888            request.arguments.unwrap_or_default(),
 889        )
 890        .expect("To parse StartDebuggingRequestArguments");
 891
 892        let seq = request.seq;
 893
 894        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
 895        cx.emit(SessionEvent::RunInTerminal {
 896            request: request_args,
 897            sender: tx,
 898        });
 899        cx.notify();
 900
 901        cx.spawn(async move |session, cx| {
 902            let result = util::maybe!(async move {
 903                rx.next().await.ok_or_else(|| {
 904                    anyhow!("failed to receive response from spawn terminal".to_string())
 905                })?
 906            })
 907            .await;
 908            let (success, body) = match result {
 909                Ok(pid) => (
 910                    true,
 911                    serde_json::to_value(dap::RunInTerminalResponse {
 912                        process_id: None,
 913                        shell_process_id: Some(pid as u64),
 914                    })
 915                    .ok(),
 916                ),
 917                Err(error) => (
 918                    false,
 919                    serde_json::to_value(dap::ErrorResponse {
 920                        error: Some(dap::Message {
 921                            id: seq,
 922                            format: error.to_string(),
 923                            variables: None,
 924                            send_telemetry: None,
 925                            show_user: None,
 926                            url: None,
 927                            url_label: None,
 928                        }),
 929                    })
 930                    .ok(),
 931                ),
 932            };
 933
 934            session
 935                .update(cx, |session, cx| {
 936                    session.respond_to_client(
 937                        seq,
 938                        success,
 939                        RunInTerminal::COMMAND.to_string(),
 940                        body,
 941                        cx,
 942                    )
 943                })?
 944                .await
 945        })
 946    }
 947
 948    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 949        let adapter_id = String::from(self.definition.adapter.clone());
 950        let request = Initialize { adapter_id };
 951        match &self.mode {
 952            Mode::Running(local_mode) => {
 953                let capabilities = local_mode.request(request);
 954
 955                cx.spawn(async move |this, cx| {
 956                    let capabilities = capabilities.await?;
 957                    this.update(cx, |session, cx| {
 958                        session.capabilities = capabilities;
 959                        let filters = session
 960                            .capabilities
 961                            .exception_breakpoint_filters
 962                            .clone()
 963                            .unwrap_or_default();
 964                        for filter in filters {
 965                            let default = filter.default.unwrap_or_default();
 966                            session
 967                                .exception_breakpoints
 968                                .entry(filter.filter.clone())
 969                                .or_insert_with(|| (filter, default));
 970                        }
 971                        cx.emit(SessionEvent::CapabilitiesLoaded);
 972                    })?;
 973                    Ok(())
 974                })
 975            }
 976            Mode::Building => Task::ready(Err(anyhow!(
 977                "Cannot send initialize request, task still building"
 978            ))),
 979        }
 980    }
 981
 982    pub(super) fn initialize_sequence(
 983        &mut self,
 984        initialize_rx: oneshot::Receiver<()>,
 985        dap_store: WeakEntity<DapStore>,
 986        cx: &mut Context<Self>,
 987    ) -> Task<Result<()>> {
 988        match &self.mode {
 989            Mode::Running(local_mode) => local_mode.initialize_sequence(
 990                &self.capabilities,
 991                &self.definition,
 992                initialize_rx,
 993                dap_store,
 994                cx,
 995            ),
 996            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
 997        }
 998    }
 999
1000    pub fn run_to_position(
1001        &mut self,
1002        breakpoint: SourceBreakpoint,
1003        active_thread_id: ThreadId,
1004        cx: &mut Context<Self>,
1005    ) {
1006        match &mut self.mode {
1007            Mode::Running(local_mode) => {
1008                if !matches!(
1009                    self.thread_states.thread_state(active_thread_id),
1010                    Some(ThreadStatus::Stopped)
1011                ) {
1012                    return;
1013                };
1014                let path = breakpoint.path.clone();
1015                local_mode.tmp_breakpoint = Some(breakpoint);
1016                let task = local_mode.send_breakpoints_from_path(
1017                    path,
1018                    BreakpointUpdatedReason::Toggled,
1019                    cx,
1020                );
1021
1022                cx.spawn(async move |this, cx| {
1023                    task.await;
1024                    this.update(cx, |this, cx| {
1025                        this.continue_thread(active_thread_id, cx);
1026                    })
1027                })
1028                .detach();
1029            }
1030            Mode::Building => {}
1031        }
1032    }
1033
1034    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1035        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1036    }
1037
1038    pub fn output(
1039        &self,
1040        since: OutputToken,
1041    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1042        if self.output_token.0 == 0 {
1043            return (self.output.range(0..0), OutputToken(0));
1044        };
1045
1046        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1047
1048        let clamped_events_since = events_since.clamp(0, self.output.len());
1049        (
1050            self.output
1051                .range(self.output.len() - clamped_events_since..),
1052            self.output_token,
1053        )
1054    }
1055
1056    pub fn respond_to_client(
1057        &self,
1058        request_seq: u64,
1059        success: bool,
1060        command: String,
1061        body: Option<serde_json::Value>,
1062        cx: &mut Context<Self>,
1063    ) -> Task<Result<()>> {
1064        let Some(local_session) = self.as_local() else {
1065            unreachable!("Cannot respond to remote client");
1066        };
1067        let client = local_session.client.clone();
1068
1069        cx.background_spawn(async move {
1070            client
1071                .send_message(Message::Response(Response {
1072                    body,
1073                    success,
1074                    command,
1075                    seq: request_seq + 1,
1076                    request_seq,
1077                    message: None,
1078                }))
1079                .await
1080        })
1081    }
1082
1083    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1084        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1085            let breakpoint = local.tmp_breakpoint.take()?;
1086            let path = breakpoint.path.clone();
1087            Some((local, path))
1088        }) {
1089            local
1090                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1091                .detach();
1092        };
1093
1094        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1095            self.thread_states.stop_all_threads();
1096
1097            self.invalidate_command_type::<StackTraceCommand>();
1098        }
1099
1100        // Event if we stopped all threads we still need to insert the thread_id
1101        // to our own data
1102        if let Some(thread_id) = event.thread_id {
1103            self.thread_states.stop_thread(ThreadId(thread_id));
1104
1105            self.invalidate_state(
1106                &StackTraceCommand {
1107                    thread_id,
1108                    start_frame: None,
1109                    levels: None,
1110                }
1111                .into(),
1112            );
1113        }
1114
1115        self.invalidate_generic();
1116        self.threads.clear();
1117        self.variables.clear();
1118        cx.emit(SessionEvent::Stopped(
1119            event
1120                .thread_id
1121                .map(Into::into)
1122                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1123        ));
1124        cx.emit(SessionEvent::InvalidateInlineValue);
1125        cx.notify();
1126    }
1127
1128    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1129        match *event {
1130            Events::Initialized(_) => {
1131                debug_assert!(
1132                    false,
1133                    "Initialized event should have been handled in LocalMode"
1134                );
1135            }
1136            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1137            Events::Continued(event) => {
1138                if event.all_threads_continued.unwrap_or_default() {
1139                    self.thread_states.continue_all_threads();
1140                } else {
1141                    self.thread_states
1142                        .continue_thread(ThreadId(event.thread_id));
1143                }
1144                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1145                self.invalidate_generic();
1146            }
1147            Events::Exited(_event) => {
1148                self.clear_active_debug_line(cx);
1149            }
1150            Events::Terminated(_) => {
1151                self.is_session_terminated = true;
1152                self.clear_active_debug_line(cx);
1153            }
1154            Events::Thread(event) => {
1155                let thread_id = ThreadId(event.thread_id);
1156
1157                match event.reason {
1158                    dap::ThreadEventReason::Started => {
1159                        self.thread_states.continue_thread(thread_id);
1160                    }
1161                    dap::ThreadEventReason::Exited => {
1162                        self.thread_states.exit_thread(thread_id);
1163                    }
1164                    reason => {
1165                        log::error!("Unhandled thread event reason {:?}", reason);
1166                    }
1167                }
1168                self.invalidate_state(&ThreadsCommand.into());
1169                cx.notify();
1170            }
1171            Events::Output(event) => {
1172                if event
1173                    .category
1174                    .as_ref()
1175                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1176                {
1177                    return;
1178                }
1179
1180                self.output.push_back(event);
1181                self.output_token.0 += 1;
1182                cx.notify();
1183            }
1184            Events::Breakpoint(_) => {}
1185            Events::Module(event) => {
1186                match event.reason {
1187                    dap::ModuleEventReason::New => {
1188                        self.modules.push(event.module);
1189                    }
1190                    dap::ModuleEventReason::Changed => {
1191                        if let Some(module) = self
1192                            .modules
1193                            .iter_mut()
1194                            .find(|other| event.module.id == other.id)
1195                        {
1196                            *module = event.module;
1197                        }
1198                    }
1199                    dap::ModuleEventReason::Removed => {
1200                        self.modules.retain(|other| event.module.id != other.id);
1201                    }
1202                }
1203
1204                // todo(debugger): We should only send the invalidate command to downstream clients.
1205                // self.invalidate_state(&ModulesCommand.into());
1206            }
1207            Events::LoadedSource(_) => {
1208                self.invalidate_state(&LoadedSourcesCommand.into());
1209            }
1210            Events::Capabilities(event) => {
1211                self.capabilities = self.capabilities.merge(event.capabilities);
1212                cx.notify();
1213            }
1214            Events::Memory(_) => {}
1215            Events::Process(_) => {}
1216            Events::ProgressEnd(_) => {}
1217            Events::ProgressStart(_) => {}
1218            Events::ProgressUpdate(_) => {}
1219            Events::Invalidated(_) => {}
1220            Events::Other(_) => {}
1221        }
1222    }
1223
1224    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1225    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1226        &mut self,
1227        request: T,
1228        process_result: impl FnOnce(
1229            &mut Self,
1230            Result<T::Response>,
1231            &mut Context<Self>,
1232        ) -> Option<T::Response>
1233        + 'static,
1234        cx: &mut Context<Self>,
1235    ) {
1236        const {
1237            assert!(
1238                T::CACHEABLE,
1239                "Only requests marked as cacheable should invoke `fetch`"
1240            );
1241        }
1242
1243        if !self.thread_states.any_stopped_thread()
1244            && request.type_id() != TypeId::of::<ThreadsCommand>()
1245            || self.is_session_terminated
1246        {
1247            return;
1248        }
1249
1250        let request_map = self
1251            .requests
1252            .entry(std::any::TypeId::of::<T>())
1253            .or_default();
1254
1255        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1256            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1257
1258            let task = Self::request_inner::<Arc<T>>(
1259                &self.capabilities,
1260                &self.mode,
1261                command,
1262                process_result,
1263                cx,
1264            );
1265            let task = cx
1266                .background_executor()
1267                .spawn(async move {
1268                    let _ = task.await?;
1269                    Some(())
1270                })
1271                .shared();
1272
1273            vacant.insert(task);
1274            cx.notify();
1275        }
1276    }
1277
1278    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1279        &self,
1280        request: T,
1281    ) -> Result<T::Response> {
1282        if !T::is_supported(&self.capabilities) {
1283            anyhow::bail!("DAP request {:?} is not supported", request);
1284        }
1285
1286        self.mode.request_dap(request).await
1287    }
1288
1289    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1290        capabilities: &Capabilities,
1291        mode: &Mode,
1292        request: T,
1293        process_result: impl FnOnce(
1294            &mut Self,
1295            Result<T::Response>,
1296            &mut Context<Self>,
1297        ) -> Option<T::Response>
1298        + 'static,
1299        cx: &mut Context<Self>,
1300    ) -> Task<Option<T::Response>> {
1301        if !T::is_supported(&capabilities) {
1302            log::warn!(
1303                "Attempted to send a DAP request that isn't supported: {:?}",
1304                request
1305            );
1306            let error = Err(anyhow::Error::msg(
1307                "Couldn't complete request because it's not supported",
1308            ));
1309            return cx.spawn(async move |this, cx| {
1310                this.update(cx, |this, cx| process_result(this, error, cx))
1311                    .log_err()
1312                    .flatten()
1313            });
1314        }
1315
1316        let request = mode.request_dap(request);
1317        cx.spawn(async move |this, cx| {
1318            let result = request.await;
1319            this.update(cx, |this, cx| process_result(this, result, cx))
1320                .log_err()
1321                .flatten()
1322        })
1323    }
1324
1325    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1326        &self,
1327        request: T,
1328        process_result: impl FnOnce(
1329            &mut Self,
1330            Result<T::Response>,
1331            &mut Context<Self>,
1332        ) -> Option<T::Response>
1333        + 'static,
1334        cx: &mut Context<Self>,
1335    ) -> Task<Option<T::Response>> {
1336        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1337    }
1338
1339    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1340        self.requests.remove(&std::any::TypeId::of::<Command>());
1341    }
1342
1343    fn invalidate_generic(&mut self) {
1344        self.invalidate_command_type::<ModulesCommand>();
1345        self.invalidate_command_type::<LoadedSourcesCommand>();
1346        self.invalidate_command_type::<ThreadsCommand>();
1347    }
1348
1349    fn invalidate_state(&mut self, key: &RequestSlot) {
1350        self.requests
1351            .entry((&*key.0 as &dyn Any).type_id())
1352            .and_modify(|request_map| {
1353                request_map.remove(&key);
1354            });
1355    }
1356
1357    pub fn any_stopped_thread(&self) -> bool {
1358        self.thread_states.any_stopped_thread()
1359    }
1360
1361    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1362        self.thread_states.thread_status(thread_id)
1363    }
1364
1365    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1366        self.fetch(
1367            dap_command::ThreadsCommand,
1368            |this, result, cx| {
1369                let result = result.log_err()?;
1370
1371                this.threads = result
1372                    .iter()
1373                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1374                    .collect();
1375
1376                this.invalidate_command_type::<StackTraceCommand>();
1377                cx.emit(SessionEvent::Threads);
1378                cx.notify();
1379
1380                Some(result)
1381            },
1382            cx,
1383        );
1384
1385        self.threads
1386            .values()
1387            .map(|thread| {
1388                (
1389                    thread.dap.clone(),
1390                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1391                )
1392            })
1393            .collect()
1394    }
1395
1396    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1397        self.fetch(
1398            dap_command::ModulesCommand,
1399            |this, result, cx| {
1400                let result = result.log_err()?;
1401
1402                this.modules = result.iter().cloned().collect();
1403                cx.emit(SessionEvent::Modules);
1404                cx.notify();
1405
1406                Some(result)
1407            },
1408            cx,
1409        );
1410
1411        &self.modules
1412    }
1413
1414    pub fn ignore_breakpoints(&self) -> bool {
1415        self.ignore_breakpoints
1416    }
1417
1418    pub fn toggle_ignore_breakpoints(
1419        &mut self,
1420        cx: &mut App,
1421    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1422        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1423    }
1424
1425    pub(crate) fn set_ignore_breakpoints(
1426        &mut self,
1427        ignore: bool,
1428        cx: &mut App,
1429    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1430        if self.ignore_breakpoints == ignore {
1431            return Task::ready(HashMap::default());
1432        }
1433
1434        self.ignore_breakpoints = ignore;
1435
1436        if let Some(local) = self.as_local() {
1437            local.send_source_breakpoints(ignore, cx)
1438        } else {
1439            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1440            unimplemented!()
1441        }
1442    }
1443
1444    pub fn exception_breakpoints(
1445        &self,
1446    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1447        self.exception_breakpoints.values()
1448    }
1449
1450    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1451        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1452            *is_enabled = !*is_enabled;
1453            self.send_exception_breakpoints(cx);
1454        }
1455    }
1456
1457    fn send_exception_breakpoints(&mut self, cx: &App) {
1458        if let Some(local) = self.as_local() {
1459            let exception_filters = self
1460                .exception_breakpoints
1461                .values()
1462                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1463                .collect();
1464
1465            let supports_exception_filters = self
1466                .capabilities
1467                .supports_exception_filter_options
1468                .unwrap_or_default();
1469            local
1470                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1471                .detach_and_log_err(cx);
1472        } else {
1473            debug_assert!(false, "Not implemented");
1474        }
1475    }
1476
1477    pub fn breakpoints_enabled(&self) -> bool {
1478        self.ignore_breakpoints
1479    }
1480
1481    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1482        self.fetch(
1483            dap_command::LoadedSourcesCommand,
1484            |this, result, cx| {
1485                let result = result.log_err()?;
1486                this.loaded_sources = result.iter().cloned().collect();
1487                cx.emit(SessionEvent::LoadedSources);
1488                cx.notify();
1489                Some(result)
1490            },
1491            cx,
1492        );
1493
1494        &self.loaded_sources
1495    }
1496
1497    fn fallback_to_manual_restart(
1498        &mut self,
1499        res: Result<()>,
1500        cx: &mut Context<Self>,
1501    ) -> Option<()> {
1502        if res.log_err().is_none() {
1503            cx.emit(SessionStateEvent::Restart);
1504            return None;
1505        }
1506        Some(())
1507    }
1508
1509    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1510        res.log_err()?;
1511        Some(())
1512    }
1513
1514    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1515        thread_id: ThreadId,
1516    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1517    {
1518        move |this, response, cx| match response.log_err() {
1519            Some(response) => Some(response),
1520            None => {
1521                this.thread_states.stop_thread(thread_id);
1522                cx.notify();
1523                None
1524            }
1525        }
1526    }
1527
1528    fn clear_active_debug_line_response(
1529        &mut self,
1530        response: Result<()>,
1531        cx: &mut Context<Session>,
1532    ) -> Option<()> {
1533        response.log_err()?;
1534        self.clear_active_debug_line(cx);
1535        Some(())
1536    }
1537
1538    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1539        self.as_local()
1540            .expect("Message handler will only run in local mode")
1541            .breakpoint_store
1542            .update(cx, |store, cx| {
1543                store.remove_active_position(Some(self.id), cx)
1544            });
1545    }
1546
1547    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1548        self.request(
1549            PauseCommand {
1550                thread_id: thread_id.0,
1551            },
1552            Self::empty_response,
1553            cx,
1554        )
1555        .detach();
1556    }
1557
1558    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1559        self.request(
1560            RestartStackFrameCommand { stack_frame_id },
1561            Self::empty_response,
1562            cx,
1563        )
1564        .detach();
1565    }
1566
1567    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1568        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1569            self.request(
1570                RestartCommand {
1571                    raw: args.unwrap_or(Value::Null),
1572                },
1573                Self::fallback_to_manual_restart,
1574                cx,
1575            )
1576            .detach();
1577        } else {
1578            cx.emit(SessionStateEvent::Restart);
1579        }
1580    }
1581
1582    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1583        let debug_adapter = self.adapter_client();
1584
1585        cx.background_spawn(async move {
1586            if let Some(client) = debug_adapter {
1587                client.shutdown().await.log_err();
1588            }
1589        })
1590    }
1591
1592    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1593        self.is_session_terminated = true;
1594        self.thread_states.exit_all_threads();
1595        cx.notify();
1596
1597        let task = if self
1598            .capabilities
1599            .supports_terminate_request
1600            .unwrap_or_default()
1601        {
1602            self.request(
1603                TerminateCommand {
1604                    restart: Some(false),
1605                },
1606                Self::clear_active_debug_line_response,
1607                cx,
1608            )
1609        } else {
1610            self.request(
1611                DisconnectCommand {
1612                    restart: Some(false),
1613                    terminate_debuggee: Some(true),
1614                    suspend_debuggee: Some(false),
1615                },
1616                Self::clear_active_debug_line_response,
1617                cx,
1618            )
1619        };
1620
1621        cx.emit(SessionStateEvent::Shutdown);
1622
1623        let debug_client = self.adapter_client();
1624
1625        cx.background_spawn(async move {
1626            let _ = task.await;
1627
1628            if let Some(client) = debug_client {
1629                client.shutdown().await.log_err();
1630            }
1631        })
1632    }
1633
1634    pub fn completions(
1635        &mut self,
1636        query: CompletionsQuery,
1637        cx: &mut Context<Self>,
1638    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1639        let task = self.request(query, |_, result, _| result.log_err(), cx);
1640
1641        cx.background_executor().spawn(async move {
1642            anyhow::Ok(
1643                task.await
1644                    .map(|response| response.targets)
1645                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1646            )
1647        })
1648    }
1649
1650    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1651        self.thread_states.continue_thread(thread_id);
1652        self.request(
1653            ContinueCommand {
1654                args: ContinueArguments {
1655                    thread_id: thread_id.0,
1656                    single_thread: Some(true),
1657                },
1658            },
1659            Self::on_step_response::<ContinueCommand>(thread_id),
1660            cx,
1661        )
1662        .detach();
1663    }
1664
1665    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1666        match self.mode {
1667            Mode::Running(ref local) => Some(local.client.clone()),
1668            Mode::Building => None,
1669        }
1670    }
1671
1672    pub fn step_over(
1673        &mut self,
1674        thread_id: ThreadId,
1675        granularity: SteppingGranularity,
1676        cx: &mut Context<Self>,
1677    ) {
1678        let supports_single_thread_execution_requests =
1679            self.capabilities.supports_single_thread_execution_requests;
1680        let supports_stepping_granularity = self
1681            .capabilities
1682            .supports_stepping_granularity
1683            .unwrap_or_default();
1684
1685        let command = NextCommand {
1686            inner: StepCommand {
1687                thread_id: thread_id.0,
1688                granularity: supports_stepping_granularity.then(|| granularity),
1689                single_thread: supports_single_thread_execution_requests,
1690            },
1691        };
1692
1693        self.thread_states.process_step(thread_id);
1694        self.request(
1695            command,
1696            Self::on_step_response::<NextCommand>(thread_id),
1697            cx,
1698        )
1699        .detach();
1700    }
1701
1702    pub fn step_in(
1703        &mut self,
1704        thread_id: ThreadId,
1705        granularity: SteppingGranularity,
1706        cx: &mut Context<Self>,
1707    ) {
1708        let supports_single_thread_execution_requests =
1709            self.capabilities.supports_single_thread_execution_requests;
1710        let supports_stepping_granularity = self
1711            .capabilities
1712            .supports_stepping_granularity
1713            .unwrap_or_default();
1714
1715        let command = StepInCommand {
1716            inner: StepCommand {
1717                thread_id: thread_id.0,
1718                granularity: supports_stepping_granularity.then(|| granularity),
1719                single_thread: supports_single_thread_execution_requests,
1720            },
1721        };
1722
1723        self.thread_states.process_step(thread_id);
1724        self.request(
1725            command,
1726            Self::on_step_response::<StepInCommand>(thread_id),
1727            cx,
1728        )
1729        .detach();
1730    }
1731
1732    pub fn step_out(
1733        &mut self,
1734        thread_id: ThreadId,
1735        granularity: SteppingGranularity,
1736        cx: &mut Context<Self>,
1737    ) {
1738        let supports_single_thread_execution_requests =
1739            self.capabilities.supports_single_thread_execution_requests;
1740        let supports_stepping_granularity = self
1741            .capabilities
1742            .supports_stepping_granularity
1743            .unwrap_or_default();
1744
1745        let command = StepOutCommand {
1746            inner: StepCommand {
1747                thread_id: thread_id.0,
1748                granularity: supports_stepping_granularity.then(|| granularity),
1749                single_thread: supports_single_thread_execution_requests,
1750            },
1751        };
1752
1753        self.thread_states.process_step(thread_id);
1754        self.request(
1755            command,
1756            Self::on_step_response::<StepOutCommand>(thread_id),
1757            cx,
1758        )
1759        .detach();
1760    }
1761
1762    pub fn step_back(
1763        &mut self,
1764        thread_id: ThreadId,
1765        granularity: SteppingGranularity,
1766        cx: &mut Context<Self>,
1767    ) {
1768        let supports_single_thread_execution_requests =
1769            self.capabilities.supports_single_thread_execution_requests;
1770        let supports_stepping_granularity = self
1771            .capabilities
1772            .supports_stepping_granularity
1773            .unwrap_or_default();
1774
1775        let command = StepBackCommand {
1776            inner: StepCommand {
1777                thread_id: thread_id.0,
1778                granularity: supports_stepping_granularity.then(|| granularity),
1779                single_thread: supports_single_thread_execution_requests,
1780            },
1781        };
1782
1783        self.thread_states.process_step(thread_id);
1784
1785        self.request(
1786            command,
1787            Self::on_step_response::<StepBackCommand>(thread_id),
1788            cx,
1789        )
1790        .detach();
1791    }
1792
1793    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1794        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1795            && self.requests.contains_key(&ThreadsCommand.type_id())
1796            && self.threads.contains_key(&thread_id)
1797        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1798        // We could still be using an old thread id and have sent a new thread's request
1799        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1800        // But it very well could cause a minor bug in the future that is hard to track down
1801        {
1802            self.fetch(
1803                super::dap_command::StackTraceCommand {
1804                    thread_id: thread_id.0,
1805                    start_frame: None,
1806                    levels: None,
1807                },
1808                move |this, stack_frames, cx| {
1809                    let stack_frames = stack_frames.log_err()?;
1810
1811                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1812                        thread.stack_frame_ids =
1813                            stack_frames.iter().map(|frame| frame.id).collect();
1814                    });
1815                    debug_assert!(
1816                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1817                        "Sent request for thread_id that doesn't exist"
1818                    );
1819
1820                    this.stack_frames.extend(
1821                        stack_frames
1822                            .iter()
1823                            .cloned()
1824                            .map(|frame| (frame.id, StackFrame::from(frame))),
1825                    );
1826
1827                    this.invalidate_command_type::<ScopesCommand>();
1828                    this.invalidate_command_type::<VariablesCommand>();
1829
1830                    cx.emit(SessionEvent::StackTrace);
1831                    cx.notify();
1832                    Some(stack_frames)
1833                },
1834                cx,
1835            );
1836        }
1837
1838        self.threads
1839            .get(&thread_id)
1840            .map(|thread| {
1841                thread
1842                    .stack_frame_ids
1843                    .iter()
1844                    .filter_map(|id| self.stack_frames.get(id))
1845                    .cloned()
1846                    .collect()
1847            })
1848            .unwrap_or_default()
1849    }
1850
1851    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1852        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1853            && self
1854                .requests
1855                .contains_key(&TypeId::of::<StackTraceCommand>())
1856        {
1857            self.fetch(
1858                ScopesCommand { stack_frame_id },
1859                move |this, scopes, cx| {
1860                    let scopes = scopes.log_err()?;
1861
1862                    for scope in scopes .iter(){
1863                        this.variables(scope.variables_reference, cx);
1864                    }
1865
1866                    let entry = this
1867                        .stack_frames
1868                        .entry(stack_frame_id)
1869                        .and_modify(|stack_frame| {
1870                            stack_frame.scopes = scopes.clone();
1871                        });
1872
1873                    cx.emit(SessionEvent::Variables);
1874
1875                    debug_assert!(
1876                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1877                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1878                    );
1879
1880                    Some(scopes)
1881                },
1882                cx,
1883            );
1884        }
1885
1886        self.stack_frames
1887            .get(&stack_frame_id)
1888            .map(|frame| frame.scopes.as_slice())
1889            .unwrap_or_default()
1890    }
1891
1892    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
1893        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
1894            return Vec::new();
1895        };
1896
1897        stack_frame
1898            .scopes
1899            .iter()
1900            .filter_map(|scope| self.variables.get(&scope.variables_reference))
1901            .flatten()
1902            .cloned()
1903            .collect()
1904    }
1905
1906    pub fn variables(
1907        &mut self,
1908        variables_reference: VariableReference,
1909        cx: &mut Context<Self>,
1910    ) -> Vec<dap::Variable> {
1911        let command = VariablesCommand {
1912            variables_reference,
1913            filter: None,
1914            start: None,
1915            count: None,
1916            format: None,
1917        };
1918
1919        self.fetch(
1920            command,
1921            move |this, variables, cx| {
1922                let variables = variables.log_err()?;
1923                this.variables
1924                    .insert(variables_reference, variables.clone());
1925
1926                cx.emit(SessionEvent::Variables);
1927                cx.emit(SessionEvent::InvalidateInlineValue);
1928                Some(variables)
1929            },
1930            cx,
1931        );
1932
1933        self.variables
1934            .get(&variables_reference)
1935            .cloned()
1936            .unwrap_or_default()
1937    }
1938
1939    pub fn set_variable_value(
1940        &mut self,
1941        variables_reference: u64,
1942        name: String,
1943        value: String,
1944        cx: &mut Context<Self>,
1945    ) {
1946        if self.capabilities.supports_set_variable.unwrap_or_default() {
1947            self.request(
1948                SetVariableValueCommand {
1949                    name,
1950                    value,
1951                    variables_reference,
1952                },
1953                move |this, response, cx| {
1954                    let response = response.log_err()?;
1955                    this.invalidate_command_type::<VariablesCommand>();
1956                    cx.notify();
1957                    Some(response)
1958                },
1959                cx,
1960            )
1961            .detach()
1962        }
1963    }
1964
1965    pub fn evaluate(
1966        &mut self,
1967        expression: String,
1968        context: Option<EvaluateArgumentsContext>,
1969        frame_id: Option<u64>,
1970        source: Option<Source>,
1971        cx: &mut Context<Self>,
1972    ) -> Task<()> {
1973        let request = self.mode.request_dap(EvaluateCommand {
1974            expression,
1975            context,
1976            frame_id,
1977            source,
1978        });
1979        cx.spawn(async move |this, cx| {
1980            let response = request.await;
1981            this.update(cx, |this, cx| {
1982                match response {
1983                    Ok(response) => {
1984                        this.output_token.0 += 1;
1985                        this.output.push_back(dap::OutputEvent {
1986                            category: None,
1987                            output: format!("< {}", &response.result),
1988                            group: None,
1989                            variables_reference: Some(response.variables_reference),
1990                            source: None,
1991                            line: None,
1992                            column: None,
1993                            data: None,
1994                            location_reference: None,
1995                        });
1996                    }
1997                    Err(e) => {
1998                        this.output_token.0 += 1;
1999                        this.output.push_back(dap::OutputEvent {
2000                            category: None,
2001                            output: format!("{}", e),
2002                            group: None,
2003                            variables_reference: None,
2004                            source: None,
2005                            line: None,
2006                            column: None,
2007                            data: None,
2008                            location_reference: None,
2009                        });
2010                    }
2011                };
2012                this.invalidate_command_type::<ScopesCommand>();
2013                cx.notify();
2014            })
2015            .ok();
2016        })
2017    }
2018
2019    pub fn location(
2020        &mut self,
2021        reference: u64,
2022        cx: &mut Context<Self>,
2023    ) -> Option<dap::LocationsResponse> {
2024        self.fetch(
2025            LocationsCommand { reference },
2026            move |this, response, _| {
2027                let response = response.log_err()?;
2028                this.locations.insert(reference, response.clone());
2029                Some(response)
2030            },
2031            cx,
2032        );
2033        self.locations.get(&reference).cloned()
2034    }
2035
2036    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2037        let command = DisconnectCommand {
2038            restart: Some(false),
2039            terminate_debuggee: Some(true),
2040            suspend_debuggee: Some(false),
2041        };
2042
2043        self.request(command, Self::empty_response, cx).detach()
2044    }
2045
2046    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2047        if self
2048            .capabilities
2049            .supports_terminate_threads_request
2050            .unwrap_or_default()
2051        {
2052            self.request(
2053                TerminateThreadsCommand {
2054                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2055                },
2056                Self::clear_active_debug_line_response,
2057                cx,
2058            )
2059            .detach();
2060        } else {
2061            self.shutdown(cx).detach();
2062        }
2063    }
2064}