session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  16use dap::messages::Response;
  17use dap::requests::{Request, RunInTerminal, StartDebugging};
  18use dap::{
  19    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  20    SteppingGranularity, StoppedEvent, VariableReference,
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{
  25    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory,
  26    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  27};
  28use futures::channel::{mpsc, oneshot};
  29use futures::{FutureExt, future::Shared};
  30use gpui::{
  31    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  32    Task, WeakEntity,
  33};
  34
  35use serde_json::Value;
  36use smol::stream::StreamExt;
  37use std::any::TypeId;
  38use std::collections::BTreeMap;
  39use std::u64;
  40use std::{
  41    any::Any,
  42    collections::hash_map::Entry,
  43    hash::{Hash, Hasher},
  44    path::Path,
  45    sync::Arc,
  46};
  47use text::{PointUtf16, ToPointUtf16};
  48use util::ResultExt;
  49use worktree::Worktree;
  50
  51#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  52#[repr(transparent)]
  53pub struct ThreadId(pub u64);
  54
  55impl ThreadId {
  56    pub const MIN: ThreadId = ThreadId(u64::MIN);
  57    pub const MAX: ThreadId = ThreadId(u64::MAX);
  58}
  59
  60impl From<u64> for ThreadId {
  61    fn from(id: u64) -> Self {
  62        Self(id)
  63    }
  64}
  65
  66#[derive(Clone, Debug)]
  67pub struct StackFrame {
  68    pub dap: dap::StackFrame,
  69    pub scopes: Vec<dap::Scope>,
  70}
  71
  72impl From<dap::StackFrame> for StackFrame {
  73    fn from(stack_frame: dap::StackFrame) -> Self {
  74        Self {
  75            scopes: vec![],
  76            dap: stack_frame,
  77        }
  78    }
  79}
  80
  81#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  82pub enum ThreadStatus {
  83    #[default]
  84    Running,
  85    Stopped,
  86    Stepping,
  87    Exited,
  88    Ended,
  89}
  90
  91impl ThreadStatus {
  92    pub fn label(&self) -> &'static str {
  93        match self {
  94            ThreadStatus::Running => "Running",
  95            ThreadStatus::Stopped => "Stopped",
  96            ThreadStatus::Stepping => "Stepping",
  97            ThreadStatus::Exited => "Exited",
  98            ThreadStatus::Ended => "Ended",
  99        }
 100    }
 101}
 102
 103#[derive(Debug)]
 104pub struct Thread {
 105    dap: dap::Thread,
 106    stack_frame_ids: IndexSet<StackFrameId>,
 107    _has_stopped: bool,
 108}
 109
 110impl From<dap::Thread> for Thread {
 111    fn from(dap: dap::Thread) -> Self {
 112        Self {
 113            dap,
 114            stack_frame_ids: Default::default(),
 115            _has_stopped: false,
 116        }
 117    }
 118}
 119
 120pub enum Mode {
 121    Building,
 122    Running(LocalMode),
 123}
 124
 125#[derive(Clone)]
 126pub struct LocalMode {
 127    client: Arc<DebugAdapterClient>,
 128    binary: DebugAdapterBinary,
 129    tmp_breakpoint: Option<SourceBreakpoint>,
 130    worktree: WeakEntity<Worktree>,
 131    executor: BackgroundExecutor,
 132}
 133
 134fn client_source(abs_path: &Path) -> dap::Source {
 135    dap::Source {
 136        name: abs_path
 137            .file_name()
 138            .map(|filename| filename.to_string_lossy().to_string()),
 139        path: Some(abs_path.to_string_lossy().to_string()),
 140        source_reference: None,
 141        presentation_hint: None,
 142        origin: None,
 143        sources: None,
 144        adapter_data: None,
 145        checksums: None,
 146    }
 147}
 148
 149impl LocalMode {
 150    async fn new(
 151        session_id: SessionId,
 152        parent_session: Option<Entity<Session>>,
 153        worktree: WeakEntity<Worktree>,
 154        binary: DebugAdapterBinary,
 155        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 156        cx: AsyncApp,
 157    ) -> Result<Self> {
 158        let message_handler = Box::new(move |message| {
 159            messages_tx.unbounded_send(message).ok();
 160        });
 161
 162        let client = Arc::new(
 163            if let Some(client) = parent_session
 164                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 165                .flatten()
 166            {
 167                client
 168                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 169                    .await?
 170            } else {
 171                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 172                    .await
 173                    .with_context(|| format!("Failed to start {:?}", &binary.command))?
 174            },
 175        );
 176
 177        Ok(Self {
 178            client,
 179            worktree,
 180            tmp_breakpoint: None,
 181            binary,
 182            executor: cx.background_executor().clone(),
 183        })
 184    }
 185
 186    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 187        &self.worktree
 188    }
 189
 190    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 191        let tasks: Vec<_> = paths
 192            .into_iter()
 193            .map(|path| {
 194                self.request(dap_command::SetBreakpoints {
 195                    source: client_source(path),
 196                    source_modified: None,
 197                    breakpoints: vec![],
 198                })
 199            })
 200            .collect();
 201
 202        cx.background_spawn(async move {
 203            futures::future::join_all(tasks)
 204                .await
 205                .iter()
 206                .for_each(|res| match res {
 207                    Ok(_) => {}
 208                    Err(err) => {
 209                        log::warn!("Set breakpoints request failed: {}", err);
 210                    }
 211                });
 212        })
 213    }
 214
 215    fn send_breakpoints_from_path(
 216        &self,
 217        abs_path: Arc<Path>,
 218        reason: BreakpointUpdatedReason,
 219        breakpoint_store: &Entity<BreakpointStore>,
 220        cx: &mut App,
 221    ) -> Task<()> {
 222        let breakpoints = breakpoint_store
 223            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 224            .into_iter()
 225            .filter(|bp| bp.state.is_enabled())
 226            .chain(self.tmp_breakpoint.clone())
 227            .map(Into::into)
 228            .collect();
 229
 230        let task = self.request(dap_command::SetBreakpoints {
 231            source: client_source(&abs_path),
 232            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 233            breakpoints,
 234        });
 235
 236        cx.background_spawn(async move {
 237            match task.await {
 238                Ok(_) => {}
 239                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 240            }
 241        })
 242    }
 243
 244    fn send_exception_breakpoints(
 245        &self,
 246        filters: Vec<ExceptionBreakpointsFilter>,
 247        supports_filter_options: bool,
 248    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 249        let arg = if supports_filter_options {
 250            SetExceptionBreakpoints::WithOptions {
 251                filters: filters
 252                    .into_iter()
 253                    .map(|filter| ExceptionFilterOptions {
 254                        filter_id: filter.filter,
 255                        condition: None,
 256                        mode: None,
 257                    })
 258                    .collect(),
 259            }
 260        } else {
 261            SetExceptionBreakpoints::Plain {
 262                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 263            }
 264        };
 265        self.request(arg)
 266    }
 267
 268    fn send_source_breakpoints(
 269        &self,
 270        ignore_breakpoints: bool,
 271        breakpoint_store: &Entity<BreakpointStore>,
 272        cx: &App,
 273    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 274        let mut breakpoint_tasks = Vec::new();
 275        let breakpoints = breakpoint_store.read_with(cx, |store, cx| store.all_breakpoints(cx));
 276
 277        for (path, breakpoints) in breakpoints {
 278            let breakpoints = if ignore_breakpoints {
 279                vec![]
 280            } else {
 281                breakpoints
 282                    .into_iter()
 283                    .filter(|bp| bp.state.is_enabled())
 284                    .map(Into::into)
 285                    .collect()
 286            };
 287
 288            breakpoint_tasks.push(
 289                self.request(dap_command::SetBreakpoints {
 290                    source: client_source(&path),
 291                    source_modified: Some(false),
 292                    breakpoints,
 293                })
 294                .map(|result| result.map_err(|e| (path, e))),
 295            );
 296        }
 297
 298        cx.background_spawn(async move {
 299            futures::future::join_all(breakpoint_tasks)
 300                .await
 301                .into_iter()
 302                .filter_map(Result::err)
 303                .collect::<HashMap<_, _>>()
 304        })
 305    }
 306
 307    fn initialize_sequence(
 308        &self,
 309        capabilities: &Capabilities,
 310        initialized_rx: oneshot::Receiver<()>,
 311        dap_store: WeakEntity<DapStore>,
 312        cx: &App,
 313    ) -> Task<Result<()>> {
 314        let raw = self.binary.request_args.clone();
 315
 316        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 317        let launch = match raw.request {
 318            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 319                raw: raw.configuration,
 320            }),
 321            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 322                raw: raw.configuration,
 323            }),
 324        };
 325
 326        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 327        let exception_filters = capabilities
 328            .exception_breakpoint_filters
 329            .as_ref()
 330            .map(|exception_filters| {
 331                exception_filters
 332                    .iter()
 333                    .filter(|filter| filter.default == Some(true))
 334                    .cloned()
 335                    .collect::<Vec<_>>()
 336            })
 337            .unwrap_or_default();
 338        let supports_exception_filters = capabilities
 339            .supports_exception_filter_options
 340            .unwrap_or_default();
 341        let this = self.clone();
 342        let worktree = self.worktree().clone();
 343        let configuration_sequence = cx.spawn({
 344            async move |cx| {
 345                let breakpoint_store =
 346                    dap_store.update(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 347                initialized_rx.await?;
 348                let errors_by_path = cx
 349                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 350                    .await;
 351
 352                dap_store.update(cx, |_, cx| {
 353                    let Some(worktree) = worktree.upgrade() else {
 354                        return;
 355                    };
 356
 357                    for (path, error) in &errors_by_path {
 358                        log::error!("failed to set breakpoints for {path:?}: {error}");
 359                    }
 360
 361                    if let Some(failed_path) = errors_by_path.keys().next() {
 362                        let failed_path = failed_path
 363                            .strip_prefix(worktree.read(cx).abs_path())
 364                            .unwrap_or(failed_path)
 365                            .display();
 366                        let message = format!(
 367                            "Failed to set breakpoints for {failed_path}{}",
 368                            match errors_by_path.len() {
 369                                0 => unreachable!(),
 370                                1 => "".into(),
 371                                2 => " and 1 other path".into(),
 372                                n => format!(" and {} other paths", n - 1),
 373                            }
 374                        );
 375                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 376                    }
 377                })?;
 378
 379                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 380                    .await
 381                    .ok();
 382                let ret = if configuration_done_supported {
 383                    this.request(ConfigurationDone {})
 384                } else {
 385                    Task::ready(Ok(()))
 386                }
 387                .await;
 388                ret
 389            }
 390        });
 391
 392        cx.background_spawn(async move {
 393            futures::future::try_join(launch, configuration_sequence).await?;
 394            Ok(())
 395        })
 396    }
 397
 398    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 399    where
 400        <R::DapRequest as dap::requests::Request>::Response: 'static,
 401        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 402    {
 403        let request = Arc::new(request);
 404
 405        let request_clone = request.clone();
 406        let connection = self.client.clone();
 407        self.executor.spawn(async move {
 408            let args = request_clone.to_dap();
 409            let response = connection.request::<R::DapRequest>(args).await?;
 410            request.response_from_dap(response)
 411        })
 412    }
 413}
 414
 415impl Mode {
 416    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 417    where
 418        <R::DapRequest as dap::requests::Request>::Response: 'static,
 419        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 420    {
 421        match self {
 422            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 423            Mode::Building => Task::ready(Err(anyhow!(
 424                "no adapter running to send request: {:?}",
 425                request
 426            ))),
 427        }
 428    }
 429}
 430
 431#[derive(Default)]
 432struct ThreadStates {
 433    global_state: Option<ThreadStatus>,
 434    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 435}
 436
 437impl ThreadStates {
 438    fn stop_all_threads(&mut self) {
 439        self.global_state = Some(ThreadStatus::Stopped);
 440        self.known_thread_states.clear();
 441    }
 442
 443    fn exit_all_threads(&mut self) {
 444        self.global_state = Some(ThreadStatus::Exited);
 445        self.known_thread_states.clear();
 446    }
 447
 448    fn continue_all_threads(&mut self) {
 449        self.global_state = Some(ThreadStatus::Running);
 450        self.known_thread_states.clear();
 451    }
 452
 453    fn stop_thread(&mut self, thread_id: ThreadId) {
 454        self.known_thread_states
 455            .insert(thread_id, ThreadStatus::Stopped);
 456    }
 457
 458    fn continue_thread(&mut self, thread_id: ThreadId) {
 459        self.known_thread_states
 460            .insert(thread_id, ThreadStatus::Running);
 461    }
 462
 463    fn process_step(&mut self, thread_id: ThreadId) {
 464        self.known_thread_states
 465            .insert(thread_id, ThreadStatus::Stepping);
 466    }
 467
 468    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 469        self.thread_state(thread_id)
 470            .unwrap_or(ThreadStatus::Running)
 471    }
 472
 473    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 474        self.known_thread_states
 475            .get(&thread_id)
 476            .copied()
 477            .or(self.global_state)
 478    }
 479
 480    fn exit_thread(&mut self, thread_id: ThreadId) {
 481        self.known_thread_states
 482            .insert(thread_id, ThreadStatus::Exited);
 483    }
 484
 485    fn any_stopped_thread(&self) -> bool {
 486        self.global_state
 487            .is_some_and(|state| state == ThreadStatus::Stopped)
 488            || self
 489                .known_thread_states
 490                .values()
 491                .any(|status| *status == ThreadStatus::Stopped)
 492    }
 493}
 494const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 495
 496type IsEnabled = bool;
 497
 498#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 499pub struct OutputToken(pub usize);
 500/// Represents a current state of a single debug adapter and provides ways to mutate it.
 501pub struct Session {
 502    pub mode: Mode,
 503    id: SessionId,
 504    label: SharedString,
 505    adapter: DebugAdapterName,
 506    pub(super) capabilities: Capabilities,
 507    child_session_ids: HashSet<SessionId>,
 508    parent_session: Option<Entity<Session>>,
 509    modules: Vec<dap::Module>,
 510    loaded_sources: Vec<dap::Source>,
 511    output_token: OutputToken,
 512    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 513    threads: IndexMap<ThreadId, Thread>,
 514    thread_states: ThreadStates,
 515    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 516    stack_frames: IndexMap<StackFrameId, StackFrame>,
 517    locations: HashMap<u64, dap::LocationsResponse>,
 518    is_session_terminated: bool,
 519    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 520    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 521    ignore_breakpoints: bool,
 522    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 523    background_tasks: Vec<Task<()>>,
 524}
 525
 526trait CacheableCommand: Any + Send + Sync {
 527    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 528    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 529    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 530}
 531
 532impl<T> CacheableCommand for T
 533where
 534    T: DapCommand + PartialEq + Eq + Hash,
 535{
 536    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 537        (rhs as &dyn Any)
 538            .downcast_ref::<Self>()
 539            .map_or(false, |rhs| self == rhs)
 540    }
 541
 542    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 543        T::hash(self, &mut hasher);
 544    }
 545
 546    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 547        self
 548    }
 549}
 550
 551pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 552
 553impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 554    fn from(request: T) -> Self {
 555        Self(Arc::new(request))
 556    }
 557}
 558
 559impl PartialEq for RequestSlot {
 560    fn eq(&self, other: &Self) -> bool {
 561        self.0.dyn_eq(other.0.as_ref())
 562    }
 563}
 564
 565impl Eq for RequestSlot {}
 566
 567impl Hash for RequestSlot {
 568    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 569        self.0.dyn_hash(state);
 570        (&*self.0 as &dyn Any).type_id().hash(state)
 571    }
 572}
 573
 574#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 575pub struct CompletionsQuery {
 576    pub query: String,
 577    pub column: u64,
 578    pub line: Option<u64>,
 579    pub frame_id: Option<u64>,
 580}
 581
 582impl CompletionsQuery {
 583    pub fn new(
 584        buffer: &language::Buffer,
 585        cursor_position: language::Anchor,
 586        frame_id: Option<u64>,
 587    ) -> Self {
 588        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 589        Self {
 590            query: buffer.text(),
 591            column: column as u64,
 592            frame_id,
 593            line: Some(row as u64),
 594        }
 595    }
 596}
 597
 598#[derive(Debug)]
 599pub enum SessionEvent {
 600    Modules,
 601    LoadedSources,
 602    Stopped(Option<ThreadId>),
 603    StackTrace,
 604    Variables,
 605    Threads,
 606    InvalidateInlineValue,
 607    CapabilitiesLoaded,
 608    RunInTerminal {
 609        request: RunInTerminalRequestArguments,
 610        sender: mpsc::Sender<Result<u32>>,
 611    },
 612}
 613
 614#[derive(Clone, Debug, PartialEq, Eq)]
 615pub enum SessionStateEvent {
 616    Running,
 617    Shutdown,
 618    Restart,
 619    SpawnChildSession {
 620        request: StartDebuggingRequestArguments,
 621    },
 622}
 623
 624impl EventEmitter<SessionEvent> for Session {}
 625impl EventEmitter<SessionStateEvent> for Session {}
 626
 627// local session will send breakpoint updates to DAP for all new breakpoints
 628// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 629// BreakpointStore notifies session on breakpoint changes
 630impl Session {
 631    pub(crate) fn new(
 632        breakpoint_store: Entity<BreakpointStore>,
 633        session_id: SessionId,
 634        parent_session: Option<Entity<Session>>,
 635        label: SharedString,
 636        adapter: DebugAdapterName,
 637        cx: &mut App,
 638    ) -> Entity<Self> {
 639        cx.new::<Self>(|cx| {
 640            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 641                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 642                    if let Some(local) = (!this.ignore_breakpoints)
 643                        .then(|| this.as_local_mut())
 644                        .flatten()
 645                    {
 646                        local
 647                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 648                            .detach();
 649                    };
 650                }
 651                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 652                    if let Some(local) = (!this.ignore_breakpoints)
 653                        .then(|| this.as_local_mut())
 654                        .flatten()
 655                    {
 656                        local.unset_breakpoints_from_paths(paths, cx).detach();
 657                    }
 658                }
 659                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 660            })
 661            .detach();
 662            cx.on_app_quit(Self::on_app_quit).detach();
 663
 664            let this = Self {
 665                mode: Mode::Building,
 666                id: session_id,
 667                child_session_ids: HashSet::default(),
 668                parent_session,
 669                capabilities: Capabilities::default(),
 670                variables: Default::default(),
 671                stack_frames: Default::default(),
 672                thread_states: ThreadStates::default(),
 673                output_token: OutputToken(0),
 674                output: circular_buffer::CircularBuffer::boxed(),
 675                requests: HashMap::default(),
 676                modules: Vec::default(),
 677                loaded_sources: Vec::default(),
 678                threads: IndexMap::default(),
 679                background_tasks: Vec::default(),
 680                locations: Default::default(),
 681                is_session_terminated: false,
 682                ignore_breakpoints: false,
 683                breakpoint_store,
 684                exception_breakpoints: Default::default(),
 685                label,
 686                adapter,
 687            };
 688
 689            this
 690        })
 691    }
 692
 693    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 694        match &self.mode {
 695            Mode::Building => None,
 696            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 697        }
 698    }
 699
 700    pub fn boot(
 701        &mut self,
 702        binary: DebugAdapterBinary,
 703        worktree: Entity<Worktree>,
 704        dap_store: WeakEntity<DapStore>,
 705        cx: &mut Context<Self>,
 706    ) -> Task<Result<()>> {
 707        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 708        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 709
 710        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 711            let mut initialized_tx = Some(initialized_tx);
 712            while let Some(message) = message_rx.next().await {
 713                if let Message::Event(event) = message {
 714                    if let Events::Initialized(_) = *event {
 715                        if let Some(tx) = initialized_tx.take() {
 716                            tx.send(()).ok();
 717                        }
 718                    } else {
 719                        let Ok(_) = this.update(cx, |session, cx| {
 720                            session.handle_dap_event(event, cx);
 721                        }) else {
 722                            break;
 723                        };
 724                    }
 725                } else if let Message::Request(request) = message {
 726                    let Ok(_) = this.update(cx, |this, cx| {
 727                        if request.command == StartDebugging::COMMAND {
 728                            this.handle_start_debugging_request(request, cx)
 729                                .detach_and_log_err(cx);
 730                        } else if request.command == RunInTerminal::COMMAND {
 731                            this.handle_run_in_terminal_request(request, cx)
 732                                .detach_and_log_err(cx);
 733                        }
 734                    }) else {
 735                        break;
 736                    };
 737                }
 738            }
 739        })];
 740        self.background_tasks = background_tasks;
 741        let id = self.id;
 742        let parent_session = self.parent_session.clone();
 743
 744        cx.spawn(async move |this, cx| {
 745            let mode = LocalMode::new(
 746                id,
 747                parent_session,
 748                worktree.downgrade(),
 749                binary,
 750                message_tx,
 751                cx.clone(),
 752            )
 753            .await?;
 754            this.update(cx, |this, cx| {
 755                this.mode = Mode::Running(mode);
 756                cx.emit(SessionStateEvent::Running);
 757            })?;
 758
 759            this.update(cx, |session, cx| session.request_initialize(cx))?
 760                .await?;
 761
 762            this.update(cx, |session, cx| {
 763                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 764            })?
 765            .await
 766        })
 767    }
 768
 769    pub fn session_id(&self) -> SessionId {
 770        self.id
 771    }
 772
 773    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 774        self.child_session_ids.clone()
 775    }
 776
 777    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 778        self.child_session_ids.insert(session_id);
 779    }
 780
 781    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 782        self.child_session_ids.remove(&session_id);
 783    }
 784
 785    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 786        self.parent_session
 787            .as_ref()
 788            .map(|session| session.read(cx).id)
 789    }
 790
 791    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 792        self.parent_session.as_ref()
 793    }
 794
 795    pub fn capabilities(&self) -> &Capabilities {
 796        &self.capabilities
 797    }
 798
 799    pub fn binary(&self) -> &DebugAdapterBinary {
 800        let Mode::Running(local_mode) = &self.mode else {
 801            panic!("Session is not local");
 802        };
 803        &local_mode.binary
 804    }
 805
 806    pub fn adapter(&self) -> DebugAdapterName {
 807        self.adapter.clone()
 808    }
 809
 810    pub fn label(&self) -> SharedString {
 811        self.label.clone()
 812    }
 813
 814    pub fn is_terminated(&self) -> bool {
 815        self.is_session_terminated
 816    }
 817
 818    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
 819        let (tx, mut rx) = mpsc::unbounded();
 820
 821        cx.spawn(async move |this, cx| {
 822            while let Some(output) = rx.next().await {
 823                this.update(cx, |this, _| {
 824                    this.output_token.0 += 1;
 825                    this.output.push_back(dap::OutputEvent {
 826                        category: None,
 827                        output,
 828                        group: None,
 829                        variables_reference: None,
 830                        source: None,
 831                        line: None,
 832                        column: None,
 833                        data: None,
 834                        location_reference: None,
 835                    });
 836                })?;
 837            }
 838            anyhow::Ok(())
 839        })
 840        .detach();
 841
 842        return tx;
 843    }
 844
 845    pub fn is_local(&self) -> bool {
 846        matches!(self.mode, Mode::Running(_))
 847    }
 848
 849    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 850        match &mut self.mode {
 851            Mode::Running(local_mode) => Some(local_mode),
 852            Mode::Building => None,
 853        }
 854    }
 855
 856    pub fn as_local(&self) -> Option<&LocalMode> {
 857        match &self.mode {
 858            Mode::Running(local_mode) => Some(local_mode),
 859            Mode::Building => None,
 860        }
 861    }
 862
 863    fn handle_start_debugging_request(
 864        &mut self,
 865        request: dap::messages::Request,
 866        cx: &mut Context<Self>,
 867    ) -> Task<Result<()>> {
 868        let request_seq = request.seq;
 869
 870        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 871            .arguments
 872            .as_ref()
 873            .map(|value| serde_json::from_value(value.clone()));
 874
 875        let mut success = true;
 876        if let Some(Ok(request)) = launch_request {
 877            cx.emit(SessionStateEvent::SpawnChildSession { request });
 878        } else {
 879            log::error!(
 880                "Failed to parse launch request arguments: {:?}",
 881                request.arguments
 882            );
 883            success = false;
 884        }
 885
 886        cx.spawn(async move |this, cx| {
 887            this.update(cx, |this, cx| {
 888                this.respond_to_client(
 889                    request_seq,
 890                    success,
 891                    StartDebugging::COMMAND.to_string(),
 892                    None,
 893                    cx,
 894                )
 895            })?
 896            .await
 897        })
 898    }
 899
 900    fn handle_run_in_terminal_request(
 901        &mut self,
 902        request: dap::messages::Request,
 903        cx: &mut Context<Self>,
 904    ) -> Task<Result<()>> {
 905        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
 906            request.arguments.unwrap_or_default(),
 907        )
 908        .expect("To parse StartDebuggingRequestArguments");
 909
 910        let seq = request.seq;
 911
 912        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
 913        cx.emit(SessionEvent::RunInTerminal {
 914            request: request_args,
 915            sender: tx,
 916        });
 917        cx.notify();
 918
 919        cx.spawn(async move |session, cx| {
 920            let result = util::maybe!(async move {
 921                rx.next().await.ok_or_else(|| {
 922                    anyhow!("failed to receive response from spawn terminal".to_string())
 923                })?
 924            })
 925            .await;
 926            let (success, body) = match result {
 927                Ok(pid) => (
 928                    true,
 929                    serde_json::to_value(dap::RunInTerminalResponse {
 930                        process_id: None,
 931                        shell_process_id: Some(pid as u64),
 932                    })
 933                    .ok(),
 934                ),
 935                Err(error) => (
 936                    false,
 937                    serde_json::to_value(dap::ErrorResponse {
 938                        error: Some(dap::Message {
 939                            id: seq,
 940                            format: error.to_string(),
 941                            variables: None,
 942                            send_telemetry: None,
 943                            show_user: None,
 944                            url: None,
 945                            url_label: None,
 946                        }),
 947                    })
 948                    .ok(),
 949                ),
 950            };
 951
 952            session
 953                .update(cx, |session, cx| {
 954                    session.respond_to_client(
 955                        seq,
 956                        success,
 957                        RunInTerminal::COMMAND.to_string(),
 958                        body,
 959                        cx,
 960                    )
 961                })?
 962                .await
 963        })
 964    }
 965
 966    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 967        let adapter_id = self.adapter().to_string();
 968        let request = Initialize { adapter_id };
 969        match &self.mode {
 970            Mode::Running(local_mode) => {
 971                let capabilities = local_mode.request(request);
 972
 973                cx.spawn(async move |this, cx| {
 974                    let capabilities = capabilities.await?;
 975                    this.update(cx, |session, cx| {
 976                        session.capabilities = capabilities;
 977                        let filters = session
 978                            .capabilities
 979                            .exception_breakpoint_filters
 980                            .clone()
 981                            .unwrap_or_default();
 982                        for filter in filters {
 983                            let default = filter.default.unwrap_or_default();
 984                            session
 985                                .exception_breakpoints
 986                                .entry(filter.filter.clone())
 987                                .or_insert_with(|| (filter, default));
 988                        }
 989                        cx.emit(SessionEvent::CapabilitiesLoaded);
 990                    })?;
 991                    Ok(())
 992                })
 993            }
 994            Mode::Building => Task::ready(Err(anyhow!(
 995                "Cannot send initialize request, task still building"
 996            ))),
 997        }
 998    }
 999
