session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap, IndexSet};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  29};
  30use futures::channel::{mpsc, oneshot};
  31use futures::{FutureExt, future::Shared};
  32use gpui::{
  33    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  34    Task, WeakEntity,
  35};
  36
  37use serde_json::Value;
  38use smol::stream::StreamExt;
  39use std::any::TypeId;
  40use std::collections::BTreeMap;
  41use std::u64;
  42use std::{
  43    any::Any,
  44    collections::hash_map::Entry,
  45    hash::{Hash, Hasher},
  46    path::Path,
  47    sync::Arc,
  48};
  49use text::{PointUtf16, ToPointUtf16};
  50use util::ResultExt;
  51use worktree::Worktree;
  52
  53#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  54#[repr(transparent)]
  55pub struct ThreadId(pub u64);
  56
  57impl ThreadId {
  58    pub const MIN: ThreadId = ThreadId(u64::MIN);
  59    pub const MAX: ThreadId = ThreadId(u64::MAX);
  60}
  61
  62impl From<u64> for ThreadId {
  63    fn from(id: u64) -> Self {
  64        Self(id)
  65    }
  66}
  67
  68#[derive(Clone, Debug)]
  69pub struct StackFrame {
  70    pub dap: dap::StackFrame,
  71    pub scopes: Vec<dap::Scope>,
  72}
  73
  74impl From<dap::StackFrame> for StackFrame {
  75    fn from(stack_frame: dap::StackFrame) -> Self {
  76        Self {
  77            scopes: vec![],
  78            dap: stack_frame,
  79        }
  80    }
  81}
  82
  83#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  84pub enum ThreadStatus {
  85    #[default]
  86    Running,
  87    Stopped,
  88    Stepping,
  89    Exited,
  90    Ended,
  91}
  92
  93impl ThreadStatus {
  94    pub fn label(&self) -> &'static str {
  95        match self {
  96            ThreadStatus::Running => "Running",
  97            ThreadStatus::Stopped => "Stopped",
  98            ThreadStatus::Stepping => "Stepping",
  99            ThreadStatus::Exited => "Exited",
 100            ThreadStatus::Ended => "Ended",
 101        }
 102    }
 103}
 104
 105#[derive(Debug)]
 106pub struct Thread {
 107    dap: dap::Thread,
 108    stack_frame_ids: IndexSet<StackFrameId>,
 109    _has_stopped: bool,
 110}
 111
 112impl From<dap::Thread> for Thread {
 113    fn from(dap: dap::Thread) -> Self {
 114        Self {
 115            dap,
 116            stack_frame_ids: Default::default(),
 117            _has_stopped: false,
 118        }
 119    }
 120}
 121
 122pub enum Mode {
 123    Building,
 124    Running(LocalMode),
 125}
 126
 127#[derive(Clone)]
 128pub struct LocalMode {
 129    client: Arc<DebugAdapterClient>,
 130    binary: DebugAdapterBinary,
 131    tmp_breakpoint: Option<SourceBreakpoint>,
 132    worktree: WeakEntity<Worktree>,
 133    executor: BackgroundExecutor,
 134}
 135
 136fn client_source(abs_path: &Path) -> dap::Source {
 137    dap::Source {
 138        name: abs_path
 139            .file_name()
 140            .map(|filename| filename.to_string_lossy().to_string()),
 141        path: Some(abs_path.to_string_lossy().to_string()),
 142        source_reference: None,
 143        presentation_hint: None,
 144        origin: None,
 145        sources: None,
 146        adapter_data: None,
 147        checksums: None,
 148    }
 149}
 150
 151impl LocalMode {
 152    async fn new(
 153        session_id: SessionId,
 154        parent_session: Option<Entity<Session>>,
 155        worktree: WeakEntity<Worktree>,
 156        binary: DebugAdapterBinary,
 157        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 158        cx: AsyncApp,
 159    ) -> Result<Self> {
 160        let message_handler = Box::new(move |message| {
 161            messages_tx.unbounded_send(message).ok();
 162        });
 163
 164        let client = Arc::new(
 165            if let Some(client) = parent_session
 166                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 167                .flatten()
 168            {
 169                client
 170                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 171                    .await?
 172            } else {
 173                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 174                    .await?
 175            },
 176        );
 177
 178        Ok(Self {
 179            client,
 180            worktree,
 181            tmp_breakpoint: None,
 182            binary,
 183            executor: cx.background_executor().clone(),
 184        })
 185    }
 186
 187    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 188        &self.worktree
 189    }
 190
 191    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 192        let tasks: Vec<_> = paths
 193            .into_iter()
 194            .map(|path| {
 195                self.request(dap_command::SetBreakpoints {
 196                    source: client_source(path),
 197                    source_modified: None,
 198                    breakpoints: vec![],
 199                })
 200            })
 201            .collect();
 202
 203        cx.background_spawn(async move {
 204            futures::future::join_all(tasks)
 205                .await
 206                .iter()
 207                .for_each(|res| match res {
 208                    Ok(_) => {}
 209                    Err(err) => {
 210                        log::warn!("Set breakpoints request failed: {}", err);
 211                    }
 212                });
 213        })
 214    }
 215
 216    fn send_breakpoints_from_path(
 217        &self,
 218        abs_path: Arc<Path>,
 219        reason: BreakpointUpdatedReason,
 220        breakpoint_store: &Entity<BreakpointStore>,
 221        cx: &mut App,
 222    ) -> Task<()> {
 223        let breakpoints =
 224            breakpoint_store
 225                .read_with(cx, |store, cx| {
 226                    store.source_breakpoints_from_path(&abs_path, cx)
 227                })
 228                .into_iter()
 229                .filter(|bp| bp.state.is_enabled())
 230                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 231                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 232                }))
 233                .map(Into::into)
 234                .collect();
 235
 236        let raw_breakpoints = breakpoint_store
 237            .read(cx)
 238            .breakpoints_from_path(&abs_path)
 239            .into_iter()
 240            .filter(|bp| bp.bp.state.is_enabled())
 241            .collect::<Vec<_>>();
 242
 243        let task = self.request(dap_command::SetBreakpoints {
 244            source: client_source(&abs_path),
 245            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 246            breakpoints,
 247        });
 248        let session_id = self.client.id();
 249        let breakpoint_store = breakpoint_store.downgrade();
 250        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 251            Ok(breakpoints) => {
 252                let breakpoints =
 253                    breakpoints
 254                        .into_iter()
 255                        .zip(raw_breakpoints)
 256                        .filter_map(|(dap_bp, zed_bp)| {
 257                            Some((
 258                                zed_bp,
 259                                BreakpointSessionState {
 260                                    id: dap_bp.id?,
 261                                    verified: dap_bp.verified,
 262                                },
 263                            ))
 264                        });
 265                breakpoint_store
 266                    .update(cx, |this, _| {
 267                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 268                    })
 269                    .ok();
 270            }
 271            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 272        })
 273    }
 274
 275    fn send_exception_breakpoints(
 276        &self,
 277        filters: Vec<ExceptionBreakpointsFilter>,
 278        supports_filter_options: bool,
 279    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 280        let arg = if supports_filter_options {
 281            SetExceptionBreakpoints::WithOptions {
 282                filters: filters
 283                    .into_iter()
 284                    .map(|filter| ExceptionFilterOptions {
 285                        filter_id: filter.filter,
 286                        condition: None,
 287                        mode: None,
 288                    })
 289                    .collect(),
 290            }
 291        } else {
 292            SetExceptionBreakpoints::Plain {
 293                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 294            }
 295        };
 296        self.request(arg)
 297    }
 298
 299    fn send_source_breakpoints(
 300        &self,
 301        ignore_breakpoints: bool,
 302        breakpoint_store: &Entity<BreakpointStore>,
 303        cx: &App,
 304    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 305        let mut breakpoint_tasks = Vec::new();
 306        let breakpoints =
 307            breakpoint_store.read_with(cx, |store, cx| store.all_source_breakpoints(cx));
 308        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 309        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 310        let session_id = self.client.id();
 311        for (path, breakpoints) in breakpoints {
 312            let breakpoints = if ignore_breakpoints {
 313                vec![]
 314            } else {
 315                breakpoints
 316                    .into_iter()
 317                    .filter(|bp| bp.state.is_enabled())
 318                    .map(Into::into)
 319                    .collect()
 320            };
 321
 322            let raw_breakpoints = raw_breakpoints
 323                .remove(&path)
 324                .unwrap_or_default()
 325                .into_iter()
 326                .filter(|bp| bp.bp.state.is_enabled());
 327            let error_path = path.clone();
 328            let send_request = self
 329                .request(dap_command::SetBreakpoints {
 330                    source: client_source(&path),
 331                    source_modified: Some(false),
 332                    breakpoints,
 333                })
 334                .map(|result| result.map_err(move |e| (error_path, e)));
 335
 336            let task = cx.spawn({
 337                let breakpoint_store = breakpoint_store.downgrade();
 338                async move |cx| {
 339                    let breakpoints = cx.background_spawn(send_request).await?;
 340
 341                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 342                        |(dap_bp, zed_bp)| {
 343                            Some((
 344                                zed_bp,
 345                                BreakpointSessionState {
 346                                    id: dap_bp.id?,
 347                                    verified: dap_bp.verified,
 348                                },
 349                            ))
 350                        },
 351                    );
 352                    breakpoint_store
 353                        .update(cx, |this, _| {
 354                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 355                        })
 356                        .ok();
 357
 358                    Ok(())
 359                }
 360            });
 361            breakpoint_tasks.push(task);
 362        }
 363
 364        cx.background_spawn(async move {
 365            futures::future::join_all(breakpoint_tasks)
 366                .await
 367                .into_iter()
 368                .filter_map(Result::err)
 369                .collect::<HashMap<_, _>>()
 370        })
 371    }
 372
 373    fn initialize_sequence(
 374        &self,
 375        capabilities: &Capabilities,
 376        initialized_rx: oneshot::Receiver<()>,
 377        dap_store: WeakEntity<DapStore>,
 378        cx: &App,
 379    ) -> Task<Result<()>> {
 380        let raw = self.binary.request_args.clone();
 381
 382        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 383        let launch = match raw.request {
 384            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 385                raw: raw.configuration,
 386            }),
 387            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 388                raw: raw.configuration,
 389            }),
 390        };
 391
 392        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 393        let exception_filters = capabilities
 394            .exception_breakpoint_filters
 395            .as_ref()
 396            .map(|exception_filters| {
 397                exception_filters
 398                    .iter()
 399                    .filter(|filter| filter.default == Some(true))
 400                    .cloned()
 401                    .collect::<Vec<_>>()
 402            })
 403            .unwrap_or_default();
 404        let supports_exception_filters = capabilities
 405            .supports_exception_filter_options
 406            .unwrap_or_default();
 407        let this = self.clone();
 408        let worktree = self.worktree().clone();
 409        let configuration_sequence = cx.spawn({
 410            async move |cx| {
 411                let breakpoint_store =
 412                    dap_store.update(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 413                initialized_rx.await?;
 414                let errors_by_path = cx
 415                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 416                    .await;
 417
 418                dap_store.update(cx, |_, cx| {
 419                    let Some(worktree) = worktree.upgrade() else {
 420                        return;
 421                    };
 422
 423                    for (path, error) in &errors_by_path {
 424                        log::error!("failed to set breakpoints for {path:?}: {error}");
 425                    }
 426
 427                    if let Some(failed_path) = errors_by_path.keys().next() {
 428                        let failed_path = failed_path
 429                            .strip_prefix(worktree.read(cx).abs_path())
 430                            .unwrap_or(failed_path)
 431                            .display();
 432                        let message = format!(
 433                            "Failed to set breakpoints for {failed_path}{}",
 434                            match errors_by_path.len() {
 435                                0 => unreachable!(),
 436                                1 => "".into(),
 437                                2 => " and 1 other path".into(),
 438                                n => format!(" and {} other paths", n - 1),
 439                            }
 440                        );
 441                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 442                    }
 443                })?;
 444
 445                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 446                    .await
 447                    .ok();
 448                let ret = if configuration_done_supported {
 449                    this.request(ConfigurationDone {})
 450                } else {
 451                    Task::ready(Ok(()))
 452                }
 453                .await;
 454                ret
 455            }
 456        });
 457
 458        cx.background_spawn(async move {
 459            futures::future::try_join(launch, configuration_sequence).await?;
 460            Ok(())
 461        })
 462    }
 463
 464    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 465    where
 466        <R::DapRequest as dap::requests::Request>::Response: 'static,
 467        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 468    {
 469        let request = Arc::new(request);
 470
 471        let request_clone = request.clone();
 472        let connection = self.client.clone();
 473        self.executor.spawn(async move {
 474            let args = request_clone.to_dap();
 475            let response = connection.request::<R::DapRequest>(args).await?;
 476            request.response_from_dap(response)
 477        })
 478    }
 479}
 480
 481impl Mode {
 482    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 483    where
 484        <R::DapRequest as dap::requests::Request>::Response: 'static,
 485        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 486    {
 487        match self {
 488            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 489            Mode::Building => Task::ready(Err(anyhow!(
 490                "no adapter running to send request: {request:?}"
 491            ))),
 492        }
 493    }
 494}
 495
 496#[derive(Default)]
 497struct ThreadStates {
 498    global_state: Option<ThreadStatus>,
 499    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 500}
 501
 502impl ThreadStates {
 503    fn stop_all_threads(&mut self) {
 504        self.global_state = Some(ThreadStatus::Stopped);
 505        self.known_thread_states.clear();
 506    }
 507
 508    fn exit_all_threads(&mut self) {
 509        self.global_state = Some(ThreadStatus::Exited);
 510        self.known_thread_states.clear();
 511    }
 512
 513    fn continue_all_threads(&mut self) {
 514        self.global_state = Some(ThreadStatus::Running);
 515        self.known_thread_states.clear();
 516    }
 517
 518    fn stop_thread(&mut self, thread_id: ThreadId) {
 519        self.known_thread_states
 520            .insert(thread_id, ThreadStatus::Stopped);
 521    }
 522
 523    fn continue_thread(&mut self, thread_id: ThreadId) {
 524        self.known_thread_states
 525            .insert(thread_id, ThreadStatus::Running);
 526    }
 527
 528    fn process_step(&mut self, thread_id: ThreadId) {
 529        self.known_thread_states
 530            .insert(thread_id, ThreadStatus::Stepping);
 531    }
 532
 533    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 534        self.thread_state(thread_id)
 535            .unwrap_or(ThreadStatus::Running)
 536    }
 537
 538    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 539        self.known_thread_states
 540            .get(&thread_id)
 541            .copied()
 542            .or(self.global_state)
 543    }
 544
 545    fn exit_thread(&mut self, thread_id: ThreadId) {
 546        self.known_thread_states
 547            .insert(thread_id, ThreadStatus::Exited);
 548    }
 549
 550    fn any_stopped_thread(&self) -> bool {
 551        self.global_state
 552            .is_some_and(|state| state == ThreadStatus::Stopped)
 553            || self
 554                .known_thread_states
 555                .values()
 556                .any(|status| *status == ThreadStatus::Stopped)
 557    }
 558}
 559const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 560
 561type IsEnabled = bool;
 562
 563#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 564pub struct OutputToken(pub usize);
 565/// Represents a current state of a single debug adapter and provides ways to mutate it.
 566pub struct Session {
 567    pub mode: Mode,
 568    id: SessionId,
 569    label: SharedString,
 570    adapter: DebugAdapterName,
 571    pub(super) capabilities: Capabilities,
 572    child_session_ids: HashSet<SessionId>,
 573    parent_session: Option<Entity<Session>>,
 574    modules: Vec<dap::Module>,
 575    loaded_sources: Vec<dap::Source>,
 576    output_token: OutputToken,
 577    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 578    threads: IndexMap<ThreadId, Thread>,
 579    thread_states: ThreadStates,
 580    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 581    stack_frames: IndexMap<StackFrameId, StackFrame>,
 582    locations: HashMap<u64, dap::LocationsResponse>,
 583    is_session_terminated: bool,
 584    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 585    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 586    ignore_breakpoints: bool,
 587    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 588    background_tasks: Vec<Task<()>>,
 589}
 590
 591trait CacheableCommand: Any + Send + Sync {
 592    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 593    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 594    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 595}
 596
 597impl<T> CacheableCommand for T
 598where
 599    T: DapCommand + PartialEq + Eq + Hash,
 600{
 601    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 602        (rhs as &dyn Any)
 603            .downcast_ref::<Self>()
 604            .map_or(false, |rhs| self == rhs)
 605    }
 606
 607    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 608        T::hash(self, &mut hasher);
 609    }
 610
 611    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 612        self
 613    }
 614}
 615
 616pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 617
 618impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 619    fn from(request: T) -> Self {
 620        Self(Arc::new(request))
 621    }
 622}
 623
 624impl PartialEq for RequestSlot {
 625    fn eq(&self, other: &Self) -> bool {
 626        self.0.dyn_eq(other.0.as_ref())
 627    }
 628}
 629
 630impl Eq for RequestSlot {}
 631
 632impl Hash for RequestSlot {
 633    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 634        self.0.dyn_hash(state);
 635        (&*self.0 as &dyn Any).type_id().hash(state)
 636    }
 637}
 638
 639#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 640pub struct CompletionsQuery {
 641    pub query: String,
 642    pub column: u64,
 643    pub line: Option<u64>,
 644    pub frame_id: Option<u64>,
 645}
 646
 647impl CompletionsQuery {
 648    pub fn new(
 649        buffer: &language::Buffer,
 650        cursor_position: language::Anchor,
 651        frame_id: Option<u64>,
 652    ) -> Self {
 653        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 654        Self {
 655            query: buffer.text(),
 656            column: column as u64,
 657            frame_id,
 658            line: Some(row as u64),
 659        }
 660    }
 661}
 662
 663#[derive(Debug)]
 664pub enum SessionEvent {
 665    Modules,
 666    LoadedSources,
 667    Stopped(Option<ThreadId>),
 668    StackTrace,
 669    Variables,
 670    Threads,
 671    InvalidateInlineValue,
 672    CapabilitiesLoaded,
 673    RunInTerminal {
 674        request: RunInTerminalRequestArguments,
 675        sender: mpsc::Sender<Result<u32>>,
 676    },
 677    ConsoleOutput,
 678}
 679
 680#[derive(Clone, Debug, PartialEq, Eq)]
 681pub enum SessionStateEvent {
 682    Running,
 683    Shutdown,
 684    Restart,
 685    SpawnChildSession {
 686        request: StartDebuggingRequestArguments,
 687    },
 688}
 689
 690impl EventEmitter<SessionEvent> for Session {}
 691impl EventEmitter<SessionStateEvent> for Session {}
 692
 693// local session will send breakpoint updates to DAP for all new breakpoints
 694// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 695// BreakpointStore notifies session on breakpoint changes
 696impl Session {
 697    pub(crate) fn new(
 698        breakpoint_store: Entity<BreakpointStore>,
 699        session_id: SessionId,
 700        parent_session: Option<Entity<Session>>,
 701        label: SharedString,
 702        adapter: DebugAdapterName,
 703        cx: &mut App,
 704    ) -> Entity<Self> {
 705        cx.new::<Self>(|cx| {
 706            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 707                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 708                    if let Some(local) = (!this.ignore_breakpoints)
 709                        .then(|| this.as_local_mut())
 710                        .flatten()
 711                    {
 712                        local
 713                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 714                            .detach();
 715                    };
 716                }
 717                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 718                    if let Some(local) = (!this.ignore_breakpoints)
 719                        .then(|| this.as_local_mut())
 720                        .flatten()
 721                    {
 722                        local.unset_breakpoints_from_paths(paths, cx).detach();
 723                    }
 724                }
 725                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 726            })
 727            .detach();
 728            cx.on_app_quit(Self::on_app_quit).detach();
 729
 730            let this = Self {
 731                mode: Mode::Building,
 732                id: session_id,
 733                child_session_ids: HashSet::default(),
 734                parent_session,
 735                capabilities: Capabilities::default(),
 736                variables: Default::default(),
 737                stack_frames: Default::default(),
 738                thread_states: ThreadStates::default(),
 739                output_token: OutputToken(0),
 740                output: circular_buffer::CircularBuffer::boxed(),
 741                requests: HashMap::default(),
 742                modules: Vec::default(),
 743                loaded_sources: Vec::default(),
 744                threads: IndexMap::default(),
 745                background_tasks: Vec::default(),
 746                locations: Default::default(),
 747                is_session_terminated: false,
 748                ignore_breakpoints: false,
 749                breakpoint_store,
 750                exception_breakpoints: Default::default(),
 751                label,
 752                adapter,
 753            };
 754
 755            this
 756        })
 757    }
 758
 759    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 760        match &self.mode {
 761            Mode::Building => None,
 762            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 763        }
 764    }
 765
 766    pub fn boot(
 767        &mut self,
 768        binary: DebugAdapterBinary,
 769        worktree: Entity<Worktree>,
 770        dap_store: WeakEntity<DapStore>,
 771        cx: &mut Context<Self>,
 772    ) -> Task<Result<()>> {
 773        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 774        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 775
 776        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 777            let mut initialized_tx = Some(initialized_tx);
 778            while let Some(message) = message_rx.next().await {
 779                if let Message::Event(event) = message {
 780                    if let Events::Initialized(_) = *event {
 781                        if let Some(tx) = initialized_tx.take() {
 782                            tx.send(()).ok();
 783                        }
 784                    } else {
 785                        let Ok(_) = this.update(cx, |session, cx| {
 786                            session.handle_dap_event(event, cx);
 787                        }) else {
 788                            break;
 789                        };
 790                    }
 791                } else if let Message::Request(request) = message {
 792                    let Ok(_) = this.update(cx, |this, cx| {
 793                        if request.command == StartDebugging::COMMAND {
 794                            this.handle_start_debugging_request(request, cx)
 795                                .detach_and_log_err(cx);
 796                        } else if request.command == RunInTerminal::COMMAND {
 797                            this.handle_run_in_terminal_request(request, cx)
 798                                .detach_and_log_err(cx);
 799                        }
 800                    }) else {
 801                        break;
 802                    };
 803                }
 804            }
 805        })];
 806        self.background_tasks = background_tasks;
 807        let id = self.id;
 808        let parent_session = self.parent_session.clone();
 809
 810        cx.spawn(async move |this, cx| {
 811            let mode = LocalMode::new(
 812                id,
 813                parent_session,
 814                worktree.downgrade(),
 815                binary,
 816                message_tx,
 817                cx.clone(),
 818            )
 819            .await?;
 820            this.update(cx, |this, cx| {
 821                this.mode = Mode::Running(mode);
 822                cx.emit(SessionStateEvent::Running);
 823            })?;
 824
 825            this.update(cx, |session, cx| session.request_initialize(cx))?
 826                .await?;
 827
 828            this.update(cx, |session, cx| {
 829                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 830            })?
 831            .await
 832        })
 833    }
 834
 835    pub fn session_id(&self) -> SessionId {
 836        self.id
 837    }
 838
 839    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 840        self.child_session_ids.clone()
 841    }
 842
 843    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 844        self.child_session_ids.insert(session_id);
 845    }
 846
 847    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 848        self.child_session_ids.remove(&session_id);
 849    }
 850
 851    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 852        self.parent_session
 853            .as_ref()
 854            .map(|session| session.read(cx).id)
 855    }
 856
 857    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 858        self.parent_session.as_ref()
 859    }
 860
 861    pub fn capabilities(&self) -> &Capabilities {
 862        &self.capabilities
 863    }
 864
 865    pub fn binary(&self) -> &DebugAdapterBinary {
 866        let Mode::Running(local_mode) = &self.mode else {
 867            panic!("Session is not running");
 868        };
 869        &local_mode.binary
 870    }
 871
 872    pub fn adapter(&self) -> DebugAdapterName {
 873        self.adapter.clone()
 874    }
 875
 876    pub fn label(&self) -> SharedString {
 877        self.label.clone()
 878    }
 879
 880    pub fn is_terminated(&self) -> bool {
 881        self.is_session_terminated
 882    }
 883
 884    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
 885        let (tx, mut rx) = mpsc::unbounded();
 886
 887        cx.spawn(async move |this, cx| {
 888            while let Some(output) = rx.next().await {
 889                this.update(cx, |this, cx| {
 890                    let event = dap::OutputEvent {
 891                        category: None,
 892                        output,
 893                        group: None,
 894                        variables_reference: None,
 895                        source: None,
 896                        line: None,
 897                        column: None,
 898                        data: None,
 899                        location_reference: None,
 900                    };
 901                    this.push_output(event, cx);
 902                })?;
 903            }
 904            anyhow::Ok(())
 905        })
 906        .detach();
 907
 908        return tx;
 909    }
 910
 911    pub fn is_local(&self) -> bool {
 912        matches!(self.mode, Mode::Running(_))
 913    }
 914
 915    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 916        match &mut self.mode {
 917            Mode::Running(local_mode) => Some(local_mode),
 918            Mode::Building => None,
 919        }
 920    }
 921
 922    pub fn as_local(&self) -> Option<&LocalMode> {
 923        match &self.mode {
 924            Mode::Running(local_mode) => Some(local_mode),
 925            Mode::Building => None,
 926        }
 927    }
 928
 929    fn handle_start_debugging_request(
 930        &mut self,
 931        request: dap::messages::Request,
 932        cx: &mut Context<Self>,
 933    ) -> Task<Result<()>> {
 934        let request_seq = request.seq;
 935
 936        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 937            .arguments
 938            .as_ref()
 939            .map(|value| serde_json::from_value(value.clone()));
 940
 941        let mut success = true;
 942        if let Some(Ok(request)) = launch_request {
 943            cx.emit(SessionStateEvent::SpawnChildSession { request });
 944        } else {
 945            log::error!(
 946                "Failed to parse launch request arguments: {:?}",
 947                request.arguments
 948            );
 949            success = false;
 950        }
 951
 952        cx.spawn(async move |this, cx| {
 953            this.update(cx, |this, cx| {
 954                this.respond_to_client(
 955                    request_seq,
 956                    success,
 957                    StartDebugging::COMMAND.to_string(),
 958                    None,
 959                    cx,
 960                )
 961            })?
 962            .await
 963        })
 964    }
 965
 966    fn handle_run_in_terminal_request(
 967        &mut self,
 968        request: dap::messages::Request,
 969        cx: &mut Context<Self>,
 970    ) -> Task<Result<()>> {
 971        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
 972            request.arguments.unwrap_or_default(),
 973        )
 974        .expect("To parse StartDebuggingRequestArguments");
 975
 976        let seq = request.seq;
 977
 978        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
 979        cx.emit(SessionEvent::RunInTerminal {
 980            request: request_args,
 981            sender: tx,
 982        });
 983        cx.notify();
 984
 985        cx.spawn(async move |session, cx| {
 986            let result = util::maybe!(async move {
 987                rx.next().await.ok_or_else(|| {
 988                    anyhow!("failed to receive response from spawn terminal".to_string())
 989                })?
 990            })
 991            .await;
 992            let (success, body) = match result {
 993                Ok(pid) => (
 994                    true,
 995                    serde_json::to_value(dap::RunInTerminalResponse {
 996                        process_id: None,
 997                        shell_process_id: Some(pid as u64),
 998                    })
 999                    .ok(),
1000                ),
1001                Err(error) => (
1002                    false,
1003                    serde_json::to_value(dap::ErrorResponse {
1004                        error: Some(dap::Message {
1005                            id: seq,
1006                            format: error.to_string(),
1007                            variables: None,
1008                            send_telemetry: None,
1009                            show_user: None,
1010                            url: None,
1011                            url_label: None,
1012                        }),
1013                    })
1014                    .ok(),
1015                ),
1016            };
1017
1018            session
1019                .update(cx, |session, cx| {
1020                    session.respond_to_client(
1021                        seq,
1022                        success,
1023                        RunInTerminal::COMMAND.to_string(),
1024                        body,
1025                        cx,
1026                    )
1027                })?
1028                .await
1029        })
1030    }
1031
1032    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1033        let adapter_id = self.adapter().to_string();
1034        let request = Initialize { adapter_id };
1035        match &self.mode {
1036            Mode::Running(local_mode) => {
1037                let capabilities = local_mode.request(request);
1038
1039                cx.spawn(async move |this, cx| {
1040                    let capabilities = capabilities.await?;
1041                    this.update(cx, |session, cx| {
1042                        session.capabilities = capabilities;
1043                        let filters = session
1044                            .capabilities
1045                            .exception_breakpoint_filters
1046                            .clone()
1047                            .unwrap_or_default();
1048                        for filter in filters {
1049                            let default = filter.default.unwrap_or_default();
1050                            session
1051                                .exception_breakpoints
1052                                .entry(filter.filter.clone())
1053                                .or_insert_with(|| (filter, default));
1054                        }
1055                        cx.emit(SessionEvent::CapabilitiesLoaded);
1056                    })?;
1057                    Ok(())
1058                })
1059            }
1060            Mode::Building => Task::ready(Err(anyhow!(
1061                "Cannot send initialize request, task still building"
1062            ))),
1063        }
1064    }
1065
1066    pub(super) fn initialize_sequence(
1067        &mut self,
1068        initialize_rx: oneshot::Receiver<()>,
1069        dap_store: WeakEntity<DapStore>,
1070        cx: &mut Context<Self>,
1071    ) -> Task<Result<()>> {
1072        match &self.mode {
1073            Mode::Running(local_mode) => {
1074                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1075            }
1076            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1077        }
1078    }
1079
1080    pub fn run_to_position(
1081        &mut self,
1082        breakpoint: SourceBreakpoint,
1083        active_thread_id: ThreadId,
1084        cx: &mut Context<Self>,
1085    ) {
1086        match &mut self.mode {
1087            Mode::Running(local_mode) => {
1088                if !matches!(
1089                    self.thread_states.thread_state(active_thread_id),
1090                    Some(ThreadStatus::Stopped)
1091                ) {
1092                    return;
1093                };
1094                let path = breakpoint.path.clone();
1095                local_mode.tmp_breakpoint = Some(breakpoint);
1096                let task = local_mode.send_breakpoints_from_path(
1097                    path,
1098                    BreakpointUpdatedReason::Toggled,
1099                    &self.breakpoint_store,
1100                    cx,
1101                );
1102
1103                cx.spawn(async move |this, cx| {
1104                    task.await;
1105                    this.update(cx, |this, cx| {
1106                        this.continue_thread(active_thread_id, cx);
1107                    })
1108                })
1109                .detach();
1110            }
1111            Mode::Building => {}
1112        }
1113    }
1114
1115    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1116        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1117    }
1118
1119    pub fn output(
1120        &self,
1121        since: OutputToken,
1122    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1123        if self.output_token.0 == 0 {
1124            return (self.output.range(0..0), OutputToken(0));
1125        };
1126
1127        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1128
1129        let clamped_events_since = events_since.clamp(0, self.output.len());
1130        (
1131            self.output
1132                .range(self.output.len() - clamped_events_since..),
1133            self.output_token,
1134        )
1135    }
1136
1137    pub fn respond_to_client(
1138        &self,
1139        request_seq: u64,
1140        success: bool,
1141        command: String,
1142        body: Option<serde_json::Value>,
1143        cx: &mut Context<Self>,
1144    ) -> Task<Result<()>> {
1145        let Some(local_session) = self.as_local() else {
1146            unreachable!("Cannot respond to remote client");
1147        };
1148        let client = local_session.client.clone();
1149
1150        cx.background_spawn(async move {
1151            client
1152                .send_message(Message::Response(Response {
1153                    body,
1154                    success,
1155                    command,
1156                    seq: request_seq + 1,
1157                    request_seq,
1158                    message: None,
1159                }))
1160                .await
1161        })
1162    }
1163
1164    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1165        // todo(debugger): Find a clean way to get around the clone
1166        let breakpoint_store = self.breakpoint_store.clone();
1167        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1168            let breakpoint = local.tmp_breakpoint.take()?;
1169            let path = breakpoint.path.clone();
1170            Some((local, path))
1171        }) {
1172            local
1173                .send_breakpoints_from_path(
1174                    path,
1175                    BreakpointUpdatedReason::Toggled,
1176                    &breakpoint_store,
1177                    cx,
1178                )
1179                .detach();
1180        };
1181
1182        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1183            self.thread_states.stop_all_threads();
1184
1185            self.invalidate_command_type::<StackTraceCommand>();
1186        }
1187
1188        // Event if we stopped all threads we still need to insert the thread_id
1189        // to our own data
1190        if let Some(thread_id) = event.thread_id {
1191            self.thread_states.stop_thread(ThreadId(thread_id));
1192
1193            self.invalidate_state(
1194                &StackTraceCommand {
1195                    thread_id,
1196                    start_frame: None,
1197                    levels: None,
1198                }
1199                .into(),
1200            );
1201        }
1202
1203        self.invalidate_generic();
1204        self.threads.clear();
1205        self.variables.clear();
1206        cx.emit(SessionEvent::Stopped(
1207            event
1208                .thread_id
1209                .map(Into::into)
1210                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1211        ));
1212        cx.emit(SessionEvent::InvalidateInlineValue);
1213        cx.notify();
1214    }
1215
1216    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1217        match *event {
1218            Events::Initialized(_) => {
1219                debug_assert!(
1220                    false,
1221                    "Initialized event should have been handled in LocalMode"
1222                );
1223            }
1224            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1225            Events::Continued(event) => {
1226                if event.all_threads_continued.unwrap_or_default() {
1227                    self.thread_states.continue_all_threads();
1228                    self.breakpoint_store.update(cx, |store, cx| {
1229                        store.remove_active_position(Some(self.session_id()), cx)
1230                    });
1231                } else {
1232                    self.thread_states
1233                        .continue_thread(ThreadId(event.thread_id));
1234                }
1235                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1236                self.invalidate_generic();
1237            }
1238            Events::Exited(_event) => {
1239                self.clear_active_debug_line(cx);
1240            }
1241            Events::Terminated(_) => {
1242                self.shutdown(cx).detach();
1243            }
1244            Events::Thread(event) => {
1245                let thread_id = ThreadId(event.thread_id);
1246
1247                match event.reason {
1248                    dap::ThreadEventReason::Started => {
1249                        self.thread_states.continue_thread(thread_id);
1250                    }
1251                    dap::ThreadEventReason::Exited => {
1252                        self.thread_states.exit_thread(thread_id);
1253                    }
1254                    reason => {
1255                        log::error!("Unhandled thread event reason {:?}", reason);
1256                    }
1257                }
1258                self.invalidate_state(&ThreadsCommand.into());
1259                cx.notify();
1260            }
1261            Events::Output(event) => {
1262                if event
1263                    .category
1264                    .as_ref()
1265                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1266                {
1267                    return;
1268                }
1269
1270                self.push_output(event, cx);
1271                cx.notify();
1272            }
1273            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1274                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1275            }),
1276            Events::Module(event) => {
1277                match event.reason {
1278                    dap::ModuleEventReason::New => {
1279                        self.modules.push(event.module);
1280                    }
1281                    dap::ModuleEventReason::Changed => {
1282                        if let Some(module) = self
1283                            .modules
1284                            .iter_mut()
1285                            .find(|other| event.module.id == other.id)
1286                        {
1287                            *module = event.module;
1288                        }
1289                    }
1290                    dap::ModuleEventReason::Removed => {
1291                        self.modules.retain(|other| event.module.id != other.id);
1292                    }
1293                }
1294
1295                // todo(debugger): We should only send the invalidate command to downstream clients.
1296                // self.invalidate_state(&ModulesCommand.into());
1297            }
1298            Events::LoadedSource(_) => {
1299                self.invalidate_state(&LoadedSourcesCommand.into());
1300            }
1301            Events::Capabilities(event) => {
1302                self.capabilities = self.capabilities.merge(event.capabilities);
1303                cx.notify();
1304            }
1305            Events::Memory(_) => {}
1306            Events::Process(_) => {}
1307            Events::ProgressEnd(_) => {}
1308            Events::ProgressStart(_) => {}
1309            Events::ProgressUpdate(_) => {}
1310            Events::Invalidated(_) => {}
1311            Events::Other(_) => {}
1312        }
1313    }
1314
1315    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1316    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1317        &mut self,
1318        request: T,
1319        process_result: impl FnOnce(
1320            &mut Self,
1321            Result<T::Response>,
1322            &mut Context<Self>,
1323        ) -> Option<T::Response>
1324        + 'static,
1325        cx: &mut Context<Self>,
1326    ) {
1327        const {
1328            assert!(
1329                T::CACHEABLE,
1330                "Only requests marked as cacheable should invoke `fetch`"
1331            );
1332        }
1333
1334        if !self.thread_states.any_stopped_thread()
1335            && request.type_id() != TypeId::of::<ThreadsCommand>()
1336            || self.is_session_terminated
1337        {
1338            return;
1339        }
1340
1341        let request_map = self
1342            .requests
1343            .entry(std::any::TypeId::of::<T>())
1344            .or_default();
1345
1346        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1347            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1348
1349            let task = Self::request_inner::<Arc<T>>(
1350                &self.capabilities,
1351                &self.mode,
1352                command,
1353                process_result,
1354                cx,
1355            );
1356            let task = cx
1357                .background_executor()
1358                .spawn(async move {
1359                    let _ = task.await?;
1360                    Some(())
1361                })
1362                .shared();
1363
1364            vacant.insert(task);
1365            cx.notify();
1366        }
1367    }
1368
1369    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1370        &self,
1371        request: T,
1372    ) -> Result<T::Response> {
1373        if !T::is_supported(&self.capabilities) {
1374            anyhow::bail!("DAP request {:?} is not supported", request);
1375        }
1376
1377        self.mode.request_dap(request).await
1378    }
1379
1380    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1381        capabilities: &Capabilities,
1382        mode: &Mode,
1383        request: T,
1384        process_result: impl FnOnce(
1385            &mut Self,
1386            Result<T::Response>,
1387            &mut Context<Self>,
1388        ) -> Option<T::Response>
1389        + 'static,
1390        cx: &mut Context<Self>,
1391    ) -> Task<Option<T::Response>> {
1392        if !T::is_supported(&capabilities) {
1393            log::warn!(
1394                "Attempted to send a DAP request that isn't supported: {:?}",
1395                request
1396            );
1397            let error = Err(anyhow::Error::msg(
1398                "Couldn't complete request because it's not supported",
1399            ));
1400            return cx.spawn(async move |this, cx| {
1401                this.update(cx, |this, cx| process_result(this, error, cx))
1402                    .log_err()
1403                    .flatten()
1404            });
1405        }
1406
1407        let request = mode.request_dap(request);
1408        cx.spawn(async move |this, cx| {
1409            let result = request.await;
1410            this.update(cx, |this, cx| process_result(this, result, cx))
1411                .log_err()
1412                .flatten()
1413        })
1414    }
1415
1416    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1417        &self,
1418        request: T,
1419        process_result: impl FnOnce(
1420            &mut Self,
1421            Result<T::Response>,
1422            &mut Context<Self>,
1423        ) -> Option<T::Response>
1424        + 'static,
1425        cx: &mut Context<Self>,
1426    ) -> Task<Option<T::Response>> {
1427        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1428    }
1429
1430    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1431        self.requests.remove(&std::any::TypeId::of::<Command>());
1432    }
1433
1434    fn invalidate_generic(&mut self) {
1435        self.invalidate_command_type::<ModulesCommand>();
1436        self.invalidate_command_type::<LoadedSourcesCommand>();
1437        self.invalidate_command_type::<ThreadsCommand>();
1438    }
1439
1440    fn invalidate_state(&mut self, key: &RequestSlot) {
1441        self.requests
1442            .entry((&*key.0 as &dyn Any).type_id())
1443            .and_modify(|request_map| {
1444                request_map.remove(&key);
1445            });
1446    }
1447
1448    fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1449        self.output.push_back(event);
1450        self.output_token.0 += 1;
1451        cx.emit(SessionEvent::ConsoleOutput);
1452    }
1453
1454    pub fn any_stopped_thread(&self) -> bool {
1455        self.thread_states.any_stopped_thread()
1456    }
1457
1458    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1459        self.thread_states.thread_status(thread_id)
1460    }
1461
1462    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1463        self.fetch(
1464            dap_command::ThreadsCommand,
1465            |this, result, cx| {
1466                let result = result.log_err()?;
1467
1468                this.threads = result
1469                    .iter()
1470                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1471                    .collect();
1472
1473                this.invalidate_command_type::<StackTraceCommand>();
1474                cx.emit(SessionEvent::Threads);
1475                cx.notify();
1476
1477                Some(result)
1478            },
1479            cx,
1480        );
1481
1482        self.threads
1483            .values()
1484            .map(|thread| {
1485                (
1486                    thread.dap.clone(),
1487                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1488                )
1489            })
1490            .collect()
1491    }
1492
1493    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1494        self.fetch(
1495            dap_command::ModulesCommand,
1496            |this, result, cx| {
1497                let result = result.log_err()?;
1498
1499                this.modules = result.iter().cloned().collect();
1500                cx.emit(SessionEvent::Modules);
1501                cx.notify();
1502
1503                Some(result)
1504            },
1505            cx,
1506        );
1507
1508        &self.modules
1509    }
1510
1511    pub fn ignore_breakpoints(&self) -> bool {
1512        self.ignore_breakpoints
1513    }
1514
1515    pub fn toggle_ignore_breakpoints(
1516        &mut self,
1517        cx: &mut App,
1518    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1519        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1520    }
1521
1522    pub(crate) fn set_ignore_breakpoints(
1523        &mut self,
1524        ignore: bool,
1525        cx: &mut App,
1526    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1527        if self.ignore_breakpoints == ignore {
1528            return Task::ready(HashMap::default());
1529        }
1530
1531        self.ignore_breakpoints = ignore;
1532
1533        if let Some(local) = self.as_local() {
1534            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1535        } else {
1536            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1537            unimplemented!()
1538        }
1539    }
1540
1541    pub fn exception_breakpoints(
1542        &self,
1543    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1544        self.exception_breakpoints.values()
1545    }
1546
1547    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1548        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1549            *is_enabled = !*is_enabled;
1550            self.send_exception_breakpoints(cx);
1551        }
1552    }
1553
1554    fn send_exception_breakpoints(&mut self, cx: &App) {
1555        if let Some(local) = self.as_local() {
1556            let exception_filters = self
1557                .exception_breakpoints
1558                .values()
1559                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1560                .collect();
1561
1562            let supports_exception_filters = self
1563                .capabilities
1564                .supports_exception_filter_options
1565                .unwrap_or_default();
1566            local
1567                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1568                .detach_and_log_err(cx);
1569        } else {
1570            debug_assert!(false, "Not implemented");
1571        }
1572    }
1573
1574    pub fn breakpoints_enabled(&self) -> bool {
1575        self.ignore_breakpoints
1576    }
1577
1578    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1579        self.fetch(
1580            dap_command::LoadedSourcesCommand,
1581            |this, result, cx| {
1582                let result = result.log_err()?;
1583                this.loaded_sources = result.iter().cloned().collect();
1584                cx.emit(SessionEvent::LoadedSources);
1585                cx.notify();
1586                Some(result)
1587            },
1588            cx,
1589        );
1590
1591        &self.loaded_sources
1592    }
1593
1594    fn fallback_to_manual_restart(
1595        &mut self,
1596        res: Result<()>,
1597        cx: &mut Context<Self>,
1598    ) -> Option<()> {
1599        if res.log_err().is_none() {
1600            cx.emit(SessionStateEvent::Restart);
1601            return None;
1602        }
1603        Some(())
1604    }
1605
1606    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1607        res.log_err()?;
1608        Some(())
1609    }
1610
1611    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1612        thread_id: ThreadId,
1613    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1614    {
1615        move |this, response, cx| match response.log_err() {
1616            Some(response) => {
1617                this.breakpoint_store.update(cx, |store, cx| {
1618                    store.remove_active_position(Some(this.session_id()), cx)
1619                });
1620                Some(response)
1621            }
1622            None => {
1623                this.thread_states.stop_thread(thread_id);
1624                cx.notify();
1625                None
1626            }
1627        }
1628    }
1629
1630    fn clear_active_debug_line_response(
1631        &mut self,
1632        response: Result<()>,
1633        cx: &mut Context<Session>,
1634    ) -> Option<()> {
1635        response.log_err()?;
1636        self.clear_active_debug_line(cx);
1637        Some(())
1638    }
1639
1640    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1641        self.breakpoint_store.update(cx, |store, cx| {
1642            store.remove_active_position(Some(self.id), cx)
1643        });
1644    }
1645
1646    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1647        self.request(
1648            PauseCommand {
1649                thread_id: thread_id.0,
1650            },
1651            Self::empty_response,
1652            cx,
1653        )
1654        .detach();
1655    }
1656
1657    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1658        self.request(
1659            RestartStackFrameCommand { stack_frame_id },
1660            Self::empty_response,
1661            cx,
1662        )
1663        .detach();
1664    }
1665
1666    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1667        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1668            self.request(
1669                RestartCommand {
1670                    raw: args.unwrap_or(Value::Null),
1671                },
1672                Self::fallback_to_manual_restart,
1673                cx,
1674            )
1675            .detach();
1676        } else {
1677            cx.emit(SessionStateEvent::Restart);
1678        }
1679    }
1680
1681    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1682        let debug_adapter = self.adapter_client();
1683
1684        cx.background_spawn(async move {
1685            if let Some(client) = debug_adapter {
1686                client.shutdown().await.log_err();
1687            }
1688        })
1689    }
1690
1691    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1692        self.is_session_terminated = true;
1693        self.thread_states.exit_all_threads();
1694        cx.notify();
1695
1696        let task = if self
1697            .capabilities
1698            .supports_terminate_request
1699            .unwrap_or_default()
1700        {
1701            self.request(
1702                TerminateCommand {
1703                    restart: Some(false),
1704                },
1705                Self::clear_active_debug_line_response,
1706                cx,
1707            )
1708        } else {
1709            self.request(
1710                DisconnectCommand {
1711                    restart: Some(false),
1712                    terminate_debuggee: Some(true),
1713                    suspend_debuggee: Some(false),
1714                },
1715                Self::clear_active_debug_line_response,
1716                cx,
1717            )
1718        };
1719
1720        cx.emit(SessionStateEvent::Shutdown);
1721
1722        let debug_client = self.adapter_client();
1723
1724        cx.background_spawn(async move {
1725            let _ = task.await;
1726
1727            if let Some(client) = debug_client {
1728                client.shutdown().await.log_err();
1729            }
1730        })
1731    }
1732
1733    pub fn completions(
1734        &mut self,
1735        query: CompletionsQuery,
1736        cx: &mut Context<Self>,
1737    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1738        let task = self.request(query, |_, result, _| result.log_err(), cx);
1739
1740        cx.background_executor().spawn(async move {
1741            anyhow::Ok(
1742                task.await
1743                    .map(|response| response.targets)
1744                    .context("failed to fetch completions")?,
1745            )
1746        })
1747    }
1748
1749    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1750        self.thread_states.continue_thread(thread_id);
1751        self.request(
1752            ContinueCommand {
1753                args: ContinueArguments {
1754                    thread_id: thread_id.0,
1755                    single_thread: Some(true),
1756                },
1757            },
1758            Self::on_step_response::<ContinueCommand>(thread_id),
1759            cx,
1760        )
1761        .detach();
1762    }
1763
1764    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1765        match self.mode {
1766            Mode::Running(ref local) => Some(local.client.clone()),
1767            Mode::Building => None,
1768        }
1769    }
1770
1771    pub fn step_over(
1772        &mut self,
1773        thread_id: ThreadId,
1774        granularity: SteppingGranularity,
1775        cx: &mut Context<Self>,
1776    ) {
1777        let supports_single_thread_execution_requests =
1778            self.capabilities.supports_single_thread_execution_requests;
1779        let supports_stepping_granularity = self
1780            .capabilities
1781            .supports_stepping_granularity
1782            .unwrap_or_default();
1783
1784        let command = NextCommand {
1785            inner: StepCommand {
1786                thread_id: thread_id.0,
1787                granularity: supports_stepping_granularity.then(|| granularity),
1788                single_thread: supports_single_thread_execution_requests,
1789            },
1790        };
1791
1792        self.thread_states.process_step(thread_id);
1793        self.request(
1794            command,
1795            Self::on_step_response::<NextCommand>(thread_id),
1796            cx,
1797        )
1798        .detach();
1799    }
1800
1801    pub fn step_in(
1802        &mut self,
1803        thread_id: ThreadId,
1804        granularity: SteppingGranularity,
1805        cx: &mut Context<Self>,
1806    ) {
1807        let supports_single_thread_execution_requests =
1808            self.capabilities.supports_single_thread_execution_requests;
1809        let supports_stepping_granularity = self
1810            .capabilities
1811            .supports_stepping_granularity
1812            .unwrap_or_default();
1813
1814        let command = StepInCommand {
1815            inner: StepCommand {
1816                thread_id: thread_id.0,
1817                granularity: supports_stepping_granularity.then(|| granularity),
1818                single_thread: supports_single_thread_execution_requests,
1819            },
1820        };
1821
1822        self.thread_states.process_step(thread_id);
1823        self.request(
1824            command,
1825            Self::on_step_response::<StepInCommand>(thread_id),
1826            cx,
1827        )
1828        .detach();
1829    }
1830
1831    pub fn step_out(
1832        &mut self,
1833        thread_id: ThreadId,
1834        granularity: SteppingGranularity,
1835        cx: &mut Context<Self>,
1836    ) {
1837        let supports_single_thread_execution_requests =
1838            self.capabilities.supports_single_thread_execution_requests;
1839        let supports_stepping_granularity = self
1840            .capabilities
1841            .supports_stepping_granularity
1842            .unwrap_or_default();
1843
1844        let command = StepOutCommand {
1845            inner: StepCommand {
1846                thread_id: thread_id.0,
1847                granularity: supports_stepping_granularity.then(|| granularity),
1848                single_thread: supports_single_thread_execution_requests,
1849            },
1850        };
1851
1852        self.thread_states.process_step(thread_id);
1853        self.request(
1854            command,
1855            Self::on_step_response::<StepOutCommand>(thread_id),
1856            cx,
1857        )
1858        .detach();
1859    }
1860
1861    pub fn step_back(
1862        &mut self,
1863        thread_id: ThreadId,
1864        granularity: SteppingGranularity,
1865        cx: &mut Context<Self>,
1866    ) {
1867        let supports_single_thread_execution_requests =
1868            self.capabilities.supports_single_thread_execution_requests;
1869        let supports_stepping_granularity = self
1870            .capabilities
1871            .supports_stepping_granularity
1872            .unwrap_or_default();
1873
1874        let command = StepBackCommand {
1875            inner: StepCommand {
1876                thread_id: thread_id.0,
1877                granularity: supports_stepping_granularity.then(|| granularity),
1878                single_thread: supports_single_thread_execution_requests,
1879            },
1880        };
1881
1882        self.thread_states.process_step(thread_id);
1883
1884        self.request(
1885            command,
1886            Self::on_step_response::<StepBackCommand>(thread_id),
1887            cx,
1888        )
1889        .detach();
1890    }
1891
1892    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1893        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1894            && self.requests.contains_key(&ThreadsCommand.type_id())
1895            && self.threads.contains_key(&thread_id)
1896        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1897        // We could still be using an old thread id and have sent a new thread's request
1898        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1899        // But it very well could cause a minor bug in the future that is hard to track down
1900        {
1901            self.fetch(
1902                super::dap_command::StackTraceCommand {
1903                    thread_id: thread_id.0,
1904                    start_frame: None,
1905                    levels: None,
1906                },
1907                move |this, stack_frames, cx| {
1908                    let stack_frames = stack_frames.log_err()?;
1909
1910                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1911                        thread.stack_frame_ids =
1912                            stack_frames.iter().map(|frame| frame.id).collect();
1913                    });
1914                    debug_assert!(
1915                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1916                        "Sent request for thread_id that doesn't exist"
1917                    );
1918
1919                    this.stack_frames.extend(
1920                        stack_frames
1921                            .iter()
1922                            .cloned()
1923                            .map(|frame| (frame.id, StackFrame::from(frame))),
1924                    );
1925
1926                    this.invalidate_command_type::<ScopesCommand>();
1927                    this.invalidate_command_type::<VariablesCommand>();
1928
1929                    cx.emit(SessionEvent::StackTrace);
1930                    cx.notify();
1931                    Some(stack_frames)
1932                },
1933                cx,
1934            );
1935        }
1936
1937        self.threads
1938            .get(&thread_id)
1939            .map(|thread| {
1940                thread
1941                    .stack_frame_ids
1942                    .iter()
1943                    .filter_map(|id| self.stack_frames.get(id))
1944                    .cloned()
1945                    .collect()
1946            })
1947            .unwrap_or_default()
1948    }
1949
1950    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1951        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1952            && self
1953                .requests
1954                .contains_key(&TypeId::of::<StackTraceCommand>())
1955        {
1956            self.fetch(
1957                ScopesCommand { stack_frame_id },
1958                move |this, scopes, cx| {
1959                    let scopes = scopes.log_err()?;
1960
1961                    for scope in scopes .iter(){
1962                        this.variables(scope.variables_reference, cx);
1963                    }
1964
1965                    let entry = this
1966                        .stack_frames
1967                        .entry(stack_frame_id)
1968                        .and_modify(|stack_frame| {
1969                            stack_frame.scopes = scopes.clone();
1970                        });
1971
1972                    cx.emit(SessionEvent::Variables);
1973
1974                    debug_assert!(
1975                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1976                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1977                    );
1978
1979                    Some(scopes)
1980                },
1981                cx,
1982            );
1983        }
1984
1985        self.stack_frames
1986            .get(&stack_frame_id)
1987            .map(|frame| frame.scopes.as_slice())
1988            .unwrap_or_default()
1989    }
1990
1991    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
1992        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
1993            return Vec::new();
1994        };
1995
1996        stack_frame
1997            .scopes
1998            .iter()
1999            .filter_map(|scope| self.variables.get(&scope.variables_reference))
2000            .flatten()
2001            .cloned()
2002            .collect()
2003    }
2004
2005    pub fn variables(
2006        &mut self,
2007        variables_reference: VariableReference,
2008        cx: &mut Context<Self>,
2009    ) -> Vec<dap::Variable> {
2010        let command = VariablesCommand {
2011            variables_reference,
2012            filter: None,
2013            start: None,
2014            count: None,
2015            format: None,
2016        };
2017
2018        self.fetch(
2019            command,
2020            move |this, variables, cx| {
2021                let variables = variables.log_err()?;
2022                this.variables
2023                    .insert(variables_reference, variables.clone());
2024
2025                cx.emit(SessionEvent::Variables);
2026                cx.emit(SessionEvent::InvalidateInlineValue);
2027                Some(variables)
2028            },
2029            cx,
2030        );
2031
2032        self.variables
2033            .get(&variables_reference)
2034            .cloned()
2035            .unwrap_or_default()
2036    }
2037
2038    pub fn set_variable_value(
2039        &mut self,
2040        variables_reference: u64,
2041        name: String,
2042        value: String,
2043        cx: &mut Context<Self>,
2044    ) {
2045        if self.capabilities.supports_set_variable.unwrap_or_default() {
2046            self.request(
2047                SetVariableValueCommand {
2048                    name,
2049                    value,
2050                    variables_reference,
2051                },
2052                move |this, response, cx| {
2053                    let response = response.log_err()?;
2054                    this.invalidate_command_type::<VariablesCommand>();
2055                    cx.notify();
2056                    Some(response)
2057                },
2058                cx,
2059            )
2060            .detach()
2061        }
2062    }
2063
2064    pub fn evaluate(
2065        &mut self,
2066        expression: String,
2067        context: Option<EvaluateArgumentsContext>,
2068        frame_id: Option<u64>,
2069        source: Option<Source>,
2070        cx: &mut Context<Self>,
2071    ) -> Task<()> {
2072        let event = dap::OutputEvent {
2073            category: None,
2074            output: format!("> {expression}"),
2075            group: None,
2076            variables_reference: None,
2077            source: None,
2078            line: None,
2079            column: None,
2080            data: None,
2081            location_reference: None,
2082        };
2083        self.push_output(event, cx);
2084        let request = self.mode.request_dap(EvaluateCommand {
2085            expression,
2086            context,
2087            frame_id,
2088            source,
2089        });
2090        cx.spawn(async move |this, cx| {
2091            let response = request.await;
2092            this.update(cx, |this, cx| {
2093                match response {
2094                    Ok(response) => {
2095                        let event = dap::OutputEvent {
2096                            category: None,
2097                            output: format!("< {}", &response.result),
2098                            group: None,
2099                            variables_reference: Some(response.variables_reference),
2100                            source: None,
2101                            line: None,
2102                            column: None,
2103                            data: None,
2104                            location_reference: None,
2105                        };
2106                        this.push_output(event, cx);
2107                    }
2108                    Err(e) => {
2109                        let event = dap::OutputEvent {
2110                            category: None,
2111                            output: format!("{}", e),
2112                            group: None,
2113                            variables_reference: None,
2114                            source: None,
2115                            line: None,
2116                            column: None,
2117                            data: None,
2118                            location_reference: None,
2119                        };
2120                        this.push_output(event, cx);
2121                    }
2122                };
2123                this.invalidate_command_type::<ScopesCommand>();
2124                cx.notify();
2125            })
2126            .ok();
2127        })
2128    }
2129
2130    pub fn location(
2131        &mut self,
2132        reference: u64,
2133        cx: &mut Context<Self>,
2134    ) -> Option<dap::LocationsResponse> {
2135        self.fetch(
2136            LocationsCommand { reference },
2137            move |this, response, _| {
2138                let response = response.log_err()?;
2139                this.locations.insert(reference, response.clone());
2140                Some(response)
2141            },
2142            cx,
2143        );
2144        self.locations.get(&reference).cloned()
2145    }
2146
2147    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2148        let command = DisconnectCommand {
2149            restart: Some(false),
2150            terminate_debuggee: Some(true),
2151            suspend_debuggee: Some(false),
2152        };
2153
2154        self.request(command, Self::empty_response, cx).detach()
2155    }
2156
2157    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2158        if self
2159            .capabilities
2160            .supports_terminate_threads_request
2161            .unwrap_or_default()
2162        {
2163            self.request(
2164                TerminateThreadsCommand {
2165                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2166                },
2167                Self::clear_active_debug_line_response,
2168                cx,
2169            )
2170            .detach();
2171        } else {
2172            self.shutdown(cx).detach();
2173        }
2174    }
2175}