session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap, IndexSet};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  29};
  30use futures::channel::{mpsc, oneshot};
  31use futures::{FutureExt, future::Shared};
  32use gpui::{
  33    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  34    Task, WeakEntity,
  35};
  36
  37use serde_json::Value;
  38use smol::stream::StreamExt;
  39use std::any::TypeId;
  40use std::collections::BTreeMap;
  41use std::u64;
  42use std::{
  43    any::Any,
  44    collections::hash_map::Entry,
  45    hash::{Hash, Hasher},
  46    path::Path,
  47    sync::Arc,
  48};
  49use text::{PointUtf16, ToPointUtf16};
  50use util::ResultExt;
  51use worktree::Worktree;
  52
  53#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  54#[repr(transparent)]
  55pub struct ThreadId(pub u64);
  56
  57impl ThreadId {
  58    pub const MIN: ThreadId = ThreadId(u64::MIN);
  59    pub const MAX: ThreadId = ThreadId(u64::MAX);
  60}
  61
  62impl From<u64> for ThreadId {
  63    fn from(id: u64) -> Self {
  64        Self(id)
  65    }
  66}
  67
  68#[derive(Clone, Debug)]
  69pub struct StackFrame {
  70    pub dap: dap::StackFrame,
  71    pub scopes: Vec<dap::Scope>,
  72}
  73
  74impl From<dap::StackFrame> for StackFrame {
  75    fn from(stack_frame: dap::StackFrame) -> Self {
  76        Self {
  77            scopes: vec![],
  78            dap: stack_frame,
  79        }
  80    }
  81}
  82
  83#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  84pub enum ThreadStatus {
  85    #[default]
  86    Running,
  87    Stopped,
  88    Stepping,
  89    Exited,
  90    Ended,
  91}
  92
  93impl ThreadStatus {
  94    pub fn label(&self) -> &'static str {
  95        match self {
  96            ThreadStatus::Running => "Running",
  97            ThreadStatus::Stopped => "Stopped",
  98            ThreadStatus::Stepping => "Stepping",
  99            ThreadStatus::Exited => "Exited",
 100            ThreadStatus::Ended => "Ended",
 101        }
 102    }
 103}
 104
 105#[derive(Debug)]
 106pub struct Thread {
 107    dap: dap::Thread,
 108    stack_frame_ids: IndexSet<StackFrameId>,
 109    _has_stopped: bool,
 110}
 111
 112impl From<dap::Thread> for Thread {
 113    fn from(dap: dap::Thread) -> Self {
 114        Self {
 115            dap,
 116            stack_frame_ids: Default::default(),
 117            _has_stopped: false,
 118        }
 119    }
 120}
 121
 122pub enum Mode {
 123    Building,
 124    Running(RunningMode),
 125}
 126
 127#[derive(Clone)]
 128pub struct RunningMode {
 129    client: Arc<DebugAdapterClient>,
 130    binary: DebugAdapterBinary,
 131    tmp_breakpoint: Option<SourceBreakpoint>,
 132    worktree: WeakEntity<Worktree>,
 133    executor: BackgroundExecutor,
 134    is_started: bool,
 135}
 136
 137fn client_source(abs_path: &Path) -> dap::Source {
 138    dap::Source {
 139        name: abs_path
 140            .file_name()
 141            .map(|filename| filename.to_string_lossy().to_string()),
 142        path: Some(abs_path.to_string_lossy().to_string()),
 143        source_reference: None,
 144        presentation_hint: None,
 145        origin: None,
 146        sources: None,
 147        adapter_data: None,
 148        checksums: None,
 149    }
 150}
 151
 152impl RunningMode {
 153    async fn new(
 154        session_id: SessionId,
 155        parent_session: Option<Entity<Session>>,
 156        worktree: WeakEntity<Worktree>,
 157        binary: DebugAdapterBinary,
 158        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 159        cx: AsyncApp,
 160    ) -> Result<Self> {
 161        let message_handler = Box::new(move |message| {
 162            messages_tx.unbounded_send(message).ok();
 163        });
 164
 165        let client = Arc::new(
 166            if let Some(client) = parent_session
 167                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 168                .flatten()
 169            {
 170                client
 171                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 172                    .await?
 173            } else {
 174                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 175                    .await?
 176            },
 177        );
 178
 179        Ok(Self {
 180            client,
 181            worktree,
 182            tmp_breakpoint: None,
 183            binary,
 184            executor: cx.background_executor().clone(),
 185            is_started: false,
 186        })
 187    }
 188
 189    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 190        &self.worktree
 191    }
 192
 193    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 194        let tasks: Vec<_> = paths
 195            .into_iter()
 196            .map(|path| {
 197                self.request(dap_command::SetBreakpoints {
 198                    source: client_source(path),
 199                    source_modified: None,
 200                    breakpoints: vec![],
 201                })
 202            })
 203            .collect();
 204
 205        cx.background_spawn(async move {
 206            futures::future::join_all(tasks)
 207                .await
 208                .iter()
 209                .for_each(|res| match res {
 210                    Ok(_) => {}
 211                    Err(err) => {
 212                        log::warn!("Set breakpoints request failed: {}", err);
 213                    }
 214                });
 215        })
 216    }
 217
 218    fn send_breakpoints_from_path(
 219        &self,
 220        abs_path: Arc<Path>,
 221        reason: BreakpointUpdatedReason,
 222        breakpoint_store: &Entity<BreakpointStore>,
 223        cx: &mut App,
 224    ) -> Task<()> {
 225        let breakpoints =
 226            breakpoint_store
 227                .read(cx)
 228                .source_breakpoints_from_path(&abs_path, cx)
 229                .into_iter()
 230                .filter(|bp| bp.state.is_enabled())
 231                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 232                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 233                }))
 234                .map(Into::into)
 235                .collect();
 236
 237        let raw_breakpoints = breakpoint_store
 238            .read(cx)
 239            .breakpoints_from_path(&abs_path)
 240            .into_iter()
 241            .filter(|bp| bp.bp.state.is_enabled())
 242            .collect::<Vec<_>>();
 243
 244        let task = self.request(dap_command::SetBreakpoints {
 245            source: client_source(&abs_path),
 246            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 247            breakpoints,
 248        });
 249        let session_id = self.client.id();
 250        let breakpoint_store = breakpoint_store.downgrade();
 251        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 252            Ok(breakpoints) => {
 253                let breakpoints =
 254                    breakpoints
 255                        .into_iter()
 256                        .zip(raw_breakpoints)
 257                        .filter_map(|(dap_bp, zed_bp)| {
 258                            Some((
 259                                zed_bp,
 260                                BreakpointSessionState {
 261                                    id: dap_bp.id?,
 262                                    verified: dap_bp.verified,
 263                                },
 264                            ))
 265                        });
 266                breakpoint_store
 267                    .update(cx, |this, _| {
 268                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 269                    })
 270                    .ok();
 271            }
 272            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 273        })
 274    }
 275
 276    fn send_exception_breakpoints(
 277        &self,
 278        filters: Vec<ExceptionBreakpointsFilter>,
 279        supports_filter_options: bool,
 280    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 281        let arg = if supports_filter_options {
 282            SetExceptionBreakpoints::WithOptions {
 283                filters: filters
 284                    .into_iter()
 285                    .map(|filter| ExceptionFilterOptions {
 286                        filter_id: filter.filter,
 287                        condition: None,
 288                        mode: None,
 289                    })
 290                    .collect(),
 291            }
 292        } else {
 293            SetExceptionBreakpoints::Plain {
 294                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 295            }
 296        };
 297        self.request(arg)
 298    }
 299
 300    fn send_source_breakpoints(
 301        &self,
 302        ignore_breakpoints: bool,
 303        breakpoint_store: &Entity<BreakpointStore>,
 304        cx: &App,
 305    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 306        let mut breakpoint_tasks = Vec::new();
 307        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 308        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 309        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 310        let session_id = self.client.id();
 311        for (path, breakpoints) in breakpoints {
 312            let breakpoints = if ignore_breakpoints {
 313                vec![]
 314            } else {
 315                breakpoints
 316                    .into_iter()
 317                    .filter(|bp| bp.state.is_enabled())
 318                    .map(Into::into)
 319                    .collect()
 320            };
 321
 322            let raw_breakpoints = raw_breakpoints
 323                .remove(&path)
 324                .unwrap_or_default()
 325                .into_iter()
 326                .filter(|bp| bp.bp.state.is_enabled());
 327            let error_path = path.clone();
 328            let send_request = self
 329                .request(dap_command::SetBreakpoints {
 330                    source: client_source(&path),
 331                    source_modified: Some(false),
 332                    breakpoints,
 333                })
 334                .map(|result| result.map_err(move |e| (error_path, e)));
 335
 336            let task = cx.spawn({
 337                let breakpoint_store = breakpoint_store.downgrade();
 338                async move |cx| {
 339                    let breakpoints = cx.background_spawn(send_request).await?;
 340
 341                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 342                        |(dap_bp, zed_bp)| {
 343                            Some((
 344                                zed_bp,
 345                                BreakpointSessionState {
 346                                    id: dap_bp.id?,
 347                                    verified: dap_bp.verified,
 348                                },
 349                            ))
 350                        },
 351                    );
 352                    breakpoint_store
 353                        .update(cx, |this, _| {
 354                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 355                        })
 356                        .ok();
 357
 358                    Ok(())
 359                }
 360            });
 361            breakpoint_tasks.push(task);
 362        }
 363
 364        cx.background_spawn(async move {
 365            futures::future::join_all(breakpoint_tasks)
 366                .await
 367                .into_iter()
 368                .filter_map(Result::err)
 369                .collect::<HashMap<_, _>>()
 370        })
 371    }
 372
 373    fn initialize_sequence(
 374        &self,
 375        capabilities: &Capabilities,
 376        initialized_rx: oneshot::Receiver<()>,
 377        dap_store: WeakEntity<DapStore>,
 378        cx: &mut Context<Session>,
 379    ) -> Task<Result<()>> {
 380        let raw = self.binary.request_args.clone();
 381
 382        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 383        let launch = match raw.request {
 384            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 385                raw: raw.configuration,
 386            }),
 387            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 388                raw: raw.configuration,
 389            }),
 390        };
 391
 392        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 393        let exception_filters = capabilities
 394            .exception_breakpoint_filters
 395            .as_ref()
 396            .map(|exception_filters| {
 397                exception_filters
 398                    .iter()
 399                    .filter(|filter| filter.default == Some(true))
 400                    .cloned()
 401                    .collect::<Vec<_>>()
 402            })
 403            .unwrap_or_default();
 404        let supports_exception_filters = capabilities
 405            .supports_exception_filter_options
 406            .unwrap_or_default();
 407        let this = self.clone();
 408        let worktree = self.worktree().clone();
 409        let configuration_sequence = cx.spawn({
 410            async move |_, cx| {
 411                let breakpoint_store =
 412                    dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 413                initialized_rx.await?;
 414                let errors_by_path = cx
 415                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 416                    .await;
 417
 418                dap_store.update(cx, |_, cx| {
 419                    let Some(worktree) = worktree.upgrade() else {
 420                        return;
 421                    };
 422
 423                    for (path, error) in &errors_by_path {
 424                        log::error!("failed to set breakpoints for {path:?}: {error}");
 425                    }
 426
 427                    if let Some(failed_path) = errors_by_path.keys().next() {
 428                        let failed_path = failed_path
 429                            .strip_prefix(worktree.read(cx).abs_path())
 430                            .unwrap_or(failed_path)
 431                            .display();
 432                        let message = format!(
 433                            "Failed to set breakpoints for {failed_path}{}",
 434                            match errors_by_path.len() {
 435                                0 => unreachable!(),
 436                                1 => "".into(),
 437                                2 => " and 1 other path".into(),
 438                                n => format!(" and {} other paths", n - 1),
 439                            }
 440                        );
 441                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 442                    }
 443                })?;
 444
 445                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 446                    .await
 447                    .ok();
 448                let ret = if configuration_done_supported {
 449                    this.request(ConfigurationDone {})
 450                } else {
 451                    Task::ready(Ok(()))
 452                }
 453                .await;
 454                ret
 455            }
 456        });
 457
 458        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 459
 460        cx.spawn(async move |this, cx| {
 461            task.await?;
 462
 463            this.update(cx, |this, cx| {
 464                if let Some(this) = this.as_running_mut() {
 465                    this.is_started = true;
 466                    cx.notify();
 467                }
 468            })
 469            .ok();
 470
 471            anyhow::Ok(())
 472        })
 473    }
 474
 475    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 476    where
 477        <R::DapRequest as dap::requests::Request>::Response: 'static,
 478        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 479    {
 480        let request = Arc::new(request);
 481
 482        let request_clone = request.clone();
 483        let connection = self.client.clone();
 484        self.executor.spawn(async move {
 485            let args = request_clone.to_dap();
 486            let response = connection.request::<R::DapRequest>(args).await?;
 487            request.response_from_dap(response)
 488        })
 489    }
 490}
 491
 492impl Mode {
 493    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 494    where
 495        <R::DapRequest as dap::requests::Request>::Response: 'static,
 496        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 497    {
 498        match self {
 499            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 500            Mode::Building => Task::ready(Err(anyhow!(
 501                "no adapter running to send request: {request:?}"
 502            ))),
 503        }
 504    }
 505}
 506
 507#[derive(Default)]
 508struct ThreadStates {
 509    global_state: Option<ThreadStatus>,
 510    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 511}
 512
 513impl ThreadStates {
 514    fn stop_all_threads(&mut self) {
 515        self.global_state = Some(ThreadStatus::Stopped);
 516        self.known_thread_states.clear();
 517    }
 518
 519    fn exit_all_threads(&mut self) {
 520        self.global_state = Some(ThreadStatus::Exited);
 521        self.known_thread_states.clear();
 522    }
 523
 524    fn continue_all_threads(&mut self) {
 525        self.global_state = Some(ThreadStatus::Running);
 526        self.known_thread_states.clear();
 527    }
 528
 529    fn stop_thread(&mut self, thread_id: ThreadId) {
 530        self.known_thread_states
 531            .insert(thread_id, ThreadStatus::Stopped);
 532    }
 533
 534    fn continue_thread(&mut self, thread_id: ThreadId) {
 535        self.known_thread_states
 536            .insert(thread_id, ThreadStatus::Running);
 537    }
 538
 539    fn process_step(&mut self, thread_id: ThreadId) {
 540        self.known_thread_states
 541            .insert(thread_id, ThreadStatus::Stepping);
 542    }
 543
 544    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 545        self.thread_state(thread_id)
 546            .unwrap_or(ThreadStatus::Running)
 547    }
 548
 549    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 550        self.known_thread_states
 551            .get(&thread_id)
 552            .copied()
 553            .or(self.global_state)
 554    }
 555
 556    fn exit_thread(&mut self, thread_id: ThreadId) {
 557        self.known_thread_states
 558            .insert(thread_id, ThreadStatus::Exited);
 559    }
 560
 561    fn any_stopped_thread(&self) -> bool {
 562        self.global_state
 563            .is_some_and(|state| state == ThreadStatus::Stopped)
 564            || self
 565                .known_thread_states
 566                .values()
 567                .any(|status| *status == ThreadStatus::Stopped)
 568    }
 569}
 570const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 571
 572type IsEnabled = bool;
 573
 574#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 575pub struct OutputToken(pub usize);
 576/// Represents a current state of a single debug adapter and provides ways to mutate it.
 577pub struct Session {
 578    pub mode: Mode,
 579    id: SessionId,
 580    label: SharedString,
 581    adapter: DebugAdapterName,
 582    pub(super) capabilities: Capabilities,
 583    child_session_ids: HashSet<SessionId>,
 584    parent_session: Option<Entity<Session>>,
 585    modules: Vec<dap::Module>,
 586    loaded_sources: Vec<dap::Source>,
 587    output_token: OutputToken,
 588    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 589    threads: IndexMap<ThreadId, Thread>,
 590    thread_states: ThreadStates,
 591    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 592    stack_frames: IndexMap<StackFrameId, StackFrame>,
 593    locations: HashMap<u64, dap::LocationsResponse>,
 594    is_session_terminated: bool,
 595    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 596    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 597    ignore_breakpoints: bool,
 598    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 599    background_tasks: Vec<Task<()>>,
 600}
 601
 602trait CacheableCommand: Any + Send + Sync {
 603    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 604    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 605    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 606}
 607
 608impl<T> CacheableCommand for T
 609where
 610    T: DapCommand + PartialEq + Eq + Hash,
 611{
 612    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 613        (rhs as &dyn Any)
 614            .downcast_ref::<Self>()
 615            .map_or(false, |rhs| self == rhs)
 616    }
 617
 618    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 619        T::hash(self, &mut hasher);
 620    }
 621
 622    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 623        self
 624    }
 625}
 626
 627pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 628
 629impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 630    fn from(request: T) -> Self {
 631        Self(Arc::new(request))
 632    }
 633}
 634
 635impl PartialEq for RequestSlot {
 636    fn eq(&self, other: &Self) -> bool {
 637        self.0.dyn_eq(other.0.as_ref())
 638    }
 639}
 640
 641impl Eq for RequestSlot {}
 642
 643impl Hash for RequestSlot {
 644    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 645        self.0.dyn_hash(state);
 646        (&*self.0 as &dyn Any).type_id().hash(state)
 647    }
 648}
 649
 650#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 651pub struct CompletionsQuery {
 652    pub query: String,
 653    pub column: u64,
 654    pub line: Option<u64>,
 655    pub frame_id: Option<u64>,
 656}
 657
 658impl CompletionsQuery {
 659    pub fn new(
 660        buffer: &language::Buffer,
 661        cursor_position: language::Anchor,
 662        frame_id: Option<u64>,
 663    ) -> Self {
 664        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 665        Self {
 666            query: buffer.text(),
 667            column: column as u64,
 668            frame_id,
 669            line: Some(row as u64),
 670        }
 671    }
 672}
 673
 674#[derive(Debug)]
 675pub enum SessionEvent {
 676    Modules,
 677    LoadedSources,
 678    Stopped(Option<ThreadId>),
 679    StackTrace,
 680    Variables,
 681    Threads,
 682    InvalidateInlineValue,
 683    CapabilitiesLoaded,
 684    RunInTerminal {
 685        request: RunInTerminalRequestArguments,
 686        sender: mpsc::Sender<Result<u32>>,
 687    },
 688    ConsoleOutput,
 689}
 690
 691#[derive(Clone, Debug, PartialEq, Eq)]
 692pub enum SessionStateEvent {
 693    Running,
 694    Shutdown,
 695    Restart,
 696    SpawnChildSession {
 697        request: StartDebuggingRequestArguments,
 698    },
 699}
 700
 701impl EventEmitter<SessionEvent> for Session {}
 702impl EventEmitter<SessionStateEvent> for Session {}
 703
 704// local session will send breakpoint updates to DAP for all new breakpoints
 705// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 706// BreakpointStore notifies session on breakpoint changes
 707impl Session {
 708    pub(crate) fn new(
 709        breakpoint_store: Entity<BreakpointStore>,
 710        session_id: SessionId,
 711        parent_session: Option<Entity<Session>>,
 712        label: SharedString,
 713        adapter: DebugAdapterName,
 714        cx: &mut App,
 715    ) -> Entity<Self> {
 716        cx.new::<Self>(|cx| {
 717            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 718                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 719                    if let Some(local) = (!this.ignore_breakpoints)
 720                        .then(|| this.as_running_mut())
 721                        .flatten()
 722                    {
 723                        local
 724                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 725                            .detach();
 726                    };
 727                }
 728                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 729                    if let Some(local) = (!this.ignore_breakpoints)
 730                        .then(|| this.as_running_mut())
 731                        .flatten()
 732                    {
 733                        local.unset_breakpoints_from_paths(paths, cx).detach();
 734                    }
 735                }
 736                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 737            })
 738            .detach();
 739            cx.on_app_quit(Self::on_app_quit).detach();
 740
 741            let this = Self {
 742                mode: Mode::Building,
 743                id: session_id,
 744                child_session_ids: HashSet::default(),
 745                parent_session,
 746                capabilities: Capabilities::default(),
 747                variables: Default::default(),
 748                stack_frames: Default::default(),
 749                thread_states: ThreadStates::default(),
 750                output_token: OutputToken(0),
 751                output: circular_buffer::CircularBuffer::boxed(),
 752                requests: HashMap::default(),
 753                modules: Vec::default(),
 754                loaded_sources: Vec::default(),
 755                threads: IndexMap::default(),
 756                background_tasks: Vec::default(),
 757                locations: Default::default(),
 758                is_session_terminated: false,
 759                ignore_breakpoints: false,
 760                breakpoint_store,
 761                exception_breakpoints: Default::default(),
 762                label,
 763                adapter,
 764            };
 765
 766            this
 767        })
 768    }
 769
 770    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 771        match &self.mode {
 772            Mode::Building => None,
 773            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 774        }
 775    }
 776
 777    pub fn boot(
 778        &mut self,
 779        binary: DebugAdapterBinary,
 780        worktree: Entity<Worktree>,
 781        dap_store: WeakEntity<DapStore>,
 782        cx: &mut Context<Self>,
 783    ) -> Task<Result<()>> {
 784        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 785        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 786
 787        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 788            let mut initialized_tx = Some(initialized_tx);
 789            while let Some(message) = message_rx.next().await {
 790                if let Message::Event(event) = message {
 791                    if let Events::Initialized(_) = *event {
 792                        if let Some(tx) = initialized_tx.take() {
 793                            tx.send(()).ok();
 794                        }
 795                    } else {
 796                        let Ok(_) = this.update(cx, |session, cx| {
 797                            session.handle_dap_event(event, cx);
 798                        }) else {
 799                            break;
 800                        };
 801                    }
 802                } else if let Message::Request(request) = message {
 803                    let Ok(_) = this.update(cx, |this, cx| {
 804                        if request.command == StartDebugging::COMMAND {
 805                            this.handle_start_debugging_request(request, cx)
 806                                .detach_and_log_err(cx);
 807                        } else if request.command == RunInTerminal::COMMAND {
 808                            this.handle_run_in_terminal_request(request, cx)
 809                                .detach_and_log_err(cx);
 810                        }
 811                    }) else {
 812                        break;
 813                    };
 814                }
 815            }
 816        })];
 817        self.background_tasks = background_tasks;
 818        let id = self.id;
 819        let parent_session = self.parent_session.clone();
 820
 821        cx.spawn(async move |this, cx| {
 822            let mode = RunningMode::new(
 823                id,
 824                parent_session,
 825                worktree.downgrade(),
 826                binary,
 827                message_tx,
 828                cx.clone(),
 829            )
 830            .await?;
 831            this.update(cx, |this, cx| {
 832                this.mode = Mode::Running(mode);
 833                cx.emit(SessionStateEvent::Running);
 834            })?;
 835
 836            this.update(cx, |session, cx| session.request_initialize(cx))?
 837                .await?;
 838
 839            this.update(cx, |session, cx| {
 840                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 841            })?
 842            .await
 843        })
 844    }
 845
 846    pub fn session_id(&self) -> SessionId {
 847        self.id
 848    }
 849
 850    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 851        self.child_session_ids.clone()
 852    }
 853
 854    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 855        self.child_session_ids.insert(session_id);
 856    }
 857
 858    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 859        self.child_session_ids.remove(&session_id);
 860    }
 861
 862    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 863        self.parent_session
 864            .as_ref()
 865            .map(|session| session.read(cx).id)
 866    }
 867
 868    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 869        self.parent_session.as_ref()
 870    }
 871
 872    pub fn capabilities(&self) -> &Capabilities {
 873        &self.capabilities
 874    }
 875
 876    pub fn binary(&self) -> &DebugAdapterBinary {
 877        let Mode::Running(local_mode) = &self.mode else {
 878            panic!("Session is not running");
 879        };
 880        &local_mode.binary
 881    }
 882
 883    pub fn adapter(&self) -> DebugAdapterName {
 884        self.adapter.clone()
 885    }
 886
 887    pub fn label(&self) -> SharedString {
 888        self.label.clone()
 889    }
 890
 891    pub fn is_terminated(&self) -> bool {
 892        self.is_session_terminated
 893    }
 894
 895    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
 896        let (tx, mut rx) = mpsc::unbounded();
 897
 898        cx.spawn(async move |this, cx| {
 899            while let Some(output) = rx.next().await {
 900                this.update(cx, |this, cx| {
 901                    let event = dap::OutputEvent {
 902                        category: None,
 903                        output,
 904                        group: None,
 905                        variables_reference: None,
 906                        source: None,
 907                        line: None,
 908                        column: None,
 909                        data: None,
 910                        location_reference: None,
 911                    };
 912                    this.push_output(event, cx);
 913                })?;
 914            }
 915            anyhow::Ok(())
 916        })
 917        .detach();
 918
 919        return tx;
 920    }
 921
 922    pub fn is_started(&self) -> bool {
 923        match &self.mode {
 924            Mode::Building => false,
 925            Mode::Running(running) => running.is_started,
 926        }
 927    }
 928
 929    pub fn is_building(&self) -> bool {
 930        matches!(self.mode, Mode::Building)
 931    }
 932
 933    pub fn is_running(&self) -> bool {
 934        matches!(self.mode, Mode::Running(_))
 935    }
 936
 937    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
 938        match &mut self.mode {
 939            Mode::Running(local_mode) => Some(local_mode),
 940            Mode::Building => None,
 941        }
 942    }
 943
 944    pub fn as_running(&self) -> Option<&RunningMode> {
 945        match &self.mode {
 946            Mode::Running(local_mode) => Some(local_mode),
 947            Mode::Building => None,
 948        }
 949    }
 950
 951    fn handle_start_debugging_request(
 952        &mut self,
 953        request: dap::messages::Request,
 954        cx: &mut Context<Self>,
 955    ) -> Task<Result<()>> {
 956        let request_seq = request.seq;
 957
 958        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 959            .arguments
 960            .as_ref()
 961            .map(|value| serde_json::from_value(value.clone()));
 962
 963        let mut success = true;
 964        if let Some(Ok(request)) = launch_request {
 965            cx.emit(SessionStateEvent::SpawnChildSession { request });
 966        } else {
 967            log::error!(
 968                "Failed to parse launch request arguments: {:?}",
 969                request.arguments
 970            );
 971            success = false;
 972        }
 973
 974        cx.spawn(async move |this, cx| {
 975            this.update(cx, |this, cx| {
 976                this.respond_to_client(
 977                    request_seq,
 978                    success,
 979                    StartDebugging::COMMAND.to_string(),
 980                    None,
 981                    cx,
 982                )
 983            })?
 984            .await
 985        })
 986    }
 987
 988    fn handle_run_in_terminal_request(
 989        &mut self,
 990        request: dap::messages::Request,
 991        cx: &mut Context<Self>,
 992    ) -> Task<Result<()>> {
 993        let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
 994            request.arguments.unwrap_or_default(),
 995        ) {
 996            Ok(args) => args,
 997            Err(error) => {
 998                return cx.spawn(async move |session, cx| {
 999                    let error = serde_json::to_value(dap::ErrorResponse {
1000                        error: Some(dap::Message {
1001                            id: request.seq,
1002                            format: error.to_string(),
1003                            variables: None,
1004                            send_telemetry: None,
1005                            show_user: None,
1006                            url: None,
1007                            url_label: None,
1008                        }),
1009                    })
1010                    .ok();
1011
1012                    session
1013                        .update(cx, |this, cx| {
1014                            this.respond_to_client(
1015                                request.seq,
1016                                false,
1017                                StartDebugging::COMMAND.to_string(),
1018                                error,
1019                                cx,
1020                            )
1021                        })?
1022                        .await?;
1023
1024                    Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1025                });
1026            }
1027        };
1028
1029        let seq = request.seq;
1030
1031        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1032        cx.emit(SessionEvent::RunInTerminal {
1033            request: request_args,
1034            sender: tx,
1035        });
1036        cx.notify();
1037
1038        cx.spawn(async move |session, cx| {
1039            let result = util::maybe!(async move {
1040                rx.next().await.ok_or_else(|| {
1041                    anyhow!("failed to receive response from spawn terminal".to_string())
1042                })?
1043            })
1044            .await;
1045            let (success, body) = match result {
1046                Ok(pid) => (
1047                    true,
1048                    serde_json::to_value(dap::RunInTerminalResponse {
1049                        process_id: None,
1050                        shell_process_id: Some(pid as u64),
1051                    })
1052                    .ok(),
1053                ),
1054                Err(error) => (
1055                    false,
1056                    serde_json::to_value(dap::ErrorResponse {
1057                        error: Some(dap::Message {
1058                            id: seq,
1059                            format: error.to_string(),
1060                            variables: None,
1061                            send_telemetry: None,
1062                            show_user: None,
1063                            url: None,
1064                            url_label: None,
1065                        }),
1066                    })
1067                    .ok(),
1068                ),
1069            };
1070
1071            session
1072                .update(cx, |session, cx| {
1073                    session.respond_to_client(
1074                        seq,
1075                        success,
1076                        RunInTerminal::COMMAND.to_string(),
1077                        body,
1078                        cx,
1079                    )
1080                })?
1081                .await
1082        })
1083    }
1084
1085    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1086        let adapter_id = self.adapter().to_string();
1087        let request = Initialize { adapter_id };
1088        match &self.mode {
1089            Mode::Running(local_mode) => {
1090                let capabilities = local_mode.request(request);
1091
1092                cx.spawn(async move |this, cx| {
1093                    let capabilities = capabilities.await?;
1094                    this.update(cx, |session, cx| {
1095                        session.capabilities = capabilities;
1096                        let filters = session
1097                            .capabilities
1098                            .exception_breakpoint_filters
1099                            .clone()
1100                            .unwrap_or_default();
1101                        for filter in filters {
1102                            let default = filter.default.unwrap_or_default();
1103                            session
1104                                .exception_breakpoints
1105                                .entry(filter.filter.clone())
1106                                .or_insert_with(|| (filter, default));
1107                        }
1108                        cx.emit(SessionEvent::CapabilitiesLoaded);
1109                    })?;
1110                    Ok(())
1111                })
1112            }
1113            Mode::Building => Task::ready(Err(anyhow!(
1114                "Cannot send initialize request, task still building"
1115            ))),
1116        }
1117    }
1118
1119    pub(super) fn initialize_sequence(
1120        &mut self,
1121        initialize_rx: oneshot::Receiver<()>,
1122        dap_store: WeakEntity<DapStore>,
1123        cx: &mut Context<Self>,
1124    ) -> Task<Result<()>> {
1125        match &self.mode {
1126            Mode::Running(local_mode) => {
1127                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1128            }
1129            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1130        }
1131    }
1132
1133    pub fn run_to_position(
1134        &mut self,
1135        breakpoint: SourceBreakpoint,
1136        active_thread_id: ThreadId,
1137        cx: &mut Context<Self>,
1138    ) {
1139        match &mut self.mode {
1140            Mode::Running(local_mode) => {
1141                if !matches!(
1142                    self.thread_states.thread_state(active_thread_id),
1143                    Some(ThreadStatus::Stopped)
1144                ) {
1145                    return;
1146                };
1147                let path = breakpoint.path.clone();
1148                local_mode.tmp_breakpoint = Some(breakpoint);
1149                let task = local_mode.send_breakpoints_from_path(
1150                    path,
1151                    BreakpointUpdatedReason::Toggled,
1152                    &self.breakpoint_store,
1153                    cx,
1154                );
1155
1156                cx.spawn(async move |this, cx| {
1157                    task.await;
1158                    this.update(cx, |this, cx| {
1159                        this.continue_thread(active_thread_id, cx);
1160                    })
1161                })
1162                .detach();
1163            }
1164            Mode::Building => {}
1165        }
1166    }
1167
1168    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1169        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1170    }
1171
1172    pub fn output(
1173        &self,
1174        since: OutputToken,
1175    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1176        if self.output_token.0 == 0 {
1177            return (self.output.range(0..0), OutputToken(0));
1178        };
1179
1180        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1181
1182        let clamped_events_since = events_since.clamp(0, self.output.len());
1183        (
1184            self.output
1185                .range(self.output.len() - clamped_events_since..),
1186            self.output_token,
1187        )
1188    }
1189
1190    pub fn respond_to_client(
1191        &self,
1192        request_seq: u64,
1193        success: bool,
1194        command: String,
1195        body: Option<serde_json::Value>,
1196        cx: &mut Context<Self>,
1197    ) -> Task<Result<()>> {
1198        let Some(local_session) = self.as_running() else {
1199            unreachable!("Cannot respond to remote client");
1200        };
1201        let client = local_session.client.clone();
1202
1203        cx.background_spawn(async move {
1204            client
1205                .send_message(Message::Response(Response {
1206                    body,
1207                    success,
1208                    command,
1209                    seq: request_seq + 1,
1210                    request_seq,
1211                    message: None,
1212                }))
1213                .await
1214        })
1215    }
1216
1217    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1218        // todo(debugger): Find a clean way to get around the clone
1219        let breakpoint_store = self.breakpoint_store.clone();
1220        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1221            let breakpoint = local.tmp_breakpoint.take()?;
1222            let path = breakpoint.path.clone();
1223            Some((local, path))
1224        }) {
1225            local
1226                .send_breakpoints_from_path(
1227                    path,
1228                    BreakpointUpdatedReason::Toggled,
1229                    &breakpoint_store,
1230                    cx,
1231                )
1232                .detach();
1233        };
1234
1235        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1236            self.thread_states.stop_all_threads();
1237
1238            self.invalidate_command_type::<StackTraceCommand>();
1239        }
1240
1241        // Event if we stopped all threads we still need to insert the thread_id
1242        // to our own data
1243        if let Some(thread_id) = event.thread_id {
1244            self.thread_states.stop_thread(ThreadId(thread_id));
1245
1246            self.invalidate_state(
1247                &StackTraceCommand {
1248                    thread_id,
1249                    start_frame: None,
1250                    levels: None,
1251                }
1252                .into(),
1253            );
1254        }
1255
1256        self.invalidate_generic();
1257        self.threads.clear();
1258        self.variables.clear();
1259        cx.emit(SessionEvent::Stopped(
1260            event
1261                .thread_id
1262                .map(Into::into)
1263                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1264        ));
1265        cx.emit(SessionEvent::InvalidateInlineValue);
1266        cx.notify();
1267    }
1268
1269    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1270        match *event {
1271            Events::Initialized(_) => {
1272                debug_assert!(
1273                    false,
1274                    "Initialized event should have been handled in LocalMode"
1275                );
1276            }
1277            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1278            Events::Continued(event) => {
1279                if event.all_threads_continued.unwrap_or_default() {
1280                    self.thread_states.continue_all_threads();
1281                    self.breakpoint_store.update(cx, |store, cx| {
1282                        store.remove_active_position(Some(self.session_id()), cx)
1283                    });
1284                } else {
1285                    self.thread_states
1286                        .continue_thread(ThreadId(event.thread_id));
1287                }
1288                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1289                self.invalidate_generic();
1290            }
1291            Events::Exited(_event) => {
1292                self.clear_active_debug_line(cx);
1293            }
1294            Events::Terminated(_) => {
1295                self.shutdown(cx).detach();
1296            }
1297            Events::Thread(event) => {
1298                let thread_id = ThreadId(event.thread_id);
1299
1300                match event.reason {
1301                    dap::ThreadEventReason::Started => {
1302                        self.thread_states.continue_thread(thread_id);
1303                    }
1304                    dap::ThreadEventReason::Exited => {
1305                        self.thread_states.exit_thread(thread_id);
1306                    }
1307                    reason => {
1308                        log::error!("Unhandled thread event reason {:?}", reason);
1309                    }
1310                }
1311                self.invalidate_state(&ThreadsCommand.into());
1312                cx.notify();
1313            }
1314            Events::Output(event) => {
1315                if event
1316                    .category
1317                    .as_ref()
1318                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1319                {
1320                    return;
1321                }
1322
1323                self.push_output(event, cx);
1324                cx.notify();
1325            }
1326            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1327                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1328            }),
1329            Events::Module(event) => {
1330                match event.reason {
1331                    dap::ModuleEventReason::New => {
1332                        self.modules.push(event.module);
1333                    }
1334                    dap::ModuleEventReason::Changed => {
1335                        if let Some(module) = self
1336                            .modules
1337                            .iter_mut()
1338                            .find(|other| event.module.id == other.id)
1339                        {
1340                            *module = event.module;
1341                        }
1342                    }
1343                    dap::ModuleEventReason::Removed => {
1344                        self.modules.retain(|other| event.module.id != other.id);
1345                    }
1346                }
1347
1348                // todo(debugger): We should only send the invalidate command to downstream clients.
1349                // self.invalidate_state(&ModulesCommand.into());
1350            }
1351            Events::LoadedSource(_) => {
1352                self.invalidate_state(&LoadedSourcesCommand.into());
1353            }
1354            Events::Capabilities(event) => {
1355                self.capabilities = self.capabilities.merge(event.capabilities);
1356                cx.notify();
1357            }
1358            Events::Memory(_) => {}
1359            Events::Process(_) => {}
1360            Events::ProgressEnd(_) => {}
1361            Events::ProgressStart(_) => {}
1362            Events::ProgressUpdate(_) => {}
1363            Events::Invalidated(_) => {}
1364            Events::Other(_) => {}
1365        }
1366    }
1367
1368    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1369    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1370        &mut self,
1371        request: T,
1372        process_result: impl FnOnce(
1373            &mut Self,
1374            Result<T::Response>,
1375            &mut Context<Self>,
1376        ) -> Option<T::Response>
1377        + 'static,
1378        cx: &mut Context<Self>,
1379    ) {
1380        const {
1381            assert!(
1382                T::CACHEABLE,
1383                "Only requests marked as cacheable should invoke `fetch`"
1384            );
1385        }
1386
1387        if !self.thread_states.any_stopped_thread()
1388            && request.type_id() != TypeId::of::<ThreadsCommand>()
1389            || self.is_session_terminated
1390        {
1391            return;
1392        }
1393
1394        let request_map = self
1395            .requests
1396            .entry(std::any::TypeId::of::<T>())
1397            .or_default();
1398
1399        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1400            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1401
1402            let task = Self::request_inner::<Arc<T>>(
1403                &self.capabilities,
1404                &self.mode,
1405                command,
1406                process_result,
1407                cx,
1408            );
1409            let task = cx
1410                .background_executor()
1411                .spawn(async move {
1412                    let _ = task.await?;
1413                    Some(())
1414                })
1415                .shared();
1416
1417            vacant.insert(task);
1418            cx.notify();
1419        }
1420    }
1421
1422    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1423        &self,
1424        request: T,
1425    ) -> Result<T::Response> {
1426        if !T::is_supported(&self.capabilities) {
1427            anyhow::bail!("DAP request {:?} is not supported", request);
1428        }
1429
1430        self.mode.request_dap(request).await
1431    }
1432
1433    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1434        capabilities: &Capabilities,
1435        mode: &Mode,
1436        request: T,
1437        process_result: impl FnOnce(
1438            &mut Self,
1439            Result<T::Response>,
1440            &mut Context<Self>,
1441        ) -> Option<T::Response>
1442        + 'static,
1443        cx: &mut Context<Self>,
1444    ) -> Task<Option<T::Response>> {
1445        if !T::is_supported(&capabilities) {
1446            log::warn!(
1447                "Attempted to send a DAP request that isn't supported: {:?}",
1448                request
1449            );
1450            let error = Err(anyhow::Error::msg(
1451                "Couldn't complete request because it's not supported",
1452            ));
1453            return cx.spawn(async move |this, cx| {
1454                this.update(cx, |this, cx| process_result(this, error, cx))
1455                    .ok()
1456                    .flatten()
1457            });
1458        }
1459
1460        let request = mode.request_dap(request);
1461        cx.spawn(async move |this, cx| {
1462            let result = request.await;
1463            this.update(cx, |this, cx| process_result(this, result, cx))
1464                .ok()
1465                .flatten()
1466        })
1467    }
1468
1469    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1470        &self,
1471        request: T,
1472        process_result: impl FnOnce(
1473            &mut Self,
1474            Result<T::Response>,
1475            &mut Context<Self>,
1476        ) -> Option<T::Response>
1477        + 'static,
1478        cx: &mut Context<Self>,
1479    ) -> Task<Option<T::Response>> {
1480        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1481    }
1482
1483    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1484        self.requests.remove(&std::any::TypeId::of::<Command>());
1485    }
1486
1487    fn invalidate_generic(&mut self) {
1488        self.invalidate_command_type::<ModulesCommand>();
1489        self.invalidate_command_type::<LoadedSourcesCommand>();
1490        self.invalidate_command_type::<ThreadsCommand>();
1491    }
1492
1493    fn invalidate_state(&mut self, key: &RequestSlot) {
1494        self.requests
1495            .entry((&*key.0 as &dyn Any).type_id())
1496            .and_modify(|request_map| {
1497                request_map.remove(&key);
1498            });
1499    }
1500
1501    fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1502        self.output.push_back(event);
1503        self.output_token.0 += 1;
1504        cx.emit(SessionEvent::ConsoleOutput);
1505    }
1506
1507    pub fn any_stopped_thread(&self) -> bool {
1508        self.thread_states.any_stopped_thread()
1509    }
1510
1511    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1512        self.thread_states.thread_status(thread_id)
1513    }
1514
1515    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1516        self.fetch(
1517            dap_command::ThreadsCommand,
1518            |this, result, cx| {
1519                let result = result.log_err()?;
1520
1521                this.threads = result
1522                    .iter()
1523                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1524                    .collect();
1525
1526                this.invalidate_command_type::<StackTraceCommand>();
1527                cx.emit(SessionEvent::Threads);
1528                cx.notify();
1529
1530                Some(result)
1531            },
1532            cx,
1533        );
1534
1535        self.threads
1536            .values()
1537            .map(|thread| {
1538                (
1539                    thread.dap.clone(),
1540                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1541                )
1542            })
1543            .collect()
1544    }
1545
1546    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1547        self.fetch(
1548            dap_command::ModulesCommand,
1549            |this, result, cx| {
1550                let result = result.log_err()?;
1551
1552                this.modules = result.iter().cloned().collect();
1553                cx.emit(SessionEvent::Modules);
1554                cx.notify();
1555
1556                Some(result)
1557            },
1558            cx,
1559        );
1560
1561        &self.modules
1562    }
1563
1564    pub fn ignore_breakpoints(&self) -> bool {
1565        self.ignore_breakpoints
1566    }
1567
1568    pub fn toggle_ignore_breakpoints(
1569        &mut self,
1570        cx: &mut App,
1571    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1572        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1573    }
1574
1575    pub(crate) fn set_ignore_breakpoints(
1576        &mut self,
1577        ignore: bool,
1578        cx: &mut App,
1579    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1580        if self.ignore_breakpoints == ignore {
1581            return Task::ready(HashMap::default());
1582        }
1583
1584        self.ignore_breakpoints = ignore;
1585
1586        if let Some(local) = self.as_running() {
1587            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1588        } else {
1589            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1590            unimplemented!()
1591        }
1592    }
1593
1594    pub fn exception_breakpoints(
1595        &self,
1596    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1597        self.exception_breakpoints.values()
1598    }
1599
1600    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1601        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1602            *is_enabled = !*is_enabled;
1603            self.send_exception_breakpoints(cx);
1604        }
1605    }
1606
1607    fn send_exception_breakpoints(&mut self, cx: &App) {
1608        if let Some(local) = self.as_running() {
1609            let exception_filters = self
1610                .exception_breakpoints
1611                .values()
1612                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1613                .collect();
1614
1615            let supports_exception_filters = self
1616                .capabilities
1617                .supports_exception_filter_options
1618                .unwrap_or_default();
1619            local
1620                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1621                .detach_and_log_err(cx);
1622        } else {
1623            debug_assert!(false, "Not implemented");
1624        }
1625    }
1626
1627    pub fn breakpoints_enabled(&self) -> bool {
1628        self.ignore_breakpoints
1629    }
1630
1631    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1632        self.fetch(
1633            dap_command::LoadedSourcesCommand,
1634            |this, result, cx| {
1635                let result = result.log_err()?;
1636                this.loaded_sources = result.iter().cloned().collect();
1637                cx.emit(SessionEvent::LoadedSources);
1638                cx.notify();
1639                Some(result)
1640            },
1641            cx,
1642        );
1643
1644        &self.loaded_sources
1645    }
1646
1647    fn fallback_to_manual_restart(
1648        &mut self,
1649        res: Result<()>,
1650        cx: &mut Context<Self>,
1651    ) -> Option<()> {
1652        if res.log_err().is_none() {
1653            cx.emit(SessionStateEvent::Restart);
1654            return None;
1655        }
1656        Some(())
1657    }
1658
1659    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1660        res.log_err()?;
1661        Some(())
1662    }
1663
1664    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1665        thread_id: ThreadId,
1666    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1667    {
1668        move |this, response, cx| match response.log_err() {
1669            Some(response) => {
1670                this.breakpoint_store.update(cx, |store, cx| {
1671                    store.remove_active_position(Some(this.session_id()), cx)
1672                });
1673                Some(response)
1674            }
1675            None => {
1676                this.thread_states.stop_thread(thread_id);
1677                cx.notify();
1678                None
1679            }
1680        }
1681    }
1682
1683    fn clear_active_debug_line_response(
1684        &mut self,
1685        response: Result<()>,
1686        cx: &mut Context<Session>,
1687    ) -> Option<()> {
1688        response.log_err()?;
1689        self.clear_active_debug_line(cx);
1690        Some(())
1691    }
1692
1693    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1694        self.breakpoint_store.update(cx, |store, cx| {
1695            store.remove_active_position(Some(self.id), cx)
1696        });
1697    }
1698
1699    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1700        self.request(
1701            PauseCommand {
1702                thread_id: thread_id.0,
1703            },
1704            Self::empty_response,
1705            cx,
1706        )
1707        .detach();
1708    }
1709
1710    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1711        self.request(
1712            RestartStackFrameCommand { stack_frame_id },
1713            Self::empty_response,
1714            cx,
1715        )
1716        .detach();
1717    }
1718
1719    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1720        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1721            self.request(
1722                RestartCommand {
1723                    raw: args.unwrap_or(Value::Null),
1724                },
1725                Self::fallback_to_manual_restart,
1726                cx,
1727            )
1728            .detach();
1729        } else {
1730            cx.emit(SessionStateEvent::Restart);
1731        }
1732    }
1733
1734    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1735        let debug_adapter = self.adapter_client();
1736
1737        cx.background_spawn(async move {
1738            if let Some(client) = debug_adapter {
1739                client.shutdown().await.log_err();
1740            }
1741        })
1742    }
1743
1744    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1745        self.is_session_terminated = true;
1746        self.thread_states.exit_all_threads();
1747        cx.notify();
1748
1749        let task = if self
1750            .capabilities
1751            .supports_terminate_request
1752            .unwrap_or_default()
1753        {
1754            self.request(
1755                TerminateCommand {
1756                    restart: Some(false),
1757                },
1758                Self::clear_active_debug_line_response,
1759                cx,
1760            )
1761        } else {
1762            self.request(
1763                DisconnectCommand {
1764                    restart: Some(false),
1765                    terminate_debuggee: Some(true),
1766                    suspend_debuggee: Some(false),
1767                },
1768                Self::clear_active_debug_line_response,
1769                cx,
1770            )
1771        };
1772
1773        cx.emit(SessionStateEvent::Shutdown);
1774
1775        let debug_client = self.adapter_client();
1776
1777        cx.background_spawn(async move {
1778            let _ = task.await;
1779
1780            if let Some(client) = debug_client {
1781                client.shutdown().await.log_err();
1782            }
1783        })
1784    }
1785
1786    pub fn completions(
1787        &mut self,
1788        query: CompletionsQuery,
1789        cx: &mut Context<Self>,
1790    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1791        let task = self.request(query, |_, result, _| result.log_err(), cx);
1792
1793        cx.background_executor().spawn(async move {
1794            anyhow::Ok(
1795                task.await
1796                    .map(|response| response.targets)
1797                    .context("failed to fetch completions")?,
1798            )
1799        })
1800    }
1801
1802    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1803        self.thread_states.continue_thread(thread_id);
1804        self.request(
1805            ContinueCommand {
1806                args: ContinueArguments {
1807                    thread_id: thread_id.0,
1808                    single_thread: Some(true),
1809                },
1810            },
1811            Self::on_step_response::<ContinueCommand>(thread_id),
1812            cx,
1813        )
1814        .detach();
1815    }
1816
1817    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1818        match self.mode {
1819            Mode::Running(ref local) => Some(local.client.clone()),
1820            Mode::Building => None,
1821        }
1822    }
1823
1824    pub fn step_over(
1825        &mut self,
1826        thread_id: ThreadId,
1827        granularity: SteppingGranularity,
1828        cx: &mut Context<Self>,
1829    ) {
1830        let supports_single_thread_execution_requests =
1831            self.capabilities.supports_single_thread_execution_requests;
1832        let supports_stepping_granularity = self
1833            .capabilities
1834            .supports_stepping_granularity
1835            .unwrap_or_default();
1836
1837        let command = NextCommand {
1838            inner: StepCommand {
1839                thread_id: thread_id.0,
1840                granularity: supports_stepping_granularity.then(|| granularity),
1841                single_thread: supports_single_thread_execution_requests,
1842            },
1843        };
1844
1845        self.thread_states.process_step(thread_id);
1846        self.request(
1847            command,
1848            Self::on_step_response::<NextCommand>(thread_id),
1849            cx,
1850        )
1851        .detach();
1852    }
1853
1854    pub fn step_in(
1855        &mut self,
1856        thread_id: ThreadId,
1857        granularity: SteppingGranularity,
1858        cx: &mut Context<Self>,
1859    ) {
1860        let supports_single_thread_execution_requests =
1861            self.capabilities.supports_single_thread_execution_requests;
1862        let supports_stepping_granularity = self
1863            .capabilities
1864            .supports_stepping_granularity
1865            .unwrap_or_default();
1866
1867        let command = StepInCommand {
1868            inner: StepCommand {
1869                thread_id: thread_id.0,
1870                granularity: supports_stepping_granularity.then(|| granularity),
1871                single_thread: supports_single_thread_execution_requests,
1872            },
1873        };
1874
1875        self.thread_states.process_step(thread_id);
1876        self.request(
1877            command,
1878            Self::on_step_response::<StepInCommand>(thread_id),
1879            cx,
1880        )
1881        .detach();
1882    }
1883
1884    pub fn step_out(
1885        &mut self,
1886        thread_id: ThreadId,
1887        granularity: SteppingGranularity,
1888        cx: &mut Context<Self>,
1889    ) {
1890        let supports_single_thread_execution_requests =
1891            self.capabilities.supports_single_thread_execution_requests;
1892        let supports_stepping_granularity = self
1893            .capabilities
1894            .supports_stepping_granularity
1895            .unwrap_or_default();
1896
1897        let command = StepOutCommand {
1898            inner: StepCommand {
1899                thread_id: thread_id.0,
1900                granularity: supports_stepping_granularity.then(|| granularity),
1901                single_thread: supports_single_thread_execution_requests,
1902            },
1903        };
1904
1905        self.thread_states.process_step(thread_id);
1906        self.request(
1907            command,
1908            Self::on_step_response::<StepOutCommand>(thread_id),
1909            cx,
1910        )
1911        .detach();
1912    }
1913
1914    pub fn step_back(
1915        &mut self,
1916        thread_id: ThreadId,
1917        granularity: SteppingGranularity,
1918        cx: &mut Context<Self>,
1919    ) {
1920        let supports_single_thread_execution_requests =
1921            self.capabilities.supports_single_thread_execution_requests;
1922        let supports_stepping_granularity = self
1923            .capabilities
1924            .supports_stepping_granularity
1925            .unwrap_or_default();
1926
1927        let command = StepBackCommand {
1928            inner: StepCommand {
1929                thread_id: thread_id.0,
1930                granularity: supports_stepping_granularity.then(|| granularity),
1931                single_thread: supports_single_thread_execution_requests,
1932            },
1933        };
1934
1935        self.thread_states.process_step(thread_id);
1936
1937        self.request(
1938            command,
1939            Self::on_step_response::<StepBackCommand>(thread_id),
1940            cx,
1941        )
1942        .detach();
1943    }
1944
1945    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1946        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1947            && self.requests.contains_key(&ThreadsCommand.type_id())
1948            && self.threads.contains_key(&thread_id)
1949        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1950        // We could still be using an old thread id and have sent a new thread's request
1951        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1952        // But it very well could cause a minor bug in the future that is hard to track down
1953        {
1954            self.fetch(
1955                super::dap_command::StackTraceCommand {
1956                    thread_id: thread_id.0,
1957                    start_frame: None,
1958                    levels: None,
1959                },
1960                move |this, stack_frames, cx| {
1961                    let stack_frames = stack_frames.log_err()?;
1962
1963                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1964                        thread.stack_frame_ids =
1965                            stack_frames.iter().map(|frame| frame.id).collect();
1966                    });
1967                    debug_assert!(
1968                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1969                        "Sent request for thread_id that doesn't exist"
1970                    );
1971
1972                    this.stack_frames.extend(
1973                        stack_frames
1974                            .iter()
1975                            .cloned()
1976                            .map(|frame| (frame.id, StackFrame::from(frame))),
1977                    );
1978
1979                    this.invalidate_command_type::<ScopesCommand>();
1980                    this.invalidate_command_type::<VariablesCommand>();
1981
1982                    cx.emit(SessionEvent::StackTrace);
1983                    cx.notify();
1984                    Some(stack_frames)
1985                },
1986                cx,
1987            );
1988        }
1989
1990        self.threads
1991            .get(&thread_id)
1992            .map(|thread| {
1993                thread
1994                    .stack_frame_ids
1995                    .iter()
1996                    .filter_map(|id| self.stack_frames.get(id))
1997                    .cloned()
1998                    .collect()
1999            })
2000            .unwrap_or_default()
2001    }
2002
2003    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2004        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2005            && self
2006                .requests
2007                .contains_key(&TypeId::of::<StackTraceCommand>())
2008        {
2009            self.fetch(
2010                ScopesCommand { stack_frame_id },
2011                move |this, scopes, cx| {
2012                    let scopes = scopes.log_err()?;
2013
2014                    for scope in scopes .iter(){
2015                        this.variables(scope.variables_reference, cx);
2016                    }
2017
2018                    let entry = this
2019                        .stack_frames
2020                        .entry(stack_frame_id)
2021                        .and_modify(|stack_frame| {
2022                            stack_frame.scopes = scopes.clone();
2023                        });
2024
2025                    cx.emit(SessionEvent::Variables);
2026
2027                    debug_assert!(
2028                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2029                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2030                    );
2031
2032                    Some(scopes)
2033                },
2034                cx,
2035            );
2036        }
2037
2038        self.stack_frames
2039            .get(&stack_frame_id)
2040            .map(|frame| frame.scopes.as_slice())
2041            .unwrap_or_default()
2042    }
2043
2044    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
2045        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2046            return Vec::new();
2047        };
2048
2049        stack_frame
2050            .scopes
2051            .iter()
2052            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2053            .flatten()
2054            .cloned()
2055            .collect()
2056    }
2057
2058    pub fn variables(
2059        &mut self,
2060        variables_reference: VariableReference,
2061        cx: &mut Context<Self>,
2062    ) -> Vec<dap::Variable> {
2063        let command = VariablesCommand {
2064            variables_reference,
2065            filter: None,
2066            start: None,
2067            count: None,
2068            format: None,
2069        };
2070
2071        self.fetch(
2072            command,
2073            move |this, variables, cx| {
2074                let variables = variables.log_err()?;
2075                this.variables
2076                    .insert(variables_reference, variables.clone());
2077
2078                cx.emit(SessionEvent::Variables);
2079                cx.emit(SessionEvent::InvalidateInlineValue);
2080                Some(variables)
2081            },
2082            cx,
2083        );
2084
2085        self.variables
2086            .get(&variables_reference)
2087            .cloned()
2088            .unwrap_or_default()
2089    }
2090
2091    pub fn set_variable_value(
2092        &mut self,
2093        variables_reference: u64,
2094        name: String,
2095        value: String,
2096        cx: &mut Context<Self>,
2097    ) {
2098        if self.capabilities.supports_set_variable.unwrap_or_default() {
2099            self.request(
2100                SetVariableValueCommand {
2101                    name,
2102                    value,
2103                    variables_reference,
2104                },
2105                move |this, response, cx| {
2106                    let response = response.log_err()?;
2107                    this.invalidate_command_type::<VariablesCommand>();
2108                    cx.notify();
2109                    Some(response)
2110                },
2111                cx,
2112            )
2113            .detach()
2114        }
2115    }
2116
2117    pub fn evaluate(
2118        &mut self,
2119        expression: String,
2120        context: Option<EvaluateArgumentsContext>,
2121        frame_id: Option<u64>,
2122        source: Option<Source>,
2123        cx: &mut Context<Self>,
2124    ) -> Task<()> {
2125        let event = dap::OutputEvent {
2126            category: None,
2127            output: format!("> {expression}"),
2128            group: None,
2129            variables_reference: None,
2130            source: None,
2131            line: None,
2132            column: None,
2133            data: None,
2134            location_reference: None,
2135        };
2136        self.push_output(event, cx);
2137        let request = self.mode.request_dap(EvaluateCommand {
2138            expression,
2139            context,
2140            frame_id,
2141            source,
2142        });
2143        cx.spawn(async move |this, cx| {
2144            let response = request.await;
2145            this.update(cx, |this, cx| {
2146                match response {
2147                    Ok(response) => {
2148                        let event = dap::OutputEvent {
2149                            category: None,
2150                            output: format!("< {}", &response.result),
2151                            group: None,
2152                            variables_reference: Some(response.variables_reference),
2153                            source: None,
2154                            line: None,
2155                            column: None,
2156                            data: None,
2157                            location_reference: None,
2158                        };
2159                        this.push_output(event, cx);
2160                    }
2161                    Err(e) => {
2162                        let event = dap::OutputEvent {
2163                            category: None,
2164                            output: format!("{}", e),
2165                            group: None,
2166                            variables_reference: None,
2167                            source: None,
2168                            line: None,
2169                            column: None,
2170                            data: None,
2171                            location_reference: None,
2172                        };
2173                        this.push_output(event, cx);
2174                    }
2175                };
2176                this.invalidate_command_type::<ScopesCommand>();
2177                cx.notify();
2178            })
2179            .ok();
2180        })
2181    }
2182
2183    pub fn location(
2184        &mut self,
2185        reference: u64,
2186        cx: &mut Context<Self>,
2187    ) -> Option<dap::LocationsResponse> {
2188        self.fetch(
2189            LocationsCommand { reference },
2190            move |this, response, _| {
2191                let response = response.log_err()?;
2192                this.locations.insert(reference, response.clone());
2193                Some(response)
2194            },
2195            cx,
2196        );
2197        self.locations.get(&reference).cloned()
2198    }
2199
2200    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2201        let command = DisconnectCommand {
2202            restart: Some(false),
2203            terminate_debuggee: Some(true),
2204            suspend_debuggee: Some(false),
2205        };
2206
2207        self.request(command, Self::empty_response, cx).detach()
2208    }
2209
2210    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2211        if self
2212            .capabilities
2213            .supports_terminate_threads_request
2214            .unwrap_or_default()
2215        {
2216            self.request(
2217                TerminateThreadsCommand {
2218                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2219                },
2220                Self::clear_active_debug_line_response,
2221                cx,
2222            )
2223            .detach();
2224        } else {
2225            self.shutdown(cx).detach();
2226        }
2227    }
2228
2229    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2230        self.thread_states.thread_state(thread_id)
2231    }
2232}