session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::DebugAdapterBinary;
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22};
  23use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
  24use futures::channel::oneshot;
  25use futures::{FutureExt, future::Shared};
  26use gpui::{
  27    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  28    Task, WeakEntity,
  29};
  30
  31use rpc::AnyProtoClient;
  32use serde_json::{Value, json};
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::collections::BTreeMap;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::DebugTaskDefinition;
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{ResultExt, merge_json_value_into};
  47use worktree::Worktree;
  48
  49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  50#[repr(transparent)]
  51pub struct ThreadId(pub u64);
  52
  53impl ThreadId {
  54    pub const MIN: ThreadId = ThreadId(u64::MIN);
  55    pub const MAX: ThreadId = ThreadId(u64::MAX);
  56}
  57
  58impl From<u64> for ThreadId {
  59    fn from(id: u64) -> Self {
  60        Self(id)
  61    }
  62}
  63
  64#[derive(Clone, Debug)]
  65pub struct StackFrame {
  66    pub dap: dap::StackFrame,
  67    pub scopes: Vec<dap::Scope>,
  68}
  69
  70impl From<dap::StackFrame> for StackFrame {
  71    fn from(stack_frame: dap::StackFrame) -> Self {
  72        Self {
  73            scopes: vec![],
  74            dap: stack_frame,
  75        }
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  80pub enum ThreadStatus {
  81    #[default]
  82    Running,
  83    Stopped,
  84    Stepping,
  85    Exited,
  86    Ended,
  87}
  88
  89impl ThreadStatus {
  90    pub fn label(&self) -> &'static str {
  91        match self {
  92            ThreadStatus::Running => "Running",
  93            ThreadStatus::Stopped => "Stopped",
  94            ThreadStatus::Stepping => "Stepping",
  95            ThreadStatus::Exited => "Exited",
  96            ThreadStatus::Ended => "Ended",
  97        }
  98    }
  99}
 100
 101#[derive(Debug)]
 102pub struct Thread {
 103    dap: dap::Thread,
 104    stack_frame_ids: IndexSet<StackFrameId>,
 105    _has_stopped: bool,
 106}
 107
 108impl From<dap::Thread> for Thread {
 109    fn from(dap: dap::Thread) -> Self {
 110        Self {
 111            dap,
 112            stack_frame_ids: Default::default(),
 113            _has_stopped: false,
 114        }
 115    }
 116}
 117
 118type UpstreamProjectId = u64;
 119
 120struct RemoteConnection {
 121    _client: AnyProtoClient,
 122    _upstream_project_id: UpstreamProjectId,
 123    _adapter_name: SharedString,
 124}
 125
 126impl RemoteConnection {
 127    fn send_proto_client_request<R: DapCommand>(
 128        &self,
 129        _request: R,
 130        _session_id: SessionId,
 131        cx: &mut App,
 132    ) -> Task<Result<R::Response>> {
 133        // let message = request.to_proto(session_id, self.upstream_project_id);
 134        // let upstream_client = self.client.clone();
 135        cx.background_executor().spawn(async move {
 136            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 137            // let response = upstream_client.request(message).await?;
 138            // request.response_from_proto(response)
 139            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 140        })
 141    }
 142
 143    fn request<R: DapCommand>(
 144        &self,
 145        request: R,
 146        session_id: SessionId,
 147        cx: &mut App,
 148    ) -> Task<Result<R::Response>>
 149    where
 150        <R::DapRequest as dap::requests::Request>::Response: 'static,
 151        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 152    {
 153        return self.send_proto_client_request::<R>(request, session_id, cx);
 154    }
 155}
 156
 157enum Mode {
 158    Local(LocalMode),
 159    Remote(RemoteConnection),
 160}
 161
 162#[derive(Clone)]
 163pub struct LocalMode {
 164    client: Arc<DebugAdapterClient>,
 165    definition: DebugTaskDefinition,
 166    binary: DebugAdapterBinary,
 167    root_binary: Option<Arc<DebugAdapterBinary>>,
 168    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 169    tmp_breakpoint: Option<SourceBreakpoint>,
 170    worktree: WeakEntity<Worktree>,
 171}
 172
 173fn client_source(abs_path: &Path) -> dap::Source {
 174    dap::Source {
 175        name: abs_path
 176            .file_name()
 177            .map(|filename| filename.to_string_lossy().to_string()),
 178        path: Some(abs_path.to_string_lossy().to_string()),
 179        source_reference: None,
 180        presentation_hint: None,
 181        origin: None,
 182        sources: None,
 183        adapter_data: None,
 184        checksums: None,
 185    }
 186}
 187
 188impl LocalMode {
 189    fn new(
 190        session_id: SessionId,
 191        parent_session: Option<Entity<Session>>,
 192        worktree: WeakEntity<Worktree>,
 193        breakpoint_store: Entity<BreakpointStore>,
 194        config: DebugTaskDefinition,
 195        binary: DebugAdapterBinary,
 196        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 197        cx: AsyncApp,
 198    ) -> Task<Result<Self>> {
 199        cx.spawn(async move |cx| {
 200            let message_handler = Box::new(move |message| {
 201                messages_tx.unbounded_send(message).ok();
 202            });
 203
 204            let root_binary = if let Some(parent_session) = parent_session.as_ref() {
 205                Some(parent_session.read_with(cx, |session, _| session.root_binary().clone())?)
 206            } else {
 207                None
 208            };
 209
 210            let client = Arc::new(
 211                if let Some(client) = parent_session
 212                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 213                    .flatten()
 214                {
 215                    client
 216                        .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 217                        .await?
 218                } else {
 219                    DebugAdapterClient::start(
 220                        session_id,
 221                        binary.clone(),
 222                        message_handler,
 223                        cx.clone(),
 224                    )
 225                    .await
 226                    .with_context(|| "Failed to start communication with debug adapter")?
 227                },
 228            );
 229
 230            Ok(Self {
 231                client,
 232                breakpoint_store,
 233                worktree,
 234                tmp_breakpoint: None,
 235                definition: config,
 236                root_binary,
 237                binary,
 238            })
 239        })
 240    }
 241
 242    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 243        &self.worktree
 244    }
 245
 246    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 247        let tasks: Vec<_> = paths
 248            .into_iter()
 249            .map(|path| {
 250                self.request(
 251                    dap_command::SetBreakpoints {
 252                        source: client_source(path),
 253                        source_modified: None,
 254                        breakpoints: vec![],
 255                    },
 256                    cx.background_executor().clone(),
 257                )
 258            })
 259            .collect();
 260
 261        cx.background_spawn(async move {
 262            futures::future::join_all(tasks)
 263                .await
 264                .iter()
 265                .for_each(|res| match res {
 266                    Ok(_) => {}
 267                    Err(err) => {
 268                        log::warn!("Set breakpoints request failed: {}", err);
 269                    }
 270                });
 271        })
 272    }
 273
 274    fn send_breakpoints_from_path(
 275        &self,
 276        abs_path: Arc<Path>,
 277        reason: BreakpointUpdatedReason,
 278        cx: &mut App,
 279    ) -> Task<()> {
 280        let breakpoints = self
 281            .breakpoint_store
 282            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 283            .into_iter()
 284            .filter(|bp| bp.state.is_enabled())
 285            .chain(self.tmp_breakpoint.clone())
 286            .map(Into::into)
 287            .collect();
 288
 289        let task = self.request(
 290            dap_command::SetBreakpoints {
 291                source: client_source(&abs_path),
 292                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 293                breakpoints,
 294            },
 295            cx.background_executor().clone(),
 296        );
 297
 298        cx.background_spawn(async move {
 299            match task.await {
 300                Ok(_) => {}
 301                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 302            }
 303        })
 304    }
 305
 306    fn send_exception_breakpoints(
 307        &self,
 308        filters: Vec<ExceptionBreakpointsFilter>,
 309        supports_filter_options: bool,
 310        cx: &App,
 311    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 312        let arg = if supports_filter_options {
 313            SetExceptionBreakpoints::WithOptions {
 314                filters: filters
 315                    .into_iter()
 316                    .map(|filter| ExceptionFilterOptions {
 317                        filter_id: filter.filter,
 318                        condition: None,
 319                        mode: None,
 320                    })
 321                    .collect(),
 322            }
 323        } else {
 324            SetExceptionBreakpoints::Plain {
 325                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 326            }
 327        };
 328        self.request(arg, cx.background_executor().clone())
 329    }
 330
 331    fn send_source_breakpoints(
 332        &self,
 333        ignore_breakpoints: bool,
 334        cx: &App,
 335    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 336        let mut breakpoint_tasks = Vec::new();
 337        let breakpoints = self
 338            .breakpoint_store
 339            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 340
 341        for (path, breakpoints) in breakpoints {
 342            let breakpoints = if ignore_breakpoints {
 343                vec![]
 344            } else {
 345                breakpoints
 346                    .into_iter()
 347                    .filter(|bp| bp.state.is_enabled())
 348                    .map(Into::into)
 349                    .collect()
 350            };
 351
 352            breakpoint_tasks.push(
 353                self.request(
 354                    dap_command::SetBreakpoints {
 355                        source: client_source(&path),
 356                        source_modified: Some(false),
 357                        breakpoints,
 358                    },
 359                    cx.background_executor().clone(),
 360                )
 361                .map(|result| result.map_err(|e| (path, e))),
 362            );
 363        }
 364
 365        cx.background_spawn(async move {
 366            futures::future::join_all(breakpoint_tasks)
 367                .await
 368                .into_iter()
 369                .filter_map(Result::err)
 370                .collect::<HashMap<_, _>>()
 371        })
 372    }
 373
 374    pub fn label(&self) -> String {
 375        self.definition.label.clone()
 376    }
 377
 378    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 379        let adapter_id = self.definition.adapter.clone();
 380
 381        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 382    }
 383
 384    fn initialize_sequence(
 385        &self,
 386        capabilities: &Capabilities,
 387        initialized_rx: oneshot::Receiver<()>,
 388        dap_store: WeakEntity<DapStore>,
 389        cx: &App,
 390    ) -> Task<Result<()>> {
 391        let mut raw = self.binary.request_args.clone();
 392
 393        merge_json_value_into(
 394            self.definition.initialize_args.clone().unwrap_or(json!({})),
 395            &mut raw.configuration,
 396        );
 397
 398        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 399        let launch = match raw.request {
 400            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
 401                Launch {
 402                    raw: raw.configuration,
 403                },
 404                cx.background_executor().clone(),
 405            ),
 406            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
 407                Attach {
 408                    raw: raw.configuration,
 409                },
 410                cx.background_executor().clone(),
 411            ),
 412        };
 413
 414        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 415        let exception_filters = capabilities
 416            .exception_breakpoint_filters
 417            .as_ref()
 418            .map(|exception_filters| {
 419                exception_filters
 420                    .iter()
 421                    .filter(|filter| filter.default == Some(true))
 422                    .cloned()
 423                    .collect::<Vec<_>>()
 424            })
 425            .unwrap_or_default();
 426        let supports_exception_filters = capabilities
 427            .supports_exception_filter_options
 428            .unwrap_or_default();
 429        let configuration_sequence = cx.spawn({
 430            let this = self.clone();
 431            let worktree = self.worktree().clone();
 432            async move |cx| {
 433                initialized_rx.await?;
 434                let errors_by_path = cx
 435                    .update(|cx| this.send_source_breakpoints(false, cx))?
 436                    .await;
 437
 438                dap_store.update(cx, |_, cx| {
 439                    let Some(worktree) = worktree.upgrade() else {
 440                        return;
 441                    };
 442
 443                    for (path, error) in &errors_by_path {
 444                        log::error!("failed to set breakpoints for {path:?}: {error}");
 445                    }
 446
 447                    if let Some(failed_path) = errors_by_path.keys().next() {
 448                        let failed_path = failed_path
 449                            .strip_prefix(worktree.read(cx).abs_path())
 450                            .unwrap_or(failed_path)
 451                            .display();
 452                        let message = format!(
 453                            "Failed to set breakpoints for {failed_path}{}",
 454                            match errors_by_path.len() {
 455                                0 => unreachable!(),
 456                                1 => "".into(),
 457                                2 => " and 1 other path".into(),
 458                                n => format!(" and {} other paths", n - 1),
 459                            }
 460                        );
 461                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 462                    }
 463                })?;
 464
 465                cx.update(|cx| {
 466                    this.send_exception_breakpoints(
 467                        exception_filters,
 468                        supports_exception_filters,
 469                        cx,
 470                    )
 471                })?
 472                .await
 473                .ok();
 474                let ret = if configuration_done_supported {
 475                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 476                } else {
 477                    Task::ready(Ok(()))
 478                }
 479                .await;
 480                ret
 481            }
 482        });
 483
 484        cx.background_spawn(async move {
 485            futures::future::try_join(launch, configuration_sequence).await?;
 486            Ok(())
 487        })
 488    }
 489
 490    fn request<R: LocalDapCommand>(
 491        &self,
 492        request: R,
 493        executor: BackgroundExecutor,
 494    ) -> Task<Result<R::Response>>
 495    where
 496        <R::DapRequest as dap::requests::Request>::Response: 'static,
 497        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 498    {
 499        let request = Arc::new(request);
 500
 501        let request_clone = request.clone();
 502        let connection = self.client.clone();
 503        let request_task = executor.spawn(async move {
 504            let args = request_clone.to_dap();
 505            connection.request::<R::DapRequest>(args).await
 506        });
 507
 508        executor.spawn(async move {
 509            let response = request.response_from_dap(request_task.await?);
 510            response
 511        })
 512    }
 513}
 514impl From<RemoteConnection> for Mode {
 515    fn from(value: RemoteConnection) -> Self {
 516        Self::Remote(value)
 517    }
 518}
 519
 520impl Mode {
 521    fn request_dap<R: DapCommand>(
 522        &self,
 523        session_id: SessionId,
 524        request: R,
 525        cx: &mut Context<Session>,
 526    ) -> Task<Result<R::Response>>
 527    where
 528        <R::DapRequest as dap::requests::Request>::Response: 'static,
 529        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 530    {
 531        match self {
 532            Mode::Local(debug_adapter_client) => {
 533                debug_adapter_client.request(request, cx.background_executor().clone())
 534            }
 535            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 536        }
 537    }
 538}
 539
 540#[derive(Default)]
 541struct ThreadStates {
 542    global_state: Option<ThreadStatus>,
 543    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 544}
 545
 546impl ThreadStates {
 547    fn stop_all_threads(&mut self) {
 548        self.global_state = Some(ThreadStatus::Stopped);
 549        self.known_thread_states.clear();
 550    }
 551
 552    fn exit_all_threads(&mut self) {
 553        self.global_state = Some(ThreadStatus::Exited);
 554        self.known_thread_states.clear();
 555    }
 556
 557    fn continue_all_threads(&mut self) {
 558        self.global_state = Some(ThreadStatus::Running);
 559        self.known_thread_states.clear();
 560    }
 561
 562    fn stop_thread(&mut self, thread_id: ThreadId) {
 563        self.known_thread_states
 564            .insert(thread_id, ThreadStatus::Stopped);
 565    }
 566
 567    fn continue_thread(&mut self, thread_id: ThreadId) {
 568        self.known_thread_states
 569            .insert(thread_id, ThreadStatus::Running);
 570    }
 571
 572    fn process_step(&mut self, thread_id: ThreadId) {
 573        self.known_thread_states
 574            .insert(thread_id, ThreadStatus::Stepping);
 575    }
 576
 577    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 578        self.thread_state(thread_id)
 579            .unwrap_or(ThreadStatus::Running)
 580    }
 581
 582    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 583        self.known_thread_states
 584            .get(&thread_id)
 585            .copied()
 586            .or(self.global_state)
 587    }
 588
 589    fn exit_thread(&mut self, thread_id: ThreadId) {
 590        self.known_thread_states
 591            .insert(thread_id, ThreadStatus::Exited);
 592    }
 593
 594    fn any_stopped_thread(&self) -> bool {
 595        self.global_state
 596            .is_some_and(|state| state == ThreadStatus::Stopped)
 597            || self
 598                .known_thread_states
 599                .values()
 600                .any(|status| *status == ThreadStatus::Stopped)
 601    }
 602}
 603const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 604
 605type IsEnabled = bool;
 606
 607#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 608pub struct OutputToken(pub usize);
 609/// Represents a current state of a single debug adapter and provides ways to mutate it.
 610pub struct Session {
 611    mode: Mode,
 612    pub(super) capabilities: Capabilities,
 613    id: SessionId,
 614    child_session_ids: HashSet<SessionId>,
 615    parent_id: Option<SessionId>,
 616    ignore_breakpoints: bool,
 617    modules: Vec<dap::Module>,
 618    loaded_sources: Vec<dap::Source>,
 619    output_token: OutputToken,
 620    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 621    threads: IndexMap<ThreadId, Thread>,
 622    thread_states: ThreadStates,
 623    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 624    stack_frames: IndexMap<StackFrameId, StackFrame>,
 625    locations: HashMap<u64, dap::LocationsResponse>,
 626    is_session_terminated: bool,
 627    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 628    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 629    _background_tasks: Vec<Task<()>>,
 630}
 631
 632trait CacheableCommand: Any + Send + Sync {
 633    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 634    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 635    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 636}
 637
 638impl<T> CacheableCommand for T
 639where
 640    T: DapCommand + PartialEq + Eq + Hash,
 641{
 642    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 643        (rhs as &dyn Any)
 644            .downcast_ref::<Self>()
 645            .map_or(false, |rhs| self == rhs)
 646    }
 647
 648    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 649        T::hash(self, &mut hasher);
 650    }
 651
 652    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 653        self
 654    }
 655}
 656
 657pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 658
 659impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 660    fn from(request: T) -> Self {
 661        Self(Arc::new(request))
 662    }
 663}
 664
 665impl PartialEq for RequestSlot {
 666    fn eq(&self, other: &Self) -> bool {
 667        self.0.dyn_eq(other.0.as_ref())
 668    }
 669}
 670
 671impl Eq for RequestSlot {}
 672
 673impl Hash for RequestSlot {
 674    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 675        self.0.dyn_hash(state);
 676        (&*self.0 as &dyn Any).type_id().hash(state)
 677    }
 678}
 679
 680#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 681pub struct CompletionsQuery {
 682    pub query: String,
 683    pub column: u64,
 684    pub line: Option<u64>,
 685    pub frame_id: Option<u64>,
 686}
 687
 688impl CompletionsQuery {
 689    pub fn new(
 690        buffer: &language::Buffer,
 691        cursor_position: language::Anchor,
 692        frame_id: Option<u64>,
 693    ) -> Self {
 694        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 695        Self {
 696            query: buffer.text(),
 697            column: column as u64,
 698            frame_id,
 699            line: Some(row as u64),
 700        }
 701    }
 702}
 703
 704pub enum SessionEvent {
 705    Modules,
 706    LoadedSources,
 707    Stopped(Option<ThreadId>),
 708    StackTrace,
 709    Variables,
 710    Threads,
 711}
 712
 713pub(super) enum SessionStateEvent {
 714    Shutdown,
 715    Restart,
 716}
 717
 718impl EventEmitter<SessionEvent> for Session {}
 719impl EventEmitter<SessionStateEvent> for Session {}
 720
 721// local session will send breakpoint updates to DAP for all new breakpoints
 722// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 723// BreakpointStore notifies session on breakpoint changes
 724impl Session {
 725    pub(crate) fn local(
 726        breakpoint_store: Entity<BreakpointStore>,
 727        worktree: WeakEntity<Worktree>,
 728        session_id: SessionId,
 729        parent_session: Option<Entity<Session>>,
 730        binary: DebugAdapterBinary,
 731        config: DebugTaskDefinition,
 732        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 733        initialized_tx: oneshot::Sender<()>,
 734        cx: &mut App,
 735    ) -> Task<Result<Entity<Self>>> {
 736        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 737
 738        cx.spawn(async move |cx| {
 739            let mode = LocalMode::new(
 740                session_id,
 741                parent_session.clone(),
 742                worktree,
 743                breakpoint_store.clone(),
 744                config.clone(),
 745                binary,
 746                message_tx,
 747                cx.clone(),
 748            )
 749            .await?;
 750
 751            cx.new(|cx| {
 752                create_local_session(
 753                    breakpoint_store,
 754                    session_id,
 755                    parent_session,
 756                    start_debugging_requests_tx,
 757                    initialized_tx,
 758                    message_rx,
 759                    mode,
 760                    cx,
 761                )
 762            })
 763        })
 764    }
 765
 766    pub(crate) fn remote(
 767        session_id: SessionId,
 768        client: AnyProtoClient,
 769        upstream_project_id: u64,
 770        ignore_breakpoints: bool,
 771    ) -> Self {
 772        Self {
 773            mode: Mode::Remote(RemoteConnection {
 774                _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
 775                _client: client,
 776                _upstream_project_id: upstream_project_id,
 777            }),
 778            id: session_id,
 779            child_session_ids: HashSet::default(),
 780            parent_id: None,
 781            capabilities: Capabilities::default(),
 782            ignore_breakpoints,
 783            variables: Default::default(),
 784            stack_frames: Default::default(),
 785            thread_states: ThreadStates::default(),
 786            output_token: OutputToken(0),
 787            output: circular_buffer::CircularBuffer::boxed(),
 788            requests: HashMap::default(),
 789            modules: Vec::default(),
 790            loaded_sources: Vec::default(),
 791            threads: IndexMap::default(),
 792            _background_tasks: Vec::default(),
 793            locations: Default::default(),
 794            is_session_terminated: false,
 795            exception_breakpoints: Default::default(),
 796        }
 797    }
 798
 799    pub fn session_id(&self) -> SessionId {
 800        self.id
 801    }
 802
 803    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 804        self.child_session_ids.clone()
 805    }
 806
 807    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 808        self.child_session_ids.insert(session_id);
 809    }
 810
 811    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 812        self.child_session_ids.remove(&session_id);
 813    }
 814
 815    pub fn parent_id(&self) -> Option<SessionId> {
 816        self.parent_id
 817    }
 818
 819    pub fn capabilities(&self) -> &Capabilities {
 820        &self.capabilities
 821    }
 822
 823    pub(crate) fn root_binary(&self) -> Arc<DebugAdapterBinary> {
 824        let Mode::Local(local_mode) = &self.mode else {
 825            panic!("Session is not local");
 826        };
 827        local_mode
 828            .root_binary
 829            .clone()
 830            .unwrap_or_else(|| Arc::new(local_mode.binary.clone()))
 831    }
 832
 833    pub fn binary(&self) -> &DebugAdapterBinary {
 834        let Mode::Local(local_mode) = &self.mode else {
 835            panic!("Session is not local");
 836        };
 837        &local_mode.binary
 838    }
 839
 840    pub fn adapter_name(&self) -> SharedString {
 841        match &self.mode {
 842            Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
 843            Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
 844        }
 845    }
 846
 847    pub fn configuration(&self) -> Option<DebugTaskDefinition> {
 848        if let Mode::Local(local_mode) = &self.mode {
 849            Some(local_mode.definition.clone())
 850        } else {
 851            None
 852        }
 853    }
 854
 855    pub fn is_terminated(&self) -> bool {
 856        self.is_session_terminated
 857    }
 858
 859    pub fn is_local(&self) -> bool {
 860        matches!(self.mode, Mode::Local(_))
 861    }
 862
 863    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 864        match &mut self.mode {
 865            Mode::Local(local_mode) => Some(local_mode),
 866            Mode::Remote(_) => None,
 867        }
 868    }
 869
 870    pub fn as_local(&self) -> Option<&LocalMode> {
 871        match &self.mode {
 872            Mode::Local(local_mode) => Some(local_mode),
 873            Mode::Remote(_) => None,
 874        }
 875    }
 876
 877    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 878        match &self.mode {
 879            Mode::Local(local_mode) => {
 880                let capabilities = local_mode.clone().request_initialization(cx);
 881
 882                cx.spawn(async move |this, cx| {
 883                    let capabilities = capabilities.await?;
 884                    this.update(cx, |session, _| {
 885                        session.capabilities = capabilities;
 886                        let filters = session
 887                            .capabilities
 888                            .exception_breakpoint_filters
 889                            .clone()
 890                            .unwrap_or_default();
 891                        for filter in filters {
 892                            let default = filter.default.unwrap_or_default();
 893                            session
 894                                .exception_breakpoints
 895                                .entry(filter.filter.clone())
 896                                .or_insert_with(|| (filter, default));
 897                        }
 898                    })?;
 899                    Ok(())
 900                })
 901            }
 902            Mode::Remote(_) => Task::ready(Err(anyhow!(
 903                "Cannot send initialize request from remote session"
 904            ))),
 905        }
 906    }
 907
 908    pub(super) fn initialize_sequence(
 909        &mut self,
 910        initialize_rx: oneshot::Receiver<()>,
 911        dap_store: WeakEntity<DapStore>,
 912        cx: &mut Context<Self>,
 913    ) -> Task<Result<()>> {
 914        match &self.mode {
 915            Mode::Local(local_mode) => {
 916                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
 917            }
 918            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 919        }
 920    }
 921
 922    pub fn run_to_position(
 923        &mut self,
 924        breakpoint: SourceBreakpoint,
 925        active_thread_id: ThreadId,
 926        cx: &mut Context<Self>,
 927    ) {
 928        match &mut self.mode {
 929            Mode::Local(local_mode) => {
 930                if !matches!(
 931                    self.thread_states.thread_state(active_thread_id),
 932                    Some(ThreadStatus::Stopped)
 933                ) {
 934                    return;
 935                };
 936                let path = breakpoint.path.clone();
 937                local_mode.tmp_breakpoint = Some(breakpoint);
 938                let task = local_mode.send_breakpoints_from_path(
 939                    path,
 940                    BreakpointUpdatedReason::Toggled,
 941                    cx,
 942                );
 943
 944                cx.spawn(async move |this, cx| {
 945                    task.await;
 946                    this.update(cx, |this, cx| {
 947                        this.continue_thread(active_thread_id, cx);
 948                    })
 949                })
 950                .detach();
 951            }
 952            Mode::Remote(_) => {}
 953        }
 954    }
 955
 956    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
 957        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
 958    }
 959
 960    pub fn output(
 961        &self,
 962        since: OutputToken,
 963    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 964        if self.output_token.0 == 0 {
 965            return (self.output.range(0..0), OutputToken(0));
 966        };
 967
 968        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 969
 970        let clamped_events_since = events_since.clamp(0, self.output.len());
 971        (
 972            self.output
 973                .range(self.output.len() - clamped_events_since..),
 974            self.output_token,
 975        )
 976    }
 977
 978    pub fn respond_to_client(
 979        &self,
 980        request_seq: u64,
 981        success: bool,
 982        command: String,
 983        body: Option<serde_json::Value>,
 984        cx: &mut Context<Self>,
 985    ) -> Task<Result<()>> {
 986        let Some(local_session) = self.as_local().cloned() else {
 987            unreachable!("Cannot respond to remote client");
 988        };
 989
 990        cx.background_spawn(async move {
 991            local_session
 992                .client
 993                .send_message(Message::Response(Response {
 994                    body,
 995                    success,
 996                    command,
 997                    seq: request_seq + 1,
 998                    request_seq,
 999                    message: None,
1000                }))
1001                .await
1002        })
1003    }
1004
1005    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1006        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1007            let breakpoint = local.tmp_breakpoint.take()?;
1008            let path = breakpoint.path.clone();
1009            Some((local, path))
1010        }) {
1011            local
1012                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1013                .detach();
1014        };
1015
1016        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1017            self.thread_states.stop_all_threads();
1018
1019            self.invalidate_command_type::<StackTraceCommand>();
1020        }
1021
1022        // Event if we stopped all threads we still need to insert the thread_id
1023        // to our own data
1024        if let Some(thread_id) = event.thread_id {
1025            self.thread_states.stop_thread(ThreadId(thread_id));
1026
1027            self.invalidate_state(
1028                &StackTraceCommand {
1029                    thread_id,
1030                    start_frame: None,
1031                    levels: None,
1032                }
1033                .into(),
1034            );
1035        }
1036
1037        self.invalidate_generic();
1038        self.threads.clear();
1039        self.variables.clear();
1040        cx.emit(SessionEvent::Stopped(
1041            event
1042                .thread_id
1043                .map(Into::into)
1044                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1045        ));
1046        cx.notify();
1047    }
1048
1049    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1050        match *event {
1051            Events::Initialized(_) => {
1052                debug_assert!(
1053                    false,
1054                    "Initialized event should have been handled in LocalMode"
1055                );
1056            }
1057            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1058            Events::Continued(event) => {
1059                if event.all_threads_continued.unwrap_or_default() {
1060                    self.thread_states.continue_all_threads();
1061                } else {
1062                    self.thread_states
1063                        .continue_thread(ThreadId(event.thread_id));
1064                }
1065                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1066                self.invalidate_generic();
1067            }
1068            Events::Exited(_event) => {
1069                self.clear_active_debug_line(cx);
1070            }
1071            Events::Terminated(_) => {
1072                self.is_session_terminated = true;
1073                self.clear_active_debug_line(cx);
1074            }
1075            Events::Thread(event) => {
1076                let thread_id = ThreadId(event.thread_id);
1077
1078                match event.reason {
1079                    dap::ThreadEventReason::Started => {
1080                        self.thread_states.continue_thread(thread_id);
1081                    }
1082                    dap::ThreadEventReason::Exited => {
1083                        self.thread_states.exit_thread(thread_id);
1084                    }
1085                    reason => {
1086                        log::error!("Unhandled thread event reason {:?}", reason);
1087                    }
1088                }
1089                self.invalidate_state(&ThreadsCommand.into());
1090                cx.notify();
1091            }
1092            Events::Output(event) => {
1093                if event
1094                    .category
1095                    .as_ref()
1096                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1097                {
1098                    return;
1099                }
1100
1101                self.output.push_back(event);
1102                self.output_token.0 += 1;
1103                cx.notify();
1104            }
1105            Events::Breakpoint(_) => {}
1106            Events::Module(event) => {
1107                match event.reason {
1108                    dap::ModuleEventReason::New => {
1109                        self.modules.push(event.module);
1110                    }
1111                    dap::ModuleEventReason::Changed => {
1112                        if let Some(module) = self
1113                            .modules
1114                            .iter_mut()
1115                            .find(|other| event.module.id == other.id)
1116                        {
1117                            *module = event.module;
1118                        }
1119                    }
1120                    dap::ModuleEventReason::Removed => {
1121                        self.modules.retain(|other| event.module.id != other.id);
1122                    }
1123                }
1124
1125                // todo(debugger): We should only send the invalidate command to downstream clients.
1126                // self.invalidate_state(&ModulesCommand.into());
1127            }
1128            Events::LoadedSource(_) => {
1129                self.invalidate_state(&LoadedSourcesCommand.into());
1130            }
1131            Events::Capabilities(event) => {
1132                self.capabilities = self.capabilities.merge(event.capabilities);
1133                cx.notify();
1134            }
1135            Events::Memory(_) => {}
1136            Events::Process(_) => {}
1137            Events::ProgressEnd(_) => {}
1138            Events::ProgressStart(_) => {}
1139            Events::ProgressUpdate(_) => {}
1140            Events::Invalidated(_) => {}
1141            Events::Other(_) => {}
1142        }
1143    }
1144
1145    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1146    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1147        &mut self,
1148        request: T,
1149        process_result: impl FnOnce(
1150            &mut Self,
1151            Result<T::Response>,
1152            &mut Context<Self>,
1153        ) -> Option<T::Response>
1154        + 'static,
1155        cx: &mut Context<Self>,
1156    ) {
1157        const {
1158            assert!(
1159                T::CACHEABLE,
1160                "Only requests marked as cacheable should invoke `fetch`"
1161            );
1162        }
1163
1164        if !self.thread_states.any_stopped_thread()
1165            && request.type_id() != TypeId::of::<ThreadsCommand>()
1166            || self.is_session_terminated
1167        {
1168            return;
1169        }
1170
1171        let request_map = self
1172            .requests
1173            .entry(std::any::TypeId::of::<T>())
1174            .or_default();
1175
1176        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1177            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1178
1179            let task = Self::request_inner::<Arc<T>>(
1180                &self.capabilities,
1181                self.id,
1182                &self.mode,
1183                command,
1184                process_result,
1185                cx,
1186            );
1187            let task = cx
1188                .background_executor()
1189                .spawn(async move {
1190                    let _ = task.await?;
1191                    Some(())
1192                })
1193                .shared();
1194
1195            vacant.insert(task);
1196            cx.notify();
1197        }
1198    }
1199
1200    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1201        capabilities: &Capabilities,
1202        session_id: SessionId,
1203        mode: &Mode,
1204        request: T,
1205        process_result: impl FnOnce(
1206            &mut Self,
1207            Result<T::Response>,
1208            &mut Context<Self>,
1209        ) -> Option<T::Response>
1210        + 'static,
1211        cx: &mut Context<Self>,
1212    ) -> Task<Option<T::Response>> {
1213        if !T::is_supported(&capabilities) {
1214            log::warn!(
1215                "Attempted to send a DAP request that isn't supported: {:?}",
1216                request
1217            );
1218            let error = Err(anyhow::Error::msg(
1219                "Couldn't complete request because it's not supported",
1220            ));
1221            return cx.spawn(async move |this, cx| {
1222                this.update(cx, |this, cx| process_result(this, error, cx))
1223                    .log_err()
1224                    .flatten()
1225            });
1226        }
1227
1228        let request = mode.request_dap(session_id, request, cx);
1229        cx.spawn(async move |this, cx| {
1230            let result = request.await;
1231            this.update(cx, |this, cx| process_result(this, result, cx))
1232                .log_err()
1233                .flatten()
1234        })
1235    }
1236
1237    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1238        &self,
1239        request: T,
1240        process_result: impl FnOnce(
1241            &mut Self,
1242            Result<T::Response>,
1243            &mut Context<Self>,
1244        ) -> Option<T::Response>
1245        + 'static,
1246        cx: &mut Context<Self>,
1247    ) -> Task<Option<T::Response>> {
1248        Self::request_inner(
1249            &self.capabilities,
1250            self.id,
1251            &self.mode,
1252            request,
1253            process_result,
1254            cx,
1255        )
1256    }
1257
1258    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1259        self.requests.remove(&std::any::TypeId::of::<Command>());
1260    }
1261
1262    fn invalidate_generic(&mut self) {
1263        self.invalidate_command_type::<ModulesCommand>();
1264        self.invalidate_command_type::<LoadedSourcesCommand>();
1265        self.invalidate_command_type::<ThreadsCommand>();
1266    }
1267
1268    fn invalidate_state(&mut self, key: &RequestSlot) {
1269        self.requests
1270            .entry((&*key.0 as &dyn Any).type_id())
1271            .and_modify(|request_map| {
1272                request_map.remove(&key);
1273            });
1274    }
1275
1276    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1277        self.thread_states.thread_status(thread_id)
1278    }
1279
1280    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1281        self.fetch(
1282            dap_command::ThreadsCommand,
1283            |this, result, cx| {
1284                let result = result.log_err()?;
1285
1286                this.threads = result
1287                    .iter()
1288                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1289                    .collect();
1290
1291                this.invalidate_command_type::<StackTraceCommand>();
1292                cx.emit(SessionEvent::Threads);
1293                cx.notify();
1294
1295                Some(result)
1296            },
1297            cx,
1298        );
1299
1300        self.threads
1301            .values()
1302            .map(|thread| {
1303                (
1304                    thread.dap.clone(),
1305                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1306                )
1307            })
1308            .collect()
1309    }
1310
1311    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1312        self.fetch(
1313            dap_command::ModulesCommand,
1314            |this, result, cx| {
1315                let result = result.log_err()?;
1316
1317                this.modules = result.iter().cloned().collect();
1318                cx.emit(SessionEvent::Modules);
1319                cx.notify();
1320
1321                Some(result)
1322            },
1323            cx,
1324        );
1325
1326        &self.modules
1327    }
1328
1329    pub fn ignore_breakpoints(&self) -> bool {
1330        self.ignore_breakpoints
1331    }
1332
1333    pub fn toggle_ignore_breakpoints(
1334        &mut self,
1335        cx: &mut App,
1336    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1337        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1338    }
1339
1340    pub(crate) fn set_ignore_breakpoints(
1341        &mut self,
1342        ignore: bool,
1343        cx: &mut App,
1344    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1345        if self.ignore_breakpoints == ignore {
1346            return Task::ready(HashMap::default());
1347        }
1348
1349        self.ignore_breakpoints = ignore;
1350
1351        if let Some(local) = self.as_local() {
1352            local.send_source_breakpoints(ignore, cx)
1353        } else {
1354            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1355            unimplemented!()
1356        }
1357    }
1358
1359    pub fn exception_breakpoints(
1360        &self,
1361    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1362        self.exception_breakpoints.values()
1363    }
1364
1365    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1366        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1367            *is_enabled = !*is_enabled;
1368            self.send_exception_breakpoints(cx);
1369        }
1370    }
1371
1372    fn send_exception_breakpoints(&mut self, cx: &App) {
1373        if let Some(local) = self.as_local() {
1374            let exception_filters = self
1375                .exception_breakpoints
1376                .values()
1377                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1378                .collect();
1379
1380            let supports_exception_filters = self
1381                .capabilities
1382                .supports_exception_filter_options
1383                .unwrap_or_default();
1384            local
1385                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1386                .detach_and_log_err(cx);
1387        } else {
1388            debug_assert!(false, "Not implemented");
1389        }
1390    }
1391
1392    pub fn breakpoints_enabled(&self) -> bool {
1393        self.ignore_breakpoints
1394    }
1395
1396    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1397        self.fetch(
1398            dap_command::LoadedSourcesCommand,
1399            |this, result, cx| {
1400                let result = result.log_err()?;
1401                this.loaded_sources = result.iter().cloned().collect();
1402                cx.emit(SessionEvent::LoadedSources);
1403                cx.notify();
1404                Some(result)
1405            },
1406            cx,
1407        );
1408
1409        &self.loaded_sources
1410    }
1411
1412    fn fallback_to_manual_restart(
1413        &mut self,
1414        res: Result<()>,
1415        cx: &mut Context<Self>,
1416    ) -> Option<()> {
1417        if res.log_err().is_none() {
1418            cx.emit(SessionStateEvent::Restart);
1419            return None;
1420        }
1421        Some(())
1422    }
1423
1424    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1425        res.log_err()?;
1426        Some(())
1427    }
1428
1429    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1430        thread_id: ThreadId,
1431    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1432    {
1433        move |this, response, cx| match response.log_err() {
1434            Some(response) => Some(response),
1435            None => {
1436                this.thread_states.stop_thread(thread_id);
1437                cx.notify();
1438                None
1439            }
1440        }
1441    }
1442
1443    fn clear_active_debug_line_response(
1444        &mut self,
1445        response: Result<()>,
1446        cx: &mut Context<Session>,
1447    ) -> Option<()> {
1448        response.log_err()?;
1449        self.clear_active_debug_line(cx);
1450        Some(())
1451    }
1452
1453    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1454        self.as_local()
1455            .expect("Message handler will only run in local mode")
1456            .breakpoint_store
1457            .update(cx, |store, cx| {
1458                store.remove_active_position(Some(self.id), cx)
1459            });
1460    }
1461
1462    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1463        self.request(
1464            PauseCommand {
1465                thread_id: thread_id.0,
1466            },
1467            Self::empty_response,
1468            cx,
1469        )
1470        .detach();
1471    }
1472
1473    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1474        self.request(
1475            RestartStackFrameCommand { stack_frame_id },
1476            Self::empty_response,
1477            cx,
1478        )
1479        .detach();
1480    }
1481
1482    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1483        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1484            self.request(
1485                RestartCommand {
1486                    raw: args.unwrap_or(Value::Null),
1487                },
1488                Self::fallback_to_manual_restart,
1489                cx,
1490            )
1491            .detach();
1492        } else {
1493            cx.emit(SessionStateEvent::Restart);
1494        }
1495    }
1496
1497    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1498        self.is_session_terminated = true;
1499        self.thread_states.exit_all_threads();
1500        cx.notify();
1501
1502        let task = if self
1503            .capabilities
1504            .supports_terminate_request
1505            .unwrap_or_default()
1506        {
1507            self.request(
1508                TerminateCommand {
1509                    restart: Some(false),
1510                },
1511                Self::clear_active_debug_line_response,
1512                cx,
1513            )
1514        } else {
1515            self.request(
1516                DisconnectCommand {
1517                    restart: Some(false),
1518                    terminate_debuggee: Some(true),
1519                    suspend_debuggee: Some(false),
1520                },
1521                Self::clear_active_debug_line_response,
1522                cx,
1523            )
1524        };
1525
1526        cx.emit(SessionStateEvent::Shutdown);
1527
1528        let debug_client = self.adapter_client();
1529
1530        cx.background_spawn(async move {
1531            let _ = task.await;
1532
1533            if let Some(client) = debug_client {
1534                client.shutdown().await.log_err();
1535            }
1536        })
1537    }
1538
1539    pub fn completions(
1540        &mut self,
1541        query: CompletionsQuery,
1542        cx: &mut Context<Self>,
1543    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1544        let task = self.request(query, |_, result, _| result.log_err(), cx);
1545
1546        cx.background_executor().spawn(async move {
1547            anyhow::Ok(
1548                task.await
1549                    .map(|response| response.targets)
1550                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1551            )
1552        })
1553    }
1554
1555    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1556        self.thread_states.continue_thread(thread_id);
1557        self.request(
1558            ContinueCommand {
1559                args: ContinueArguments {
1560                    thread_id: thread_id.0,
1561                    single_thread: Some(true),
1562                },
1563            },
1564            Self::on_step_response::<ContinueCommand>(thread_id),
1565            cx,
1566        )
1567        .detach();
1568    }
1569
1570    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1571        match self.mode {
1572            Mode::Local(ref local) => Some(local.client.clone()),
1573            Mode::Remote(_) => None,
1574        }
1575    }
1576
1577    pub fn step_over(
1578        &mut self,
1579        thread_id: ThreadId,
1580        granularity: SteppingGranularity,
1581        cx: &mut Context<Self>,
1582    ) {
1583        let supports_single_thread_execution_requests =
1584            self.capabilities.supports_single_thread_execution_requests;
1585        let supports_stepping_granularity = self
1586            .capabilities
1587            .supports_stepping_granularity
1588            .unwrap_or_default();
1589
1590        let command = NextCommand {
1591            inner: StepCommand {
1592                thread_id: thread_id.0,
1593                granularity: supports_stepping_granularity.then(|| granularity),
1594                single_thread: supports_single_thread_execution_requests,
1595            },
1596        };
1597
1598        self.thread_states.process_step(thread_id);
1599        self.request(
1600            command,
1601            Self::on_step_response::<NextCommand>(thread_id),
1602            cx,
1603        )
1604        .detach();
1605    }
1606
1607    pub fn step_in(
1608        &mut self,
1609        thread_id: ThreadId,
1610        granularity: SteppingGranularity,
1611        cx: &mut Context<Self>,
1612    ) {
1613        let supports_single_thread_execution_requests =
1614            self.capabilities.supports_single_thread_execution_requests;
1615        let supports_stepping_granularity = self
1616            .capabilities
1617            .supports_stepping_granularity
1618            .unwrap_or_default();
1619
1620        let command = StepInCommand {
1621            inner: StepCommand {
1622                thread_id: thread_id.0,
1623                granularity: supports_stepping_granularity.then(|| granularity),
1624                single_thread: supports_single_thread_execution_requests,
1625            },
1626        };
1627
1628        self.thread_states.process_step(thread_id);
1629        self.request(
1630            command,
1631            Self::on_step_response::<StepInCommand>(thread_id),
1632            cx,
1633        )
1634        .detach();
1635    }
1636
1637    pub fn step_out(
1638        &mut self,
1639        thread_id: ThreadId,
1640        granularity: SteppingGranularity,
1641        cx: &mut Context<Self>,
1642    ) {
1643        let supports_single_thread_execution_requests =
1644            self.capabilities.supports_single_thread_execution_requests;
1645        let supports_stepping_granularity = self
1646            .capabilities
1647            .supports_stepping_granularity
1648            .unwrap_or_default();
1649
1650        let command = StepOutCommand {
1651            inner: StepCommand {
1652                thread_id: thread_id.0,
1653                granularity: supports_stepping_granularity.then(|| granularity),
1654                single_thread: supports_single_thread_execution_requests,
1655            },
1656        };
1657
1658        self.thread_states.process_step(thread_id);
1659        self.request(
1660            command,
1661            Self::on_step_response::<StepOutCommand>(thread_id),
1662            cx,
1663        )
1664        .detach();
1665    }
1666
1667    pub fn step_back(
1668        &mut self,
1669        thread_id: ThreadId,
1670        granularity: SteppingGranularity,
1671        cx: &mut Context<Self>,
1672    ) {
1673        let supports_single_thread_execution_requests =
1674            self.capabilities.supports_single_thread_execution_requests;
1675        let supports_stepping_granularity = self
1676            .capabilities
1677            .supports_stepping_granularity
1678            .unwrap_or_default();
1679
1680        let command = StepBackCommand {
1681            inner: StepCommand {
1682                thread_id: thread_id.0,
1683                granularity: supports_stepping_granularity.then(|| granularity),
1684                single_thread: supports_single_thread_execution_requests,
1685            },
1686        };
1687
1688        self.thread_states.process_step(thread_id);
1689
1690        self.request(
1691            command,
1692            Self::on_step_response::<StepBackCommand>(thread_id),
1693            cx,
1694        )
1695        .detach();
1696    }
1697
1698    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1699        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1700            && self.requests.contains_key(&ThreadsCommand.type_id())
1701            && self.threads.contains_key(&thread_id)
1702        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1703        // We could still be using an old thread id and have sent a new thread's request
1704        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1705        // But it very well could cause a minor bug in the future that is hard to track down
1706        {
1707            self.fetch(
1708                super::dap_command::StackTraceCommand {
1709                    thread_id: thread_id.0,
1710                    start_frame: None,
1711                    levels: None,
1712                },
1713                move |this, stack_frames, cx| {
1714                    let stack_frames = stack_frames.log_err()?;
1715
1716                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1717                        thread.stack_frame_ids =
1718                            stack_frames.iter().map(|frame| frame.id).collect();
1719                    });
1720                    debug_assert!(
1721                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1722                        "Sent request for thread_id that doesn't exist"
1723                    );
1724
1725                    this.stack_frames.extend(
1726                        stack_frames
1727                            .iter()
1728                            .cloned()
1729                            .map(|frame| (frame.id, StackFrame::from(frame))),
1730                    );
1731
1732                    this.invalidate_command_type::<ScopesCommand>();
1733                    this.invalidate_command_type::<VariablesCommand>();
1734
1735                    cx.emit(SessionEvent::StackTrace);
1736                    cx.notify();
1737                    Some(stack_frames)
1738                },
1739                cx,
1740            );
1741        }
1742
1743        self.threads
1744            .get(&thread_id)
1745            .map(|thread| {
1746                thread
1747                    .stack_frame_ids
1748                    .iter()
1749                    .filter_map(|id| self.stack_frames.get(id))
1750                    .cloned()
1751                    .collect()
1752            })
1753            .unwrap_or_default()
1754    }
1755
1756    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1757        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1758            && self
1759                .requests
1760                .contains_key(&TypeId::of::<StackTraceCommand>())
1761        {
1762            self.fetch(
1763                ScopesCommand { stack_frame_id },
1764                move |this, scopes, cx| {
1765                    let scopes = scopes.log_err()?;
1766
1767                    for scope in scopes .iter(){
1768                        this.variables(scope.variables_reference, cx);
1769                    }
1770
1771                    let entry = this
1772                        .stack_frames
1773                        .entry(stack_frame_id)
1774                        .and_modify(|stack_frame| {
1775                            stack_frame.scopes = scopes.clone();
1776                        });
1777
1778                    cx.emit(SessionEvent::Variables);
1779
1780                    debug_assert!(
1781                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1782                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1783                    );
1784
1785                    Some(scopes)
1786                },
1787                cx,
1788            );
1789        }
1790
1791        self.stack_frames
1792            .get(&stack_frame_id)
1793            .map(|frame| frame.scopes.as_slice())
1794            .unwrap_or_default()
1795    }
1796
1797    pub fn variables(
1798        &mut self,
1799        variables_reference: VariableReference,
1800        cx: &mut Context<Self>,
1801    ) -> Vec<dap::Variable> {
1802        let command = VariablesCommand {
1803            variables_reference,
1804            filter: None,
1805            start: None,
1806            count: None,
1807            format: None,
1808        };
1809
1810        self.fetch(
1811            command,
1812            move |this, variables, cx| {
1813                let variables = variables.log_err()?;
1814                this.variables
1815                    .insert(variables_reference, variables.clone());
1816
1817                cx.emit(SessionEvent::Variables);
1818                Some(variables)
1819            },
1820            cx,
1821        );
1822
1823        self.variables
1824            .get(&variables_reference)
1825            .cloned()
1826            .unwrap_or_default()
1827    }
1828
1829    pub fn set_variable_value(
1830        &mut self,
1831        variables_reference: u64,
1832        name: String,
1833        value: String,
1834        cx: &mut Context<Self>,
1835    ) {
1836        if self.capabilities.supports_set_variable.unwrap_or_default() {
1837            self.request(
1838                SetVariableValueCommand {
1839                    name,
1840                    value,
1841                    variables_reference,
1842                },
1843                move |this, response, cx| {
1844                    let response = response.log_err()?;
1845                    this.invalidate_command_type::<VariablesCommand>();
1846                    cx.notify();
1847                    Some(response)
1848                },
1849                cx,
1850            )
1851            .detach()
1852        }
1853    }
1854
1855    pub fn evaluate(
1856        &mut self,
1857        expression: String,
1858        context: Option<EvaluateArgumentsContext>,
1859        frame_id: Option<u64>,
1860        source: Option<Source>,
1861        cx: &mut Context<Self>,
1862    ) {
1863        self.request(
1864            EvaluateCommand {
1865                expression,
1866                context,
1867                frame_id,
1868                source,
1869            },
1870            |this, response, cx| {
1871                let response = response.log_err()?;
1872                this.output_token.0 += 1;
1873                this.output.push_back(dap::OutputEvent {
1874                    category: None,
1875                    output: response.result.clone(),
1876                    group: None,
1877                    variables_reference: Some(response.variables_reference),
1878                    source: None,
1879                    line: None,
1880                    column: None,
1881                    data: None,
1882                    location_reference: None,
1883                });
1884
1885                this.invalidate_command_type::<ScopesCommand>();
1886                cx.notify();
1887                Some(response)
1888            },
1889            cx,
1890        )
1891        .detach();
1892    }
1893
1894    pub fn location(
1895        &mut self,
1896        reference: u64,
1897        cx: &mut Context<Self>,
1898    ) -> Option<dap::LocationsResponse> {
1899        self.fetch(
1900            LocationsCommand { reference },
1901            move |this, response, _| {
1902                let response = response.log_err()?;
1903                this.locations.insert(reference, response.clone());
1904                Some(response)
1905            },
1906            cx,
1907        );
1908        self.locations.get(&reference).cloned()
1909    }
1910    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1911        let command = DisconnectCommand {
1912            restart: Some(false),
1913            terminate_debuggee: Some(true),
1914            suspend_debuggee: Some(false),
1915        };
1916
1917        self.request(command, Self::empty_response, cx).detach()
1918    }
1919
1920    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1921        if self
1922            .capabilities
1923            .supports_terminate_threads_request
1924            .unwrap_or_default()
1925        {
1926            self.request(
1927                TerminateThreadsCommand {
1928                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1929                },
1930                Self::clear_active_debug_line_response,
1931                cx,
1932            )
1933            .detach();
1934        } else {
1935            self.shutdown(cx).detach();
1936        }
1937    }
1938}
1939
1940fn create_local_session(
1941    breakpoint_store: Entity<BreakpointStore>,
1942    session_id: SessionId,
1943    parent_session: Option<Entity<Session>>,
1944    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1945    initialized_tx: oneshot::Sender<()>,
1946    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1947    mode: LocalMode,
1948    cx: &mut Context<Session>,
1949) -> Session {
1950    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1951        let mut initialized_tx = Some(initialized_tx);
1952        while let Some(message) = message_rx.next().await {
1953            if let Message::Event(event) = message {
1954                if let Events::Initialized(_) = *event {
1955                    if let Some(tx) = initialized_tx.take() {
1956                        tx.send(()).ok();
1957                    }
1958                } else {
1959                    let Ok(_) = this.update(cx, |session, cx| {
1960                        session.handle_dap_event(event, cx);
1961                    }) else {
1962                        break;
1963                    };
1964                }
1965            } else {
1966                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1967                else {
1968                    break;
1969                };
1970            }
1971        }
1972    })];
1973
1974    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1975        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1976            if let Some(local) = (!this.ignore_breakpoints)
1977                .then(|| this.as_local_mut())
1978                .flatten()
1979            {
1980                local
1981                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1982                    .detach();
1983            };
1984        }
1985        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1986            if let Some(local) = (!this.ignore_breakpoints)
1987                .then(|| this.as_local_mut())
1988                .flatten()
1989            {
1990                local.unset_breakpoints_from_paths(paths, cx).detach();
1991            }
1992        }
1993        BreakpointStoreEvent::ActiveDebugLineChanged => {}
1994    })
1995    .detach();
1996
1997    Session {
1998        mode: Mode::Local(mode),
1999        id: session_id,
2000        child_session_ids: HashSet::default(),
2001        parent_id: parent_session.map(|session| session.read(cx).id),
2002        variables: Default::default(),
2003        capabilities: Capabilities::default(),
2004        thread_states: ThreadStates::default(),
2005        output_token: OutputToken(0),
2006        ignore_breakpoints: false,
2007        output: circular_buffer::CircularBuffer::boxed(),
2008        requests: HashMap::default(),
2009        modules: Vec::default(),
2010        loaded_sources: Vec::default(),
2011        threads: IndexMap::default(),
2012        stack_frames: IndexMap::default(),
2013        locations: Default::default(),
2014        exception_breakpoints: Default::default(),
2015        _background_tasks,
2016        is_session_terminated: false,
2017    }
2018}