session.rs

   1use crate::debugger::breakpoint_store::BreakpointSessionState;
   2
   3use super::breakpoint_store::{
   4    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   5};
   6use super::dap_command::{
   7    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   8    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   9    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
  10    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
  11    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  12    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  13};
  14use super::dap_store::DapStore;
  15use anyhow::{Context as _, Result, anyhow};
  16use collections::{HashMap, HashSet, IndexMap, IndexSet};
  17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
  18use dap::messages::Response;
  19use dap::requests::{Request, RunInTerminal, StartDebugging};
  20use dap::{
  21    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  22    SteppingGranularity, StoppedEvent, VariableReference,
  23    client::{DebugAdapterClient, SessionId},
  24    messages::{Events, Message},
  25};
  26use dap::{
  27    ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
  28    RunInTerminalRequestArguments, StartDebuggingRequestArguments,
  29};
  30use futures::channel::{mpsc, oneshot};
  31use futures::{FutureExt, future::Shared};
  32use gpui::{
  33    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  34    Task, WeakEntity,
  35};
  36
  37use serde_json::Value;
  38use smol::stream::StreamExt;
  39use std::any::TypeId;
  40use std::collections::BTreeMap;
  41use std::u64;
  42use std::{
  43    any::Any,
  44    collections::hash_map::Entry,
  45    hash::{Hash, Hasher},
  46    path::Path,
  47    sync::Arc,
  48};
  49use text::{PointUtf16, ToPointUtf16};
  50use util::ResultExt;
  51use worktree::Worktree;
  52
  53#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  54#[repr(transparent)]
  55pub struct ThreadId(pub u64);
  56
  57impl ThreadId {
  58    pub const MIN: ThreadId = ThreadId(u64::MIN);
  59    pub const MAX: ThreadId = ThreadId(u64::MAX);
  60}
  61
  62impl From<u64> for ThreadId {
  63    fn from(id: u64) -> Self {
  64        Self(id)
  65    }
  66}
  67
  68#[derive(Clone, Debug)]
  69pub struct StackFrame {
  70    pub dap: dap::StackFrame,
  71    pub scopes: Vec<dap::Scope>,
  72}
  73
  74impl From<dap::StackFrame> for StackFrame {
  75    fn from(stack_frame: dap::StackFrame) -> Self {
  76        Self {
  77            scopes: vec![],
  78            dap: stack_frame,
  79        }
  80    }
  81}
  82
  83#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  84pub enum ThreadStatus {
  85    #[default]
  86    Running,
  87    Stopped,
  88    Stepping,
  89    Exited,
  90    Ended,
  91}
  92
  93impl ThreadStatus {
  94    pub fn label(&self) -> &'static str {
  95        match self {
  96            ThreadStatus::Running => "Running",
  97            ThreadStatus::Stopped => "Stopped",
  98            ThreadStatus::Stepping => "Stepping",
  99            ThreadStatus::Exited => "Exited",
 100            ThreadStatus::Ended => "Ended",
 101        }
 102    }
 103}
 104
 105#[derive(Debug)]
 106pub struct Thread {
 107    dap: dap::Thread,
 108    stack_frame_ids: IndexSet<StackFrameId>,
 109    _has_stopped: bool,
 110}
 111
 112impl From<dap::Thread> for Thread {
 113    fn from(dap: dap::Thread) -> Self {
 114        Self {
 115            dap,
 116            stack_frame_ids: Default::default(),
 117            _has_stopped: false,
 118        }
 119    }
 120}
 121
 122pub enum Mode {
 123    Building,
 124    Running(LocalMode),
 125}
 126
 127#[derive(Clone)]
 128pub struct LocalMode {
 129    client: Arc<DebugAdapterClient>,
 130    binary: DebugAdapterBinary,
 131    tmp_breakpoint: Option<SourceBreakpoint>,
 132    worktree: WeakEntity<Worktree>,
 133    executor: BackgroundExecutor,
 134}
 135
 136fn client_source(abs_path: &Path) -> dap::Source {
 137    dap::Source {
 138        name: abs_path
 139            .file_name()
 140            .map(|filename| filename.to_string_lossy().to_string()),
 141        path: Some(abs_path.to_string_lossy().to_string()),
 142        source_reference: None,
 143        presentation_hint: None,
 144        origin: None,
 145        sources: None,
 146        adapter_data: None,
 147        checksums: None,
 148    }
 149}
 150
 151impl LocalMode {
 152    async fn new(
 153        session_id: SessionId,
 154        parent_session: Option<Entity<Session>>,
 155        worktree: WeakEntity<Worktree>,
 156        binary: DebugAdapterBinary,
 157        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 158        cx: AsyncApp,
 159    ) -> Result<Self> {
 160        let message_handler = Box::new(move |message| {
 161            messages_tx.unbounded_send(message).ok();
 162        });
 163
 164        let client = Arc::new(
 165            if let Some(client) = parent_session
 166                .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 167                .flatten()
 168            {
 169                client
 170                    .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 171                    .await?
 172            } else {
 173                DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
 174                    .await?
 175            },
 176        );
 177
 178        Ok(Self {
 179            client,
 180            worktree,
 181            tmp_breakpoint: None,
 182            binary,
 183            executor: cx.background_executor().clone(),
 184        })
 185    }
 186
 187    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 188        &self.worktree
 189    }
 190
 191    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 192        let tasks: Vec<_> = paths
 193            .into_iter()
 194            .map(|path| {
 195                self.request(dap_command::SetBreakpoints {
 196                    source: client_source(path),
 197                    source_modified: None,
 198                    breakpoints: vec![],
 199                })
 200            })
 201            .collect();
 202
 203        cx.background_spawn(async move {
 204            futures::future::join_all(tasks)
 205                .await
 206                .iter()
 207                .for_each(|res| match res {
 208                    Ok(_) => {}
 209                    Err(err) => {
 210                        log::warn!("Set breakpoints request failed: {}", err);
 211                    }
 212                });
 213        })
 214    }
 215
 216    fn send_breakpoints_from_path(
 217        &self,
 218        abs_path: Arc<Path>,
 219        reason: BreakpointUpdatedReason,
 220        breakpoint_store: &Entity<BreakpointStore>,
 221        cx: &mut App,
 222    ) -> Task<()> {
 223        let breakpoints =
 224            breakpoint_store
 225                .read(cx)
 226                .source_breakpoints_from_path(&abs_path, cx)
 227                .into_iter()
 228                .filter(|bp| bp.state.is_enabled())
 229                .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
 230                    breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
 231                }))
 232                .map(Into::into)
 233                .collect();
 234
 235        let raw_breakpoints = breakpoint_store
 236            .read(cx)
 237            .breakpoints_from_path(&abs_path)
 238            .into_iter()
 239            .filter(|bp| bp.bp.state.is_enabled())
 240            .collect::<Vec<_>>();
 241
 242        let task = self.request(dap_command::SetBreakpoints {
 243            source: client_source(&abs_path),
 244            source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 245            breakpoints,
 246        });
 247        let session_id = self.client.id();
 248        let breakpoint_store = breakpoint_store.downgrade();
 249        cx.spawn(async move |cx| match cx.background_spawn(task).await {
 250            Ok(breakpoints) => {
 251                let breakpoints =
 252                    breakpoints
 253                        .into_iter()
 254                        .zip(raw_breakpoints)
 255                        .filter_map(|(dap_bp, zed_bp)| {
 256                            Some((
 257                                zed_bp,
 258                                BreakpointSessionState {
 259                                    id: dap_bp.id?,
 260                                    verified: dap_bp.verified,
 261                                },
 262                            ))
 263                        });
 264                breakpoint_store
 265                    .update(cx, |this, _| {
 266                        this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
 267                    })
 268                    .ok();
 269            }
 270            Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 271        })
 272    }
 273
 274    fn send_exception_breakpoints(
 275        &self,
 276        filters: Vec<ExceptionBreakpointsFilter>,
 277        supports_filter_options: bool,
 278    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 279        let arg = if supports_filter_options {
 280            SetExceptionBreakpoints::WithOptions {
 281                filters: filters
 282                    .into_iter()
 283                    .map(|filter| ExceptionFilterOptions {
 284                        filter_id: filter.filter,
 285                        condition: None,
 286                        mode: None,
 287                    })
 288                    .collect(),
 289            }
 290        } else {
 291            SetExceptionBreakpoints::Plain {
 292                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 293            }
 294        };
 295        self.request(arg)
 296    }
 297
 298    fn send_source_breakpoints(
 299        &self,
 300        ignore_breakpoints: bool,
 301        breakpoint_store: &Entity<BreakpointStore>,
 302        cx: &App,
 303    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 304        let mut breakpoint_tasks = Vec::new();
 305        let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
 306        let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
 307        debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
 308        let session_id = self.client.id();
 309        for (path, breakpoints) in breakpoints {
 310            let breakpoints = if ignore_breakpoints {
 311                vec![]
 312            } else {
 313                breakpoints
 314                    .into_iter()
 315                    .filter(|bp| bp.state.is_enabled())
 316                    .map(Into::into)
 317                    .collect()
 318            };
 319
 320            let raw_breakpoints = raw_breakpoints
 321                .remove(&path)
 322                .unwrap_or_default()
 323                .into_iter()
 324                .filter(|bp| bp.bp.state.is_enabled());
 325            let error_path = path.clone();
 326            let send_request = self
 327                .request(dap_command::SetBreakpoints {
 328                    source: client_source(&path),
 329                    source_modified: Some(false),
 330                    breakpoints,
 331                })
 332                .map(|result| result.map_err(move |e| (error_path, e)));
 333
 334            let task = cx.spawn({
 335                let breakpoint_store = breakpoint_store.downgrade();
 336                async move |cx| {
 337                    let breakpoints = cx.background_spawn(send_request).await?;
 338
 339                    let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
 340                        |(dap_bp, zed_bp)| {
 341                            Some((
 342                                zed_bp,
 343                                BreakpointSessionState {
 344                                    id: dap_bp.id?,
 345                                    verified: dap_bp.verified,
 346                                },
 347                            ))
 348                        },
 349                    );
 350                    breakpoint_store
 351                        .update(cx, |this, _| {
 352                            this.mark_breakpoints_verified(session_id, &path, breakpoints);
 353                        })
 354                        .ok();
 355
 356                    Ok(())
 357                }
 358            });
 359            breakpoint_tasks.push(task);
 360        }
 361
 362        cx.background_spawn(async move {
 363            futures::future::join_all(breakpoint_tasks)
 364                .await
 365                .into_iter()
 366                .filter_map(Result::err)
 367                .collect::<HashMap<_, _>>()
 368        })
 369    }
 370
 371    fn initialize_sequence(
 372        &self,
 373        capabilities: &Capabilities,
 374        initialized_rx: oneshot::Receiver<()>,
 375        dap_store: WeakEntity<DapStore>,
 376        cx: &App,
 377    ) -> Task<Result<()>> {
 378        let raw = self.binary.request_args.clone();
 379
 380        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 381        let launch = match raw.request {
 382            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
 383                raw: raw.configuration,
 384            }),
 385            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
 386                raw: raw.configuration,
 387            }),
 388        };
 389
 390        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 391        let exception_filters = capabilities
 392            .exception_breakpoint_filters
 393            .as_ref()
 394            .map(|exception_filters| {
 395                exception_filters
 396                    .iter()
 397                    .filter(|filter| filter.default == Some(true))
 398                    .cloned()
 399                    .collect::<Vec<_>>()
 400            })
 401            .unwrap_or_default();
 402        let supports_exception_filters = capabilities
 403            .supports_exception_filter_options
 404            .unwrap_or_default();
 405        let this = self.clone();
 406        let worktree = self.worktree().clone();
 407        let configuration_sequence = cx.spawn({
 408            async move |cx| {
 409                let breakpoint_store =
 410                    dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
 411                initialized_rx.await?;
 412                let errors_by_path = cx
 413                    .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
 414                    .await;
 415
 416                dap_store.update(cx, |_, cx| {
 417                    let Some(worktree) = worktree.upgrade() else {
 418                        return;
 419                    };
 420
 421                    for (path, error) in &errors_by_path {
 422                        log::error!("failed to set breakpoints for {path:?}: {error}");
 423                    }
 424
 425                    if let Some(failed_path) = errors_by_path.keys().next() {
 426                        let failed_path = failed_path
 427                            .strip_prefix(worktree.read(cx).abs_path())
 428                            .unwrap_or(failed_path)
 429                            .display();
 430                        let message = format!(
 431                            "Failed to set breakpoints for {failed_path}{}",
 432                            match errors_by_path.len() {
 433                                0 => unreachable!(),
 434                                1 => "".into(),
 435                                2 => " and 1 other path".into(),
 436                                n => format!(" and {} other paths", n - 1),
 437                            }
 438                        );
 439                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 440                    }
 441                })?;
 442
 443                this.send_exception_breakpoints(exception_filters, supports_exception_filters)
 444                    .await
 445                    .ok();
 446                let ret = if configuration_done_supported {
 447                    this.request(ConfigurationDone {})
 448                } else {
 449                    Task::ready(Ok(()))
 450                }
 451                .await;
 452                ret
 453            }
 454        });
 455
 456        cx.background_spawn(async move {
 457            futures::future::try_join(launch, configuration_sequence).await?;
 458            Ok(())
 459        })
 460    }
 461
 462    fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
 463    where
 464        <R::DapRequest as dap::requests::Request>::Response: 'static,
 465        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 466    {
 467        let request = Arc::new(request);
 468
 469        let request_clone = request.clone();
 470        let connection = self.client.clone();
 471        self.executor.spawn(async move {
 472            let args = request_clone.to_dap();
 473            let response = connection.request::<R::DapRequest>(args).await?;
 474            request.response_from_dap(response)
 475        })
 476    }
 477}
 478
 479impl Mode {
 480    pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
 481    where
 482        <R::DapRequest as dap::requests::Request>::Response: 'static,
 483        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 484    {
 485        match self {
 486            Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
 487            Mode::Building => Task::ready(Err(anyhow!(
 488                "no adapter running to send request: {request:?}"
 489            ))),
 490        }
 491    }
 492}
 493
 494#[derive(Default)]
 495struct ThreadStates {
 496    global_state: Option<ThreadStatus>,
 497    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 498}
 499
 500impl ThreadStates {
 501    fn stop_all_threads(&mut self) {
 502        self.global_state = Some(ThreadStatus::Stopped);
 503        self.known_thread_states.clear();
 504    }
 505
 506    fn exit_all_threads(&mut self) {
 507        self.global_state = Some(ThreadStatus::Exited);
 508        self.known_thread_states.clear();
 509    }
 510
 511    fn continue_all_threads(&mut self) {
 512        self.global_state = Some(ThreadStatus::Running);
 513        self.known_thread_states.clear();
 514    }
 515
 516    fn stop_thread(&mut self, thread_id: ThreadId) {
 517        self.known_thread_states
 518            .insert(thread_id, ThreadStatus::Stopped);
 519    }
 520
 521    fn continue_thread(&mut self, thread_id: ThreadId) {
 522        self.known_thread_states
 523            .insert(thread_id, ThreadStatus::Running);
 524    }
 525
 526    fn process_step(&mut self, thread_id: ThreadId) {
 527        self.known_thread_states
 528            .insert(thread_id, ThreadStatus::Stepping);
 529    }
 530
 531    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 532        self.thread_state(thread_id)
 533            .unwrap_or(ThreadStatus::Running)
 534    }
 535
 536    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 537        self.known_thread_states
 538            .get(&thread_id)
 539            .copied()
 540            .or(self.global_state)
 541    }
 542
 543    fn exit_thread(&mut self, thread_id: ThreadId) {
 544        self.known_thread_states
 545            .insert(thread_id, ThreadStatus::Exited);
 546    }
 547
 548    fn any_stopped_thread(&self) -> bool {
 549        self.global_state
 550            .is_some_and(|state| state == ThreadStatus::Stopped)
 551            || self
 552                .known_thread_states
 553                .values()
 554                .any(|status| *status == ThreadStatus::Stopped)
 555    }
 556}
 557const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 558
 559type IsEnabled = bool;
 560
 561#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 562pub struct OutputToken(pub usize);
 563/// Represents a current state of a single debug adapter and provides ways to mutate it.
 564pub struct Session {
 565    pub mode: Mode,
 566    id: SessionId,
 567    label: SharedString,
 568    adapter: DebugAdapterName,
 569    pub(super) capabilities: Capabilities,
 570    child_session_ids: HashSet<SessionId>,
 571    parent_session: Option<Entity<Session>>,
 572    modules: Vec<dap::Module>,
 573    loaded_sources: Vec<dap::Source>,
 574    output_token: OutputToken,
 575    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 576    threads: IndexMap<ThreadId, Thread>,
 577    thread_states: ThreadStates,
 578    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 579    stack_frames: IndexMap<StackFrameId, StackFrame>,
 580    locations: HashMap<u64, dap::LocationsResponse>,
 581    is_session_terminated: bool,
 582    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 583    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 584    ignore_breakpoints: bool,
 585    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 586    background_tasks: Vec<Task<()>>,
 587}
 588
 589trait CacheableCommand: Any + Send + Sync {
 590    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 591    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 592    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 593}
 594
 595impl<T> CacheableCommand for T
 596where
 597    T: DapCommand + PartialEq + Eq + Hash,
 598{
 599    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 600        (rhs as &dyn Any)
 601            .downcast_ref::<Self>()
 602            .map_or(false, |rhs| self == rhs)
 603    }
 604
 605    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 606        T::hash(self, &mut hasher);
 607    }
 608
 609    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 610        self
 611    }
 612}
 613
 614pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 615
 616impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 617    fn from(request: T) -> Self {
 618        Self(Arc::new(request))
 619    }
 620}
 621
 622impl PartialEq for RequestSlot {
 623    fn eq(&self, other: &Self) -> bool {
 624        self.0.dyn_eq(other.0.as_ref())
 625    }
 626}
 627
 628impl Eq for RequestSlot {}
 629
 630impl Hash for RequestSlot {
 631    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 632        self.0.dyn_hash(state);
 633        (&*self.0 as &dyn Any).type_id().hash(state)
 634    }
 635}
 636
 637#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 638pub struct CompletionsQuery {
 639    pub query: String,
 640    pub column: u64,
 641    pub line: Option<u64>,
 642    pub frame_id: Option<u64>,
 643}
 644
 645impl CompletionsQuery {
 646    pub fn new(
 647        buffer: &language::Buffer,
 648        cursor_position: language::Anchor,
 649        frame_id: Option<u64>,
 650    ) -> Self {
 651        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 652        Self {
 653            query: buffer.text(),
 654            column: column as u64,
 655            frame_id,
 656            line: Some(row as u64),
 657        }
 658    }
 659}
 660
 661#[derive(Debug)]
 662pub enum SessionEvent {
 663    Modules,
 664    LoadedSources,
 665    Stopped(Option<ThreadId>),
 666    StackTrace,
 667    Variables,
 668    Threads,
 669    InvalidateInlineValue,
 670    CapabilitiesLoaded,
 671    RunInTerminal {
 672        request: RunInTerminalRequestArguments,
 673        sender: mpsc::Sender<Result<u32>>,
 674    },
 675    ConsoleOutput,
 676}
 677
 678#[derive(Clone, Debug, PartialEq, Eq)]
 679pub enum SessionStateEvent {
 680    Running,
 681    Shutdown,
 682    Restart,
 683    SpawnChildSession {
 684        request: StartDebuggingRequestArguments,
 685    },
 686}
 687
 688impl EventEmitter<SessionEvent> for Session {}
 689impl EventEmitter<SessionStateEvent> for Session {}
 690
 691// local session will send breakpoint updates to DAP for all new breakpoints
 692// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 693// BreakpointStore notifies session on breakpoint changes
 694impl Session {
 695    pub(crate) fn new(
 696        breakpoint_store: Entity<BreakpointStore>,
 697        session_id: SessionId,
 698        parent_session: Option<Entity<Session>>,
 699        label: SharedString,
 700        adapter: DebugAdapterName,
 701        cx: &mut App,
 702    ) -> Entity<Self> {
 703        cx.new::<Self>(|cx| {
 704            cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
 705                BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
 706                    if let Some(local) = (!this.ignore_breakpoints)
 707                        .then(|| this.as_local_mut())
 708                        .flatten()
 709                    {
 710                        local
 711                            .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
 712                            .detach();
 713                    };
 714                }
 715                BreakpointStoreEvent::BreakpointsCleared(paths) => {
 716                    if let Some(local) = (!this.ignore_breakpoints)
 717                        .then(|| this.as_local_mut())
 718                        .flatten()
 719                    {
 720                        local.unset_breakpoints_from_paths(paths, cx).detach();
 721                    }
 722                }
 723                BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
 724            })
 725            .detach();
 726            cx.on_app_quit(Self::on_app_quit).detach();
 727
 728            let this = Self {
 729                mode: Mode::Building,
 730                id: session_id,
 731                child_session_ids: HashSet::default(),
 732                parent_session,
 733                capabilities: Capabilities::default(),
 734                variables: Default::default(),
 735                stack_frames: Default::default(),
 736                thread_states: ThreadStates::default(),
 737                output_token: OutputToken(0),
 738                output: circular_buffer::CircularBuffer::boxed(),
 739                requests: HashMap::default(),
 740                modules: Vec::default(),
 741                loaded_sources: Vec::default(),
 742                threads: IndexMap::default(),
 743                background_tasks: Vec::default(),
 744                locations: Default::default(),
 745                is_session_terminated: false,
 746                ignore_breakpoints: false,
 747                breakpoint_store,
 748                exception_breakpoints: Default::default(),
 749                label,
 750                adapter,
 751            };
 752
 753            this
 754        })
 755    }
 756
 757    pub fn worktree(&self) -> Option<Entity<Worktree>> {
 758        match &self.mode {
 759            Mode::Building => None,
 760            Mode::Running(local_mode) => local_mode.worktree.upgrade(),
 761        }
 762    }
 763
 764    pub fn boot(
 765        &mut self,
 766        binary: DebugAdapterBinary,
 767        worktree: Entity<Worktree>,
 768        dap_store: WeakEntity<DapStore>,
 769        cx: &mut Context<Self>,
 770    ) -> Task<Result<()>> {
 771        let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
 772        let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
 773
 774        let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
 775            let mut initialized_tx = Some(initialized_tx);
 776            while let Some(message) = message_rx.next().await {
 777                if let Message::Event(event) = message {
 778                    if let Events::Initialized(_) = *event {
 779                        if let Some(tx) = initialized_tx.take() {
 780                            tx.send(()).ok();
 781                        }
 782                    } else {
 783                        let Ok(_) = this.update(cx, |session, cx| {
 784                            session.handle_dap_event(event, cx);
 785                        }) else {
 786                            break;
 787                        };
 788                    }
 789                } else if let Message::Request(request) = message {
 790                    let Ok(_) = this.update(cx, |this, cx| {
 791                        if request.command == StartDebugging::COMMAND {
 792                            this.handle_start_debugging_request(request, cx)
 793                                .detach_and_log_err(cx);
 794                        } else if request.command == RunInTerminal::COMMAND {
 795                            this.handle_run_in_terminal_request(request, cx)
 796                                .detach_and_log_err(cx);
 797                        }
 798                    }) else {
 799                        break;
 800                    };
 801                }
 802            }
 803        })];
 804        self.background_tasks = background_tasks;
 805        let id = self.id;
 806        let parent_session = self.parent_session.clone();
 807
 808        cx.spawn(async move |this, cx| {
 809            let mode = LocalMode::new(
 810                id,
 811                parent_session,
 812                worktree.downgrade(),
 813                binary,
 814                message_tx,
 815                cx.clone(),
 816            )
 817            .await?;
 818            this.update(cx, |this, cx| {
 819                this.mode = Mode::Running(mode);
 820                cx.emit(SessionStateEvent::Running);
 821            })?;
 822
 823            this.update(cx, |session, cx| session.request_initialize(cx))?
 824                .await?;
 825
 826            this.update(cx, |session, cx| {
 827                session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
 828            })?
 829            .await
 830        })
 831    }
 832
 833    pub fn session_id(&self) -> SessionId {
 834        self.id
 835    }
 836
 837    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 838        self.child_session_ids.clone()
 839    }
 840
 841    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 842        self.child_session_ids.insert(session_id);
 843    }
 844
 845    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 846        self.child_session_ids.remove(&session_id);
 847    }
 848
 849    pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
 850        self.parent_session
 851            .as_ref()
 852            .map(|session| session.read(cx).id)
 853    }
 854
 855    pub fn parent_session(&self) -> Option<&Entity<Self>> {
 856        self.parent_session.as_ref()
 857    }
 858
 859    pub fn capabilities(&self) -> &Capabilities {
 860        &self.capabilities
 861    }
 862
 863    pub fn binary(&self) -> &DebugAdapterBinary {
 864        let Mode::Running(local_mode) = &self.mode else {
 865            panic!("Session is not running");
 866        };
 867        &local_mode.binary
 868    }
 869
 870    pub fn adapter(&self) -> DebugAdapterName {
 871        self.adapter.clone()
 872    }
 873
 874    pub fn label(&self) -> SharedString {
 875        self.label.clone()
 876    }
 877
 878    pub fn is_terminated(&self) -> bool {
 879        self.is_session_terminated
 880    }
 881
 882    pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
 883        let (tx, mut rx) = mpsc::unbounded();
 884
 885        cx.spawn(async move |this, cx| {
 886            while let Some(output) = rx.next().await {
 887                this.update(cx, |this, cx| {
 888                    let event = dap::OutputEvent {
 889                        category: None,
 890                        output,
 891                        group: None,
 892                        variables_reference: None,
 893                        source: None,
 894                        line: None,
 895                        column: None,
 896                        data: None,
 897                        location_reference: None,
 898                    };
 899                    this.push_output(event, cx);
 900                })?;
 901            }
 902            anyhow::Ok(())
 903        })
 904        .detach();
 905
 906        return tx;
 907    }
 908
 909    pub fn is_local(&self) -> bool {
 910        matches!(self.mode, Mode::Running(_))
 911    }
 912
 913    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 914        match &mut self.mode {
 915            Mode::Running(local_mode) => Some(local_mode),
 916            Mode::Building => None,
 917        }
 918    }
 919
 920    pub fn as_local(&self) -> Option<&LocalMode> {
 921        match &self.mode {
 922            Mode::Running(local_mode) => Some(local_mode),
 923            Mode::Building => None,
 924        }
 925    }
 926
 927    fn handle_start_debugging_request(
 928        &mut self,
 929        request: dap::messages::Request,
 930        cx: &mut Context<Self>,
 931    ) -> Task<Result<()>> {
 932        let request_seq = request.seq;
 933
 934        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
 935            .arguments
 936            .as_ref()
 937            .map(|value| serde_json::from_value(value.clone()));
 938
 939        let mut success = true;
 940        if let Some(Ok(request)) = launch_request {
 941            cx.emit(SessionStateEvent::SpawnChildSession { request });
 942        } else {
 943            log::error!(
 944                "Failed to parse launch request arguments: {:?}",
 945                request.arguments
 946            );
 947            success = false;
 948        }
 949
 950        cx.spawn(async move |this, cx| {
 951            this.update(cx, |this, cx| {
 952                this.respond_to_client(
 953                    request_seq,
 954                    success,
 955                    StartDebugging::COMMAND.to_string(),
 956                    None,
 957                    cx,
 958                )
 959            })?
 960            .await
 961        })
 962    }
 963
 964    fn handle_run_in_terminal_request(
 965        &mut self,
 966        request: dap::messages::Request,
 967        cx: &mut Context<Self>,
 968    ) -> Task<Result<()>> {
 969        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
 970            request.arguments.unwrap_or_default(),
 971        )
 972        .expect("To parse StartDebuggingRequestArguments");
 973
 974        let seq = request.seq;
 975
 976        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
 977        cx.emit(SessionEvent::RunInTerminal {
 978            request: request_args,
 979            sender: tx,
 980        });
 981        cx.notify();
 982
 983        cx.spawn(async move |session, cx| {
 984            let result = util::maybe!(async move {
 985                rx.next().await.ok_or_else(|| {
 986                    anyhow!("failed to receive response from spawn terminal".to_string())
 987                })?
 988            })
 989            .await;
 990            let (success, body) = match result {
 991                Ok(pid) => (
 992                    true,
 993                    serde_json::to_value(dap::RunInTerminalResponse {
 994                        process_id: None,
 995                        shell_process_id: Some(pid as u64),
 996                    })
 997                    .ok(),
 998                ),
 999                Err(error) => (
1000                    false,
1001                    serde_json::to_value(dap::ErrorResponse {
1002                        error: Some(dap::Message {
1003                            id: seq,
1004                            format: error.to_string(),
1005                            variables: None,
1006                            send_telemetry: None,
1007                            show_user: None,
1008                            url: None,
1009                            url_label: None,
1010                        }),
1011                    })
1012                    .ok(),
1013                ),
1014            };
1015
1016            session
1017                .update(cx, |session, cx| {
1018                    session.respond_to_client(
1019                        seq,
1020                        success,
1021                        RunInTerminal::COMMAND.to_string(),
1022                        body,
1023                        cx,
1024                    )
1025                })?
1026                .await
1027        })
1028    }
1029
1030    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1031        let adapter_id = self.adapter().to_string();
1032        let request = Initialize { adapter_id };
1033        match &self.mode {
1034            Mode::Running(local_mode) => {
1035                let capabilities = local_mode.request(request);
1036
1037                cx.spawn(async move |this, cx| {
1038                    let capabilities = capabilities.await?;
1039                    this.update(cx, |session, cx| {
1040                        session.capabilities = capabilities;
1041                        let filters = session
1042                            .capabilities
1043                            .exception_breakpoint_filters
1044                            .clone()
1045                            .unwrap_or_default();
1046                        for filter in filters {
1047                            let default = filter.default.unwrap_or_default();
1048                            session
1049                                .exception_breakpoints
1050                                .entry(filter.filter.clone())
1051                                .or_insert_with(|| (filter, default));
1052                        }
1053                        cx.emit(SessionEvent::CapabilitiesLoaded);
1054                    })?;
1055                    Ok(())
1056                })
1057            }
1058            Mode::Building => Task::ready(Err(anyhow!(
1059                "Cannot send initialize request, task still building"
1060            ))),
1061        }
1062    }
1063
1064    pub(super) fn initialize_sequence(
1065        &mut self,
1066        initialize_rx: oneshot::Receiver<()>,
1067        dap_store: WeakEntity<DapStore>,
1068        cx: &mut Context<Self>,
1069    ) -> Task<Result<()>> {
1070        match &self.mode {
1071            Mode::Running(local_mode) => {
1072                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1073            }
1074            Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1075        }
1076    }
1077
1078    pub fn run_to_position(
1079        &mut self,
1080        breakpoint: SourceBreakpoint,
1081        active_thread_id: ThreadId,
1082        cx: &mut Context<Self>,
1083    ) {
1084        match &mut self.mode {
1085            Mode::Running(local_mode) => {
1086                if !matches!(
1087                    self.thread_states.thread_state(active_thread_id),
1088                    Some(ThreadStatus::Stopped)
1089                ) {
1090                    return;
1091                };
1092                let path = breakpoint.path.clone();
1093                local_mode.tmp_breakpoint = Some(breakpoint);
1094                let task = local_mode.send_breakpoints_from_path(
1095                    path,
1096                    BreakpointUpdatedReason::Toggled,
1097                    &self.breakpoint_store,
1098                    cx,
1099                );
1100
1101                cx.spawn(async move |this, cx| {
1102                    task.await;
1103                    this.update(cx, |this, cx| {
1104                        this.continue_thread(active_thread_id, cx);
1105                    })
1106                })
1107                .detach();
1108            }
1109            Mode::Building => {}
1110        }
1111    }
1112
1113    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1114        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1115    }
1116
1117    pub fn output(
1118        &self,
1119        since: OutputToken,
1120    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1121        if self.output_token.0 == 0 {
1122            return (self.output.range(0..0), OutputToken(0));
1123        };
1124
1125        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1126
1127        let clamped_events_since = events_since.clamp(0, self.output.len());
1128        (
1129            self.output
1130                .range(self.output.len() - clamped_events_since..),
1131            self.output_token,
1132        )
1133    }
1134
1135    pub fn respond_to_client(
1136        &self,
1137        request_seq: u64,
1138        success: bool,
1139        command: String,
1140        body: Option<serde_json::Value>,
1141        cx: &mut Context<Self>,
1142    ) -> Task<Result<()>> {
1143        let Some(local_session) = self.as_local() else {
1144            unreachable!("Cannot respond to remote client");
1145        };
1146        let client = local_session.client.clone();
1147
1148        cx.background_spawn(async move {
1149            client
1150                .send_message(Message::Response(Response {
1151                    body,
1152                    success,
1153                    command,
1154                    seq: request_seq + 1,
1155                    request_seq,
1156                    message: None,
1157                }))
1158                .await
1159        })
1160    }
1161
1162    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1163        // todo(debugger): Find a clean way to get around the clone
1164        let breakpoint_store = self.breakpoint_store.clone();
1165        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1166            let breakpoint = local.tmp_breakpoint.take()?;
1167            let path = breakpoint.path.clone();
1168            Some((local, path))
1169        }) {
1170            local
1171                .send_breakpoints_from_path(
1172                    path,
1173                    BreakpointUpdatedReason::Toggled,
1174                    &breakpoint_store,
1175                    cx,
1176                )
1177                .detach();
1178        };
1179
1180        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1181            self.thread_states.stop_all_threads();
1182
1183            self.invalidate_command_type::<StackTraceCommand>();
1184        }
1185
1186        // Event if we stopped all threads we still need to insert the thread_id
1187        // to our own data
1188        if let Some(thread_id) = event.thread_id {
1189            self.thread_states.stop_thread(ThreadId(thread_id));
1190
1191            self.invalidate_state(
1192                &StackTraceCommand {
1193                    thread_id,
1194                    start_frame: None,
1195                    levels: None,
1196                }
1197                .into(),
1198            );
1199        }
1200
1201        self.invalidate_generic();
1202        self.threads.clear();
1203        self.variables.clear();
1204        cx.emit(SessionEvent::Stopped(
1205            event
1206                .thread_id
1207                .map(Into::into)
1208                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1209        ));
1210        cx.emit(SessionEvent::InvalidateInlineValue);
1211        cx.notify();
1212    }
1213
1214    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1215        match *event {
1216            Events::Initialized(_) => {
1217                debug_assert!(
1218                    false,
1219                    "Initialized event should have been handled in LocalMode"
1220                );
1221            }
1222            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1223            Events::Continued(event) => {
1224                if event.all_threads_continued.unwrap_or_default() {
1225                    self.thread_states.continue_all_threads();
1226                    self.breakpoint_store.update(cx, |store, cx| {
1227                        store.remove_active_position(Some(self.session_id()), cx)
1228                    });
1229                } else {
1230                    self.thread_states
1231                        .continue_thread(ThreadId(event.thread_id));
1232                }
1233                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1234                self.invalidate_generic();
1235            }
1236            Events::Exited(_event) => {
1237                self.clear_active_debug_line(cx);
1238            }
1239            Events::Terminated(_) => {
1240                self.shutdown(cx).detach();
1241            }
1242            Events::Thread(event) => {
1243                let thread_id = ThreadId(event.thread_id);
1244
1245                match event.reason {
1246                    dap::ThreadEventReason::Started => {
1247                        self.thread_states.continue_thread(thread_id);
1248                    }
1249                    dap::ThreadEventReason::Exited => {
1250                        self.thread_states.exit_thread(thread_id);
1251                    }
1252                    reason => {
1253                        log::error!("Unhandled thread event reason {:?}", reason);
1254                    }
1255                }
1256                self.invalidate_state(&ThreadsCommand.into());
1257                cx.notify();
1258            }
1259            Events::Output(event) => {
1260                if event
1261                    .category
1262                    .as_ref()
1263                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1264                {
1265                    return;
1266                }
1267
1268                self.push_output(event, cx);
1269                cx.notify();
1270            }
1271            Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1272                store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1273            }),
1274            Events::Module(event) => {
1275                match event.reason {
1276                    dap::ModuleEventReason::New => {
1277                        self.modules.push(event.module);
1278                    }
1279                    dap::ModuleEventReason::Changed => {
1280                        if let Some(module) = self
1281                            .modules
1282                            .iter_mut()
1283                            .find(|other| event.module.id == other.id)
1284                        {
1285                            *module = event.module;
1286                        }
1287                    }
1288                    dap::ModuleEventReason::Removed => {
1289                        self.modules.retain(|other| event.module.id != other.id);
1290                    }
1291                }
1292
1293                // todo(debugger): We should only send the invalidate command to downstream clients.
1294                // self.invalidate_state(&ModulesCommand.into());
1295            }
1296            Events::LoadedSource(_) => {
1297                self.invalidate_state(&LoadedSourcesCommand.into());
1298            }
1299            Events::Capabilities(event) => {
1300                self.capabilities = self.capabilities.merge(event.capabilities);
1301                cx.notify();
1302            }
1303            Events::Memory(_) => {}
1304            Events::Process(_) => {}
1305            Events::ProgressEnd(_) => {}
1306            Events::ProgressStart(_) => {}
1307            Events::ProgressUpdate(_) => {}
1308            Events::Invalidated(_) => {}
1309            Events::Other(_) => {}
1310        }
1311    }
1312
1313    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1314    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1315        &mut self,
1316        request: T,
1317        process_result: impl FnOnce(
1318            &mut Self,
1319            Result<T::Response>,
1320            &mut Context<Self>,
1321        ) -> Option<T::Response>
1322        + 'static,
1323        cx: &mut Context<Self>,
1324    ) {
1325        const {
1326            assert!(
1327                T::CACHEABLE,
1328                "Only requests marked as cacheable should invoke `fetch`"
1329            );
1330        }
1331
1332        if !self.thread_states.any_stopped_thread()
1333            && request.type_id() != TypeId::of::<ThreadsCommand>()
1334            || self.is_session_terminated
1335        {
1336            return;
1337        }
1338
1339        let request_map = self
1340            .requests
1341            .entry(std::any::TypeId::of::<T>())
1342            .or_default();
1343
1344        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1345            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1346
1347            let task = Self::request_inner::<Arc<T>>(
1348                &self.capabilities,
1349                &self.mode,
1350                command,
1351                process_result,
1352                cx,
1353            );
1354            let task = cx
1355                .background_executor()
1356                .spawn(async move {
1357                    let _ = task.await?;
1358                    Some(())
1359                })
1360                .shared();
1361
1362            vacant.insert(task);
1363            cx.notify();
1364        }
1365    }
1366
1367    pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1368        &self,
1369        request: T,
1370    ) -> Result<T::Response> {
1371        if !T::is_supported(&self.capabilities) {
1372            anyhow::bail!("DAP request {:?} is not supported", request);
1373        }
1374
1375        self.mode.request_dap(request).await
1376    }
1377
1378    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1379        capabilities: &Capabilities,
1380        mode: &Mode,
1381        request: T,
1382        process_result: impl FnOnce(
1383            &mut Self,
1384            Result<T::Response>,
1385            &mut Context<Self>,
1386        ) -> Option<T::Response>
1387        + 'static,
1388        cx: &mut Context<Self>,
1389    ) -> Task<Option<T::Response>> {
1390        if !T::is_supported(&capabilities) {
1391            log::warn!(
1392                "Attempted to send a DAP request that isn't supported: {:?}",
1393                request
1394            );
1395            let error = Err(anyhow::Error::msg(
1396                "Couldn't complete request because it's not supported",
1397            ));
1398            return cx.spawn(async move |this, cx| {
1399                this.update(cx, |this, cx| process_result(this, error, cx))
1400                    .log_err()
1401                    .flatten()
1402            });
1403        }
1404
1405        let request = mode.request_dap(request);
1406        cx.spawn(async move |this, cx| {
1407            let result = request.await;
1408            this.update(cx, |this, cx| process_result(this, result, cx))
1409                .log_err()
1410                .flatten()
1411        })
1412    }
1413
1414    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1415        &self,
1416        request: T,
1417        process_result: impl FnOnce(
1418            &mut Self,
1419            Result<T::Response>,
1420            &mut Context<Self>,
1421        ) -> Option<T::Response>
1422        + 'static,
1423        cx: &mut Context<Self>,
1424    ) -> Task<Option<T::Response>> {
1425        Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1426    }
1427
1428    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1429        self.requests.remove(&std::any::TypeId::of::<Command>());
1430    }
1431
1432    fn invalidate_generic(&mut self) {
1433        self.invalidate_command_type::<ModulesCommand>();
1434        self.invalidate_command_type::<LoadedSourcesCommand>();
1435        self.invalidate_command_type::<ThreadsCommand>();
1436    }
1437
1438    fn invalidate_state(&mut self, key: &RequestSlot) {
1439        self.requests
1440            .entry((&*key.0 as &dyn Any).type_id())
1441            .and_modify(|request_map| {
1442                request_map.remove(&key);
1443            });
1444    }
1445
1446    fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1447        self.output.push_back(event);
1448        self.output_token.0 += 1;
1449        cx.emit(SessionEvent::ConsoleOutput);
1450    }
1451
1452    pub fn any_stopped_thread(&self) -> bool {
1453        self.thread_states.any_stopped_thread()
1454    }
1455
1456    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1457        self.thread_states.thread_status(thread_id)
1458    }
1459
1460    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1461        self.fetch(
1462            dap_command::ThreadsCommand,
1463            |this, result, cx| {
1464                let result = result.log_err()?;
1465
1466                this.threads = result
1467                    .iter()
1468                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1469                    .collect();
1470
1471                this.invalidate_command_type::<StackTraceCommand>();
1472                cx.emit(SessionEvent::Threads);
1473                cx.notify();
1474
1475                Some(result)
1476            },
1477            cx,
1478        );
1479
1480        self.threads
1481            .values()
1482            .map(|thread| {
1483                (
1484                    thread.dap.clone(),
1485                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1486                )
1487            })
1488            .collect()
1489    }
1490
1491    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1492        self.fetch(
1493            dap_command::ModulesCommand,
1494            |this, result, cx| {
1495                let result = result.log_err()?;
1496
1497                this.modules = result.iter().cloned().collect();
1498                cx.emit(SessionEvent::Modules);
1499                cx.notify();
1500
1501                Some(result)
1502            },
1503            cx,
1504        );
1505
1506        &self.modules
1507    }
1508
1509    pub fn ignore_breakpoints(&self) -> bool {
1510        self.ignore_breakpoints
1511    }
1512
1513    pub fn toggle_ignore_breakpoints(
1514        &mut self,
1515        cx: &mut App,
1516    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1517        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1518    }
1519
1520    pub(crate) fn set_ignore_breakpoints(
1521        &mut self,
1522        ignore: bool,
1523        cx: &mut App,
1524    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1525        if self.ignore_breakpoints == ignore {
1526            return Task::ready(HashMap::default());
1527        }
1528
1529        self.ignore_breakpoints = ignore;
1530
1531        if let Some(local) = self.as_local() {
1532            local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1533        } else {
1534            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1535            unimplemented!()
1536        }
1537    }
1538
1539    pub fn exception_breakpoints(
1540        &self,
1541    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1542        self.exception_breakpoints.values()
1543    }
1544
1545    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1546        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1547            *is_enabled = !*is_enabled;
1548            self.send_exception_breakpoints(cx);
1549        }
1550    }
1551
1552    fn send_exception_breakpoints(&mut self, cx: &App) {
1553        if let Some(local) = self.as_local() {
1554            let exception_filters = self
1555                .exception_breakpoints
1556                .values()
1557                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1558                .collect();
1559
1560            let supports_exception_filters = self
1561                .capabilities
1562                .supports_exception_filter_options
1563                .unwrap_or_default();
1564            local
1565                .send_exception_breakpoints(exception_filters, supports_exception_filters)
1566                .detach_and_log_err(cx);
1567        } else {
1568            debug_assert!(false, "Not implemented");
1569        }
1570    }
1571
1572    pub fn breakpoints_enabled(&self) -> bool {
1573        self.ignore_breakpoints
1574    }
1575
1576    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1577        self.fetch(
1578            dap_command::LoadedSourcesCommand,
1579            |this, result, cx| {
1580                let result = result.log_err()?;
1581                this.loaded_sources = result.iter().cloned().collect();
1582                cx.emit(SessionEvent::LoadedSources);
1583                cx.notify();
1584                Some(result)
1585            },
1586            cx,
1587        );
1588
1589        &self.loaded_sources
1590    }
1591
1592    fn fallback_to_manual_restart(
1593        &mut self,
1594        res: Result<()>,
1595        cx: &mut Context<Self>,
1596    ) -> Option<()> {
1597        if res.log_err().is_none() {
1598            cx.emit(SessionStateEvent::Restart);
1599            return None;
1600        }
1601        Some(())
1602    }
1603
1604    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1605        res.log_err()?;
1606        Some(())
1607    }
1608
1609    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1610        thread_id: ThreadId,
1611    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1612    {
1613        move |this, response, cx| match response.log_err() {
1614            Some(response) => {
1615                this.breakpoint_store.update(cx, |store, cx| {
1616                    store.remove_active_position(Some(this.session_id()), cx)
1617                });
1618                Some(response)
1619            }
1620            None => {
1621                this.thread_states.stop_thread(thread_id);
1622                cx.notify();
1623                None
1624            }
1625        }
1626    }
1627
1628    fn clear_active_debug_line_response(
1629        &mut self,
1630        response: Result<()>,
1631        cx: &mut Context<Session>,
1632    ) -> Option<()> {
1633        response.log_err()?;
1634        self.clear_active_debug_line(cx);
1635        Some(())
1636    }
1637
1638    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1639        self.breakpoint_store.update(cx, |store, cx| {
1640            store.remove_active_position(Some(self.id), cx)
1641        });
1642    }
1643
1644    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1645        self.request(
1646            PauseCommand {
1647                thread_id: thread_id.0,
1648            },
1649            Self::empty_response,
1650            cx,
1651        )
1652        .detach();
1653    }
1654
1655    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1656        self.request(
1657            RestartStackFrameCommand { stack_frame_id },
1658            Self::empty_response,
1659            cx,
1660        )
1661        .detach();
1662    }
1663
1664    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1665        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1666            self.request(
1667                RestartCommand {
1668                    raw: args.unwrap_or(Value::Null),
1669                },
1670                Self::fallback_to_manual_restart,
1671                cx,
1672            )
1673            .detach();
1674        } else {
1675            cx.emit(SessionStateEvent::Restart);
1676        }
1677    }
1678
1679    fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1680        let debug_adapter = self.adapter_client();
1681
1682        cx.background_spawn(async move {
1683            if let Some(client) = debug_adapter {
1684                client.shutdown().await.log_err();
1685            }
1686        })
1687    }
1688
1689    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1690        self.is_session_terminated = true;
1691        self.thread_states.exit_all_threads();
1692        cx.notify();
1693
1694        let task = if self
1695            .capabilities
1696            .supports_terminate_request
1697            .unwrap_or_default()
1698        {
1699            self.request(
1700                TerminateCommand {
1701                    restart: Some(false),
1702                },
1703                Self::clear_active_debug_line_response,
1704                cx,
1705            )
1706        } else {
1707            self.request(
1708                DisconnectCommand {
1709                    restart: Some(false),
1710                    terminate_debuggee: Some(true),
1711                    suspend_debuggee: Some(false),
1712                },
1713                Self::clear_active_debug_line_response,
1714                cx,
1715            )
1716        };
1717
1718        cx.emit(SessionStateEvent::Shutdown);
1719
1720        let debug_client = self.adapter_client();
1721
1722        cx.background_spawn(async move {
1723            let _ = task.await;
1724
1725            if let Some(client) = debug_client {
1726                client.shutdown().await.log_err();
1727            }
1728        })
1729    }
1730
1731    pub fn completions(
1732        &mut self,
1733        query: CompletionsQuery,
1734        cx: &mut Context<Self>,
1735    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1736        let task = self.request(query, |_, result, _| result.log_err(), cx);
1737
1738        cx.background_executor().spawn(async move {
1739            anyhow::Ok(
1740                task.await
1741                    .map(|response| response.targets)
1742                    .context("failed to fetch completions")?,
1743            )
1744        })
1745    }
1746
1747    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1748        self.thread_states.continue_thread(thread_id);
1749        self.request(
1750            ContinueCommand {
1751                args: ContinueArguments {
1752                    thread_id: thread_id.0,
1753                    single_thread: Some(true),
1754                },
1755            },
1756            Self::on_step_response::<ContinueCommand>(thread_id),
1757            cx,
1758        )
1759        .detach();
1760    }
1761
1762    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1763        match self.mode {
1764            Mode::Running(ref local) => Some(local.client.clone()),
1765            Mode::Building => None,
1766        }
1767    }
1768
1769    pub fn step_over(
1770        &mut self,
1771        thread_id: ThreadId,
1772        granularity: SteppingGranularity,
1773        cx: &mut Context<Self>,
1774    ) {
1775        let supports_single_thread_execution_requests =
1776            self.capabilities.supports_single_thread_execution_requests;
1777        let supports_stepping_granularity = self
1778            .capabilities
1779            .supports_stepping_granularity
1780            .unwrap_or_default();
1781
1782        let command = NextCommand {
1783            inner: StepCommand {
1784                thread_id: thread_id.0,
1785                granularity: supports_stepping_granularity.then(|| granularity),
1786                single_thread: supports_single_thread_execution_requests,
1787            },
1788        };
1789
1790        self.thread_states.process_step(thread_id);
1791        self.request(
1792            command,
1793            Self::on_step_response::<NextCommand>(thread_id),
1794            cx,
1795        )
1796        .detach();
1797    }
1798
1799    pub fn step_in(
1800        &mut self,
1801        thread_id: ThreadId,
1802        granularity: SteppingGranularity,
1803        cx: &mut Context<Self>,
1804    ) {
1805        let supports_single_thread_execution_requests =
1806            self.capabilities.supports_single_thread_execution_requests;
1807        let supports_stepping_granularity = self
1808            .capabilities
1809            .supports_stepping_granularity
1810            .unwrap_or_default();
1811
1812        let command = StepInCommand {
1813            inner: StepCommand {
1814                thread_id: thread_id.0,
1815                granularity: supports_stepping_granularity.then(|| granularity),
1816                single_thread: supports_single_thread_execution_requests,
1817            },
1818        };
1819
1820        self.thread_states.process_step(thread_id);
1821        self.request(
1822            command,
1823            Self::on_step_response::<StepInCommand>(thread_id),
1824            cx,
1825        )
1826        .detach();
1827    }
1828
1829    pub fn step_out(
1830        &mut self,
1831        thread_id: ThreadId,
1832        granularity: SteppingGranularity,
1833        cx: &mut Context<Self>,
1834    ) {
1835        let supports_single_thread_execution_requests =
1836            self.capabilities.supports_single_thread_execution_requests;
1837        let supports_stepping_granularity = self
1838            .capabilities
1839            .supports_stepping_granularity
1840            .unwrap_or_default();
1841
1842        let command = StepOutCommand {
1843            inner: StepCommand {
1844                thread_id: thread_id.0,
1845                granularity: supports_stepping_granularity.then(|| granularity),
1846                single_thread: supports_single_thread_execution_requests,
1847            },
1848        };
1849
1850        self.thread_states.process_step(thread_id);
1851        self.request(
1852            command,
1853            Self::on_step_response::<StepOutCommand>(thread_id),
1854            cx,
1855        )
1856        .detach();
1857    }
1858
1859    pub fn step_back(
1860        &mut self,
1861        thread_id: ThreadId,
1862        granularity: SteppingGranularity,
1863        cx: &mut Context<Self>,
1864    ) {
1865        let supports_single_thread_execution_requests =
1866            self.capabilities.supports_single_thread_execution_requests;
1867        let supports_stepping_granularity = self
1868            .capabilities
1869            .supports_stepping_granularity
1870            .unwrap_or_default();
1871
1872        let command = StepBackCommand {
1873            inner: StepCommand {
1874                thread_id: thread_id.0,
1875                granularity: supports_stepping_granularity.then(|| granularity),
1876                single_thread: supports_single_thread_execution_requests,
1877            },
1878        };
1879
1880        self.thread_states.process_step(thread_id);
1881
1882        self.request(
1883            command,
1884            Self::on_step_response::<StepBackCommand>(thread_id),
1885            cx,
1886        )
1887        .detach();
1888    }
1889
1890    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1891        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1892            && self.requests.contains_key(&ThreadsCommand.type_id())
1893            && self.threads.contains_key(&thread_id)
1894        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1895        // We could still be using an old thread id and have sent a new thread's request
1896        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1897        // But it very well could cause a minor bug in the future that is hard to track down
1898        {
1899            self.fetch(
1900                super::dap_command::StackTraceCommand {
1901                    thread_id: thread_id.0,
1902                    start_frame: None,
1903                    levels: None,
1904                },
1905                move |this, stack_frames, cx| {
1906                    let stack_frames = stack_frames.log_err()?;
1907
1908                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1909                        thread.stack_frame_ids =
1910                            stack_frames.iter().map(|frame| frame.id).collect();
1911                    });
1912                    debug_assert!(
1913                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1914                        "Sent request for thread_id that doesn't exist"
1915                    );
1916
1917                    this.stack_frames.extend(
1918                        stack_frames
1919                            .iter()
1920                            .cloned()
1921                            .map(|frame| (frame.id, StackFrame::from(frame))),
1922                    );
1923
1924                    this.invalidate_command_type::<ScopesCommand>();
1925                    this.invalidate_command_type::<VariablesCommand>();
1926
1927                    cx.emit(SessionEvent::StackTrace);
1928                    cx.notify();
1929                    Some(stack_frames)
1930                },
1931                cx,
1932            );
1933        }
1934
1935        self.threads
1936            .get(&thread_id)
1937            .map(|thread| {
1938                thread
1939                    .stack_frame_ids
1940                    .iter()
1941                    .filter_map(|id| self.stack_frames.get(id))
1942                    .cloned()
1943                    .collect()
1944            })
1945            .unwrap_or_default()
1946    }
1947
1948    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1949        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1950            && self
1951                .requests
1952                .contains_key(&TypeId::of::<StackTraceCommand>())
1953        {
1954            self.fetch(
1955                ScopesCommand { stack_frame_id },
1956                move |this, scopes, cx| {
1957                    let scopes = scopes.log_err()?;
1958
1959                    for scope in scopes .iter(){
1960                        this.variables(scope.variables_reference, cx);
1961                    }
1962
1963                    let entry = this
1964                        .stack_frames
1965                        .entry(stack_frame_id)
1966                        .and_modify(|stack_frame| {
1967                            stack_frame.scopes = scopes.clone();
1968                        });
1969
1970                    cx.emit(SessionEvent::Variables);
1971
1972                    debug_assert!(
1973                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1974                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1975                    );
1976
1977                    Some(scopes)
1978                },
1979                cx,
1980            );
1981        }
1982
1983        self.stack_frames
1984            .get(&stack_frame_id)
1985            .map(|frame| frame.scopes.as_slice())
1986            .unwrap_or_default()
1987    }
1988
1989    pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
1990        let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
1991            return Vec::new();
1992        };
1993
1994        stack_frame
1995            .scopes
1996            .iter()
1997            .filter_map(|scope| self.variables.get(&scope.variables_reference))
1998            .flatten()
1999            .cloned()
2000            .collect()
2001    }
2002
2003    pub fn variables(
2004        &mut self,
2005        variables_reference: VariableReference,
2006        cx: &mut Context<Self>,
2007    ) -> Vec<dap::Variable> {
2008        let command = VariablesCommand {
2009            variables_reference,
2010            filter: None,
2011            start: None,
2012            count: None,
2013            format: None,
2014        };
2015
2016        self.fetch(
2017            command,
2018            move |this, variables, cx| {
2019                let variables = variables.log_err()?;
2020                this.variables
2021                    .insert(variables_reference, variables.clone());
2022
2023                cx.emit(SessionEvent::Variables);
2024                cx.emit(SessionEvent::InvalidateInlineValue);
2025                Some(variables)
2026            },
2027            cx,
2028        );
2029
2030        self.variables
2031            .get(&variables_reference)
2032            .cloned()
2033            .unwrap_or_default()
2034    }
2035
2036    pub fn set_variable_value(
2037        &mut self,
2038        variables_reference: u64,
2039        name: String,
2040        value: String,
2041        cx: &mut Context<Self>,
2042    ) {
2043        if self.capabilities.supports_set_variable.unwrap_or_default() {
2044            self.request(
2045                SetVariableValueCommand {
2046                    name,
2047                    value,
2048                    variables_reference,
2049                },
2050                move |this, response, cx| {
2051                    let response = response.log_err()?;
2052                    this.invalidate_command_type::<VariablesCommand>();
2053                    cx.notify();
2054                    Some(response)
2055                },
2056                cx,
2057            )
2058            .detach()
2059        }
2060    }
2061
2062    pub fn evaluate(
2063        &mut self,
2064        expression: String,
2065        context: Option<EvaluateArgumentsContext>,
2066        frame_id: Option<u64>,
2067        source: Option<Source>,
2068        cx: &mut Context<Self>,
2069    ) -> Task<()> {
2070        let event = dap::OutputEvent {
2071            category: None,
2072            output: format!("> {expression}"),
2073            group: None,
2074            variables_reference: None,
2075            source: None,
2076            line: None,
2077            column: None,
2078            data: None,
2079            location_reference: None,
2080        };
2081        self.push_output(event, cx);
2082        let request = self.mode.request_dap(EvaluateCommand {
2083            expression,
2084            context,
2085            frame_id,
2086            source,
2087        });
2088        cx.spawn(async move |this, cx| {
2089            let response = request.await;
2090            this.update(cx, |this, cx| {
2091                match response {
2092                    Ok(response) => {
2093                        let event = dap::OutputEvent {
2094                            category: None,
2095                            output: format!("< {}", &response.result),
2096                            group: None,
2097                            variables_reference: Some(response.variables_reference),
2098                            source: None,
2099                            line: None,
2100                            column: None,
2101                            data: None,
2102                            location_reference: None,
2103                        };
2104                        this.push_output(event, cx);
2105                    }
2106                    Err(e) => {
2107                        let event = dap::OutputEvent {
2108                            category: None,
2109                            output: format!("{}", e),
2110                            group: None,
2111                            variables_reference: None,
2112                            source: None,
2113                            line: None,
2114                            column: None,
2115                            data: None,
2116                            location_reference: None,
2117                        };
2118                        this.push_output(event, cx);
2119                    }
2120                };
2121                this.invalidate_command_type::<ScopesCommand>();
2122                cx.notify();
2123            })
2124            .ok();
2125        })
2126    }
2127
2128    pub fn location(
2129        &mut self,
2130        reference: u64,
2131        cx: &mut Context<Self>,
2132    ) -> Option<dap::LocationsResponse> {
2133        self.fetch(
2134            LocationsCommand { reference },
2135            move |this, response, _| {
2136                let response = response.log_err()?;
2137                this.locations.insert(reference, response.clone());
2138                Some(response)
2139            },
2140            cx,
2141        );
2142        self.locations.get(&reference).cloned()
2143    }
2144
2145    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2146        let command = DisconnectCommand {
2147            restart: Some(false),
2148            terminate_debuggee: Some(true),
2149            suspend_debuggee: Some(false),
2150        };
2151
2152        self.request(command, Self::empty_response, cx).detach()
2153    }
2154
2155    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2156        if self
2157            .capabilities
2158            .supports_terminate_threads_request
2159            .unwrap_or_default()
2160        {
2161            self.request(
2162                TerminateThreadsCommand {
2163                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2164                },
2165                Self::clear_active_debug_line_response,
2166                cx,
2167            )
2168            .detach();
2169        } else {
2170            self.shutdown(cx).detach();
2171        }
2172    }
2173}