1000    pub(super) fn initialize_sequence(
1001        &mut self,
1002        initialize_rx: oneshot::Receiver<()>,
1003        dap_store: WeakEntity<DapStore>,
1004        cx: &mut Context<Self>,
1005    ) -> Task<Result<()>> {
1006        match &self.mode {
1007            Mode::Running(local_mode) => {
1008                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1009            }
1010            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1011        }
1012    }
1013
1014    pub fn run_to_position(
1015        &mut self,
1016        breakpoint: SourceBreakpoint,
1017        active_thread_id: ThreadId,
1018        cx: &mut Context<Self>,
1019    ) {
1020        match &mut self.mode {
1021            Mode::Running(local_mode) => {
1022                if !matches!(
1023                    self.thread_states.thread_state(active_thread_id),
1024                    Some(ThreadStatus::Stopped)
1025                ) {
1026                    return;
1027                };
1028                let path = breakpoint.path.clone();
1029                local_mode.tmp_breakpoint = Some(breakpoint);
1030                let task = local_mode.send_breakpoints_from_path(
1031                    path,
1032                    BreakpointUpdatedReason::Toggled,
1033                    &self.breakpoint_store,
1034                    cx,
1035                );
1036
1037                cx.spawn(async move |this, cx| {
1038                    task.await;
1039                    this.update(cx, |this, cx| {
1040                        this.continue_thread(active_thread_id, cx);
1041                    })
1042                })
1043                .detach();
1044            }
1045            Mode::Building => {}
1046        }
1047    }
1048
1049    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1050        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1051    }
1052
1053    pub fn output(
1054        &self,
1055        since: OutputToken,
1056    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1057        if self.output_token.0 == 0 {
1058            return (self.output.range(0..0), OutputToken(0));
1059        };
1060
1061        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1062
1063        let clamped_events_since = events_since.clamp(0, self.output.len());
1064        (
1065            self.output
1066                .range(self.output.len() - clamped_events_since..),
1067            self.output_token,
1068        )
1069    }
1070
1071    pub fn respond_to_client(
1072        &self,
1073        request_seq: u64,
1074        success: bool,
1075        command: String,
1076        body: Option<serde_json::Value>,
1077        cx: &mut Context<Self>,
1078    ) -> Task<Result<()>> {
1079        let Some(local_session) = self.as_local() else {
1080            unreachable!("Cannot respond to remote client");
1081        };
1082        let client = local_session.client.clone();
1083
1084        cx.background_spawn(async move {
1085            client
1086                .send_message(Message::Response(Response {
1087                    body,
1088                    success,
1089                    command,
1090                    seq: request_seq + 1,
1091                    request_seq,
1092                    message: None,
1093                }))
1094                .await
1095        })
1096    }
1097
1098    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1099        // todo(debugger): Find a clean way to get around the clone
1100        let breakpoint_store = self.breakpoint_store.clone();
1101        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1102            let breakpoint = local.tmp_breakpoint.take()?;
1103            let path = breakpoint.path.clone();
1104            Some((local, path))
1105        }) {
1106            local
1107                .send_breakpoints_from_path(
1108                    path,
1109                    BreakpointUpdatedReason::Toggled,
1110                    &breakpoint_store,
1111                    cx,
1112                )
1113                .detach();
1114        };
1115
1116        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1117            self.thread_states.stop_all_threads();
1118
1119            self.invalidate_command_type::<StackTraceCommand>();
1120        }
1121
1122        // Event if we stopped all threads we still need to insert the thread_id
1123        // to our own data
1124        if let Some(thread_id) = event.thread_id {
1125            self.thread_states.stop_thread(ThreadId(thread_id));
1126
1127            self.invalidate_state(
1128                &StackTraceCommand {
1129                    thread_id,
1130                    start_frame: None,
1131                    levels: None,
1132                }
1133                .into(),
1134            );
1135        }
1136
1137        self.invalidate_generic();
1138        self.threads.clear();
1139        self.variables.clear();
1140        cx.emit(SessionEvent::Stopped(
1141            event
1142                .thread_id
1143                .map(Into::into)
1144                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1145        ));
1146        cx.emit(SessionEvent::InvalidateInlineValue);
1147        cx.notify();
1148    }
1149
1150    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1151        match *event {
1152            Events::Initialized(_) => {
1153                debug_assert!(
1154                    false,
1155                    "Initialized event should have been handled in LocalMode"
1156                );
1157            }
1158            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1159            Events::Continued(event) => {
1160                if event.all_threads_continued.unwrap_or_default() {
1161                    self.thread_states.continue_all_threads();
1162                    self.breakpoint_store.update(cx, |store, cx| {
1163                        store.remove_active_position(Some(self.session_id()), cx)
1164                    });
1165                } else {
1166                    self.thread_states
1167                        .continue_thread(ThreadId(event.thread_id));
1168                }
1169                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1170                self.invalidate_generic();
1171            }
1172            Events::Exited(_event) => {
1173                self.clear_active_debug_line(cx);
1174            }
1175            Events::Terminated(_) => {
1176                self.is_session_terminated = true;
1177                self.clear_active_debug_line(cx);
1178            }
1179            Events::Thread(event) => {
1180                let thread_id = ThreadId(event.thread_id);
1181
1182                match event.reason {
1183                    dap::ThreadEventReason::Started => {
1184                        self.thread_states.continue_thread(thread_id);
1185                    }
1186                    dap::ThreadEventReason::Exited => {
1187                        self.thread_states.exit_thread(thread_id);
1188                    }
1189                    reason => {
1190                        log::error!("Unhandled thread event reason {:?}", reason);
1191                    }
1192                }
1193                self.invalidate_state(&ThreadsCommand.into());
1194                cx.notify();
1195            }
1196            Events::Output(event) => {
1197                if event
1198                    .category
1199                    .as_ref()
1200                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1201                {
1202                    return;
1203                }
1204
1205                self.output.push_back(event);
1206                self.output_token.0 += 1;
1207                cx.notify();
1208            }
1209            Events::Breakpoint(_) => {}
1210            Events::Module(event) => {
1211                match event.reason {
1212                    dap::ModuleEventReason::New => {
1213                        self.modules.push(event.module);
1214                    }
1215                    dap::ModuleEventReason::Changed => {
1216                        if let Some(module) = self
1217                            .modules
1218                            .iter_mut()
1219                            .find(|other| event.module.id == other.id)
1220                        {
1221                            *module = event.module;
1222                        }
1223                    }
1224                    dap::ModuleEventReason::Removed => {
1225                        self.modules.retain(|other| event.module.id != other.id);
1226                    }
1227                }
1228
1229                // todo(debugger): We should only send the invalidate command to downstream clients.
1230                // self.invalidate_state(&ModulesCommand.into());
1231            }
1232            Events::LoadedSource(_) => {
1233                self.invalidate_state(&LoadedSourcesCommand.into());
1234            }
1235            Events::Capabilities(event) => {
1236                self.capabilities = self.capabilities.merge(event.capabilities);
1237                cx.notify();
1238            }
1239            Events::Memory(_) => {}
1240            Events::Process(_) => {}
1241            Events::ProgressEnd(_) => {}
1242            Events::ProgressStart(_) => {}
1243            Events::ProgressUpdate(_) => {}
1244            Events::Invalidated(_) => {}
1245            Events::Other(_) => {}
1246        }
1247    }
1248
1249    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1250    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1251        &mut self,
1252        request: T,
1253        process_result: impl FnOnce(
1254            &mut Self,
1255            Result<T::Response>,
1256            &mut Context<Self>,
1257        ) -> Option<T::Response>
1258        + 'static,
1259        cx: &mut Context<Self>,
1260    ) {
1261        const {
1262            assert!(
1263                T::CACHEABLE,
1264                "Only requests marked as cacheable should invoke `fetch`"
1265            );
1266        }
1267
1268        if !self.thread_states.any_stopped_thread()
1269            && request.type_id() != TypeId::of::<ThreadsCommand>()
1270            || self.is_session_terminated
1271        {
1272            return;
1273        }
1274
1275        let request_map = self
1276            .requests
1277            .entry(std::any::TypeId::of::<T>())
1278            .or_default();
1279
1280        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1281            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1282
1283            let task = Self::request_inner::<Arc<T>>(
1284                &self.capabilities,
1285                &self.mode,
1286                command,
1287                process_result,
1288                cx,
1289            );
1290            let task = cx
1291                .background_executor()
1292                .spawn(async move {
1293                    let _ = task.await?;
1294                    Some(())
1295                })
1296                .shared();
1297
1298            vacant.insert(task);
1299            cx.notify();
1300        }
1301    }
1302
1303    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1304        &self,
1305        request: T,
1306    ) -> Result<T::Response> {
1307        if !T::is_supported(&self.capabilities) {
1308            anyhow::bail!("DAP request {:?} is not supported", request);
1309        }
1310
1311        self.mode.request_dap(request).await
1312    }
1313
1314    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1315        capabilities: &Capabilities,
1316        mode: &Mode,
1317        request: T,
1318        process_result: impl FnOnce(
1319            &mut Self,
1320            Result<T::Response>,
1321            &mut Context<Self>,
1322        ) -> Option<T::Response>
1323        + 'static,
1324        cx: &mut Context<Self>,
1325    ) -> Task<Option<T::Response>> {
1326        if !T::is_supported(&capabilities) {
1327            log::warn!(
1328                "Attempted to send a DAP request that isn't supported: {:?}",
1329                request
1330            );
1331            let error = Err(anyhow::Error::msg(
1332                "Couldn't complete request because it's not supported",
1333            ));
1334            return cx.spawn(async move |this, cx| {
1335                this.update(cx, |this, cx| process_result(this, error, cx))
1336                    .log_err()
1337                    .flatten()
1338            });
1339        }
1340
1341        let request = mode.request_dap(request);
1342        cx.spawn(async move |this, cx| {
1343            let result = request.await;
1344            this.update(cx, |this, cx| process_result(this, result, cx))
1345                .log_err()
1346                .flatten()
1347        })
1348    }
1349
1350    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1351        &self,
1352        request: T,
1353        process_result: impl FnOnce(
1354            &mut Self,
1355            Result<T::Response>,
1356            &mut Context<Self>,
1357        ) -> Option<T::Response>
1358        + 'static,
1359        cx: &mut Context<Self>,
1360    ) -> Task<Option<T::Response>> {
1361        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1362    }
1363
1364    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1365        self.requests.remove(&std::any::TypeId::of::<Command>());
1366    }
1367
1368    fn invalidate_generic(&mut self) {
1369        self.invalidate_command_type::<ModulesCommand>();
1370        self.invalidate_command_type::<LoadedSourcesCommand>();
1371        self.invalidate_command_type::<ThreadsCommand>();
1372    }
1373
1374    fn invalidate_state(&mut self, key: &RequestSlot) {
1375        self.requests
1376            .entry((&*key.0 as &dyn Any).type_id())
1377            .and_modify(|request_map| {
1378                request_map.remove(&key);
1379            });
1380    }
1381
1382    pub fn any_stopped_thread(&self) -> bool {
1383        self.thread_states.any_stopped_thread()
1384    }
1385
1386    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1387        self.thread_states.thread_status(thread_id)
1388    }
1389
1390    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1391        self.fetch(
1392            dap_command::ThreadsCommand,
1393            |this, result, cx| {
1394                let result = result.log_err()?;
1395
1396                this.threads = result
1397                    .iter()
1398                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1399                    .collect();
1400
1401                this.invalidate_command_type::<StackTraceCommand>();
1402                cx.emit(SessionEvent::Threads);
1403                cx.notify();
1404
1405                Some(result)
1406            },
1407            cx,
1408        );
1409
1410        self.threads
1411            .values()
1412            .map(|thread| {
1413                (
1414                    thread.dap.clone(),
1415                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1416                )
1417            })
1418            .collect()
1419    }
1420
1421    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1422        self.fetch(
1423            dap_command::ModulesCommand,
1424            |this, result, cx| {
1425                let result = result.log_err()?;
1426
1427                this.modules = result.iter().cloned().collect();
1428                cx.emit(SessionEvent::Modules);
1429                cx.notify();
1430
1431                Some(result)
1432            },
1433            cx,
1434        );
1435
1436        &self.modules
1437    }
1438
1439    pub fn ignore_breakpoints(&self) -> bool {
1440        self.ignore_breakpoints
1441    }
1442
1443    pub fn toggle_ignore_breakpoints(
1444        &mut self,
1445        cx: &mut App,
1446    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1447        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1448    }
1449
1450    pub(crate) fn set_ignore_breakpoints(
1451        &mut self,
1452        ignore: bool,
1453        cx: &mut App,
1454    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1455        if self.ignore_breakpoints == ignore {
1456            return Task::ready(HashMap::default());
1457        }
1458
1459        self.ignore_breakpoints = ignore;
1460
1461        if let Some(local) = self.as_local() {
1462            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1463        } else {
1464            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1465            unimplemented!()
1466        }
1467    }
1468
1469    pub fn exception_breakpoints(
1470        &self,
1471    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1472        self.exception_breakpoints.values()
1473    }
1474
1475    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1476        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1477            *is_enabled = !*is_enabled;
1478            self.send_exception_breakpoints(cx);
1479        }
1480    }
1481
1482    fn send_exception_breakpoints(&mut self, cx: &App) {
1483        if let Some(local) = self.as_local() {
1484            let exception_filters = self
1485                .exception_breakpoints
1486                .values()
1487                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1488                .collect();
1489
1490            let supports_exception_filters = self
1491                .capabilities
1492                .supports_exception_filter_options
1493                .unwrap_or_default();
1494            local
1495                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1496                .detach_and_log_err(cx);
1497        } else {
1498            debug_assert!(false, "Not implemented");
1499        }
1500    }
1501
1502    pub fn breakpoints_enabled(&self) -> bool {
1503        self.ignore_breakpoints
1504    }
1505
1506    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1507        self.fetch(
1508            dap_command::LoadedSourcesCommand,
1509            |this, result, cx| {
1510                let result = result.log_err()?;
1511                this.loaded_sources = result.iter().cloned().collect();
1512                cx.emit(SessionEvent::LoadedSources);
1513                cx.notify();
1514                Some(result)
1515            },
1516            cx,
1517        );
1518
1519        &self.loaded_sources
1520    }
1521
1522    fn fallback_to_manual_restart(
1523        &mut self,
1524        res: Result<()>,
1525        cx: &mut Context<Self>,
1526    ) -> Option<()> {
1527        if res.log_err().is_none() {
1528            cx.emit(SessionStateEvent::Restart);
1529            return None;
1530        }
1531        Some(())
1532    }
1533
1534    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1535        res.log_err()?;
1536        Some(())
1537    }
1538
1539    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1540        thread_id: ThreadId,
1541    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1542    {
1543        move |this, response, cx| match response.log_err() {
1544            Some(response) => {
1545                this.breakpoint_store.update(cx, |store, cx| {
1546                    store.remove_active_position(Some(this.session_id()), cx)
1547                });
1548                Some(response)
1549            }
1550            None => {
1551                this.thread_states.stop_thread(thread_id);
1552                cx.notify();
1553                None
1554            }
1555        }
1556    }
1557
1558    fn clear_active_debug_line_response(
1559        &mut self,
1560        response: Result<()>,
1561        cx: &mut Context<Session>,
1562    ) -> Option<()> {
1563        response.log_err()?;
1564        self.clear_active_debug_line(cx);
1565        Some(())
1566    }
1567
1568    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1569        self.breakpoint_store.update(cx, |store, cx| {
1570            store.remove_active_position(Some(self.id), cx)
1571        });
1572    }
1573
1574    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1575        self.request(
1576            PauseCommand {
1577                thread_id: thread_id.0,
1578            },
1579            Self::empty_response,
1580            cx,
1581        )
1582        .detach();
1583    }
1584
1585    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1586        self.request(
1587            RestartStackFrameCommand { stack_frame_id },
1588            Self::empty_response,
1589            cx,
1590        )
1591        .detach();
1592    }
1593
1594    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1595        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1596            self.request(
1597                RestartCommand {
1598                    raw: args.unwrap_or(Value::Null),
1599                },
1600                Self::fallback_to_manual_restart,
1601                cx,
1602            )
1603            .detach();
1604        } else {
1605            cx.emit(SessionStateEvent::Restart);
1606        }
1607    }
1608
1609    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1610        let debug_adapter = self.adapter_client();
1611
1612        cx.background_spawn(async move {
1613            if let Some(client) = debug_adapter {
1614                client.shutdown().await.log_err();
1615            }
1616        })
1617    }
1618
1619    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1620        self.is_session_terminated = true;
1621        self.thread_states.exit_all_threads();
1622        cx.notify();
1623
1624        let task = if self
1625            .capabilities
1626            .supports_terminate_request
1627            .unwrap_or_default()
1628        {
1629            self.request(
1630                TerminateCommand {
1631                    restart: Some(false),
1632                },
1633                Self::clear_active_debug_line_response,
1634                cx,
1635            )
1636        } else {
1637            self.request(
1638                DisconnectCommand {
1639                    restart: Some(false),
1640                    terminate_debuggee: Some(true),
1641                    suspend_debuggee: Some(false),
1642                },
1643                Self::clear_active_debug_line_response,
1644                cx,
1645            )
1646        };
1647
1648        cx.emit(SessionStateEvent::Shutdown);
1649
1650        let debug_client = self.adapter_client();
1651
1652        cx.background_spawn(async move {
1653            let _ = task.await;
1654
1655            if let Some(client) = debug_client {
1656                client.shutdown().await.log_err();
1657            }
1658        })
1659    }
1660
1661    pub fn completions(
1662        &mut self,
1663        query: CompletionsQuery,
1664        cx: &mut Context<Self>,
1665    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1666        let task = self.request(query, |_, result, _| result.log_err(), cx);
1667
1668        cx.background_executor().spawn(async move {
1669            anyhow::Ok(
1670                task.await
1671                    .map(|response| response.targets)
1672                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1673            )
1674        })
1675    }
1676
1677    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1678        self.thread_states.continue_thread(thread_id);
1679        self.request(
1680            ContinueCommand {
1681                args: ContinueArguments {
1682                    thread_id: thread_id.0,
1683                    single_thread: Some(true),
1684                },
1685            },
1686            Self::on_step_response::<ContinueCommand>(thread_id),
1687            cx,
1688        )
1689        .detach();
1690    }
1691
1692    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1693        match self.mode {
1694            Mode::Running(ref local) => Some(local.client.clone()),
1695            Mode::Building => None,
1696        }
1697    }
1698
1699    pub fn step_over(
1700        &mut self,
1701        thread_id: ThreadId,
1702        granularity: SteppingGranularity,
1703        cx: &mut Context<Self>,
1704    ) {
1705        let supports_single_thread_execution_requests =
1706            self.capabilities.supports_single_thread_execution_requests;
1707        let supports_stepping_granularity = self
1708            .capabilities
1709            .supports_stepping_granularity
1710            .unwrap_or_default();
1711
1712        let command = NextCommand {
1713            inner: StepCommand {
1714                thread_id: thread_id.0,
1715                granularity: supports_stepping_granularity.then(|| granularity),
1716                single_thread: supports_single_thread_execution_requests,
1717            },
1718        };
1719
1720        self.thread_states.process_step(thread_id);
1721        self.request(
1722            command,
1723            Self::on_step_response::<NextCommand>(thread_id),
1724            cx,
1725        )
1726        .detach();
1727    }
1728
1729    pub fn step_in(
1730        &mut self,
1731        thread_id: ThreadId,
1732        granularity: SteppingGranularity,
1733        cx: &mut Context<Self>,
1734    ) {
1735        let supports_single_thread_execution_requests =
1736            self.capabilities.supports_single_thread_execution_requests;
1737        let supports_stepping_granularity = self
1738            .capabilities
1739            .supports_stepping_granularity
1740            .unwrap_or_default();
1741
1742        let command = StepInCommand {
1743            inner: StepCommand {
1744                thread_id: thread_id.0,
1745                granularity: supports_stepping_granularity.then(|| granularity),
1746                single_thread: supports_single_thread_execution_requests,
1747            },
1748        };
1749
1750        self.thread_states.process_step(thread_id);
1751        self.request(
1752            command,
1753            Self::on_step_response::<StepInCommand>(thread_id),
1754            cx,
1755        )
1756        .detach();
1757    }
1758
1759    pub fn step_out(
1760        &mut self,
1761        thread_id: ThreadId,
1762        granularity: SteppingGranularity,
1763        cx: &mut Context<Self>,
1764    ) {
1765        let supports_single_thread_execution_requests =
1766            self.capabilities.supports_single_thread_execution_requests;
1767        let supports_stepping_granularity = self
1768            .capabilities
1769            .supports_stepping_granularity
1770            .unwrap_or_default();
1771
1772        let command = StepOutCommand {
1773            inner: StepCommand {
1774                thread_id: thread_id.0,
1775                granularity: supports_stepping_granularity.then(|| granularity),
1776                single_thread: supports_single_thread_execution_requests,
1777            },
1778        };
1779
1780        self.thread_states.process_step(thread_id);
1781        self.request(
1782            command,
1783            Self::on_step_response::<StepOutCommand>(thread_id),
1784            cx,
1785        )
1786        .detach();
1787    }
1788
1789    pub fn step_back(
1790        &mut self,
1791        thread_id: ThreadId,
1792        granularity: SteppingGranularity,
1793        cx: &mut Context<Self>,
1794    ) {
1795        let supports_single_thread_execution_requests =
1796            self.capabilities.supports_single_thread_execution_requests;
1797        let supports_stepping_granularity = self
1798            .capabilities
1799            .supports_stepping_granularity
1800            .unwrap_or_default();
1801
1802        let command = StepBackCommand {
1803            inner: StepCommand {
1804                thread_id: thread_id.0,
1805                granularity: supports_stepping_granularity.then(|| granularity),
1806                single_thread: supports_single_thread_execution_requests,
1807            },
1808        };
1809
1810        self.thread_states.process_step(thread_id);
1811
1812        self.request(
1813            command,
1814            Self::on_step_response::<StepBackCommand>(thread_id),
1815            cx,
1816        )
1817        .detach();
1818    }
1819
1820    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1821        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1822            && self.requests.contains_key(&ThreadsCommand.type_id())
1823            && self.threads.contains_key(&thread_id)
1824        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1825        // We could still be using an old thread id and have sent a new thread's request
1826        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1827        // But it very well could cause a minor bug in the future that is hard to track down
1828        {
1829            self.fetch(
1830                super::dap_command::StackTraceCommand {
1831                    thread_id: thread_id.0,
1832                    start_frame: None,
1833                    levels: None,
1834                },
1835                move |this, stack_frames, cx| {
1836                    let stack_frames = stack_frames.log_err()?;
1837
1838                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1839                        thread.stack_frame_ids =
1840                            stack_frames.iter().map(|frame| frame.id).collect();
1841                    });
1842                    debug_assert!(
1843                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1844                        "Sent request for thread_id that doesn't exist"
1845                    );
1846
1847                    this.stack_frames.extend(
1848                        stack_frames
1849                            .iter()
1850                            .cloned()
1851                            .map(|frame| (frame.id, StackFrame::from(frame))),
1852                    );
1853
1854                    this.invalidate_command_type::<ScopesCommand>();
1855                    this.invalidate_command_type::<VariablesCommand>();
1856
1857                    cx.emit(SessionEvent::StackTrace);
1858                    cx.notify();
1859                    Some(stack_frames)
1860                },
1861                cx,
1862            );
1863        }
1864
1865        self.threads
1866            .get(&thread_id)
1867            .map(|thread| {
1868                thread
1869                    .stack_frame_ids
1870                    .iter()
1871                    .filter_map(|id| self.stack_frames.get(id))
1872                    .cloned()
1873                    .collect()
1874            })
1875            .unwrap_or_default()
1876    }
1877
1878    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1879        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1880            && self
1881                .requests
1882                .contains_key(&TypeId::of::<StackTraceCommand>())
1883        {
1884            self.fetch(
1885                ScopesCommand { stack_frame_id },
1886                move |this, scopes, cx| {
1887                    let scopes = scopes.log_err()?;
1888
1889                    for scope in scopes .iter(){
1890                        this.variables(scope.variables_reference, cx);
1891                    }
1892
1893                    let entry = this
1894                        .stack_frames
1895                        .entry(stack_frame_id)
1896                        .and_modify(|stack_frame| {
1897                            stack_frame.scopes = scopes.clone();
1898                        });
1899
1900                    cx.emit(SessionEvent::Variables);
1901
1902                    debug_assert!(
1903                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1904                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1905                    );
1906
1907                    Some(scopes)
1908                },
1909                cx,
1910            );
1911        }
1912
1913        self.stack_frames
1914            .get(&stack_frame_id)
1915            .map(|frame| frame.scopes.as_slice())
1916            .unwrap_or_default()
1917    }
1918
1919    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
1920        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
1921            return Vec::new();
1922        };
1923
1924        stack_frame
1925            .scopes
1926            .iter()
1927            .filter_map(|scope| self.variables.get(&scope.variables_reference))
1928            .flatten()
1929            .cloned()
1930            .collect()
1931    }
1932
1933    pub fn variables(
1934        &mut self,
1935        variables_reference: VariableReference,
1936        cx: &mut Context<Self>,
1937    ) -> Vec<dap::Variable> {
1938        let command = VariablesCommand {
1939            variables_reference,
1940            filter: None,
1941            start: None,
1942            count: None,
1943            format: None,
1944        };
1945
1946        self.fetch(
1947            command,
1948            move |this, variables, cx| {
1949                let variables = variables.log_err()?;
1950                this.variables
1951                    .insert(variables_reference, variables.clone());
1952
1953                cx.emit(SessionEvent::Variables);
1954                cx.emit(SessionEvent::InvalidateInlineValue);
1955                Some(variables)
1956            },
1957            cx,
1958        );
1959
1960        self.variables
1961            .get(&variables_reference)
1962            .cloned()
1963            .unwrap_or_default()
1964    }
1965
1966    pub fn set_variable_value(
1967        &mut self,
1968        variables_reference: u64,
1969        name: String,
1970        value: String,
1971        cx: &mut Context<Self>,
1972    ) {
1973        if self.capabilities.supports_set_variable.unwrap_or_default() {
1974            self.request(
1975                SetVariableValueCommand {
1976                    name,
1977                    value,
1978                    variables_reference,
1979                },
1980                move |this, response, cx| {
1981                    let response = response.log_err()?;
1982                    this.invalidate_command_type::<VariablesCommand>();
1983                    cx.notify();
1984                    Some(response)
1985                },
1986                cx,
1987            )
1988            .detach()
1989        }
1990    }
1991
1992    pub fn evaluate(
1993        &mut self,
1994        expression: String,
1995        context: Option<EvaluateArgumentsContext>,
1996        frame_id: Option<u64>,
1997        source: Option<Source>,
1998        cx: &mut Context<Self>,
1999    ) -> Task<()> {
2000        self.output_token.0 += 1;
2001        self.output.push_back(dap::OutputEvent {
2002            category: None,
2003            output: format!("> {expression}"),
2004            group: None,
2005            variables_reference: None,
2006            source: None,
2007            line: None,
2008            column: None,
2009            data: None,
2010            location_reference: None,
2011        });
2012        let request = self.mode.request_dap(EvaluateCommand {
2013            expression,
2014            context,
2015            frame_id,
2016            source,
2017        });
2018        cx.spawn(async move |this, cx| {
2019            let response = request.await;
2020            this.update(cx, |this, cx| {
2021                match response {
2022                    Ok(response) => {
2023                        this.output_token.0 += 1;
2024                        this.output.push_back(dap::OutputEvent {
2025                            category: None,
2026                            output: format!("< {}", &response.result),
2027                            group: None,
2028                            variables_reference: Some(response.variables_reference),
2029                            source: None,
2030                            line: None,
2031                            column: None,
2032                            data: None,
2033                            location_reference: None,
2034                        });
2035                    }
2036                    Err(e) => {
2037                        this.output_token.0 += 1;
2038                        this.output.push_back(dap::OutputEvent {
2039                            category: None,
2040                            output: format!("{}", e),
2041                            group: None,
2042                            variables_reference: None,
2043                            source: None,
2044                            line: None,
2045                            column: None,
2046                            data: None,
2047                            location_reference: None,
2048                        });
2049                    }
2050                };
2051                this.invalidate_command_type::<ScopesCommand>();
2052                cx.notify();
2053            })
2054            .ok();
2055        })
2056    }
2057
2058    pub fn location(
2059        &mut self,
2060        reference: u64,
2061        cx: &mut Context<Self>,
2062    ) -> Option<dap::LocationsResponse> {
2063        self.fetch(
2064            LocationsCommand { reference },
2065            move |this, response, _| {
2066                let response = response.log_err()?;
2067                this.locations.insert(reference, response.clone());
2068                Some(response)
2069            },
2070            cx,
2071        );
2072        self.locations.get(&reference).cloned()
2073    }
2074
2075    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2076        let command = DisconnectCommand {
2077            restart: Some(false),
2078            terminate_debuggee: Some(true),
2079            suspend_debuggee: Some(false),
2080        };
2081
2082        self.request(command, Self::empty_response, cx).detach()
2083    }
2084
2085    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2086        if self
2087            .capabilities
2088            .supports_terminate_threads_request
2089            .unwrap_or_default()
2090        {
2091            self.request(
2092                TerminateThreadsCommand {
2093                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2094                },
2095                Self::clear_active_debug_line_response,
2096                cx,
2097            )
2098            .detach();
2099        } else {
2100            self.shutdown(cx).detach();
2101        }
2102    }
2103}