session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap, IndexSet};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  29};
  30use futures::SinkExt;
  31use futures::channel::{mpsc, oneshot};
  32use futures::{FutureExt, future::Shared};
  33use gpui::{
  34    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  35    Task, WeakEntity,
  36};
  37
  38use serde_json::Value;
  39use smol::stream::StreamExt;
  40use std::any::TypeId;
  41use std::collections::BTreeMap;
  42use std::u64;
  43use std::{
  44    any::Any,
  45    collections::hash_map::Entry,
  46    hash::{Hash, Hasher},
  47    path::Path,
  48    sync::Arc,
  49};
  50use text::{PointUtf16, ToPointUtf16};
  51use util::ResultExt;
  52use worktree::Worktree;
  53
  54#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  55#[repr(transparent)]
  56pub struct ThreadId(pub u64);
  57
  58impl ThreadId {
  59    pub const MIN: ThreadId = ThreadId(u64::MIN);
  60    pub const MAX: ThreadId = ThreadId(u64::MAX);
  61}
  62
  63impl From<u64> for ThreadId {
  64    fn from(id: u64) -> Self {
  65        Self(id)
  66    }
  67}
  68
  69#[derive(Clone, Debug)]
  70pub struct StackFrame {
  71    pub dap: dap::StackFrame,
  72    pub scopes: Vec<dap::Scope>,
  73}
  74
  75impl From<dap::StackFrame> for StackFrame {
  76    fn from(stack_frame: dap::StackFrame) -> Self {
  77        Self {
  78            scopes: vec![],
  79            dap: stack_frame,
  80        }
  81    }
  82}
  83
  84#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  85pub enum ThreadStatus {
  86    #[default]
  87    Running,
  88    Stopped,
  89    Stepping,
  90    Exited,
  91    Ended,
  92}
  93
  94impl ThreadStatus {
  95    pub fn label(&self) -> &'static str {
  96        match self {
  97            ThreadStatus::Running => "Running",
  98            ThreadStatus::Stopped => "Stopped",
  99            ThreadStatus::Stepping => "Stepping",
 100            ThreadStatus::Exited => "Exited",
 101            ThreadStatus::Ended => "Ended",
 102        }
 103    }
 104}
 105
 106#[derive(Debug)]
 107pub struct Thread {
 108    dap: dap::Thread,
 109    stack_frame_ids: IndexSet<StackFrameId>,
 110    _has_stopped: bool,
 111}
 112
 113impl From<dap::Thread> for Thread {
 114    fn from(dap: dap::Thread) -> Self {
 115        Self {
 116            dap,
 117            stack_frame_ids: Default::default(),
 118            _has_stopped: false,
 119        }
 120    }
 121}
 122
 123pub enum Mode {
 124    Building,
 125    Running(RunningMode),
 126}
 127
 128#[derive(Clone)]
 129pub struct RunningMode {
 130    client: Arc<DebugAdapterClient>,
 131    binary: DebugAdapterBinary,
 132    tmp_breakpoint: Option<SourceBreakpoint>,
 133    worktree: WeakEntity<Worktree>,
 134    executor: BackgroundExecutor,
 135    is_started: bool,
 136}
 137
 138fn client_source(abs_path: &Path) -> dap::Source {
 139    dap::Source {
 140        name: abs_path
 141            .file_name()
 142            .map(|filename| filename.to_string_lossy().to_string()),
 143        path: Some(abs_path.to_string_lossy().to_string()),
 144        source_reference: None,
 145        presentation_hint: None,
 146        origin: None,
 147        sources: None,
 148        adapter_data: None,
 149        checksums: None,
 150    }
 151}
 152
 153impl RunningMode {
 154    async fn new(
 155        session_id: SessionId,
 156        parent_session: Option<Entity<Session>>,
 157        worktree: WeakEntity<Worktree>,
 158        binary: DebugAdapterBinary,
 159        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 160        cx: AsyncApp,
 161    ) -> Result<Self> {
 162        let message_handler = Box::new(move |message| {
 163            messages_tx.unbounded_send(message).ok();
 164        });
 165
 166        let client = Arc::new(
 167            if let Some(client) = parent_session
 168                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 169                .flatten()
 170            {
 171                client
 172                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 173                    .await?
 174            } else {
 175                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 176                    .await?
 177            },
 178        );
 179
 180        Ok(Self {
 181            client,
 182            worktree,
 183            tmp_breakpoint: None,
 184            binary,
 185            executor: cx.background_executor().clone(),
 186            is_started: false,
 187        })
 188    }
 189
 190    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 191        &self.worktree
 192    }
 193
 194    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 195        let tasks: Vec<_> = paths
 196            .into_iter()
 197            .map(|path| {
 198                self.request(dap_command::SetBreakpoints {
 199                    source: client_source(path),
 200                    source_modified: None,
 201                    breakpoints: vec![],
 202                })
 203            })
 204            .collect();
 205
 206        cx.background_spawn(async move {
 207            futures::future::join_all(tasks)
 208                .await
 209                .iter()
 210                .for_each(|res| match res {
 211                    Ok(_) => {}
 212                    Err(err) => {
 213                        log::warn!("Set breakpoints request failed: {}", err);
 214                    }
 215                });
 216        })
 217    }
 218
 219    fn send_breakpoints_from_path(
 220        &self,
 221        abs_path: Arc<Path>,
 222        reason: BreakpointUpdatedReason,
 223        breakpoint_store: &Entity<BreakpointStore>,
 224        cx: &mut App,
 225    ) -> Task<()> {
 226        let breakpoints =
 227            breakpoint_store
 228                .read(cx)
 229                .source_breakpoints_from_path(&abs_path, cx)
 230                .into_iter()
 231                .filter(|bp| bp.state.is_enabled())
 232                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 233                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 234                }))
 235                .map(Into::into)
 236                .collect();
 237
 238        let raw_breakpoints = breakpoint_store
 239            .read(cx)
 240            .breakpoints_from_path(&abs_path)
 241            .into_iter()
 242            .filter(|bp| bp.bp.state.is_enabled())
 243            .collect::<Vec<_>>();
 244
 245        let task = self.request(dap_command::SetBreakpoints {
 246            source: client_source(&abs_path),
 247            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 248            breakpoints,
 249        });
 250        let session_id = self.client.id();
 251        let breakpoint_store = breakpoint_store.downgrade();
 252        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 253            Ok(breakpoints) => {
 254                let breakpoints =
 255                    breakpoints
 256                        .into_iter()
 257                        .zip(raw_breakpoints)
 258                        .filter_map(|(dap_bp, zed_bp)| {
 259                            Some((
 260                                zed_bp,
 261                                BreakpointSessionState {
 262                                    id: dap_bp.id?,
 263                                    verified: dap_bp.verified,
 264                                },
 265                            ))
 266                        });
 267                breakpoint_store
 268                    .update(cx, |this, _| {
 269                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 270                    })
 271                    .ok();
 272            }
 273            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 274        })
 275    }
 276
 277    fn send_exception_breakpoints(
 278        &self,
 279        filters: Vec<ExceptionBreakpointsFilter>,
 280        supports_filter_options: bool,
 281    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 282        let arg = if supports_filter_options {
 283            SetExceptionBreakpoints::WithOptions {
 284                filters: filters
 285                    .into_iter()
 286                    .map(|filter| ExceptionFilterOptions {
 287                        filter_id: filter.filter,
 288                        condition: None,
 289                        mode: None,
 290                    })
 291                    .collect(),
 292            }
 293        } else {
 294            SetExceptionBreakpoints::Plain {
 295                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 296            }
 297        };
 298        self.request(arg)
 299    }
 300
 301    fn send_source_breakpoints(
 302        &self,
 303        ignore_breakpoints: bool,
 304        breakpoint_store: &Entity<BreakpointStore>,
 305        cx: &App,
 306    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 307        let mut breakpoint_tasks = Vec::new();
 308        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 309        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 310        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 311        let session_id = self.client.id();
 312        for (path, breakpoints) in breakpoints {
 313            let breakpoints = if ignore_breakpoints {
 314                vec![]
 315            } else {
 316                breakpoints
 317                    .into_iter()
 318                    .filter(|bp| bp.state.is_enabled())
 319                    .map(Into::into)
 320                    .collect()
 321            };
 322
 323            let raw_breakpoints = raw_breakpoints
 324                .remove(&path)
 325                .unwrap_or_default()
 326                .into_iter()
 327                .filter(|bp| bp.bp.state.is_enabled());
 328            let error_path = path.clone();
 329            let send_request = self
 330                .request(dap_command::SetBreakpoints {
 331                    source: client_source(&path),
 332                    source_modified: Some(false),
 333                    breakpoints,
 334                })
 335                .map(|result| result.map_err(move |e| (error_path, e)));
 336
 337            let task = cx.spawn({
 338                let breakpoint_store = breakpoint_store.downgrade();
 339                async move |cx| {
 340                    let breakpoints = cx.background_spawn(send_request).await?;
 341
 342                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 343                        |(dap_bp, zed_bp)| {
 344                            Some((
 345                                zed_bp,
 346                                BreakpointSessionState {
 347                                    id: dap_bp.id?,
 348                                    verified: dap_bp.verified,
 349                                },
 350                            ))
 351                        },
 352                    );
 353                    breakpoint_store
 354                        .update(cx, |this, _| {
 355                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 356                        })
 357                        .ok();
 358
 359                    Ok(())
 360                }
 361            });
 362            breakpoint_tasks.push(task);
 363        }
 364
 365        cx.background_spawn(async move {
 366            futures::future::join_all(breakpoint_tasks)
 367                .await
 368                .into_iter()
 369                .filter_map(Result::err)
 370                .collect::<HashMap<_, _>>()
 371        })
 372    }
 373
 374    fn initialize_sequence(
 375        &self,
 376        capabilities: &Capabilities,
 377        initialized_rx: oneshot::Receiver<()>,
 378        dap_store: WeakEntity<DapStore>,
 379        cx: &mut Context<Session>,
 380    ) -> Task<Result<()>> {
 381        let raw = self.binary.request_args.clone();
 382
 383        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 384        let launch = match raw.request {
 385            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 386                raw: raw.configuration,
 387            }),
 388            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 389                raw: raw.configuration,
 390            }),
 391        };
 392
 393        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 394        let exception_filters = capabilities
 395            .exception_breakpoint_filters
 396            .as_ref()
 397            .map(|exception_filters| {
 398                exception_filters
 399                    .iter()
 400                    .filter(|filter| filter.default == Some(true))
 401                    .cloned()
 402                    .collect::<Vec<_>>()
 403            })
 404            .unwrap_or_default();
 405        let supports_exception_filters = capabilities
 406            .supports_exception_filter_options
 407            .unwrap_or_default();
 408        let this = self.clone();
 409        let worktree = self.worktree().clone();
 410        let configuration_sequence = cx.spawn({
 411            async move |_, cx| {
 412                let breakpoint_store =
 413                    dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 414                initialized_rx.await?;
 415                let errors_by_path = cx
 416                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 417                    .await;
 418
 419                dap_store.update(cx, |_, cx| {
 420                    let Some(worktree) = worktree.upgrade() else {
 421                        return;
 422                    };
 423
 424                    for (path, error) in &errors_by_path {
 425                        log::error!("failed to set breakpoints for {path:?}: {error}");
 426                    }
 427
 428                    if let Some(failed_path) = errors_by_path.keys().next() {
 429                        let failed_path = failed_path
 430                            .strip_prefix(worktree.read(cx).abs_path())
 431                            .unwrap_or(failed_path)
 432                            .display();
 433                        let message = format!(
 434                            "Failed to set breakpoints for {failed_path}{}",
 435                            match errors_by_path.len() {
 436                                0 => unreachable!(),
 437                                1 => "".into(),
 438                                2 => " and 1 other path".into(),
 439                                n => format!(" and {} other paths", n - 1),
 440                            }
 441                        );
 442                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 443                    }
 444                })?;
 445
 446                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 447                    .await
 448                    .ok();
 449                let ret = if configuration_done_supported {
 450                    this.request(ConfigurationDone {})
 451                } else {
 452                    Task::ready(Ok(()))
 453                }
 454                .await;
 455                ret
 456            }
 457        });
 458
 459        let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
 460
 461        cx.spawn(async move |this, cx| {
 462            let result = task.await;
 463
 464            this.update(cx, |this, cx| {
 465                if let Some(this) = this.as_running_mut() {
 466                    this.is_started = true;
 467                    cx.notify();
 468                }
 469            })
 470            .ok();
 471
 472            result?;
 473            anyhow::Ok(())
 474        })
 475    }
 476
 477    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 478    where
 479        <R::DapRequest as dap::requests::Request>::Response: 'static,
 480        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 481    {
 482        let request = Arc::new(request);
 483
 484        let request_clone = request.clone();
 485        let connection = self.client.clone();
 486        self.executor.spawn(async move {
 487            let args = request_clone.to_dap();
 488            let response = connection.request::<R::DapRequest>(args).await?;
 489            request.response_from_dap(response)
 490        })
 491    }
 492}
 493
 494impl Mode {
 495    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 496    where
 497        <R::DapRequest as dap::requests::Request>::Response: 'static,
 498        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 499    {
 500        match self {
 501            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 502            Mode::Building => Task::ready(Err(anyhow!(
 503                "no adapter running to send request: {request:?}"
 504            ))),
 505        }
 506    }
 507}
 508
 509#[derive(Default)]
 510struct ThreadStates {
 511    global_state: Option<ThreadStatus>,
 512    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 513}
 514
 515impl ThreadStates {
 516    fn stop_all_threads(&mut self) {
 517        self.global_state = Some(ThreadStatus::Stopped);
 518        self.known_thread_states.clear();
 519    }
 520
 521    fn exit_all_threads(&mut self) {
 522        self.global_state = Some(ThreadStatus::Exited);
 523        self.known_thread_states.clear();
 524    }
 525
 526    fn continue_all_threads(&mut self) {
 527        self.global_state = Some(ThreadStatus::Running);
 528        self.known_thread_states.clear();
 529    }
 530
 531    fn stop_thread(&mut self, thread_id: ThreadId) {
 532        self.known_thread_states
 533            .insert(thread_id, ThreadStatus::Stopped);
 534    }
 535
 536    fn continue_thread(&mut self, thread_id: ThreadId) {
 537        self.known_thread_states
 538            .insert(thread_id, ThreadStatus::Running);
 539    }
 540
 541    fn process_step(&mut self, thread_id: ThreadId) {
 542        self.known_thread_states
 543            .insert(thread_id, ThreadStatus::Stepping);
 544    }
 545
 546    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 547        self.thread_state(thread_id)
 548            .unwrap_or(ThreadStatus::Running)
 549    }
 550
 551    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 552        self.known_thread_states
 553            .get(&thread_id)
 554            .copied()
 555            .or(self.global_state)
 556    }
 557
 558    fn exit_thread(&mut self, thread_id: ThreadId) {
 559        self.known_thread_states
 560            .insert(thread_id, ThreadStatus::Exited);
 561    }
 562
 563    fn any_stopped_thread(&self) -> bool {
 564        self.global_state
 565            .is_some_and(|state| state == ThreadStatus::Stopped)
 566            || self
 567                .known_thread_states
 568                .values()
 569                .any(|status| *status == ThreadStatus::Stopped)
 570    }
 571}
 572const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 573
 574type IsEnabled = bool;
 575
 576#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 577pub struct OutputToken(pub usize);
 578/// Represents a current state of a single debug adapter and provides ways to mutate it.
 579pub struct Session {
 580    pub mode: Mode,
 581    id: SessionId,
 582    label: SharedString,
 583    adapter: DebugAdapterName,
 584    pub(super) capabilities: Capabilities,
 585    child_session_ids: HashSet<SessionId>,
 586    parent_session: Option<Entity<Session>>,
 587    modules: Vec<dap::Module>,
 588    loaded_sources: Vec<dap::Source>,
 589    output_token: OutputToken,
 590    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 591    threads: IndexMap<ThreadId, Thread>,
 592    thread_states: ThreadStates,
 593    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 594    stack_frames: IndexMap<StackFrameId, StackFrame>,
 595    locations: HashMap<u64, dap::LocationsResponse>,
 596    is_session_terminated: bool,
 597    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 598    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 599    ignore_breakpoints: bool,
 600    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 601    background_tasks: Vec<Task<()>>,
 602}
 603
 604trait CacheableCommand: Any + Send + Sync {
 605    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 606    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 607    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 608}
 609
 610impl<T> CacheableCommand for T
 611where
 612    T: DapCommand + PartialEq + Eq + Hash,
 613{
 614    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 615        (rhs as &dyn Any)
 616            .downcast_ref::<Self>()
 617            .map_or(false, |rhs| self == rhs)
 618    }
 619
 620    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 621        T::hash(self, &mut hasher);
 622    }
 623
 624    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 625        self
 626    }
 627}
 628
 629pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 630
 631impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 632    fn from(request: T) -> Self {
 633        Self(Arc::new(request))
 634    }
 635}
 636
 637impl PartialEq for RequestSlot {
 638    fn eq(&self, other: &Self) -> bool {
 639        self.0.dyn_eq(other.0.as_ref())
 640    }
 641}
 642
 643impl Eq for RequestSlot {}
 644
 645impl Hash for RequestSlot {
 646    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 647        self.0.dyn_hash(state);
 648        (&*self.0 as &dyn Any).type_id().hash(state)
 649    }
 650}
 651
 652#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 653pub struct CompletionsQuery {
 654    pub query: String,
 655    pub column: u64,
 656    pub line: Option<u64>,
 657    pub frame_id: Option<u64>,
 658}
 659
 660impl CompletionsQuery {
 661    pub fn new(
 662        buffer: &language::Buffer,
 663        cursor_position: language::Anchor,
 664        frame_id: Option<u64>,
 665    ) -> Self {
 666        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 667        Self {
 668            query: buffer.text(),
 669            column: column as u64,
 670            frame_id,
 671            line: Some(row as u64),
 672        }
 673    }
 674}
 675
 676#[derive(Debug)]
 677pub enum SessionEvent {
 678    Modules,
 679    LoadedSources,
 680    Stopped(Option<ThreadId>),
 681    StackTrace,
 682    Variables,
 683    Threads,
 684    InvalidateInlineValue,
 685    CapabilitiesLoaded,
 686    RunInTerminal {
 687        request: RunInTerminalRequestArguments,
 688        sender: mpsc::Sender<Result<u32>>,
 689    },
 690    ConsoleOutput,
 691}
 692
 693#[derive(Clone, Debug, PartialEq, Eq)]
 694pub enum SessionStateEvent {
 695    Running,
 696    Shutdown,
 697    Restart,
 698    SpawnChildSession {
 699        request: StartDebuggingRequestArguments,
 700    },
 701}
 702
 703impl EventEmitter<SessionEvent> for Session {}
 704impl EventEmitter<SessionStateEvent> for Session {}
 705
 706// local session will send breakpoint updates to DAP for all new breakpoints
 707// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 708// BreakpointStore notifies session on breakpoint changes
 709impl Session {
 710    pub(crate) fn new(
 711        breakpoint_store: Entity<BreakpointStore>,
 712        session_id: SessionId,
 713        parent_session: Option<Entity<Session>>,
 714        label: SharedString,
 715        adapter: DebugAdapterName,
 716        cx: &mut App,
 717    ) -> Entity<Self> {
 718        cx.new::<Self>(|cx| {
 719            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 720                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 721                    if let Some(local) = (!this.ignore_breakpoints)
 722                        .then(|| this.as_running_mut())
 723                        .flatten()
 724                    {
 725                        local
 726                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 727                            .detach();
 728                    };
 729                }
 730                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 731                    if let Some(local) = (!this.ignore_breakpoints)
 732                        .then(|| this.as_running_mut())
 733                        .flatten()
 734                    {
 735                        local.unset_breakpoints_from_paths(paths, cx).detach();
 736                    }
 737                }
 738                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 739            })
 740            .detach();
 741            cx.on_app_quit(Self::on_app_quit).detach();
 742
 743            let this = Self {
 744                mode: Mode::Building,
 745                id: session_id,
 746                child_session_ids: HashSet::default(),
 747                parent_session,
 748                capabilities: Capabilities::default(),
 749                variables: Default::default(),
 750                stack_frames: Default::default(),
 751                thread_states: ThreadStates::default(),
 752                output_token: OutputToken(0),
 753                output: circular_buffer::CircularBuffer::boxed(),
 754                requests: HashMap::default(),
 755                modules: Vec::default(),
 756                loaded_sources: Vec::default(),
 757                threads: IndexMap::default(),
 758                background_tasks: Vec::default(),
 759                locations: Default::default(),
 760                is_session_terminated: false,
 761                ignore_breakpoints: false,
 762                breakpoint_store,
 763                exception_breakpoints: Default::default(),
 764                label,
 765                adapter,
 766            };
 767
 768            this
 769        })
 770    }
 771
 772    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 773        match &self.mode {
 774            Mode::Building => None,
 775            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 776        }
 777    }
 778
 779    pub fn boot(
 780        &mut self,
 781        binary: DebugAdapterBinary,
 782        worktree: Entity<Worktree>,
 783        dap_store: WeakEntity<DapStore>,
 784        cx: &mut Context<Self>,
 785    ) -> Task<Result<()>> {
 786        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 787        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 788
 789        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 790            let mut initialized_tx = Some(initialized_tx);
 791            while let Some(message) = message_rx.next().await {
 792                if let Message::Event(event) = message {
 793                    if let Events::Initialized(_) = *event {
 794                        if let Some(tx) = initialized_tx.take() {
 795                            tx.send(()).ok();
 796                        }
 797                    } else {
 798                        let Ok(_) = this.update(cx, |session, cx| {
 799                            session.handle_dap_event(event, cx);
 800                        }) else {
 801                            break;
 802                        };
 803                    }
 804                } else if let Message::Request(request) = message {
 805                    let Ok(_) = this.update(cx, |this, cx| {
 806                        if request.command == StartDebugging::COMMAND {
 807                            this.handle_start_debugging_request(request, cx)
 808                                .detach_and_log_err(cx);
 809                        } else if request.command == RunInTerminal::COMMAND {
 810                            this.handle_run_in_terminal_request(request, cx)
 811                                .detach_and_log_err(cx);
 812                        }
 813                    }) else {
 814                        break;
 815                    };
 816                }
 817            }
 818        })];
 819        self.background_tasks = background_tasks;
 820        let id = self.id;
 821        let parent_session = self.parent_session.clone();
 822
 823        cx.spawn(async move |this, cx| {
 824            let mode = RunningMode::new(
 825                id,
 826                parent_session,
 827                worktree.downgrade(),
 828                binary.clone(),
 829                message_tx,
 830                cx.clone(),
 831            )
 832            .await?;
 833            this.update(cx, |this, cx| {
 834                this.mode = Mode::Running(mode);
 835                cx.emit(SessionStateEvent::Running);
 836            })?;
 837
 838            this.update(cx, |session, cx| session.request_initialize(cx))?
 839                .await?;
 840
 841            let result = this
 842                .update(cx, |session, cx| {
 843                    session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 844                })?
 845                .await;
 846
 847            if result.is_err() {
 848                let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
 849
 850                console
 851                    .send(format!(
 852                        "Tried to launch debugger with: {}",
 853                        serde_json::to_string_pretty(&binary.request_args.configuration)
 854                            .unwrap_or_default(),
 855                    ))
 856                    .await
 857                    .ok();
 858            }
 859
 860            result
 861        })
 862    }
 863
 864    pub fn session_id(&self) -> SessionId {
 865        self.id
 866    }
 867
 868    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 869        self.child_session_ids.clone()
 870    }
 871
 872    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 873        self.child_session_ids.insert(session_id);
 874    }
 875
 876    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 877        self.child_session_ids.remove(&session_id);
 878    }
 879
 880    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 881        self.parent_session
 882            .as_ref()
 883            .map(|session| session.read(cx).id)
 884    }
 885
 886    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 887        self.parent_session.as_ref()
 888    }
 889
 890    pub fn capabilities(&self) -> &Capabilities {
 891        &self.capabilities
 892    }
 893
 894    pub fn binary(&self) -> &DebugAdapterBinary {
 895        let Mode::Running(local_mode) = &self.mode else {
 896            panic!("Session is not running");
 897        };
 898        &local_mode.binary
 899    }
 900
 901    pub fn adapter(&self) -> DebugAdapterName {
 902        self.adapter.clone()
 903    }
 904
 905    pub fn label(&self) -> SharedString {
 906        self.label.clone()
 907    }
 908
 909    pub fn is_terminated(&self) -> bool {
 910        self.is_session_terminated
 911    }
 912
 913    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
 914        let (tx, mut rx) = mpsc::unbounded();
 915
 916        cx.spawn(async move |this, cx| {
 917            while let Some(output) = rx.next().await {
 918                this.update(cx, |this, cx| {
 919                    let event = dap::OutputEvent {
 920                        category: None,
 921                        output,
 922                        group: None,
 923                        variables_reference: None,
 924                        source: None,
 925                        line: None,
 926                        column: None,
 927                        data: None,
 928                        location_reference: None,
 929                    };
 930                    this.push_output(event, cx);
 931                })?;
 932            }
 933            anyhow::Ok(())
 934        })
 935        .detach();
 936
 937        return tx;
 938    }
 939
 940    pub fn is_started(&self) -> bool {
 941        match &self.mode {
 942            Mode::Building => false,
 943            Mode::Running(running) => running.is_started,
 944        }
 945    }
 946
 947    pub fn is_building(&self) -> bool {
 948        matches!(self.mode, Mode::Building)
 949    }
 950
 951    pub fn is_running(&self) -> bool {
 952        matches!(self.mode, Mode::Running(_))
 953    }
 954
 955    pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
 956        match &mut self.mode {
 957            Mode::Running(local_mode) => Some(local_mode),
 958            Mode::Building => None,
 959        }
 960    }
 961
 962    pub fn as_running(&self) -> Option<&RunningMode> {
 963        match &self.mode {
 964            Mode::Running(local_mode) => Some(local_mode),
 965            Mode::Building => None,
 966        }
 967    }
 968
 969    fn handle_start_debugging_request(
 970        &mut self,
 971        request: dap::messages::Request,
 972        cx: &mut Context<Self>,
 973    ) -> Task<Result<()>> {
 974        let request_seq = request.seq;
 975
 976        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 977            .arguments
 978            .as_ref()
 979            .map(|value| serde_json::from_value(value.clone()));
 980
 981        let mut success = true;
 982        if let Some(Ok(request)) = launch_request {
 983            cx.emit(SessionStateEvent::SpawnChildSession { request });
 984        } else {
 985            log::error!(
 986                "Failed to parse launch request arguments: {:?}",
 987                request.arguments
 988            );
 989            success = false;
 990        }
 991
 992        cx.spawn(async move |this, cx| {
 993            this.update(cx, |this, cx| {
 994                this.respond_to_client(
 995                    request_seq,
 996                    success,
 997                    StartDebugging::COMMAND.to_string(),
 998                    None,
 999                    cx,
1000                )
1001            })?
1002            .await
1003        })
1004    }
1005
1006    fn handle_run_in_terminal_request(
1007        &mut self,
1008        request: dap::messages::Request,
1009        cx: &mut Context<Self>,
1010    ) -> Task<Result<()>> {
1011        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
1012            request.arguments.unwrap_or_default(),
1013        )
1014        .expect("To parse StartDebuggingRequestArguments");
1015
1016        let seq = request.seq;
1017
1018        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1019        cx.emit(SessionEvent::RunInTerminal {
1020            request: request_args,
1021            sender: tx,
1022        });
1023        cx.notify();
1024
1025        cx.spawn(async move |session, cx| {
1026            let result = util::maybe!(async move {
1027                rx.next().await.ok_or_else(|| {
1028                    anyhow!("failed to receive response from spawn terminal".to_string())
1029                })?
1030            })
1031            .await;
1032            let (success, body) = match result {
1033                Ok(pid) => (
1034                    true,
1035                    serde_json::to_value(dap::RunInTerminalResponse {
1036                        process_id: None,
1037                        shell_process_id: Some(pid as u64),
1038                    })
1039                    .ok(),
1040                ),
1041                Err(error) => (
1042                    false,
1043                    serde_json::to_value(dap::ErrorResponse {
1044                        error: Some(dap::Message {
1045                            id: seq,
1046                            format: error.to_string(),
1047                            variables: None,
1048                            send_telemetry: None,
1049                            show_user: None,
1050                            url: None,
1051                            url_label: None,
1052                        }),
1053                    })
1054                    .ok(),
1055                ),
1056            };
1057
1058            session
1059                .update(cx, |session, cx| {
1060                    session.respond_to_client(
1061                        seq,
1062                        success,
1063                        RunInTerminal::COMMAND.to_string(),
1064                        body,
1065                        cx,
1066                    )
1067                })?
1068                .await
1069        })
1070    }
1071
1072    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1073        let adapter_id = self.adapter().to_string();
1074        let request = Initialize { adapter_id };
1075        match &self.mode {
1076            Mode::Running(local_mode) => {
1077                let capabilities = local_mode.request(request);
1078
1079                cx.spawn(async move |this, cx| {
1080                    let capabilities = capabilities.await?;
1081                    this.update(cx, |session, cx| {
1082                        session.capabilities = capabilities;
1083                        let filters = session
1084                            .capabilities
1085                            .exception_breakpoint_filters
1086                            .clone()
1087                            .unwrap_or_default();
1088                        for filter in filters {
1089                            let default = filter.default.unwrap_or_default();
1090                            session
1091                                .exception_breakpoints
1092                                .entry(filter.filter.clone())
1093                                .or_insert_with(|| (filter, default));
1094                        }
1095                        cx.emit(SessionEvent::CapabilitiesLoaded);
1096                    })?;
1097                    Ok(())
1098                })
1099            }
1100            Mode::Building => Task::ready(Err(anyhow!(
1101                "Cannot send initialize request, task still building"
1102            ))),
1103        }
1104    }
1105
1106    pub(super) fn initialize_sequence(
1107        &mut self,
1108        initialize_rx: oneshot::Receiver<()>,
1109        dap_store: WeakEntity<DapStore>,
1110        cx: &mut Context<Self>,
1111    ) -> Task<Result<()>> {
1112        match &self.mode {
1113            Mode::Running(local_mode) => {
1114                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1115            }
1116            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1117        }
1118    }
1119
1120    pub fn run_to_position(
1121        &mut self,
1122        breakpoint: SourceBreakpoint,
1123        active_thread_id: ThreadId,
1124        cx: &mut Context<Self>,
1125    ) {
1126        match &mut self.mode {
1127            Mode::Running(local_mode) => {
1128                if !matches!(
1129                    self.thread_states.thread_state(active_thread_id),
1130                    Some(ThreadStatus::Stopped)
1131                ) {
1132                    return;
1133                };
1134                let path = breakpoint.path.clone();
1135                local_mode.tmp_breakpoint = Some(breakpoint);
1136                let task = local_mode.send_breakpoints_from_path(
1137                    path,
1138                    BreakpointUpdatedReason::Toggled,
1139                    &self.breakpoint_store,
1140                    cx,
1141                );
1142
1143                cx.spawn(async move |this, cx| {
1144                    task.await;
1145                    this.update(cx, |this, cx| {
1146                        this.continue_thread(active_thread_id, cx);
1147                    })
1148                })
1149                .detach();
1150            }
1151            Mode::Building => {}
1152        }
1153    }
1154
1155    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1156        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1157    }
1158
1159    pub fn output(
1160        &self,
1161        since: OutputToken,
1162    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1163        if self.output_token.0 == 0 {
1164            return (self.output.range(0..0), OutputToken(0));
1165        };
1166
1167        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1168
1169        let clamped_events_since = events_since.clamp(0, self.output.len());
1170        (
1171            self.output
1172                .range(self.output.len() - clamped_events_since..),
1173            self.output_token,
1174        )
1175    }
1176
1177    pub fn respond_to_client(
1178        &self,
1179        request_seq: u64,
1180        success: bool,
1181        command: String,
1182        body: Option<serde_json::Value>,
1183        cx: &mut Context<Self>,
1184    ) -> Task<Result<()>> {
1185        let Some(local_session) = self.as_running() else {
1186            unreachable!("Cannot respond to remote client");
1187        };
1188        let client = local_session.client.clone();
1189
1190        cx.background_spawn(async move {
1191            client
1192                .send_message(Message::Response(Response {
1193                    body,
1194                    success,
1195                    command,
1196                    seq: request_seq + 1,
1197                    request_seq,
1198                    message: None,
1199                }))
1200                .await
1201        })
1202    }
1203
1204    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1205        // todo(debugger): Find a clean way to get around the clone
1206        let breakpoint_store = self.breakpoint_store.clone();
1207        if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1208            let breakpoint = local.tmp_breakpoint.take()?;
1209            let path = breakpoint.path.clone();
1210            Some((local, path))
1211        }) {
1212            local
1213                .send_breakpoints_from_path(
1214                    path,
1215                    BreakpointUpdatedReason::Toggled,
1216                    &breakpoint_store,
1217                    cx,
1218                )
1219                .detach();
1220        };
1221
1222        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1223            self.thread_states.stop_all_threads();
1224
1225            self.invalidate_command_type::<StackTraceCommand>();
1226        }
1227
1228        // Event if we stopped all threads we still need to insert the thread_id
1229        // to our own data
1230        if let Some(thread_id) = event.thread_id {
1231            self.thread_states.stop_thread(ThreadId(thread_id));
1232
1233            self.invalidate_state(
1234                &StackTraceCommand {
1235                    thread_id,
1236                    start_frame: None,
1237                    levels: None,
1238                }
1239                .into(),
1240            );
1241        }
1242
1243        self.invalidate_generic();
1244        self.threads.clear();
1245        self.variables.clear();
1246        cx.emit(SessionEvent::Stopped(
1247            event
1248                .thread_id
1249                .map(Into::into)
1250                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1251        ));
1252        cx.emit(SessionEvent::InvalidateInlineValue);
1253        cx.notify();
1254    }
1255
1256    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1257        match *event {
1258            Events::Initialized(_) => {
1259                debug_assert!(
1260                    false,
1261                    "Initialized event should have been handled in LocalMode"
1262                );
1263            }
1264            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1265            Events::Continued(event) => {
1266                if event.all_threads_continued.unwrap_or_default() {
1267                    self.thread_states.continue_all_threads();
1268                    self.breakpoint_store.update(cx, |store, cx| {
1269                        store.remove_active_position(Some(self.session_id()), cx)
1270                    });
1271                } else {
1272                    self.thread_states
1273                        .continue_thread(ThreadId(event.thread_id));
1274                }
1275                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1276                self.invalidate_generic();
1277            }
1278            Events::Exited(_event) => {
1279                self.clear_active_debug_line(cx);
1280            }
1281            Events::Terminated(_) => {
1282                self.shutdown(cx).detach();
1283            }
1284            Events::Thread(event) => {
1285                let thread_id = ThreadId(event.thread_id);
1286
1287                match event.reason {
1288                    dap::ThreadEventReason::Started => {
1289                        self.thread_states.continue_thread(thread_id);
1290                    }
1291                    dap::ThreadEventReason::Exited => {
1292                        self.thread_states.exit_thread(thread_id);
1293                    }
1294                    reason => {
1295                        log::error!("Unhandled thread event reason {:?}", reason);
1296                    }
1297                }
1298                self.invalidate_state(&ThreadsCommand.into());
1299                cx.notify();
1300            }
1301            Events::Output(event) => {
1302                if event
1303                    .category
1304                    .as_ref()
1305                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1306                {
1307                    return;
1308                }
1309
1310                self.push_output(event, cx);
1311                cx.notify();
1312            }
1313            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1314                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1315            }),
1316            Events::Module(event) => {
1317                match event.reason {
1318                    dap::ModuleEventReason::New => {
1319                        self.modules.push(event.module);
1320                    }
1321                    dap::ModuleEventReason::Changed => {
1322                        if let Some(module) = self
1323                            .modules
1324                            .iter_mut()
1325                            .find(|other| event.module.id == other.id)
1326                        {
1327                            *module = event.module;
1328                        }
1329                    }
1330                    dap::ModuleEventReason::Removed => {
1331                        self.modules.retain(|other| event.module.id != other.id);
1332                    }
1333                }
1334
1335                // todo(debugger): We should only send the invalidate command to downstream clients.
1336                // self.invalidate_state(&ModulesCommand.into());
1337            }
1338            Events::LoadedSource(_) => {
1339                self.invalidate_state(&LoadedSourcesCommand.into());
1340            }
1341            Events::Capabilities(event) => {
1342                self.capabilities = self.capabilities.merge(event.capabilities);
1343                cx.notify();
1344            }
1345            Events::Memory(_) => {}
1346            Events::Process(_) => {}
1347            Events::ProgressEnd(_) => {}
1348            Events::ProgressStart(_) => {}
1349            Events::ProgressUpdate(_) => {}
1350            Events::Invalidated(_) => {}
1351            Events::Other(_) => {}
1352        }
1353    }
1354
1355    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1356    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1357        &mut self,
1358        request: T,
1359        process_result: impl FnOnce(
1360            &mut Self,
1361            Result<T::Response>,
1362            &mut Context<Self>,
1363        ) -> Option<T::Response>
1364        + 'static,
1365        cx: &mut Context<Self>,
1366    ) {
1367        const {
1368            assert!(
1369                T::CACHEABLE,
1370                "Only requests marked as cacheable should invoke `fetch`"
1371            );
1372        }
1373
1374        if !self.thread_states.any_stopped_thread()
1375            && request.type_id() != TypeId::of::<ThreadsCommand>()
1376            || self.is_session_terminated
1377        {
1378            return;
1379        }
1380
1381        let request_map = self
1382            .requests
1383            .entry(std::any::TypeId::of::<T>())
1384            .or_default();
1385
1386        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1387            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1388
1389            let task = Self::request_inner::<Arc<T>>(
1390                &self.capabilities,
1391                &self.mode,
1392                command,
1393                process_result,
1394                cx,
1395            );
1396            let task = cx
1397                .background_executor()
1398                .spawn(async move {
1399                    let _ = task.await?;
1400                    Some(())
1401                })
1402                .shared();
1403
1404            vacant.insert(task);
1405            cx.notify();
1406        }
1407    }
1408
1409    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1410        &self,
1411        request: T,
1412    ) -> Result<T::Response> {
1413        if !T::is_supported(&self.capabilities) {
1414            anyhow::bail!("DAP request {:?} is not supported", request);
1415        }
1416
1417        self.mode.request_dap(request).await
1418    }
1419
1420    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1421        capabilities: &Capabilities,
1422        mode: &Mode,
1423        request: T,
1424        process_result: impl FnOnce(
1425            &mut Self,
1426            Result<T::Response>,
1427            &mut Context<Self>,
1428        ) -> Option<T::Response>
1429        + 'static,
1430        cx: &mut Context<Self>,
1431    ) -> Task<Option<T::Response>> {
1432        if !T::is_supported(&capabilities) {
1433            log::warn!(
1434                "Attempted to send a DAP request that isn't supported: {:?}",
1435                request
1436            );
1437            let error = Err(anyhow::Error::msg(
1438                "Couldn't complete request because it's not supported",
1439            ));
1440            return cx.spawn(async move |this, cx| {
1441                this.update(cx, |this, cx| process_result(this, error, cx))
1442                    .ok()
1443                    .flatten()
1444            });
1445        }
1446
1447        let request = mode.request_dap(request);
1448        cx.spawn(async move |this, cx| {
1449            let result = request.await;
1450            this.update(cx, |this, cx| process_result(this, result, cx))
1451                .ok()
1452                .flatten()
1453        })
1454    }
1455
1456    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1457        &self,
1458        request: T,
1459        process_result: impl FnOnce(
1460            &mut Self,
1461            Result<T::Response>,
1462            &mut Context<Self>,
1463        ) -> Option<T::Response>
1464        + 'static,
1465        cx: &mut Context<Self>,
1466    ) -> Task<Option<T::Response>> {
1467        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1468    }
1469
1470    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1471        self.requests.remove(&std::any::TypeId::of::<Command>());
1472    }
1473
1474    fn invalidate_generic(&mut self) {
1475        self.invalidate_command_type::<ModulesCommand>();
1476        self.invalidate_command_type::<LoadedSourcesCommand>();
1477        self.invalidate_command_type::<ThreadsCommand>();
1478    }
1479
1480    fn invalidate_state(&mut self, key: &RequestSlot) {
1481        self.requests
1482            .entry((&*key.0 as &dyn Any).type_id())
1483            .and_modify(|request_map| {
1484                request_map.remove(&key);
1485            });
1486    }
1487
1488    fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1489        self.output.push_back(event);
1490        self.output_token.0 += 1;
1491        cx.emit(SessionEvent::ConsoleOutput);
1492    }
1493
1494    pub fn any_stopped_thread(&self) -> bool {
1495        self.thread_states.any_stopped_thread()
1496    }
1497
1498    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1499        self.thread_states.thread_status(thread_id)
1500    }
1501
1502    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1503        self.fetch(
1504            dap_command::ThreadsCommand,
1505            |this, result, cx| {
1506                let result = result.log_err()?;
1507
1508                this.threads = result
1509                    .iter()
1510                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1511                    .collect();
1512
1513                this.invalidate_command_type::<StackTraceCommand>();
1514                cx.emit(SessionEvent::Threads);
1515                cx.notify();
1516
1517                Some(result)
1518            },
1519            cx,
1520        );
1521
1522        self.threads
1523            .values()
1524            .map(|thread| {
1525                (
1526                    thread.dap.clone(),
1527                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1528                )
1529            })
1530            .collect()
1531    }
1532
1533    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1534        self.fetch(
1535            dap_command::ModulesCommand,
1536            |this, result, cx| {
1537                let result = result.log_err()?;
1538
1539                this.modules = result.iter().cloned().collect();
1540                cx.emit(SessionEvent::Modules);
1541                cx.notify();
1542
1543                Some(result)
1544            },
1545            cx,
1546        );
1547
1548        &self.modules
1549    }
1550
1551    pub fn ignore_breakpoints(&self) -> bool {
1552        self.ignore_breakpoints
1553    }
1554
1555    pub fn toggle_ignore_breakpoints(
1556        &mut self,
1557        cx: &mut App,
1558    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1559        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1560    }
1561
1562    pub(crate) fn set_ignore_breakpoints(
1563        &mut self,
1564        ignore: bool,
1565        cx: &mut App,
1566    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1567        if self.ignore_breakpoints == ignore {
1568            return Task::ready(HashMap::default());
1569        }
1570
1571        self.ignore_breakpoints = ignore;
1572
1573        if let Some(local) = self.as_running() {
1574            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1575        } else {
1576            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1577            unimplemented!()
1578        }
1579    }
1580
1581    pub fn exception_breakpoints(
1582        &self,
1583    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1584        self.exception_breakpoints.values()
1585    }
1586
1587    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1588        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1589            *is_enabled = !*is_enabled;
1590            self.send_exception_breakpoints(cx);
1591        }
1592    }
1593
1594    fn send_exception_breakpoints(&mut self, cx: &App) {
1595        if let Some(local) = self.as_running() {
1596            let exception_filters = self
1597                .exception_breakpoints
1598                .values()
1599                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1600                .collect();
1601
1602            let supports_exception_filters = self
1603                .capabilities
1604                .supports_exception_filter_options
1605                .unwrap_or_default();
1606            local
1607                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1608                .detach_and_log_err(cx);
1609        } else {
1610            debug_assert!(false, "Not implemented");
1611        }
1612    }
1613
1614    pub fn breakpoints_enabled(&self) -> bool {
1615        self.ignore_breakpoints
1616    }
1617
1618    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1619        self.fetch(
1620            dap_command::LoadedSourcesCommand,
1621            |this, result, cx| {
1622                let result = result.log_err()?;
1623                this.loaded_sources = result.iter().cloned().collect();
1624                cx.emit(SessionEvent::LoadedSources);
1625                cx.notify();
1626                Some(result)
1627            },
1628            cx,
1629        );
1630
1631        &self.loaded_sources
1632    }
1633
1634    fn fallback_to_manual_restart(
1635        &mut self,
1636        res: Result<()>,
1637        cx: &mut Context<Self>,
1638    ) -> Option<()> {
1639        if res.log_err().is_none() {
1640            cx.emit(SessionStateEvent::Restart);
1641            return None;
1642        }
1643        Some(())
1644    }
1645
1646    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1647        res.log_err()?;
1648        Some(())
1649    }
1650
1651    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1652        thread_id: ThreadId,
1653    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1654    {
1655        move |this, response, cx| match response.log_err() {
1656            Some(response) => {
1657                this.breakpoint_store.update(cx, |store, cx| {
1658                    store.remove_active_position(Some(this.session_id()), cx)
1659                });
1660                Some(response)
1661            }
1662            None => {
1663                this.thread_states.stop_thread(thread_id);
1664                cx.notify();
1665                None
1666            }
1667        }
1668    }
1669
1670    fn clear_active_debug_line_response(
1671        &mut self,
1672        response: Result<()>,
1673        cx: &mut Context<Session>,
1674    ) -> Option<()> {
1675        response.log_err()?;
1676        self.clear_active_debug_line(cx);
1677        Some(())
1678    }
1679
1680    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1681        self.breakpoint_store.update(cx, |store, cx| {
1682            store.remove_active_position(Some(self.id), cx)
1683        });
1684    }
1685
1686    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1687        self.request(
1688            PauseCommand {
1689                thread_id: thread_id.0,
1690            },
1691            Self::empty_response,
1692            cx,
1693        )
1694        .detach();
1695    }
1696
1697    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1698        self.request(
1699            RestartStackFrameCommand { stack_frame_id },
1700            Self::empty_response,
1701            cx,
1702        )
1703        .detach();
1704    }
1705
1706    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1707        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1708            self.request(
1709                RestartCommand {
1710                    raw: args.unwrap_or(Value::Null),
1711                },
1712                Self::fallback_to_manual_restart,
1713                cx,
1714            )
1715            .detach();
1716        } else {
1717            cx.emit(SessionStateEvent::Restart);
1718        }
1719    }
1720
1721    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1722        let debug_adapter = self.adapter_client();
1723
1724        cx.background_spawn(async move {
1725            if let Some(client) = debug_adapter {
1726                client.shutdown().await.log_err();
1727            }
1728        })
1729    }
1730
1731    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1732        self.is_session_terminated = true;
1733        self.thread_states.exit_all_threads();
1734        cx.notify();
1735
1736        let task = if self
1737            .capabilities
1738            .supports_terminate_request
1739            .unwrap_or_default()
1740        {
1741            self.request(
1742                TerminateCommand {
1743                    restart: Some(false),
1744                },
1745                Self::clear_active_debug_line_response,
1746                cx,
1747            )
1748        } else {
1749            self.request(
1750                DisconnectCommand {
1751                    restart: Some(false),
1752                    terminate_debuggee: Some(true),
1753                    suspend_debuggee: Some(false),
1754                },
1755                Self::clear_active_debug_line_response,
1756                cx,
1757            )
1758        };
1759
1760        cx.emit(SessionStateEvent::Shutdown);
1761
1762        let debug_client = self.adapter_client();
1763
1764        cx.background_spawn(async move {
1765            let _ = task.await;
1766
1767            if let Some(client) = debug_client {
1768                client.shutdown().await.log_err();
1769            }
1770        })
1771    }
1772
1773    pub fn completions(
1774        &mut self,
1775        query: CompletionsQuery,
1776        cx: &mut Context<Self>,
1777    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1778        let task = self.request(query, |_, result, _| result.log_err(), cx);
1779
1780        cx.background_executor().spawn(async move {
1781            anyhow::Ok(
1782                task.await
1783                    .map(|response| response.targets)
1784                    .context("failed to fetch completions")?,
1785            )
1786        })
1787    }
1788
1789    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1790        self.thread_states.continue_thread(thread_id);
1791        self.request(
1792            ContinueCommand {
1793                args: ContinueArguments {
1794                    thread_id: thread_id.0,
1795                    single_thread: Some(true),
1796                },
1797            },
1798            Self::on_step_response::<ContinueCommand>(thread_id),
1799            cx,
1800        )
1801        .detach();
1802    }
1803
1804    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1805        match self.mode {
1806            Mode::Running(ref local) => Some(local.client.clone()),
1807            Mode::Building => None,
1808        }
1809    }
1810
1811    pub fn step_over(
1812        &mut self,
1813        thread_id: ThreadId,
1814        granularity: SteppingGranularity,
1815        cx: &mut Context<Self>,
1816    ) {
1817        let supports_single_thread_execution_requests =
1818            self.capabilities.supports_single_thread_execution_requests;
1819        let supports_stepping_granularity = self
1820            .capabilities
1821            .supports_stepping_granularity
1822            .unwrap_or_default();
1823
1824        let command = NextCommand {
1825            inner: StepCommand {
1826                thread_id: thread_id.0,
1827                granularity: supports_stepping_granularity.then(|| granularity),
1828                single_thread: supports_single_thread_execution_requests,
1829            },
1830        };
1831
1832        self.thread_states.process_step(thread_id);
1833        self.request(
1834            command,
1835            Self::on_step_response::<NextCommand>(thread_id),
1836            cx,
1837        )
1838        .detach();
1839    }
1840
1841    pub fn step_in(
1842        &mut self,
1843        thread_id: ThreadId,
1844        granularity: SteppingGranularity,
1845        cx: &mut Context<Self>,
1846    ) {
1847        let supports_single_thread_execution_requests =
1848            self.capabilities.supports_single_thread_execution_requests;
1849        let supports_stepping_granularity = self
1850            .capabilities
1851            .supports_stepping_granularity
1852            .unwrap_or_default();
1853
1854        let command = StepInCommand {
1855            inner: StepCommand {
1856                thread_id: thread_id.0,
1857                granularity: supports_stepping_granularity.then(|| granularity),
1858                single_thread: supports_single_thread_execution_requests,
1859            },
1860        };
1861
1862        self.thread_states.process_step(thread_id);
1863        self.request(
1864            command,
1865            Self::on_step_response::<StepInCommand>(thread_id),
1866            cx,
1867        )
1868        .detach();
1869    }
1870
1871    pub fn step_out(
1872        &mut self,
1873        thread_id: ThreadId,
1874        granularity: SteppingGranularity,
1875        cx: &mut Context<Self>,
1876    ) {
1877        let supports_single_thread_execution_requests =
1878            self.capabilities.supports_single_thread_execution_requests;
1879        let supports_stepping_granularity = self
1880            .capabilities
1881            .supports_stepping_granularity
1882            .unwrap_or_default();
1883
1884        let command = StepOutCommand {
1885            inner: StepCommand {
1886                thread_id: thread_id.0,
1887                granularity: supports_stepping_granularity.then(|| granularity),
1888                single_thread: supports_single_thread_execution_requests,
1889            },
1890        };
1891
1892        self.thread_states.process_step(thread_id);
1893        self.request(
1894            command,
1895            Self::on_step_response::<StepOutCommand>(thread_id),
1896            cx,
1897        )
1898        .detach();
1899    }
1900
1901    pub fn step_back(
1902        &mut self,
1903        thread_id: ThreadId,
1904        granularity: SteppingGranularity,
1905        cx: &mut Context<Self>,
1906    ) {
1907        let supports_single_thread_execution_requests =
1908            self.capabilities.supports_single_thread_execution_requests;
1909        let supports_stepping_granularity = self
1910            .capabilities
1911            .supports_stepping_granularity
1912            .unwrap_or_default();
1913
1914        let command = StepBackCommand {
1915            inner: StepCommand {
1916                thread_id: thread_id.0,
1917                granularity: supports_stepping_granularity.then(|| granularity),
1918                single_thread: supports_single_thread_execution_requests,
1919            },
1920        };
1921
1922        self.thread_states.process_step(thread_id);
1923
1924        self.request(
1925            command,
1926            Self::on_step_response::<StepBackCommand>(thread_id),
1927            cx,
1928        )
1929        .detach();
1930    }
1931
1932    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1933        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1934            && self.requests.contains_key(&ThreadsCommand.type_id())
1935            && self.threads.contains_key(&thread_id)
1936        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1937        // We could still be using an old thread id and have sent a new thread's request
1938        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1939        // But it very well could cause a minor bug in the future that is hard to track down
1940        {
1941            self.fetch(
1942                super::dap_command::StackTraceCommand {
1943                    thread_id: thread_id.0,
1944                    start_frame: None,
1945                    levels: None,
1946                },
1947                move |this, stack_frames, cx| {
1948                    let stack_frames = stack_frames.log_err()?;
1949
1950                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1951                        thread.stack_frame_ids =
1952                            stack_frames.iter().map(|frame| frame.id).collect();
1953                    });
1954                    debug_assert!(
1955                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1956                        "Sent request for thread_id that doesn't exist"
1957                    );
1958
1959                    this.stack_frames.extend(
1960                        stack_frames
1961                            .iter()
1962                            .cloned()
1963                            .map(|frame| (frame.id, StackFrame::from(frame))),
1964                    );
1965
1966                    this.invalidate_command_type::<ScopesCommand>();
1967                    this.invalidate_command_type::<VariablesCommand>();
1968
1969                    cx.emit(SessionEvent::StackTrace);
1970                    cx.notify();
1971                    Some(stack_frames)
1972                },
1973                cx,
1974            );
1975        }
1976
1977        self.threads
1978            .get(&thread_id)
1979            .map(|thread| {
1980                thread
1981                    .stack_frame_ids
1982                    .iter()
1983                    .filter_map(|id| self.stack_frames.get(id))
1984                    .cloned()
1985                    .collect()
1986            })
1987            .unwrap_or_default()
1988    }
1989
1990    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1991        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1992            && self
1993                .requests
1994                .contains_key(&TypeId::of::<StackTraceCommand>())
1995        {
1996            self.fetch(
1997                ScopesCommand { stack_frame_id },
1998                move |this, scopes, cx| {
1999                    let scopes = scopes.log_err()?;
2000
2001                    for scope in scopes .iter(){
2002                        this.variables(scope.variables_reference, cx);
2003                    }
2004
2005                    let entry = this
2006                        .stack_frames
2007                        .entry(stack_frame_id)
2008                        .and_modify(|stack_frame| {
2009                            stack_frame.scopes = scopes.clone();
2010                        });
2011
2012                    cx.emit(SessionEvent::Variables);
2013
2014                    debug_assert!(
2015                        matches!(entry, indexmap::map::Entry::Occupied(_)),
2016                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2017                    );
2018
2019                    Some(scopes)
2020                },
2021                cx,
2022            );
2023        }
2024
2025        self.stack_frames
2026            .get(&stack_frame_id)
2027            .map(|frame| frame.scopes.as_slice())
2028            .unwrap_or_default()
2029    }
2030
2031    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
2032        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2033            return Vec::new();
2034        };
2035
2036        stack_frame
2037            .scopes
2038            .iter()
2039            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2040            .flatten()
2041            .cloned()
2042            .collect()
2043    }
2044
2045    pub fn variables(
2046        &mut self,
2047        variables_reference: VariableReference,
2048        cx: &mut Context<Self>,
2049    ) -> Vec<dap::Variable> {
2050        let command = VariablesCommand {
2051            variables_reference,
2052            filter: None,
2053            start: None,
2054            count: None,
2055            format: None,
2056        };
2057
2058        self.fetch(
2059            command,
2060            move |this, variables, cx| {
2061                let variables = variables.log_err()?;
2062                this.variables
2063                    .insert(variables_reference, variables.clone());
2064
2065                cx.emit(SessionEvent::Variables);
2066                cx.emit(SessionEvent::InvalidateInlineValue);
2067                Some(variables)
2068            },
2069            cx,
2070        );
2071
2072        self.variables
2073            .get(&variables_reference)
2074            .cloned()
2075            .unwrap_or_default()
2076    }
2077
2078    pub fn set_variable_value(
2079        &mut self,
2080        variables_reference: u64,
2081        name: String,
2082        value: String,
2083        cx: &mut Context<Self>,
2084    ) {
2085        if self.capabilities.supports_set_variable.unwrap_or_default() {
2086            self.request(
2087                SetVariableValueCommand {
2088                    name,
2089                    value,
2090                    variables_reference,
2091                },
2092                move |this, response, cx| {
2093                    let response = response.log_err()?;
2094                    this.invalidate_command_type::<VariablesCommand>();
2095                    cx.notify();
2096                    Some(response)
2097                },
2098                cx,
2099            )
2100            .detach()
2101        }
2102    }
2103
2104    pub fn evaluate(
2105        &mut self,
2106        expression: String,
2107        context: Option<EvaluateArgumentsContext>,
2108        frame_id: Option<u64>,
2109        source: Option<Source>,
2110        cx: &mut Context<Self>,
2111    ) -> Task<()> {
2112        let event = dap::OutputEvent {
2113            category: None,
2114            output: format!("> {expression}"),
2115            group: None,
2116            variables_reference: None,
2117            source: None,
2118            line: None,
2119            column: None,
2120            data: None,
2121            location_reference: None,
2122        };
2123        self.push_output(event, cx);
2124        let request = self.mode.request_dap(EvaluateCommand {
2125            expression,
2126            context,
2127            frame_id,
2128            source,
2129        });
2130        cx.spawn(async move |this, cx| {
2131            let response = request.await;
2132            this.update(cx, |this, cx| {
2133                match response {
2134                    Ok(response) => {
2135                        let event = dap::OutputEvent {
2136                            category: None,
2137                            output: format!("< {}", &response.result),
2138                            group: None,
2139                            variables_reference: Some(response.variables_reference),
2140                            source: None,
2141                            line: None,
2142                            column: None,
2143                            data: None,
2144                            location_reference: None,
2145                        };
2146                        this.push_output(event, cx);
2147                    }
2148                    Err(e) => {
2149                        let event = dap::OutputEvent {
2150                            category: None,
2151                            output: format!("{}", e),
2152                            group: None,
2153                            variables_reference: None,
2154                            source: None,
2155                            line: None,
2156                            column: None,
2157                            data: None,
2158                            location_reference: None,
2159                        };
2160                        this.push_output(event, cx);
2161                    }
2162                };
2163                this.invalidate_command_type::<ScopesCommand>();
2164                cx.notify();
2165            })
2166            .ok();
2167        })
2168    }
2169
2170    pub fn location(
2171        &mut self,
2172        reference: u64,
2173        cx: &mut Context<Self>,
2174    ) -> Option<dap::LocationsResponse> {
2175        self.fetch(
2176            LocationsCommand { reference },
2177            move |this, response, _| {
2178                let response = response.log_err()?;
2179                this.locations.insert(reference, response.clone());
2180                Some(response)
2181            },
2182            cx,
2183        );
2184        self.locations.get(&reference).cloned()
2185    }
2186
2187    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2188        let command = DisconnectCommand {
2189            restart: Some(false),
2190            terminate_debuggee: Some(true),
2191            suspend_debuggee: Some(false),
2192        };
2193
2194        self.request(command, Self::empty_response, cx).detach()
2195    }
2196
2197    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2198        if self
2199            .capabilities
2200            .supports_terminate_threads_request
2201            .unwrap_or_default()
2202        {
2203            self.request(
2204                TerminateThreadsCommand {
2205                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2206                },
2207                Self::clear_active_debug_line_response,
2208                cx,
2209            )
2210            .detach();
2211        } else {
2212            self.shutdown(cx).detach();
2213        }
2214    }
2215
2216    pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2217        self.thread_states.thread_state(thread_id)
2218    }
2219}