session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  16use dap::messages::Response;
  17use dap::requests::{Request, RunInTerminal, StartDebugging};
  18use dap::{
  19    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  20    SteppingGranularity, StoppedEvent, VariableReference,
  21    client::{DebugAdapterClient, SessionId},
  22    messages::{Events, Message},
  23};
  24use dap::{
  25    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory,
  26    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  27};
  28use futures::channel::{mpsc, oneshot};
  29use futures::{FutureExt, future::Shared};
  30use gpui::{
  31    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  32    Task, WeakEntity,
  33};
  34
  35use serde_json::Value;
  36use smol::stream::StreamExt;
  37use std::any::TypeId;
  38use std::collections::BTreeMap;
  39use std::u64;
  40use std::{
  41    any::Any,
  42    collections::hash_map::Entry,
  43    hash::{Hash, Hasher},
  44    path::Path,
  45    sync::Arc,
  46};
  47use text::{PointUtf16, ToPointUtf16};
  48use util::ResultExt;
  49use worktree::Worktree;
  50
  51#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  52#[repr(transparent)]
  53pub struct ThreadId(pub u64);
  54
  55impl ThreadId {
  56    pub const MIN: ThreadId = ThreadId(u64::MIN);
  57    pub const MAX: ThreadId = ThreadId(u64::MAX);
  58}
  59
  60impl From<u64> for ThreadId {
  61    fn from(id: u64) -> Self {
  62        Self(id)
  63    }
  64}
  65
  66#[derive(Clone, Debug)]
  67pub struct StackFrame {
  68    pub dap: dap::StackFrame,
  69    pub scopes: Vec<dap::Scope>,
  70}
  71
  72impl From<dap::StackFrame> for StackFrame {
  73    fn from(stack_frame: dap::StackFrame) -> Self {
  74        Self {
  75            scopes: vec![],
  76            dap: stack_frame,
  77        }
  78    }
  79}
  80
  81#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  82pub enum ThreadStatus {
  83    #[default]
  84    Running,
  85    Stopped,
  86    Stepping,
  87    Exited,
  88    Ended,
  89}
  90
  91impl ThreadStatus {
  92    pub fn label(&self) -> &'static str {
  93        match self {
  94            ThreadStatus::Running => "Running",
  95            ThreadStatus::Stopped => "Stopped",
  96            ThreadStatus::Stepping => "Stepping",
  97            ThreadStatus::Exited => "Exited",
  98            ThreadStatus::Ended => "Ended",
  99        }
 100    }
 101}
 102
 103#[derive(Debug)]
 104pub struct Thread {
 105    dap: dap::Thread,
 106    stack_frame_ids: IndexSet<StackFrameId>,
 107    _has_stopped: bool,
 108}
 109
 110impl From<dap::Thread> for Thread {
 111    fn from(dap: dap::Thread) -> Self {
 112        Self {
 113            dap,
 114            stack_frame_ids: Default::default(),
 115            _has_stopped: false,
 116        }
 117    }
 118}
 119
 120pub enum Mode {
 121    Building,
 122    Running(LocalMode),
 123}
 124
 125#[derive(Clone)]
 126pub struct LocalMode {
 127    client: Arc<DebugAdapterClient>,
 128    binary: DebugAdapterBinary,
 129    tmp_breakpoint: Option<SourceBreakpoint>,
 130    worktree: WeakEntity<Worktree>,
 131    executor: BackgroundExecutor,
 132}
 133
 134fn client_source(abs_path: &Path) -> dap::Source {
 135    dap::Source {
 136        name: abs_path
 137            .file_name()
 138            .map(|filename| filename.to_string_lossy().to_string()),
 139        path: Some(abs_path.to_string_lossy().to_string()),
 140        source_reference: None,
 141        presentation_hint: None,
 142        origin: None,
 143        sources: None,
 144        adapter_data: None,
 145        checksums: None,
 146    }
 147}
 148
 149impl LocalMode {
 150    async fn new(
 151        session_id: SessionId,
 152        parent_session: Option<Entity<Session>>,
 153        worktree: WeakEntity<Worktree>,
 154        binary: DebugAdapterBinary,
 155        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 156        cx: AsyncApp,
 157    ) -> Result<Self> {
 158        let message_handler = Box::new(move |message| {
 159            messages_tx.unbounded_send(message).ok();
 160        });
 161
 162        let client = Arc::new(
 163            if let Some(client) = parent_session
 164                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 165                .flatten()
 166            {
 167                client
 168                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 169                    .await?
 170            } else {
 171                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 172                    .await?
 173            },
 174        );
 175
 176        Ok(Self {
 177            client,
 178            worktree,
 179            tmp_breakpoint: None,
 180            binary,
 181            executor: cx.background_executor().clone(),
 182        })
 183    }
 184
 185    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 186        &self.worktree
 187    }
 188
 189    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 190        let tasks: Vec<_> = paths
 191            .into_iter()
 192            .map(|path| {
 193                self.request(dap_command::SetBreakpoints {
 194                    source: client_source(path),
 195                    source_modified: None,
 196                    breakpoints: vec![],
 197                })
 198            })
 199            .collect();
 200
 201        cx.background_spawn(async move {
 202            futures::future::join_all(tasks)
 203                .await
 204                .iter()
 205                .for_each(|res| match res {
 206                    Ok(_) => {}
 207                    Err(err) => {
 208                        log::warn!("Set breakpoints request failed: {}", err);
 209                    }
 210                });
 211        })
 212    }
 213
 214    fn send_breakpoints_from_path(
 215        &self,
 216        abs_path: Arc<Path>,
 217        reason: BreakpointUpdatedReason,
 218        breakpoint_store: &Entity<BreakpointStore>,
 219        cx: &mut App,
 220    ) -> Task<()> {
 221        let breakpoints = breakpoint_store
 222            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 223            .into_iter()
 224            .filter(|bp| bp.state.is_enabled())
 225            .chain(self.tmp_breakpoint.clone())
 226            .map(Into::into)
 227            .collect();
 228
 229        let task = self.request(dap_command::SetBreakpoints {
 230            source: client_source(&abs_path),
 231            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 232            breakpoints,
 233        });
 234
 235        cx.background_spawn(async move {
 236            match task.await {
 237                Ok(_) => {}
 238                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 239            }
 240        })
 241    }
 242
 243    fn send_exception_breakpoints(
 244        &self,
 245        filters: Vec<ExceptionBreakpointsFilter>,
 246        supports_filter_options: bool,
 247    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 248        let arg = if supports_filter_options {
 249            SetExceptionBreakpoints::WithOptions {
 250                filters: filters
 251                    .into_iter()
 252                    .map(|filter| ExceptionFilterOptions {
 253                        filter_id: filter.filter,
 254                        condition: None,
 255                        mode: None,
 256                    })
 257                    .collect(),
 258            }
 259        } else {
 260            SetExceptionBreakpoints::Plain {
 261                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 262            }
 263        };
 264        self.request(arg)
 265    }
 266
 267    fn send_source_breakpoints(
 268        &self,
 269        ignore_breakpoints: bool,
 270        breakpoint_store: &Entity<BreakpointStore>,
 271        cx: &App,
 272    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 273        let mut breakpoint_tasks = Vec::new();
 274        let breakpoints = breakpoint_store.read_with(cx, |store, cx| store.all_breakpoints(cx));
 275
 276        for (path, breakpoints) in breakpoints {
 277            let breakpoints = if ignore_breakpoints {
 278                vec![]
 279            } else {
 280                breakpoints
 281                    .into_iter()
 282                    .filter(|bp| bp.state.is_enabled())
 283                    .map(Into::into)
 284                    .collect()
 285            };
 286
 287            breakpoint_tasks.push(
 288                self.request(dap_command::SetBreakpoints {
 289                    source: client_source(&path),
 290                    source_modified: Some(false),
 291                    breakpoints,
 292                })
 293                .map(|result| result.map_err(|e| (path, e))),
 294            );
 295        }
 296
 297        cx.background_spawn(async move {
 298            futures::future::join_all(breakpoint_tasks)
 299                .await
 300                .into_iter()
 301                .filter_map(Result::err)
 302                .collect::<HashMap<_, _>>()
 303        })
 304    }
 305
 306    fn initialize_sequence(
 307        &self,
 308        capabilities: &Capabilities,
 309        initialized_rx: oneshot::Receiver<()>,
 310        dap_store: WeakEntity<DapStore>,
 311        cx: &App,
 312    ) -> Task<Result<()>> {
 313        let raw = self.binary.request_args.clone();
 314
 315        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 316        let launch = match raw.request {
 317            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 318                raw: raw.configuration,
 319            }),
 320            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 321                raw: raw.configuration,
 322            }),
 323        };
 324
 325        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 326        let exception_filters = capabilities
 327            .exception_breakpoint_filters
 328            .as_ref()
 329            .map(|exception_filters| {
 330                exception_filters
 331                    .iter()
 332                    .filter(|filter| filter.default == Some(true))
 333                    .cloned()
 334                    .collect::<Vec<_>>()
 335            })
 336            .unwrap_or_default();
 337        let supports_exception_filters = capabilities
 338            .supports_exception_filter_options
 339            .unwrap_or_default();
 340        let this = self.clone();
 341        let worktree = self.worktree().clone();
 342        let configuration_sequence = cx.spawn({
 343            async move |cx| {
 344                let breakpoint_store =
 345                    dap_store.update(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 346                initialized_rx.await?;
 347                let errors_by_path = cx
 348                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 349                    .await;
 350
 351                dap_store.update(cx, |_, cx| {
 352                    let Some(worktree) = worktree.upgrade() else {
 353                        return;
 354                    };
 355
 356                    for (path, error) in &errors_by_path {
 357                        log::error!("failed to set breakpoints for {path:?}: {error}");
 358                    }
 359
 360                    if let Some(failed_path) = errors_by_path.keys().next() {
 361                        let failed_path = failed_path
 362                            .strip_prefix(worktree.read(cx).abs_path())
 363                            .unwrap_or(failed_path)
 364                            .display();
 365                        let message = format!(
 366                            "Failed to set breakpoints for {failed_path}{}",
 367                            match errors_by_path.len() {
 368                                0 => unreachable!(),
 369                                1 => "".into(),
 370                                2 => " and 1 other path".into(),
 371                                n => format!(" and {} other paths", n - 1),
 372                            }
 373                        );
 374                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 375                    }
 376                })?;
 377
 378                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 379                    .await
 380                    .ok();
 381                let ret = if configuration_done_supported {
 382                    this.request(ConfigurationDone {})
 383                } else {
 384                    Task::ready(Ok(()))
 385                }
 386                .await;
 387                ret
 388            }
 389        });
 390
 391        cx.background_spawn(async move {
 392            futures::future::try_join(launch, configuration_sequence).await?;
 393            Ok(())
 394        })
 395    }
 396
 397    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 398    where
 399        <R::DapRequest as dap::requests::Request>::Response: 'static,
 400        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 401    {
 402        let request = Arc::new(request);
 403
 404        let request_clone = request.clone();
 405        let connection = self.client.clone();
 406        self.executor.spawn(async move {
 407            let args = request_clone.to_dap();
 408            let response = connection.request::<R::DapRequest>(args).await?;
 409            request.response_from_dap(response)
 410        })
 411    }
 412}
 413
 414impl Mode {
 415    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 416    where
 417        <R::DapRequest as dap::requests::Request>::Response: 'static,
 418        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 419    {
 420        match self {
 421            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 422            Mode::Building => Task::ready(Err(anyhow!(
 423                "no adapter running to send request: {:?}",
 424                request
 425            ))),
 426        }
 427    }
 428}
 429
 430#[derive(Default)]
 431struct ThreadStates {
 432    global_state: Option<ThreadStatus>,
 433    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 434}
 435
 436impl ThreadStates {
 437    fn stop_all_threads(&mut self) {
 438        self.global_state = Some(ThreadStatus::Stopped);
 439        self.known_thread_states.clear();
 440    }
 441
 442    fn exit_all_threads(&mut self) {
 443        self.global_state = Some(ThreadStatus::Exited);
 444        self.known_thread_states.clear();
 445    }
 446
 447    fn continue_all_threads(&mut self) {
 448        self.global_state = Some(ThreadStatus::Running);
 449        self.known_thread_states.clear();
 450    }
 451
 452    fn stop_thread(&mut self, thread_id: ThreadId) {
 453        self.known_thread_states
 454            .insert(thread_id, ThreadStatus::Stopped);
 455    }
 456
 457    fn continue_thread(&mut self, thread_id: ThreadId) {
 458        self.known_thread_states
 459            .insert(thread_id, ThreadStatus::Running);
 460    }
 461
 462    fn process_step(&mut self, thread_id: ThreadId) {
 463        self.known_thread_states
 464            .insert(thread_id, ThreadStatus::Stepping);
 465    }
 466
 467    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 468        self.thread_state(thread_id)
 469            .unwrap_or(ThreadStatus::Running)
 470    }
 471
 472    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 473        self.known_thread_states
 474            .get(&thread_id)
 475            .copied()
 476            .or(self.global_state)
 477    }
 478
 479    fn exit_thread(&mut self, thread_id: ThreadId) {
 480        self.known_thread_states
 481            .insert(thread_id, ThreadStatus::Exited);
 482    }
 483
 484    fn any_stopped_thread(&self) -> bool {
 485        self.global_state
 486            .is_some_and(|state| state == ThreadStatus::Stopped)
 487            || self
 488                .known_thread_states
 489                .values()
 490                .any(|status| *status == ThreadStatus::Stopped)
 491    }
 492}
 493const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 494
 495type IsEnabled = bool;
 496
 497#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 498pub struct OutputToken(pub usize);
 499/// Represents a current state of a single debug adapter and provides ways to mutate it.
 500pub struct Session {
 501    pub mode: Mode,
 502    id: SessionId,
 503    label: SharedString,
 504    adapter: DebugAdapterName,
 505    pub(super) capabilities: Capabilities,
 506    child_session_ids: HashSet<SessionId>,
 507    parent_session: Option<Entity<Session>>,
 508    modules: Vec<dap::Module>,
 509    loaded_sources: Vec<dap::Source>,
 510    output_token: OutputToken,
 511    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 512    threads: IndexMap<ThreadId, Thread>,
 513    thread_states: ThreadStates,
 514    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 515    stack_frames: IndexMap<StackFrameId, StackFrame>,
 516    locations: HashMap<u64, dap::LocationsResponse>,
 517    is_session_terminated: bool,
 518    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 519    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 520    ignore_breakpoints: bool,
 521    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 522    background_tasks: Vec<Task<()>>,
 523}
 524
 525trait CacheableCommand: Any + Send + Sync {
 526    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 527    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 528    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 529}
 530
 531impl<T> CacheableCommand for T
 532where
 533    T: DapCommand + PartialEq + Eq + Hash,
 534{
 535    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 536        (rhs as &dyn Any)
 537            .downcast_ref::<Self>()
 538            .map_or(false, |rhs| self == rhs)
 539    }
 540
 541    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 542        T::hash(self, &mut hasher);
 543    }
 544
 545    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 546        self
 547    }
 548}
 549
 550pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 551
 552impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 553    fn from(request: T) -> Self {
 554        Self(Arc::new(request))
 555    }
 556}
 557
 558impl PartialEq for RequestSlot {
 559    fn eq(&self, other: &Self) -> bool {
 560        self.0.dyn_eq(other.0.as_ref())
 561    }
 562}
 563
 564impl Eq for RequestSlot {}
 565
 566impl Hash for RequestSlot {
 567    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 568        self.0.dyn_hash(state);
 569        (&*self.0 as &dyn Any).type_id().hash(state)
 570    }
 571}
 572
 573#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 574pub struct CompletionsQuery {
 575    pub query: String,
 576    pub column: u64,
 577    pub line: Option<u64>,
 578    pub frame_id: Option<u64>,
 579}
 580
 581impl CompletionsQuery {
 582    pub fn new(
 583        buffer: &language::Buffer,
 584        cursor_position: language::Anchor,
 585        frame_id: Option<u64>,
 586    ) -> Self {
 587        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 588        Self {
 589            query: buffer.text(),
 590            column: column as u64,
 591            frame_id,
 592            line: Some(row as u64),
 593        }
 594    }
 595}
 596
 597#[derive(Debug)]
 598pub enum SessionEvent {
 599    Modules,
 600    LoadedSources,
 601    Stopped(Option<ThreadId>),
 602    StackTrace,
 603    Variables,
 604    Threads,
 605    InvalidateInlineValue,
 606    CapabilitiesLoaded,
 607    RunInTerminal {
 608        request: RunInTerminalRequestArguments,
 609        sender: mpsc::Sender<Result<u32>>,
 610    },
 611}
 612
 613#[derive(Clone, Debug, PartialEq, Eq)]
 614pub enum SessionStateEvent {
 615    Running,
 616    Shutdown,
 617    Restart,
 618    SpawnChildSession {
 619        request: StartDebuggingRequestArguments,
 620    },
 621}
 622
 623impl EventEmitter<SessionEvent> for Session {}
 624impl EventEmitter<SessionStateEvent> for Session {}
 625
 626// local session will send breakpoint updates to DAP for all new breakpoints
 627// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 628// BreakpointStore notifies session on breakpoint changes
 629impl Session {
 630    pub(crate) fn new(
 631        breakpoint_store: Entity<BreakpointStore>,
 632        session_id: SessionId,
 633        parent_session: Option<Entity<Session>>,
 634        label: SharedString,
 635        adapter: DebugAdapterName,
 636        cx: &mut App,
 637    ) -> Entity<Self> {
 638        cx.new::<Self>(|cx| {
 639            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 640                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 641                    if let Some(local) = (!this.ignore_breakpoints)
 642                        .then(|| this.as_local_mut())
 643                        .flatten()
 644                    {
 645                        local
 646                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 647                            .detach();
 648                    };
 649                }
 650                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 651                    if let Some(local) = (!this.ignore_breakpoints)
 652                        .then(|| this.as_local_mut())
 653                        .flatten()
 654                    {
 655                        local.unset_breakpoints_from_paths(paths, cx).detach();
 656                    }
 657                }
 658                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 659            })
 660            .detach();
 661            cx.on_app_quit(Self::on_app_quit).detach();
 662
 663            let this = Self {
 664                mode: Mode::Building,
 665                id: session_id,
 666                child_session_ids: HashSet::default(),
 667                parent_session,
 668                capabilities: Capabilities::default(),
 669                variables: Default::default(),
 670                stack_frames: Default::default(),
 671                thread_states: ThreadStates::default(),
 672                output_token: OutputToken(0),
 673                output: circular_buffer::CircularBuffer::boxed(),
 674                requests: HashMap::default(),
 675                modules: Vec::default(),
 676                loaded_sources: Vec::default(),
 677                threads: IndexMap::default(),
 678                background_tasks: Vec::default(),
 679                locations: Default::default(),
 680                is_session_terminated: false,
 681                ignore_breakpoints: false,
 682                breakpoint_store,
 683                exception_breakpoints: Default::default(),
 684                label,
 685                adapter,
 686            };
 687
 688            this
 689        })
 690    }
 691
 692    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 693        match &self.mode {
 694            Mode::Building => None,
 695            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 696        }
 697    }
 698
 699    pub fn boot(
 700        &mut self,
 701        binary: DebugAdapterBinary,
 702        worktree: Entity<Worktree>,
 703        dap_store: WeakEntity<DapStore>,
 704        cx: &mut Context<Self>,
 705    ) -> Task<Result<()>> {
 706        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 707        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 708
 709        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 710            let mut initialized_tx = Some(initialized_tx);
 711            while let Some(message) = message_rx.next().await {
 712                if let Message::Event(event) = message {
 713                    if let Events::Initialized(_) = *event {
 714                        if let Some(tx) = initialized_tx.take() {
 715                            tx.send(()).ok();
 716                        }
 717                    } else {
 718                        let Ok(_) = this.update(cx, |session, cx| {
 719                            session.handle_dap_event(event, cx);
 720                        }) else {
 721                            break;
 722                        };
 723                    }
 724                } else if let Message::Request(request) = message {
 725                    let Ok(_) = this.update(cx, |this, cx| {
 726                        if request.command == StartDebugging::COMMAND {
 727                            this.handle_start_debugging_request(request, cx)
 728                                .detach_and_log_err(cx);
 729                        } else if request.command == RunInTerminal::COMMAND {
 730                            this.handle_run_in_terminal_request(request, cx)
 731                                .detach_and_log_err(cx);
 732                        }
 733                    }) else {
 734                        break;
 735                    };
 736                }
 737            }
 738        })];
 739        self.background_tasks = background_tasks;
 740        let id = self.id;
 741        let parent_session = self.parent_session.clone();
 742
 743        cx.spawn(async move |this, cx| {
 744            let mode = LocalMode::new(
 745                id,
 746                parent_session,
 747                worktree.downgrade(),
 748                binary,
 749                message_tx,
 750                cx.clone(),
 751            )
 752            .await?;
 753            this.update(cx, |this, cx| {
 754                this.mode = Mode::Running(mode);
 755                cx.emit(SessionStateEvent::Running);
 756            })?;
 757
 758            this.update(cx, |session, cx| session.request_initialize(cx))?
 759                .await?;
 760
 761            this.update(cx, |session, cx| {
 762                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 763            })?
 764            .await
 765        })
 766    }
 767
 768    pub fn session_id(&self) -> SessionId {
 769        self.id
 770    }
 771
 772    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 773        self.child_session_ids.clone()
 774    }
 775
 776    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 777        self.child_session_ids.insert(session_id);
 778    }
 779
 780    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 781        self.child_session_ids.remove(&session_id);
 782    }
 783
 784    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 785        self.parent_session
 786            .as_ref()
 787            .map(|session| session.read(cx).id)
 788    }
 789
 790    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 791        self.parent_session.as_ref()
 792    }
 793
 794    pub fn capabilities(&self) -> &Capabilities {
 795        &self.capabilities
 796    }
 797
 798    pub fn binary(&self) -> &DebugAdapterBinary {
 799        let Mode::Running(local_mode) = &self.mode else {
 800            panic!("Session is not local");
 801        };
 802        &local_mode.binary
 803    }
 804
 805    pub fn adapter(&self) -> DebugAdapterName {
 806        self.adapter.clone()
 807    }
 808
 809    pub fn label(&self) -> SharedString {
 810        self.label.clone()
 811    }
 812
 813    pub fn is_terminated(&self) -> bool {
 814        self.is_session_terminated
 815    }
 816
 817    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
 818        let (tx, mut rx) = mpsc::unbounded();
 819
 820        cx.spawn(async move |this, cx| {
 821            while let Some(output) = rx.next().await {
 822                this.update(cx, |this, _| {
 823                    this.output_token.0 += 1;
 824                    this.output.push_back(dap::OutputEvent {
 825                        category: None,
 826                        output,
 827                        group: None,
 828                        variables_reference: None,
 829                        source: None,
 830                        line: None,
 831                        column: None,
 832                        data: None,
 833                        location_reference: None,
 834                    });
 835                })?;
 836            }
 837            anyhow::Ok(())
 838        })
 839        .detach();
 840
 841        return tx;
 842    }
 843
 844    pub fn is_local(&self) -> bool {
 845        matches!(self.mode, Mode::Running(_))
 846    }
 847
 848    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 849        match &mut self.mode {
 850            Mode::Running(local_mode) => Some(local_mode),
 851            Mode::Building => None,
 852        }
 853    }
 854
 855    pub fn as_local(&self) -> Option<&LocalMode> {
 856        match &self.mode {
 857            Mode::Running(local_mode) => Some(local_mode),
 858            Mode::Building => None,
 859        }
 860    }
 861
 862    fn handle_start_debugging_request(
 863        &mut self,
 864        request: dap::messages::Request,
 865        cx: &mut Context<Self>,
 866    ) -> Task<Result<()>> {
 867        let request_seq = request.seq;
 868
 869        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 870            .arguments
 871            .as_ref()
 872            .map(|value| serde_json::from_value(value.clone()));
 873
 874        let mut success = true;
 875        if let Some(Ok(request)) = launch_request {
 876            cx.emit(SessionStateEvent::SpawnChildSession { request });
 877        } else {
 878            log::error!(
 879                "Failed to parse launch request arguments: {:?}",
 880                request.arguments
 881            );
 882            success = false;
 883        }
 884
 885        cx.spawn(async move |this, cx| {
 886            this.update(cx, |this, cx| {
 887                this.respond_to_client(
 888                    request_seq,
 889                    success,
 890                    StartDebugging::COMMAND.to_string(),
 891                    None,
 892                    cx,
 893                )
 894            })?
 895            .await
 896        })
 897    }
 898
 899    fn handle_run_in_terminal_request(
 900        &mut self,
 901        request: dap::messages::Request,
 902        cx: &mut Context<Self>,
 903    ) -> Task<Result<()>> {
 904        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
 905            request.arguments.unwrap_or_default(),
 906        )
 907        .expect("To parse StartDebuggingRequestArguments");
 908
 909        let seq = request.seq;
 910
 911        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
 912        cx.emit(SessionEvent::RunInTerminal {
 913            request: request_args,
 914            sender: tx,
 915        });
 916        cx.notify();
 917
 918        cx.spawn(async move |session, cx| {
 919            let result = util::maybe!(async move {
 920                rx.next().await.ok_or_else(|| {
 921                    anyhow!("failed to receive response from spawn terminal".to_string())
 922                })?
 923            })
 924            .await;
 925            let (success, body) = match result {
 926                Ok(pid) => (
 927                    true,
 928                    serde_json::to_value(dap::RunInTerminalResponse {
 929                        process_id: None,
 930                        shell_process_id: Some(pid as u64),
 931                    })
 932                    .ok(),
 933                ),
 934                Err(error) => (
 935                    false,
 936                    serde_json::to_value(dap::ErrorResponse {
 937                        error: Some(dap::Message {
 938                            id: seq,
 939                            format: error.to_string(),
 940                            variables: None,
 941                            send_telemetry: None,
 942                            show_user: None,
 943                            url: None,
 944                            url_label: None,
 945                        }),
 946                    })
 947                    .ok(),
 948                ),
 949            };
 950
 951            session
 952                .update(cx, |session, cx| {
 953                    session.respond_to_client(
 954                        seq,
 955                        success,
 956                        RunInTerminal::COMMAND.to_string(),
 957                        body,
 958                        cx,
 959                    )
 960                })?
 961                .await
 962        })
 963    }
 964
 965    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 966        let adapter_id = self.adapter().to_string();
 967        let request = Initialize { adapter_id };
 968        match &self.mode {
 969            Mode::Running(local_mode) => {
 970                let capabilities = local_mode.request(request);
 971
 972                cx.spawn(async move |this, cx| {
 973                    let capabilities = capabilities.await?;
 974                    this.update(cx, |session, cx| {
 975                        session.capabilities = capabilities;
 976                        let filters = session
 977                            .capabilities
 978                            .exception_breakpoint_filters
 979                            .clone()
 980                            .unwrap_or_default();
 981                        for filter in filters {
 982                            let default = filter.default.unwrap_or_default();
 983                            session
 984                                .exception_breakpoints
 985                                .entry(filter.filter.clone())
 986                                .or_insert_with(|| (filter, default));
 987                        }
 988                        cx.emit(SessionEvent::CapabilitiesLoaded);
 989                    })?;
 990                    Ok(())
 991                })
 992            }
 993            Mode::Building => Task::ready(Err(anyhow!(
 994                "Cannot send initialize request, task still building"
 995            ))),
 996        }
 997    }
 998
 999    pub(super) fn initialize_sequence(
1000        &mut self,
1001        initialize_rx: oneshot::Receiver<()>,
1002        dap_store: WeakEntity<DapStore>,
1003        cx: &mut Context<Self>,
1004    ) -> Task<Result<()>> {
1005        match &self.mode {
1006            Mode::Running(local_mode) => {
1007                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1008            }
1009            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1010        }
1011    }
1012
1013    pub fn run_to_position(
1014        &mut self,
1015        breakpoint: SourceBreakpoint,
1016        active_thread_id: ThreadId,
1017        cx: &mut Context<Self>,
1018    ) {
1019        match &mut self.mode {
1020            Mode::Running(local_mode) => {
1021                if !matches!(
1022                    self.thread_states.thread_state(active_thread_id),
1023                    Some(ThreadStatus::Stopped)
1024                ) {
1025                    return;
1026                };
1027                let path = breakpoint.path.clone();
1028                local_mode.tmp_breakpoint = Some(breakpoint);
1029                let task = local_mode.send_breakpoints_from_path(
1030                    path,
1031                    BreakpointUpdatedReason::Toggled,
1032                    &self.breakpoint_store,
1033                    cx,
1034                );
1035
1036                cx.spawn(async move |this, cx| {
1037                    task.await;
1038                    this.update(cx, |this, cx| {
1039                        this.continue_thread(active_thread_id, cx);
1040                    })
1041                })
1042                .detach();
1043            }
1044            Mode::Building => {}
1045        }
1046    }
1047
1048    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1049        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1050    }
1051
1052    pub fn output(
1053        &self,
1054        since: OutputToken,
1055    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1056        if self.output_token.0 == 0 {
1057            return (self.output.range(0..0), OutputToken(0));
1058        };
1059
1060        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1061
1062        let clamped_events_since = events_since.clamp(0, self.output.len());
1063        (
1064            self.output
1065                .range(self.output.len() - clamped_events_since..),
1066            self.output_token,
1067        )
1068    }
1069
1070    pub fn respond_to_client(
1071        &self,
1072        request_seq: u64,
1073        success: bool,
1074        command: String,
1075        body: Option<serde_json::Value>,
1076        cx: &mut Context<Self>,
1077    ) -> Task<Result<()>> {
1078        let Some(local_session) = self.as_local() else {
1079            unreachable!("Cannot respond to remote client");
1080        };
1081        let client = local_session.client.clone();
1082
1083        cx.background_spawn(async move {
1084            client
1085                .send_message(Message::Response(Response {
1086                    body,
1087                    success,
1088                    command,
1089                    seq: request_seq + 1,
1090                    request_seq,
1091                    message: None,
1092                }))
1093                .await
1094        })
1095    }
1096
1097    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1098        // todo(debugger): Find a clean way to get around the clone
1099        let breakpoint_store = self.breakpoint_store.clone();
1100        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1101            let breakpoint = local.tmp_breakpoint.take()?;
1102            let path = breakpoint.path.clone();
1103            Some((local, path))
1104        }) {
1105            local
1106                .send_breakpoints_from_path(
1107                    path,
1108                    BreakpointUpdatedReason::Toggled,
1109                    &breakpoint_store,
1110                    cx,
1111                )
1112                .detach();
1113        };
1114
1115        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1116            self.thread_states.stop_all_threads();
1117
1118            self.invalidate_command_type::<StackTraceCommand>();
1119        }
1120
1121        // Event if we stopped all threads we still need to insert the thread_id
1122        // to our own data
1123        if let Some(thread_id) = event.thread_id {
1124            self.thread_states.stop_thread(ThreadId(thread_id));
1125
1126            self.invalidate_state(
1127                &StackTraceCommand {
1128                    thread_id,
1129                    start_frame: None,
1130                    levels: None,
1131                }
1132                .into(),
1133            );
1134        }
1135
1136        self.invalidate_generic();
1137        self.threads.clear();
1138        self.variables.clear();
1139        cx.emit(SessionEvent::Stopped(
1140            event
1141                .thread_id
1142                .map(Into::into)
1143                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1144        ));
1145        cx.emit(SessionEvent::InvalidateInlineValue);
1146        cx.notify();
1147    }
1148
1149    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1150        match *event {
1151            Events::Initialized(_) => {
1152                debug_assert!(
1153                    false,
1154                    "Initialized event should have been handled in LocalMode"
1155                );
1156            }
1157            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1158            Events::Continued(event) => {
1159                if event.all_threads_continued.unwrap_or_default() {
1160                    self.thread_states.continue_all_threads();
1161                    self.breakpoint_store.update(cx, |store, cx| {
1162                        store.remove_active_position(Some(self.session_id()), cx)
1163                    });
1164                } else {
1165                    self.thread_states
1166                        .continue_thread(ThreadId(event.thread_id));
1167                }
1168                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1169                self.invalidate_generic();
1170            }
1171            Events::Exited(_event) => {
1172                self.clear_active_debug_line(cx);
1173            }
1174            Events::Terminated(_) => {
1175                self.is_session_terminated = true;
1176                self.clear_active_debug_line(cx);
1177            }
1178            Events::Thread(event) => {
1179                let thread_id = ThreadId(event.thread_id);
1180
1181                match event.reason {
1182                    dap::ThreadEventReason::Started => {
1183                        self.thread_states.continue_thread(thread_id);
1184                    }
1185                    dap::ThreadEventReason::Exited => {
1186                        self.thread_states.exit_thread(thread_id);
1187                    }
1188                    reason => {
1189                        log::error!("Unhandled thread event reason {:?}", reason);
1190                    }
1191                }
1192                self.invalidate_state(&ThreadsCommand.into());
1193                cx.notify();
1194            }
1195            Events::Output(event) => {
1196                if event
1197                    .category
1198                    .as_ref()
1199                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1200                {
1201                    return;
1202                }
1203
1204                self.output.push_back(event);
1205                self.output_token.0 += 1;
1206                cx.notify();
1207            }
1208            Events::Breakpoint(_) => {}
1209            Events::Module(event) => {
1210                match event.reason {
1211                    dap::ModuleEventReason::New => {
1212                        self.modules.push(event.module);
1213                    }
1214                    dap::ModuleEventReason::Changed => {
1215                        if let Some(module) = self
1216                            .modules
1217                            .iter_mut()
1218                            .find(|other| event.module.id == other.id)
1219                        {
1220                            *module = event.module;
1221                        }
1222                    }
1223                    dap::ModuleEventReason::Removed => {
1224                        self.modules.retain(|other| event.module.id != other.id);
1225                    }
1226                }
1227
1228                // todo(debugger): We should only send the invalidate command to downstream clients.
1229                // self.invalidate_state(&ModulesCommand.into());
1230            }
1231            Events::LoadedSource(_) => {
1232                self.invalidate_state(&LoadedSourcesCommand.into());
1233            }
1234            Events::Capabilities(event) => {
1235                self.capabilities = self.capabilities.merge(event.capabilities);
1236                cx.notify();
1237            }
1238            Events::Memory(_) => {}
1239            Events::Process(_) => {}
1240            Events::ProgressEnd(_) => {}
1241            Events::ProgressStart(_) => {}
1242            Events::ProgressUpdate(_) => {}
1243            Events::Invalidated(_) => {}
1244            Events::Other(_) => {}
1245        }
1246    }
1247
1248    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1249    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1250        &mut self,
1251        request: T,
1252        process_result: impl FnOnce(
1253            &mut Self,
1254            Result<T::Response>,
1255            &mut Context<Self>,
1256        ) -> Option<T::Response>
1257        + 'static,
1258        cx: &mut Context<Self>,
1259    ) {
1260        const {
1261            assert!(
1262                T::CACHEABLE,
1263                "Only requests marked as cacheable should invoke `fetch`"
1264            );
1265        }
1266
1267        if !self.thread_states.any_stopped_thread()
1268            && request.type_id() != TypeId::of::<ThreadsCommand>()
1269            || self.is_session_terminated
1270        {
1271            return;
1272        }
1273
1274        let request_map = self
1275            .requests
1276            .entry(std::any::TypeId::of::<T>())
1277            .or_default();
1278
1279        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1280            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1281
1282            let task = Self::request_inner::<Arc<T>>(
1283                &self.capabilities,
1284                &self.mode,
1285                command,
1286                process_result,
1287                cx,
1288            );
1289            let task = cx
1290                .background_executor()
1291                .spawn(async move {
1292                    let _ = task.await?;
1293                    Some(())
1294                })
1295                .shared();
1296
1297            vacant.insert(task);
1298            cx.notify();
1299        }
1300    }
1301
1302    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1303        &self,
1304        request: T,
1305    ) -> Result<T::Response> {
1306        if !T::is_supported(&self.capabilities) {
1307            anyhow::bail!("DAP request {:?} is not supported", request);
1308        }
1309
1310        self.mode.request_dap(request).await
1311    }
1312
1313    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1314        capabilities: &Capabilities,
1315        mode: &Mode,
1316        request: T,
1317        process_result: impl FnOnce(
1318            &mut Self,
1319            Result<T::Response>,
1320            &mut Context<Self>,
1321        ) -> Option<T::Response>
1322        + 'static,
1323        cx: &mut Context<Self>,
1324    ) -> Task<Option<T::Response>> {
1325        if !T::is_supported(&capabilities) {
1326            log::warn!(
1327                "Attempted to send a DAP request that isn't supported: {:?}",
1328                request
1329            );
1330            let error = Err(anyhow::Error::msg(
1331                "Couldn't complete request because it's not supported",
1332            ));
1333            return cx.spawn(async move |this, cx| {
1334                this.update(cx, |this, cx| process_result(this, error, cx))
1335                    .log_err()
1336                    .flatten()
1337            });
1338        }
1339
1340        let request = mode.request_dap(request);
1341        cx.spawn(async move |this, cx| {
1342            let result = request.await;
1343            this.update(cx, |this, cx| process_result(this, result, cx))
1344                .log_err()
1345                .flatten()
1346        })
1347    }
1348
1349    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1350        &self,
1351        request: T,
1352        process_result: impl FnOnce(
1353            &mut Self,
1354            Result<T::Response>,
1355            &mut Context<Self>,
1356        ) -> Option<T::Response>
1357        + 'static,
1358        cx: &mut Context<Self>,
1359    ) -> Task<Option<T::Response>> {
1360        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1361    }
1362
1363    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1364        self.requests.remove(&std::any::TypeId::of::<Command>());
1365    }
1366
1367    fn invalidate_generic(&mut self) {
1368        self.invalidate_command_type::<ModulesCommand>();
1369        self.invalidate_command_type::<LoadedSourcesCommand>();
1370        self.invalidate_command_type::<ThreadsCommand>();
1371    }
1372
1373    fn invalidate_state(&mut self, key: &RequestSlot) {
1374        self.requests
1375            .entry((&*key.0 as &dyn Any).type_id())
1376            .and_modify(|request_map| {
1377                request_map.remove(&key);
1378            });
1379    }
1380
1381    pub fn any_stopped_thread(&self) -> bool {
1382        self.thread_states.any_stopped_thread()
1383    }
1384
1385    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1386        self.thread_states.thread_status(thread_id)
1387    }
1388
1389    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1390        self.fetch(
1391            dap_command::ThreadsCommand,
1392            |this, result, cx| {
1393                let result = result.log_err()?;
1394
1395                this.threads = result
1396                    .iter()
1397                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1398                    .collect();
1399
1400                this.invalidate_command_type::<StackTraceCommand>();
1401                cx.emit(SessionEvent::Threads);
1402                cx.notify();
1403
1404                Some(result)
1405            },
1406            cx,
1407        );
1408
1409        self.threads
1410            .values()
1411            .map(|thread| {
1412                (
1413                    thread.dap.clone(),
1414                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1415                )
1416            })
1417            .collect()
1418    }
1419
1420    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1421        self.fetch(
1422            dap_command::ModulesCommand,
1423            |this, result, cx| {
1424                let result = result.log_err()?;
1425
1426                this.modules = result.iter().cloned().collect();
1427                cx.emit(SessionEvent::Modules);
1428                cx.notify();
1429
1430                Some(result)
1431            },
1432            cx,
1433        );
1434
1435        &self.modules
1436    }
1437
1438    pub fn ignore_breakpoints(&self) -> bool {
1439        self.ignore_breakpoints
1440    }
1441
1442    pub fn toggle_ignore_breakpoints(
1443        &mut self,
1444        cx: &mut App,
1445    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1446        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1447    }
1448
1449    pub(crate) fn set_ignore_breakpoints(
1450        &mut self,
1451        ignore: bool,
1452        cx: &mut App,
1453    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1454        if self.ignore_breakpoints == ignore {
1455            return Task::ready(HashMap::default());
1456        }
1457
1458        self.ignore_breakpoints = ignore;
1459
1460        if let Some(local) = self.as_local() {
1461            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1462        } else {
1463            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1464            unimplemented!()
1465        }
1466    }
1467
1468    pub fn exception_breakpoints(
1469        &self,
1470    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1471        self.exception_breakpoints.values()
1472    }
1473
1474    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1475        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1476            *is_enabled = !*is_enabled;
1477            self.send_exception_breakpoints(cx);
1478        }
1479    }
1480
1481    fn send_exception_breakpoints(&mut self, cx: &App) {
1482        if let Some(local) = self.as_local() {
1483            let exception_filters = self
1484                .exception_breakpoints
1485                .values()
1486                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1487                .collect();
1488
1489            let supports_exception_filters = self
1490                .capabilities
1491                .supports_exception_filter_options
1492                .unwrap_or_default();
1493            local
1494                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1495                .detach_and_log_err(cx);
1496        } else {
1497            debug_assert!(false, "Not implemented");
1498        }
1499    }
1500
1501    pub fn breakpoints_enabled(&self) -> bool {
1502        self.ignore_breakpoints
1503    }
1504
1505    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1506        self.fetch(
1507            dap_command::LoadedSourcesCommand,
1508            |this, result, cx| {
1509                let result = result.log_err()?;
1510                this.loaded_sources = result.iter().cloned().collect();
1511                cx.emit(SessionEvent::LoadedSources);
1512                cx.notify();
1513                Some(result)
1514            },
1515            cx,
1516        );
1517
1518        &self.loaded_sources
1519    }
1520
1521    fn fallback_to_manual_restart(
1522        &mut self,
1523        res: Result<()>,
1524        cx: &mut Context<Self>,
1525    ) -> Option<()> {
1526        if res.log_err().is_none() {
1527            cx.emit(SessionStateEvent::Restart);
1528            return None;
1529        }
1530        Some(())
1531    }
1532
1533    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1534        res.log_err()?;
1535        Some(())
1536    }
1537
1538    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1539        thread_id: ThreadId,
1540    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1541    {
1542        move |this, response, cx| match response.log_err() {
1543            Some(response) => {
1544                this.breakpoint_store.update(cx, |store, cx| {
1545                    store.remove_active_position(Some(this.session_id()), cx)
1546                });
1547                Some(response)
1548            }
1549            None => {
1550                this.thread_states.stop_thread(thread_id);
1551                cx.notify();
1552                None
1553            }
1554        }
1555    }
1556
1557    fn clear_active_debug_line_response(
1558        &mut self,
1559        response: Result<()>,
1560        cx: &mut Context<Session>,
1561    ) -> Option<()> {
1562        response.log_err()?;
1563        self.clear_active_debug_line(cx);
1564        Some(())
1565    }
1566
1567    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1568        self.breakpoint_store.update(cx, |store, cx| {
1569            store.remove_active_position(Some(self.id), cx)
1570        });
1571    }
1572
1573    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1574        self.request(
1575            PauseCommand {
1576                thread_id: thread_id.0,
1577            },
1578            Self::empty_response,
1579            cx,
1580        )
1581        .detach();
1582    }
1583
1584    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1585        self.request(
1586            RestartStackFrameCommand { stack_frame_id },
1587            Self::empty_response,
1588            cx,
1589        )
1590        .detach();
1591    }
1592
1593    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1594        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1595            self.request(
1596                RestartCommand {
1597                    raw: args.unwrap_or(Value::Null),
1598                },
1599                Self::fallback_to_manual_restart,
1600                cx,
1601            )
1602            .detach();
1603        } else {
1604            cx.emit(SessionStateEvent::Restart);
1605        }
1606    }
1607
1608    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1609        let debug_adapter = self.adapter_client();
1610
1611        cx.background_spawn(async move {
1612            if let Some(client) = debug_adapter {
1613                client.shutdown().await.log_err();
1614            }
1615        })
1616    }
1617
1618    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1619        self.is_session_terminated = true;
1620        self.thread_states.exit_all_threads();
1621        cx.notify();
1622
1623        let task = if self
1624            .capabilities
1625            .supports_terminate_request
1626            .unwrap_or_default()
1627        {
1628            self.request(
1629                TerminateCommand {
1630                    restart: Some(false),
1631                },
1632                Self::clear_active_debug_line_response,
1633                cx,
1634            )
1635        } else {
1636            self.request(
1637                DisconnectCommand {
1638                    restart: Some(false),
1639                    terminate_debuggee: Some(true),
1640                    suspend_debuggee: Some(false),
1641                },
1642                Self::clear_active_debug_line_response,
1643                cx,
1644            )
1645        };
1646
1647        cx.emit(SessionStateEvent::Shutdown);
1648
1649        let debug_client = self.adapter_client();
1650
1651        cx.background_spawn(async move {
1652            let _ = task.await;
1653
1654            if let Some(client) = debug_client {
1655                client.shutdown().await.log_err();
1656            }
1657        })
1658    }
1659
1660    pub fn completions(
1661        &mut self,
1662        query: CompletionsQuery,
1663        cx: &mut Context<Self>,
1664    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1665        let task = self.request(query, |_, result, _| result.log_err(), cx);
1666
1667        cx.background_executor().spawn(async move {
1668            anyhow::Ok(
1669                task.await
1670                    .map(|response| response.targets)
1671                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1672            )
1673        })
1674    }
1675
1676    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1677        self.thread_states.continue_thread(thread_id);
1678        self.request(
1679            ContinueCommand {
1680                args: ContinueArguments {
1681                    thread_id: thread_id.0,
1682                    single_thread: Some(true),
1683                },
1684            },
1685            Self::on_step_response::<ContinueCommand>(thread_id),
1686            cx,
1687        )
1688        .detach();
1689    }
1690
1691    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1692        match self.mode {
1693            Mode::Running(ref local) => Some(local.client.clone()),
1694            Mode::Building => None,
1695        }
1696    }
1697
1698    pub fn step_over(
1699        &mut self,
1700        thread_id: ThreadId,
1701        granularity: SteppingGranularity,
1702        cx: &mut Context<Self>,
1703    ) {
1704        let supports_single_thread_execution_requests =
1705            self.capabilities.supports_single_thread_execution_requests;
1706        let supports_stepping_granularity = self
1707            .capabilities
1708            .supports_stepping_granularity
1709            .unwrap_or_default();
1710
1711        let command = NextCommand {
1712            inner: StepCommand {
1713                thread_id: thread_id.0,
1714                granularity: supports_stepping_granularity.then(|| granularity),
1715                single_thread: supports_single_thread_execution_requests,
1716            },
1717        };
1718
1719        self.thread_states.process_step(thread_id);
1720        self.request(
1721            command,
1722            Self::on_step_response::<NextCommand>(thread_id),
1723            cx,
1724        )
1725        .detach();
1726    }
1727
1728    pub fn step_in(
1729        &mut self,
1730        thread_id: ThreadId,
1731        granularity: SteppingGranularity,
1732        cx: &mut Context<Self>,
1733    ) {
1734        let supports_single_thread_execution_requests =
1735            self.capabilities.supports_single_thread_execution_requests;
1736        let supports_stepping_granularity = self
1737            .capabilities
1738            .supports_stepping_granularity
1739            .unwrap_or_default();
1740
1741        let command = StepInCommand {
1742            inner: StepCommand {
1743                thread_id: thread_id.0,
1744                granularity: supports_stepping_granularity.then(|| granularity),
1745                single_thread: supports_single_thread_execution_requests,
1746            },
1747        };
1748
1749        self.thread_states.process_step(thread_id);
1750        self.request(
1751            command,
1752            Self::on_step_response::<StepInCommand>(thread_id),
1753            cx,
1754        )
1755        .detach();
1756    }
1757
1758    pub fn step_out(
1759        &mut self,
1760        thread_id: ThreadId,
1761        granularity: SteppingGranularity,
1762        cx: &mut Context<Self>,
1763    ) {
1764        let supports_single_thread_execution_requests =
1765            self.capabilities.supports_single_thread_execution_requests;
1766        let supports_stepping_granularity = self
1767            .capabilities
1768            .supports_stepping_granularity
1769            .unwrap_or_default();
1770
1771        let command = StepOutCommand {
1772            inner: StepCommand {
1773                thread_id: thread_id.0,
1774                granularity: supports_stepping_granularity.then(|| granularity),
1775                single_thread: supports_single_thread_execution_requests,
1776            },
1777        };
1778
1779        self.thread_states.process_step(thread_id);
1780        self.request(
1781            command,
1782            Self::on_step_response::<StepOutCommand>(thread_id),
1783            cx,
1784        )
1785        .detach();
1786    }
1787
1788    pub fn step_back(
1789        &mut self,
1790        thread_id: ThreadId,
1791        granularity: SteppingGranularity,
1792        cx: &mut Context<Self>,
1793    ) {
1794        let supports_single_thread_execution_requests =
1795            self.capabilities.supports_single_thread_execution_requests;
1796        let supports_stepping_granularity = self
1797            .capabilities
1798            .supports_stepping_granularity
1799            .unwrap_or_default();
1800
1801        let command = StepBackCommand {
1802            inner: StepCommand {
1803                thread_id: thread_id.0,
1804                granularity: supports_stepping_granularity.then(|| granularity),
1805                single_thread: supports_single_thread_execution_requests,
1806            },
1807        };
1808
1809        self.thread_states.process_step(thread_id);
1810
1811        self.request(
1812            command,
1813            Self::on_step_response::<StepBackCommand>(thread_id),
1814            cx,
1815        )
1816        .detach();
1817    }
1818
1819    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1820        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1821            && self.requests.contains_key(&ThreadsCommand.type_id())
1822            && self.threads.contains_key(&thread_id)
1823        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1824        // We could still be using an old thread id and have sent a new thread's request
1825        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1826        // But it very well could cause a minor bug in the future that is hard to track down
1827        {
1828            self.fetch(
1829                super::dap_command::StackTraceCommand {
1830                    thread_id: thread_id.0,
1831                    start_frame: None,
1832                    levels: None,
1833                },
1834                move |this, stack_frames, cx| {
1835                    let stack_frames = stack_frames.log_err()?;
1836
1837                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1838                        thread.stack_frame_ids =
1839                            stack_frames.iter().map(|frame| frame.id).collect();
1840                    });
1841                    debug_assert!(
1842                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1843                        "Sent request for thread_id that doesn't exist"
1844                    );
1845
1846                    this.stack_frames.extend(
1847                        stack_frames
1848                            .iter()
1849                            .cloned()
1850                            .map(|frame| (frame.id, StackFrame::from(frame))),
1851                    );
1852
1853                    this.invalidate_command_type::<ScopesCommand>();
1854                    this.invalidate_command_type::<VariablesCommand>();
1855
1856                    cx.emit(SessionEvent::StackTrace);
1857                    cx.notify();
1858                    Some(stack_frames)
1859                },
1860                cx,
1861            );
1862        }
1863
1864        self.threads
1865            .get(&thread_id)
1866            .map(|thread| {
1867                thread
1868                    .stack_frame_ids
1869                    .iter()
1870                    .filter_map(|id| self.stack_frames.get(id))
1871                    .cloned()
1872                    .collect()
1873            })
1874            .unwrap_or_default()
1875    }
1876
1877    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1878        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1879            && self
1880                .requests
1881                .contains_key(&TypeId::of::<StackTraceCommand>())
1882        {
1883            self.fetch(
1884                ScopesCommand { stack_frame_id },
1885                move |this, scopes, cx| {
1886                    let scopes = scopes.log_err()?;
1887
1888                    for scope in scopes .iter(){
1889                        this.variables(scope.variables_reference, cx);
1890                    }
1891
1892                    let entry = this
1893                        .stack_frames
1894                        .entry(stack_frame_id)
1895                        .and_modify(|stack_frame| {
1896                            stack_frame.scopes = scopes.clone();
1897                        });
1898
1899                    cx.emit(SessionEvent::Variables);
1900
1901                    debug_assert!(
1902                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1903                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1904                    );
1905
1906                    Some(scopes)
1907                },
1908                cx,
1909            );
1910        }
1911
1912        self.stack_frames
1913            .get(&stack_frame_id)
1914            .map(|frame| frame.scopes.as_slice())
1915            .unwrap_or_default()
1916    }
1917
1918    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
1919        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
1920            return Vec::new();
1921        };
1922
1923        stack_frame
1924            .scopes
1925            .iter()
1926            .filter_map(|scope| self.variables.get(&scope.variables_reference))
1927            .flatten()
1928            .cloned()
1929            .collect()
1930    }
1931
1932    pub fn variables(
1933        &mut self,
1934        variables_reference: VariableReference,
1935        cx: &mut Context<Self>,
1936    ) -> Vec<dap::Variable> {
1937        let command = VariablesCommand {
1938            variables_reference,
1939            filter: None,
1940            start: None,
1941            count: None,
1942            format: None,
1943        };
1944
1945        self.fetch(
1946            command,
1947            move |this, variables, cx| {
1948                let variables = variables.log_err()?;
1949                this.variables
1950                    .insert(variables_reference, variables.clone());
1951
1952                cx.emit(SessionEvent::Variables);
1953                cx.emit(SessionEvent::InvalidateInlineValue);
1954                Some(variables)
1955            },
1956            cx,
1957        );
1958
1959        self.variables
1960            .get(&variables_reference)
1961            .cloned()
1962            .unwrap_or_default()
1963    }
1964
1965    pub fn set_variable_value(
1966        &mut self,
1967        variables_reference: u64,
1968        name: String,
1969        value: String,
1970        cx: &mut Context<Self>,
1971    ) {
1972        if self.capabilities.supports_set_variable.unwrap_or_default() {
1973            self.request(
1974                SetVariableValueCommand {
1975                    name,
1976                    value,
1977                    variables_reference,
1978                },
1979                move |this, response, cx| {
1980                    let response = response.log_err()?;
1981                    this.invalidate_command_type::<VariablesCommand>();
1982                    cx.notify();
1983                    Some(response)
1984                },
1985                cx,
1986            )
1987            .detach()
1988        }
1989    }
1990
1991    pub fn evaluate(
1992        &mut self,
1993        expression: String,
1994        context: Option<EvaluateArgumentsContext>,
1995        frame_id: Option<u64>,
1996        source: Option<Source>,
1997        cx: &mut Context<Self>,
1998    ) -> Task<()> {
1999        self.output_token.0 += 1;
2000        self.output.push_back(dap::OutputEvent {
2001            category: None,
2002            output: format!("> {expression}"),
2003            group: None,
2004            variables_reference: None,
2005            source: None,
2006            line: None,
2007            column: None,
2008            data: None,
2009            location_reference: None,
2010        });
2011        let request = self.mode.request_dap(EvaluateCommand {
2012            expression,
2013            context,
2014            frame_id,
2015            source,
2016        });
2017        cx.spawn(async move |this, cx| {
2018            let response = request.await;
2019            this.update(cx, |this, cx| {
2020                match response {
2021                    Ok(response) => {
2022                        this.output_token.0 += 1;
2023                        this.output.push_back(dap::OutputEvent {
2024                            category: None,
2025                            output: format!("< {}", &response.result),
2026                            group: None,
2027                            variables_reference: Some(response.variables_reference),
2028                            source: None,
2029                            line: None,
2030                            column: None,
2031                            data: None,
2032                            location_reference: None,
2033                        });
2034                    }
2035                    Err(e) => {
2036                        this.output_token.0 += 1;
2037                        this.output.push_back(dap::OutputEvent {
2038                            category: None,
2039                            output: format!("{}", e),
2040                            group: None,
2041                            variables_reference: None,
2042                            source: None,
2043                            line: None,
2044                            column: None,
2045                            data: None,
2046                            location_reference: None,
2047                        });
2048                    }
2049                };
2050                this.invalidate_command_type::<ScopesCommand>();
2051                cx.notify();
2052            })
2053            .ok();
2054        })
2055    }
2056
2057    pub fn location(
2058        &mut self,
2059        reference: u64,
2060        cx: &mut Context<Self>,
2061    ) -> Option<dap::LocationsResponse> {
2062        self.fetch(
2063            LocationsCommand { reference },
2064            move |this, response, _| {
2065                let response = response.log_err()?;
2066                this.locations.insert(reference, response.clone());
2067                Some(response)
2068            },
2069            cx,
2070        );
2071        self.locations.get(&reference).cloned()
2072    }
2073
2074    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2075        let command = DisconnectCommand {
2076            restart: Some(false),
2077            terminate_debuggee: Some(true),
2078            suspend_debuggee: Some(false),
2079        };
2080
2081        self.request(command, Self::empty_response, cx).detach()
2082    }
2083
2084    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2085        if self
2086            .capabilities
2087            .supports_terminate_threads_request
2088            .unwrap_or_default()
2089        {
2090            self.request(
2091                TerminateThreadsCommand {
2092                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2093                },
2094                Self::clear_active_debug_line_response,
2095                cx,
2096            )
2097            .detach();
2098        } else {
2099            self.shutdown(cx).detach();
2100        }
2101    }
2102}