session.rs

   1use super::breakpoint_store::{
   2    BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
   3};
   4use super::dap_command::{
   5    self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
   6    EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
   7    ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
   8    ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
   9    StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
  10    TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
  11};
  12use super::dap_store::DapStore;
  13use anyhow::{Context as _, Result, anyhow};
  14use collections::{HashMap, HashSet, IndexMap, IndexSet};
  15use dap::adapters::DebugAdapterBinary;
  16use dap::messages::Response;
  17use dap::{
  18    Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
  19    SteppingGranularity, StoppedEvent, VariableReference,
  20    client::{DebugAdapterClient, SessionId},
  21    messages::{Events, Message},
  22};
  23use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
  24use futures::channel::oneshot;
  25use futures::{FutureExt, future::Shared};
  26use gpui::{
  27    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
  28    Task, WeakEntity,
  29};
  30
  31use rpc::AnyProtoClient;
  32use serde_json::{Value, json};
  33use smol::stream::StreamExt;
  34use std::any::TypeId;
  35use std::collections::BTreeMap;
  36use std::u64;
  37use std::{
  38    any::Any,
  39    collections::hash_map::Entry,
  40    hash::{Hash, Hasher},
  41    path::Path,
  42    sync::Arc,
  43};
  44use task::DebugTaskDefinition;
  45use text::{PointUtf16, ToPointUtf16};
  46use util::{ResultExt, merge_json_value_into};
  47use worktree::Worktree;
  48
  49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
  50#[repr(transparent)]
  51pub struct ThreadId(pub u64);
  52
  53impl ThreadId {
  54    pub const MIN: ThreadId = ThreadId(u64::MIN);
  55    pub const MAX: ThreadId = ThreadId(u64::MAX);
  56}
  57
  58impl From<u64> for ThreadId {
  59    fn from(id: u64) -> Self {
  60        Self(id)
  61    }
  62}
  63
  64#[derive(Clone, Debug)]
  65pub struct StackFrame {
  66    pub dap: dap::StackFrame,
  67    pub scopes: Vec<dap::Scope>,
  68}
  69
  70impl From<dap::StackFrame> for StackFrame {
  71    fn from(stack_frame: dap::StackFrame) -> Self {
  72        Self {
  73            scopes: vec![],
  74            dap: stack_frame,
  75        }
  76    }
  77}
  78
  79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
  80pub enum ThreadStatus {
  81    #[default]
  82    Running,
  83    Stopped,
  84    Stepping,
  85    Exited,
  86    Ended,
  87}
  88
  89impl ThreadStatus {
  90    pub fn label(&self) -> &'static str {
  91        match self {
  92            ThreadStatus::Running => "Running",
  93            ThreadStatus::Stopped => "Stopped",
  94            ThreadStatus::Stepping => "Stepping",
  95            ThreadStatus::Exited => "Exited",
  96            ThreadStatus::Ended => "Ended",
  97        }
  98    }
  99}
 100
 101#[derive(Debug)]
 102pub struct Thread {
 103    dap: dap::Thread,
 104    stack_frame_ids: IndexSet<StackFrameId>,
 105    _has_stopped: bool,
 106}
 107
 108impl From<dap::Thread> for Thread {
 109    fn from(dap: dap::Thread) -> Self {
 110        Self {
 111            dap,
 112            stack_frame_ids: Default::default(),
 113            _has_stopped: false,
 114        }
 115    }
 116}
 117
 118type UpstreamProjectId = u64;
 119
 120struct RemoteConnection {
 121    _client: AnyProtoClient,
 122    _upstream_project_id: UpstreamProjectId,
 123    _adapter_name: SharedString,
 124}
 125
 126impl RemoteConnection {
 127    fn send_proto_client_request<R: DapCommand>(
 128        &self,
 129        _request: R,
 130        _session_id: SessionId,
 131        cx: &mut App,
 132    ) -> Task<Result<R::Response>> {
 133        // let message = request.to_proto(session_id, self.upstream_project_id);
 134        // let upstream_client = self.client.clone();
 135        cx.background_executor().spawn(async move {
 136            // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
 137            // let response = upstream_client.request(message).await?;
 138            // request.response_from_proto(response)
 139            Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
 140        })
 141    }
 142
 143    fn request<R: DapCommand>(
 144        &self,
 145        request: R,
 146        session_id: SessionId,
 147        cx: &mut App,
 148    ) -> Task<Result<R::Response>>
 149    where
 150        <R::DapRequest as dap::requests::Request>::Response: 'static,
 151        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 152    {
 153        return self.send_proto_client_request::<R>(request, session_id, cx);
 154    }
 155}
 156
 157enum Mode {
 158    Local(LocalMode),
 159    Remote(RemoteConnection),
 160}
 161
 162#[derive(Clone)]
 163pub struct LocalMode {
 164    client: Arc<DebugAdapterClient>,
 165    definition: DebugTaskDefinition,
 166    binary: DebugAdapterBinary,
 167    pub(crate) breakpoint_store: Entity<BreakpointStore>,
 168    tmp_breakpoint: Option<SourceBreakpoint>,
 169    worktree: WeakEntity<Worktree>,
 170}
 171
 172fn client_source(abs_path: &Path) -> dap::Source {
 173    dap::Source {
 174        name: abs_path
 175            .file_name()
 176            .map(|filename| filename.to_string_lossy().to_string()),
 177        path: Some(abs_path.to_string_lossy().to_string()),
 178        source_reference: None,
 179        presentation_hint: None,
 180        origin: None,
 181        sources: None,
 182        adapter_data: None,
 183        checksums: None,
 184    }
 185}
 186
 187impl LocalMode {
 188    fn new(
 189        session_id: SessionId,
 190        parent_session: Option<Entity<Session>>,
 191        worktree: WeakEntity<Worktree>,
 192        breakpoint_store: Entity<BreakpointStore>,
 193        config: DebugTaskDefinition,
 194        binary: DebugAdapterBinary,
 195        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 196        cx: AsyncApp,
 197    ) -> Task<Result<Self>> {
 198        Self::new_inner(
 199            session_id,
 200            parent_session,
 201            breakpoint_store,
 202            worktree,
 203            config,
 204            binary,
 205            messages_tx,
 206            async |_, _| {},
 207            cx,
 208        )
 209    }
 210
 211    fn new_inner(
 212        session_id: SessionId,
 213        parent_session: Option<Entity<Session>>,
 214        breakpoint_store: Entity<BreakpointStore>,
 215        worktree: WeakEntity<Worktree>,
 216        config: DebugTaskDefinition,
 217        binary: DebugAdapterBinary,
 218        messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
 219        on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
 220        cx: AsyncApp,
 221    ) -> Task<Result<Self>> {
 222        cx.spawn(async move |cx| {
 223            let message_handler = Box::new(move |message| {
 224                messages_tx.unbounded_send(message).ok();
 225            });
 226
 227            let client = Arc::new(
 228                if let Some(client) = parent_session
 229                    .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
 230                    .flatten()
 231                {
 232                    client
 233                        .reconnect(session_id, binary.clone(), message_handler, cx.clone())
 234                        .await?
 235                } else {
 236                    DebugAdapterClient::start(
 237                        session_id,
 238                        binary.clone(),
 239                        message_handler,
 240                        cx.clone(),
 241                    )
 242                    .await
 243                    .with_context(|| "Failed to start communication with debug adapter")?
 244                },
 245            );
 246
 247            let mut session = Self {
 248                client,
 249                breakpoint_store,
 250                worktree,
 251                tmp_breakpoint: None,
 252                definition: config,
 253                binary,
 254            };
 255
 256            on_initialized(&mut session, cx.clone()).await;
 257
 258            Ok(session)
 259        })
 260    }
 261
 262    pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
 263        &self.worktree
 264    }
 265
 266    fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
 267        let tasks: Vec<_> = paths
 268            .into_iter()
 269            .map(|path| {
 270                self.request(
 271                    dap_command::SetBreakpoints {
 272                        source: client_source(path),
 273                        source_modified: None,
 274                        breakpoints: vec![],
 275                    },
 276                    cx.background_executor().clone(),
 277                )
 278            })
 279            .collect();
 280
 281        cx.background_spawn(async move {
 282            futures::future::join_all(tasks)
 283                .await
 284                .iter()
 285                .for_each(|res| match res {
 286                    Ok(_) => {}
 287                    Err(err) => {
 288                        log::warn!("Set breakpoints request failed: {}", err);
 289                    }
 290                });
 291        })
 292    }
 293
 294    fn send_breakpoints_from_path(
 295        &self,
 296        abs_path: Arc<Path>,
 297        reason: BreakpointUpdatedReason,
 298        cx: &mut App,
 299    ) -> Task<()> {
 300        let breakpoints = self
 301            .breakpoint_store
 302            .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
 303            .into_iter()
 304            .filter(|bp| bp.state.is_enabled())
 305            .chain(self.tmp_breakpoint.clone())
 306            .map(Into::into)
 307            .collect();
 308
 309        let task = self.request(
 310            dap_command::SetBreakpoints {
 311                source: client_source(&abs_path),
 312                source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
 313                breakpoints,
 314            },
 315            cx.background_executor().clone(),
 316        );
 317
 318        cx.background_spawn(async move {
 319            match task.await {
 320                Ok(_) => {}
 321                Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
 322            }
 323        })
 324    }
 325
 326    fn send_exception_breakpoints(
 327        &self,
 328        filters: Vec<ExceptionBreakpointsFilter>,
 329        supports_filter_options: bool,
 330        cx: &App,
 331    ) -> Task<Result<Vec<dap::Breakpoint>>> {
 332        let arg = if supports_filter_options {
 333            SetExceptionBreakpoints::WithOptions {
 334                filters: filters
 335                    .into_iter()
 336                    .map(|filter| ExceptionFilterOptions {
 337                        filter_id: filter.filter,
 338                        condition: None,
 339                        mode: None,
 340                    })
 341                    .collect(),
 342            }
 343        } else {
 344            SetExceptionBreakpoints::Plain {
 345                filters: filters.into_iter().map(|filter| filter.filter).collect(),
 346            }
 347        };
 348        self.request(arg, cx.background_executor().clone())
 349    }
 350
 351    fn send_source_breakpoints(
 352        &self,
 353        ignore_breakpoints: bool,
 354        cx: &App,
 355    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
 356        let mut breakpoint_tasks = Vec::new();
 357        let breakpoints = self
 358            .breakpoint_store
 359            .read_with(cx, |store, cx| store.all_breakpoints(cx));
 360
 361        for (path, breakpoints) in breakpoints {
 362            let breakpoints = if ignore_breakpoints {
 363                vec![]
 364            } else {
 365                breakpoints
 366                    .into_iter()
 367                    .filter(|bp| bp.state.is_enabled())
 368                    .map(Into::into)
 369                    .collect()
 370            };
 371
 372            breakpoint_tasks.push(
 373                self.request(
 374                    dap_command::SetBreakpoints {
 375                        source: client_source(&path),
 376                        source_modified: Some(false),
 377                        breakpoints,
 378                    },
 379                    cx.background_executor().clone(),
 380                )
 381                .map(|result| result.map_err(|e| (path, e))),
 382            );
 383        }
 384
 385        cx.background_spawn(async move {
 386            futures::future::join_all(breakpoint_tasks)
 387                .await
 388                .into_iter()
 389                .filter_map(Result::err)
 390                .collect::<HashMap<_, _>>()
 391        })
 392    }
 393
 394    pub fn label(&self) -> String {
 395        self.definition.label.clone()
 396    }
 397
 398    fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
 399        let adapter_id = self.binary.adapter_name.to_string();
 400
 401        self.request(Initialize { adapter_id }, cx.background_executor().clone())
 402    }
 403
 404    fn initialize_sequence(
 405        &self,
 406        capabilities: &Capabilities,
 407        initialized_rx: oneshot::Receiver<()>,
 408        dap_store: WeakEntity<DapStore>,
 409        cx: &App,
 410    ) -> Task<Result<()>> {
 411        let mut raw = self.binary.request_args.clone();
 412
 413        merge_json_value_into(
 414            self.definition.initialize_args.clone().unwrap_or(json!({})),
 415            &mut raw.configuration,
 416        );
 417
 418        // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
 419        let launch = match raw.request {
 420            dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
 421                Launch {
 422                    raw: raw.configuration,
 423                },
 424                cx.background_executor().clone(),
 425            ),
 426            dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
 427                Attach {
 428                    raw: raw.configuration,
 429                },
 430                cx.background_executor().clone(),
 431            ),
 432        };
 433
 434        let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
 435        let exception_filters = capabilities
 436            .exception_breakpoint_filters
 437            .as_ref()
 438            .map(|exception_filters| {
 439                exception_filters
 440                    .iter()
 441                    .filter(|filter| filter.default == Some(true))
 442                    .cloned()
 443                    .collect::<Vec<_>>()
 444            })
 445            .unwrap_or_default();
 446        let supports_exception_filters = capabilities
 447            .supports_exception_filter_options
 448            .unwrap_or_default();
 449        let configuration_sequence = cx.spawn({
 450            let this = self.clone();
 451            let worktree = self.worktree().clone();
 452            async move |cx| {
 453                initialized_rx.await?;
 454                let errors_by_path = cx
 455                    .update(|cx| this.send_source_breakpoints(false, cx))?
 456                    .await;
 457
 458                dap_store.update(cx, |_, cx| {
 459                    let Some(worktree) = worktree.upgrade() else {
 460                        return;
 461                    };
 462
 463                    for (path, error) in &errors_by_path {
 464                        log::error!("failed to set breakpoints for {path:?}: {error}");
 465                    }
 466
 467                    if let Some(failed_path) = errors_by_path.keys().next() {
 468                        let failed_path = failed_path
 469                            .strip_prefix(worktree.read(cx).abs_path())
 470                            .unwrap_or(failed_path)
 471                            .display();
 472                        let message = format!(
 473                            "Failed to set breakpoints for {failed_path}{}",
 474                            match errors_by_path.len() {
 475                                0 => unreachable!(),
 476                                1 => "".into(),
 477                                2 => " and 1 other path".into(),
 478                                n => format!(" and {} other paths", n - 1),
 479                            }
 480                        );
 481                        cx.emit(super::dap_store::DapStoreEvent::Notification(message));
 482                    }
 483                })?;
 484
 485                cx.update(|cx| {
 486                    this.send_exception_breakpoints(
 487                        exception_filters,
 488                        supports_exception_filters,
 489                        cx,
 490                    )
 491                })?
 492                .await
 493                .ok();
 494                let ret = if configuration_done_supported {
 495                    this.request(ConfigurationDone {}, cx.background_executor().clone())
 496                } else {
 497                    Task::ready(Ok(()))
 498                }
 499                .await;
 500                ret
 501            }
 502        });
 503
 504        cx.background_spawn(async move {
 505            futures::future::try_join(launch, configuration_sequence).await?;
 506            Ok(())
 507        })
 508    }
 509
 510    fn request<R: LocalDapCommand>(
 511        &self,
 512        request: R,
 513        executor: BackgroundExecutor,
 514    ) -> Task<Result<R::Response>>
 515    where
 516        <R::DapRequest as dap::requests::Request>::Response: 'static,
 517        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 518    {
 519        let request = Arc::new(request);
 520
 521        let request_clone = request.clone();
 522        let connection = self.client.clone();
 523        let request_task = executor.spawn(async move {
 524            let args = request_clone.to_dap();
 525            connection.request::<R::DapRequest>(args).await
 526        });
 527
 528        executor.spawn(async move {
 529            let response = request.response_from_dap(request_task.await?);
 530            response
 531        })
 532    }
 533}
 534impl From<RemoteConnection> for Mode {
 535    fn from(value: RemoteConnection) -> Self {
 536        Self::Remote(value)
 537    }
 538}
 539
 540impl Mode {
 541    fn request_dap<R: DapCommand>(
 542        &self,
 543        session_id: SessionId,
 544        request: R,
 545        cx: &mut Context<Session>,
 546    ) -> Task<Result<R::Response>>
 547    where
 548        <R::DapRequest as dap::requests::Request>::Response: 'static,
 549        <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
 550    {
 551        match self {
 552            Mode::Local(debug_adapter_client) => {
 553                debug_adapter_client.request(request, cx.background_executor().clone())
 554            }
 555            Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
 556        }
 557    }
 558}
 559
 560#[derive(Default)]
 561struct ThreadStates {
 562    global_state: Option<ThreadStatus>,
 563    known_thread_states: IndexMap<ThreadId, ThreadStatus>,
 564}
 565
 566impl ThreadStates {
 567    fn stop_all_threads(&mut self) {
 568        self.global_state = Some(ThreadStatus::Stopped);
 569        self.known_thread_states.clear();
 570    }
 571
 572    fn exit_all_threads(&mut self) {
 573        self.global_state = Some(ThreadStatus::Exited);
 574        self.known_thread_states.clear();
 575    }
 576
 577    fn continue_all_threads(&mut self) {
 578        self.global_state = Some(ThreadStatus::Running);
 579        self.known_thread_states.clear();
 580    }
 581
 582    fn stop_thread(&mut self, thread_id: ThreadId) {
 583        self.known_thread_states
 584            .insert(thread_id, ThreadStatus::Stopped);
 585    }
 586
 587    fn continue_thread(&mut self, thread_id: ThreadId) {
 588        self.known_thread_states
 589            .insert(thread_id, ThreadStatus::Running);
 590    }
 591
 592    fn process_step(&mut self, thread_id: ThreadId) {
 593        self.known_thread_states
 594            .insert(thread_id, ThreadStatus::Stepping);
 595    }
 596
 597    fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
 598        self.thread_state(thread_id)
 599            .unwrap_or(ThreadStatus::Running)
 600    }
 601
 602    fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
 603        self.known_thread_states
 604            .get(&thread_id)
 605            .copied()
 606            .or(self.global_state)
 607    }
 608
 609    fn exit_thread(&mut self, thread_id: ThreadId) {
 610        self.known_thread_states
 611            .insert(thread_id, ThreadStatus::Exited);
 612    }
 613
 614    fn any_stopped_thread(&self) -> bool {
 615        self.global_state
 616            .is_some_and(|state| state == ThreadStatus::Stopped)
 617            || self
 618                .known_thread_states
 619                .values()
 620                .any(|status| *status == ThreadStatus::Stopped)
 621    }
 622}
 623const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
 624
 625type IsEnabled = bool;
 626
 627#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
 628pub struct OutputToken(pub usize);
 629/// Represents a current state of a single debug adapter and provides ways to mutate it.
 630pub struct Session {
 631    mode: Mode,
 632    pub(super) capabilities: Capabilities,
 633    id: SessionId,
 634    child_session_ids: HashSet<SessionId>,
 635    parent_id: Option<SessionId>,
 636    ignore_breakpoints: bool,
 637    modules: Vec<dap::Module>,
 638    loaded_sources: Vec<dap::Source>,
 639    output_token: OutputToken,
 640    output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
 641    threads: IndexMap<ThreadId, Thread>,
 642    thread_states: ThreadStates,
 643    variables: HashMap<VariableReference, Vec<dap::Variable>>,
 644    stack_frames: IndexMap<StackFrameId, StackFrame>,
 645    locations: HashMap<u64, dap::LocationsResponse>,
 646    is_session_terminated: bool,
 647    requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
 648    exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
 649    _background_tasks: Vec<Task<()>>,
 650}
 651
 652trait CacheableCommand: Any + Send + Sync {
 653    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
 654    fn dyn_hash(&self, hasher: &mut dyn Hasher);
 655    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
 656}
 657
 658impl<T> CacheableCommand for T
 659where
 660    T: DapCommand + PartialEq + Eq + Hash,
 661{
 662    fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
 663        (rhs as &dyn Any)
 664            .downcast_ref::<Self>()
 665            .map_or(false, |rhs| self == rhs)
 666    }
 667
 668    fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
 669        T::hash(self, &mut hasher);
 670    }
 671
 672    fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
 673        self
 674    }
 675}
 676
 677pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
 678
 679impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
 680    fn from(request: T) -> Self {
 681        Self(Arc::new(request))
 682    }
 683}
 684
 685impl PartialEq for RequestSlot {
 686    fn eq(&self, other: &Self) -> bool {
 687        self.0.dyn_eq(other.0.as_ref())
 688    }
 689}
 690
 691impl Eq for RequestSlot {}
 692
 693impl Hash for RequestSlot {
 694    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
 695        self.0.dyn_hash(state);
 696        (&*self.0 as &dyn Any).type_id().hash(state)
 697    }
 698}
 699
 700#[derive(Debug, Clone, Hash, PartialEq, Eq)]
 701pub struct CompletionsQuery {
 702    pub query: String,
 703    pub column: u64,
 704    pub line: Option<u64>,
 705    pub frame_id: Option<u64>,
 706}
 707
 708impl CompletionsQuery {
 709    pub fn new(
 710        buffer: &language::Buffer,
 711        cursor_position: language::Anchor,
 712        frame_id: Option<u64>,
 713    ) -> Self {
 714        let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
 715        Self {
 716            query: buffer.text(),
 717            column: column as u64,
 718            frame_id,
 719            line: Some(row as u64),
 720        }
 721    }
 722}
 723
 724pub enum SessionEvent {
 725    Modules,
 726    LoadedSources,
 727    Stopped(Option<ThreadId>),
 728    StackTrace,
 729    Variables,
 730    Threads,
 731}
 732
 733pub(super) enum SessionStateEvent {
 734    Shutdown,
 735    Restart,
 736}
 737
 738impl EventEmitter<SessionEvent> for Session {}
 739impl EventEmitter<SessionStateEvent> for Session {}
 740
 741// local session will send breakpoint updates to DAP for all new breakpoints
 742// remote side will only send breakpoint updates when it is a breakpoint created by that peer
 743// BreakpointStore notifies session on breakpoint changes
 744impl Session {
 745    pub(crate) fn local(
 746        breakpoint_store: Entity<BreakpointStore>,
 747        worktree: WeakEntity<Worktree>,
 748        session_id: SessionId,
 749        parent_session: Option<Entity<Session>>,
 750        binary: DebugAdapterBinary,
 751        config: DebugTaskDefinition,
 752        start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 753        initialized_tx: oneshot::Sender<()>,
 754        cx: &mut App,
 755    ) -> Task<Result<Entity<Self>>> {
 756        let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
 757
 758        cx.spawn(async move |cx| {
 759            let mode = LocalMode::new(
 760                session_id,
 761                parent_session.clone(),
 762                worktree,
 763                breakpoint_store.clone(),
 764                config.clone(),
 765                binary,
 766                message_tx,
 767                cx.clone(),
 768            )
 769            .await?;
 770
 771            cx.new(|cx| {
 772                create_local_session(
 773                    breakpoint_store,
 774                    session_id,
 775                    parent_session,
 776                    start_debugging_requests_tx,
 777                    initialized_tx,
 778                    message_rx,
 779                    mode,
 780                    cx,
 781                )
 782            })
 783        })
 784    }
 785
 786    pub(crate) fn remote(
 787        session_id: SessionId,
 788        client: AnyProtoClient,
 789        upstream_project_id: u64,
 790        ignore_breakpoints: bool,
 791    ) -> Self {
 792        Self {
 793            mode: Mode::Remote(RemoteConnection {
 794                _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
 795                _client: client,
 796                _upstream_project_id: upstream_project_id,
 797            }),
 798            id: session_id,
 799            child_session_ids: HashSet::default(),
 800            parent_id: None,
 801            capabilities: Capabilities::default(),
 802            ignore_breakpoints,
 803            variables: Default::default(),
 804            stack_frames: Default::default(),
 805            thread_states: ThreadStates::default(),
 806            output_token: OutputToken(0),
 807            output: circular_buffer::CircularBuffer::boxed(),
 808            requests: HashMap::default(),
 809            modules: Vec::default(),
 810            loaded_sources: Vec::default(),
 811            threads: IndexMap::default(),
 812            _background_tasks: Vec::default(),
 813            locations: Default::default(),
 814            is_session_terminated: false,
 815            exception_breakpoints: Default::default(),
 816        }
 817    }
 818
 819    pub fn session_id(&self) -> SessionId {
 820        self.id
 821    }
 822
 823    pub fn child_session_ids(&self) -> HashSet<SessionId> {
 824        self.child_session_ids.clone()
 825    }
 826
 827    pub fn add_child_session_id(&mut self, session_id: SessionId) {
 828        self.child_session_ids.insert(session_id);
 829    }
 830
 831    pub fn remove_child_session_id(&mut self, session_id: SessionId) {
 832        self.child_session_ids.remove(&session_id);
 833    }
 834
 835    pub fn parent_id(&self) -> Option<SessionId> {
 836        self.parent_id
 837    }
 838
 839    pub fn capabilities(&self) -> &Capabilities {
 840        &self.capabilities
 841    }
 842
 843    pub fn binary(&self) -> &DebugAdapterBinary {
 844        let Mode::Local(local_mode) = &self.mode else {
 845            panic!("Session is not local");
 846        };
 847        &local_mode.binary
 848    }
 849
 850    pub fn adapter_name(&self) -> SharedString {
 851        match &self.mode {
 852            Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
 853            Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
 854        }
 855    }
 856
 857    pub fn configuration(&self) -> Option<DebugTaskDefinition> {
 858        if let Mode::Local(local_mode) = &self.mode {
 859            Some(local_mode.definition.clone())
 860        } else {
 861            None
 862        }
 863    }
 864
 865    pub fn is_terminated(&self) -> bool {
 866        self.is_session_terminated
 867    }
 868
 869    pub fn is_local(&self) -> bool {
 870        matches!(self.mode, Mode::Local(_))
 871    }
 872
 873    pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
 874        match &mut self.mode {
 875            Mode::Local(local_mode) => Some(local_mode),
 876            Mode::Remote(_) => None,
 877        }
 878    }
 879
 880    pub fn as_local(&self) -> Option<&LocalMode> {
 881        match &self.mode {
 882            Mode::Local(local_mode) => Some(local_mode),
 883            Mode::Remote(_) => None,
 884        }
 885    }
 886
 887    pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
 888        match &self.mode {
 889            Mode::Local(local_mode) => {
 890                let capabilities = local_mode.clone().request_initialization(cx);
 891
 892                cx.spawn(async move |this, cx| {
 893                    let capabilities = capabilities.await?;
 894                    this.update(cx, |session, _| {
 895                        session.capabilities = capabilities;
 896                        let filters = session
 897                            .capabilities
 898                            .exception_breakpoint_filters
 899                            .clone()
 900                            .unwrap_or_default();
 901                        for filter in filters {
 902                            let default = filter.default.unwrap_or_default();
 903                            session
 904                                .exception_breakpoints
 905                                .entry(filter.filter.clone())
 906                                .or_insert_with(|| (filter, default));
 907                        }
 908                    })?;
 909                    Ok(())
 910                })
 911            }
 912            Mode::Remote(_) => Task::ready(Err(anyhow!(
 913                "Cannot send initialize request from remote session"
 914            ))),
 915        }
 916    }
 917
 918    pub(super) fn initialize_sequence(
 919        &mut self,
 920        initialize_rx: oneshot::Receiver<()>,
 921        dap_store: WeakEntity<DapStore>,
 922        cx: &mut Context<Self>,
 923    ) -> Task<Result<()>> {
 924        match &self.mode {
 925            Mode::Local(local_mode) => {
 926                local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
 927            }
 928            Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
 929        }
 930    }
 931
 932    pub fn run_to_position(
 933        &mut self,
 934        breakpoint: SourceBreakpoint,
 935        active_thread_id: ThreadId,
 936        cx: &mut Context<Self>,
 937    ) {
 938        match &mut self.mode {
 939            Mode::Local(local_mode) => {
 940                if !matches!(
 941                    self.thread_states.thread_state(active_thread_id),
 942                    Some(ThreadStatus::Stopped)
 943                ) {
 944                    return;
 945                };
 946                let path = breakpoint.path.clone();
 947                local_mode.tmp_breakpoint = Some(breakpoint);
 948                let task = local_mode.send_breakpoints_from_path(
 949                    path,
 950                    BreakpointUpdatedReason::Toggled,
 951                    cx,
 952                );
 953
 954                cx.spawn(async move |this, cx| {
 955                    task.await;
 956                    this.update(cx, |this, cx| {
 957                        this.continue_thread(active_thread_id, cx);
 958                    })
 959                })
 960                .detach();
 961            }
 962            Mode::Remote(_) => {}
 963        }
 964    }
 965
 966    pub fn has_new_output(&self, last_update: OutputToken) -> bool {
 967        self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
 968    }
 969
 970    pub fn output(
 971        &self,
 972        since: OutputToken,
 973    ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
 974        if self.output_token.0 == 0 {
 975            return (self.output.range(0..0), OutputToken(0));
 976        };
 977
 978        let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
 979
 980        let clamped_events_since = events_since.clamp(0, self.output.len());
 981        (
 982            self.output
 983                .range(self.output.len() - clamped_events_since..),
 984            self.output_token,
 985        )
 986    }
 987
 988    pub fn respond_to_client(
 989        &self,
 990        request_seq: u64,
 991        success: bool,
 992        command: String,
 993        body: Option<serde_json::Value>,
 994        cx: &mut Context<Self>,
 995    ) -> Task<Result<()>> {
 996        let Some(local_session) = self.as_local().cloned() else {
 997            unreachable!("Cannot respond to remote client");
 998        };
 999
1000        cx.background_spawn(async move {
1001            local_session
1002                .client
1003                .send_message(Message::Response(Response {
1004                    body,
1005                    success,
1006                    command,
1007                    seq: request_seq + 1,
1008                    request_seq,
1009                    message: None,
1010                }))
1011                .await
1012        })
1013    }
1014
1015    fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1016        if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1017            let breakpoint = local.tmp_breakpoint.take()?;
1018            let path = breakpoint.path.clone();
1019            Some((local, path))
1020        }) {
1021            local
1022                .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1023                .detach();
1024        };
1025
1026        if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1027            self.thread_states.stop_all_threads();
1028
1029            self.invalidate_command_type::<StackTraceCommand>();
1030        }
1031
1032        // Event if we stopped all threads we still need to insert the thread_id
1033        // to our own data
1034        if let Some(thread_id) = event.thread_id {
1035            self.thread_states.stop_thread(ThreadId(thread_id));
1036
1037            self.invalidate_state(
1038                &StackTraceCommand {
1039                    thread_id,
1040                    start_frame: None,
1041                    levels: None,
1042                }
1043                .into(),
1044            );
1045        }
1046
1047        self.invalidate_generic();
1048        self.threads.clear();
1049        self.variables.clear();
1050        cx.emit(SessionEvent::Stopped(
1051            event
1052                .thread_id
1053                .map(Into::into)
1054                .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1055        ));
1056        cx.notify();
1057    }
1058
1059    pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1060        match *event {
1061            Events::Initialized(_) => {
1062                debug_assert!(
1063                    false,
1064                    "Initialized event should have been handled in LocalMode"
1065                );
1066            }
1067            Events::Stopped(event) => self.handle_stopped_event(event, cx),
1068            Events::Continued(event) => {
1069                if event.all_threads_continued.unwrap_or_default() {
1070                    self.thread_states.continue_all_threads();
1071                } else {
1072                    self.thread_states
1073                        .continue_thread(ThreadId(event.thread_id));
1074                }
1075                // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1076                self.invalidate_generic();
1077            }
1078            Events::Exited(_event) => {
1079                self.clear_active_debug_line(cx);
1080            }
1081            Events::Terminated(_) => {
1082                self.is_session_terminated = true;
1083                self.clear_active_debug_line(cx);
1084            }
1085            Events::Thread(event) => {
1086                let thread_id = ThreadId(event.thread_id);
1087
1088                match event.reason {
1089                    dap::ThreadEventReason::Started => {
1090                        self.thread_states.continue_thread(thread_id);
1091                    }
1092                    dap::ThreadEventReason::Exited => {
1093                        self.thread_states.exit_thread(thread_id);
1094                    }
1095                    reason => {
1096                        log::error!("Unhandled thread event reason {:?}", reason);
1097                    }
1098                }
1099                self.invalidate_state(&ThreadsCommand.into());
1100                cx.notify();
1101            }
1102            Events::Output(event) => {
1103                if event
1104                    .category
1105                    .as_ref()
1106                    .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1107                {
1108                    return;
1109                }
1110
1111                self.output.push_back(event);
1112                self.output_token.0 += 1;
1113                cx.notify();
1114            }
1115            Events::Breakpoint(_) => {}
1116            Events::Module(event) => {
1117                match event.reason {
1118                    dap::ModuleEventReason::New => {
1119                        self.modules.push(event.module);
1120                    }
1121                    dap::ModuleEventReason::Changed => {
1122                        if let Some(module) = self
1123                            .modules
1124                            .iter_mut()
1125                            .find(|other| event.module.id == other.id)
1126                        {
1127                            *module = event.module;
1128                        }
1129                    }
1130                    dap::ModuleEventReason::Removed => {
1131                        self.modules.retain(|other| event.module.id != other.id);
1132                    }
1133                }
1134
1135                // todo(debugger): We should only send the invalidate command to downstream clients.
1136                // self.invalidate_state(&ModulesCommand.into());
1137            }
1138            Events::LoadedSource(_) => {
1139                self.invalidate_state(&LoadedSourcesCommand.into());
1140            }
1141            Events::Capabilities(event) => {
1142                self.capabilities = self.capabilities.merge(event.capabilities);
1143                cx.notify();
1144            }
1145            Events::Memory(_) => {}
1146            Events::Process(_) => {}
1147            Events::ProgressEnd(_) => {}
1148            Events::ProgressStart(_) => {}
1149            Events::ProgressUpdate(_) => {}
1150            Events::Invalidated(_) => {}
1151            Events::Other(_) => {}
1152        }
1153    }
1154
1155    /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1156    fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1157        &mut self,
1158        request: T,
1159        process_result: impl FnOnce(
1160            &mut Self,
1161            Result<T::Response>,
1162            &mut Context<Self>,
1163        ) -> Option<T::Response>
1164        + 'static,
1165        cx: &mut Context<Self>,
1166    ) {
1167        const {
1168            assert!(
1169                T::CACHEABLE,
1170                "Only requests marked as cacheable should invoke `fetch`"
1171            );
1172        }
1173
1174        if !self.thread_states.any_stopped_thread()
1175            && request.type_id() != TypeId::of::<ThreadsCommand>()
1176            || self.is_session_terminated
1177        {
1178            return;
1179        }
1180
1181        let request_map = self
1182            .requests
1183            .entry(std::any::TypeId::of::<T>())
1184            .or_default();
1185
1186        if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1187            let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1188
1189            let task = Self::request_inner::<Arc<T>>(
1190                &self.capabilities,
1191                self.id,
1192                &self.mode,
1193                command,
1194                process_result,
1195                cx,
1196            );
1197            let task = cx
1198                .background_executor()
1199                .spawn(async move {
1200                    let _ = task.await?;
1201                    Some(())
1202                })
1203                .shared();
1204
1205            vacant.insert(task);
1206            cx.notify();
1207        }
1208    }
1209
1210    fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1211        capabilities: &Capabilities,
1212        session_id: SessionId,
1213        mode: &Mode,
1214        request: T,
1215        process_result: impl FnOnce(
1216            &mut Self,
1217            Result<T::Response>,
1218            &mut Context<Self>,
1219        ) -> Option<T::Response>
1220        + 'static,
1221        cx: &mut Context<Self>,
1222    ) -> Task<Option<T::Response>> {
1223        if !T::is_supported(&capabilities) {
1224            log::warn!(
1225                "Attempted to send a DAP request that isn't supported: {:?}",
1226                request
1227            );
1228            let error = Err(anyhow::Error::msg(
1229                "Couldn't complete request because it's not supported",
1230            ));
1231            return cx.spawn(async move |this, cx| {
1232                this.update(cx, |this, cx| process_result(this, error, cx))
1233                    .log_err()
1234                    .flatten()
1235            });
1236        }
1237
1238        let request = mode.request_dap(session_id, request, cx);
1239        cx.spawn(async move |this, cx| {
1240            let result = request.await;
1241            this.update(cx, |this, cx| process_result(this, result, cx))
1242                .log_err()
1243                .flatten()
1244        })
1245    }
1246
1247    fn request<T: DapCommand + PartialEq + Eq + Hash>(
1248        &self,
1249        request: T,
1250        process_result: impl FnOnce(
1251            &mut Self,
1252            Result<T::Response>,
1253            &mut Context<Self>,
1254        ) -> Option<T::Response>
1255        + 'static,
1256        cx: &mut Context<Self>,
1257    ) -> Task<Option<T::Response>> {
1258        Self::request_inner(
1259            &self.capabilities,
1260            self.id,
1261            &self.mode,
1262            request,
1263            process_result,
1264            cx,
1265        )
1266    }
1267
1268    fn invalidate_command_type<Command: DapCommand>(&mut self) {
1269        self.requests.remove(&std::any::TypeId::of::<Command>());
1270    }
1271
1272    fn invalidate_generic(&mut self) {
1273        self.invalidate_command_type::<ModulesCommand>();
1274        self.invalidate_command_type::<LoadedSourcesCommand>();
1275        self.invalidate_command_type::<ThreadsCommand>();
1276    }
1277
1278    fn invalidate_state(&mut self, key: &RequestSlot) {
1279        self.requests
1280            .entry((&*key.0 as &dyn Any).type_id())
1281            .and_modify(|request_map| {
1282                request_map.remove(&key);
1283            });
1284    }
1285
1286    pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1287        self.thread_states.thread_status(thread_id)
1288    }
1289
1290    pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1291        self.fetch(
1292            dap_command::ThreadsCommand,
1293            |this, result, cx| {
1294                let result = result.log_err()?;
1295
1296                this.threads = result
1297                    .iter()
1298                    .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1299                    .collect();
1300
1301                this.invalidate_command_type::<StackTraceCommand>();
1302                cx.emit(SessionEvent::Threads);
1303                cx.notify();
1304
1305                Some(result)
1306            },
1307            cx,
1308        );
1309
1310        self.threads
1311            .values()
1312            .map(|thread| {
1313                (
1314                    thread.dap.clone(),
1315                    self.thread_states.thread_status(ThreadId(thread.dap.id)),
1316                )
1317            })
1318            .collect()
1319    }
1320
1321    pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1322        self.fetch(
1323            dap_command::ModulesCommand,
1324            |this, result, cx| {
1325                let result = result.log_err()?;
1326
1327                this.modules = result.iter().cloned().collect();
1328                cx.emit(SessionEvent::Modules);
1329                cx.notify();
1330
1331                Some(result)
1332            },
1333            cx,
1334        );
1335
1336        &self.modules
1337    }
1338
1339    pub fn ignore_breakpoints(&self) -> bool {
1340        self.ignore_breakpoints
1341    }
1342
1343    pub fn toggle_ignore_breakpoints(
1344        &mut self,
1345        cx: &mut App,
1346    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1347        self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1348    }
1349
1350    pub(crate) fn set_ignore_breakpoints(
1351        &mut self,
1352        ignore: bool,
1353        cx: &mut App,
1354    ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1355        if self.ignore_breakpoints == ignore {
1356            return Task::ready(HashMap::default());
1357        }
1358
1359        self.ignore_breakpoints = ignore;
1360
1361        if let Some(local) = self.as_local() {
1362            local.send_source_breakpoints(ignore, cx)
1363        } else {
1364            // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1365            unimplemented!()
1366        }
1367    }
1368
1369    pub fn exception_breakpoints(
1370        &self,
1371    ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1372        self.exception_breakpoints.values()
1373    }
1374
1375    pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1376        if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1377            *is_enabled = !*is_enabled;
1378            self.send_exception_breakpoints(cx);
1379        }
1380    }
1381
1382    fn send_exception_breakpoints(&mut self, cx: &App) {
1383        if let Some(local) = self.as_local() {
1384            let exception_filters = self
1385                .exception_breakpoints
1386                .values()
1387                .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1388                .collect();
1389
1390            let supports_exception_filters = self
1391                .capabilities
1392                .supports_exception_filter_options
1393                .unwrap_or_default();
1394            local
1395                .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1396                .detach_and_log_err(cx);
1397        } else {
1398            debug_assert!(false, "Not implemented");
1399        }
1400    }
1401
1402    pub fn breakpoints_enabled(&self) -> bool {
1403        self.ignore_breakpoints
1404    }
1405
1406    pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1407        self.fetch(
1408            dap_command::LoadedSourcesCommand,
1409            |this, result, cx| {
1410                let result = result.log_err()?;
1411                this.loaded_sources = result.iter().cloned().collect();
1412                cx.emit(SessionEvent::LoadedSources);
1413                cx.notify();
1414                Some(result)
1415            },
1416            cx,
1417        );
1418
1419        &self.loaded_sources
1420    }
1421
1422    fn fallback_to_manual_restart(
1423        &mut self,
1424        res: Result<()>,
1425        cx: &mut Context<Self>,
1426    ) -> Option<()> {
1427        if res.log_err().is_none() {
1428            cx.emit(SessionStateEvent::Restart);
1429            return None;
1430        }
1431        Some(())
1432    }
1433
1434    fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1435        res.log_err()?;
1436        Some(())
1437    }
1438
1439    fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1440        thread_id: ThreadId,
1441    ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1442    {
1443        move |this, response, cx| match response.log_err() {
1444            Some(response) => Some(response),
1445            None => {
1446                this.thread_states.stop_thread(thread_id);
1447                cx.notify();
1448                None
1449            }
1450        }
1451    }
1452
1453    fn clear_active_debug_line_response(
1454        &mut self,
1455        response: Result<()>,
1456        cx: &mut Context<Session>,
1457    ) -> Option<()> {
1458        response.log_err()?;
1459        self.clear_active_debug_line(cx);
1460        Some(())
1461    }
1462
1463    fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1464        self.as_local()
1465            .expect("Message handler will only run in local mode")
1466            .breakpoint_store
1467            .update(cx, |store, cx| {
1468                store.remove_active_position(Some(self.id), cx)
1469            });
1470    }
1471
1472    pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1473        self.request(
1474            PauseCommand {
1475                thread_id: thread_id.0,
1476            },
1477            Self::empty_response,
1478            cx,
1479        )
1480        .detach();
1481    }
1482
1483    pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1484        self.request(
1485            RestartStackFrameCommand { stack_frame_id },
1486            Self::empty_response,
1487            cx,
1488        )
1489        .detach();
1490    }
1491
1492    pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1493        if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1494            self.request(
1495                RestartCommand {
1496                    raw: args.unwrap_or(Value::Null),
1497                },
1498                Self::fallback_to_manual_restart,
1499                cx,
1500            )
1501            .detach();
1502        } else {
1503            cx.emit(SessionStateEvent::Restart);
1504        }
1505    }
1506
1507    pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1508        self.is_session_terminated = true;
1509        self.thread_states.exit_all_threads();
1510        cx.notify();
1511
1512        let task = if self
1513            .capabilities
1514            .supports_terminate_request
1515            .unwrap_or_default()
1516        {
1517            self.request(
1518                TerminateCommand {
1519                    restart: Some(false),
1520                },
1521                Self::clear_active_debug_line_response,
1522                cx,
1523            )
1524        } else {
1525            self.request(
1526                DisconnectCommand {
1527                    restart: Some(false),
1528                    terminate_debuggee: Some(true),
1529                    suspend_debuggee: Some(false),
1530                },
1531                Self::clear_active_debug_line_response,
1532                cx,
1533            )
1534        };
1535
1536        cx.emit(SessionStateEvent::Shutdown);
1537
1538        let debug_client = self.adapter_client();
1539
1540        cx.background_spawn(async move {
1541            let _ = task.await;
1542
1543            if let Some(client) = debug_client {
1544                client.shutdown().await.log_err();
1545            }
1546        })
1547    }
1548
1549    pub fn completions(
1550        &mut self,
1551        query: CompletionsQuery,
1552        cx: &mut Context<Self>,
1553    ) -> Task<Result<Vec<dap::CompletionItem>>> {
1554        let task = self.request(query, |_, result, _| result.log_err(), cx);
1555
1556        cx.background_executor().spawn(async move {
1557            anyhow::Ok(
1558                task.await
1559                    .map(|response| response.targets)
1560                    .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1561            )
1562        })
1563    }
1564
1565    pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1566        self.thread_states.continue_thread(thread_id);
1567        self.request(
1568            ContinueCommand {
1569                args: ContinueArguments {
1570                    thread_id: thread_id.0,
1571                    single_thread: Some(true),
1572                },
1573            },
1574            Self::on_step_response::<ContinueCommand>(thread_id),
1575            cx,
1576        )
1577        .detach();
1578    }
1579
1580    pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1581        match self.mode {
1582            Mode::Local(ref local) => Some(local.client.clone()),
1583            Mode::Remote(_) => None,
1584        }
1585    }
1586
1587    pub fn step_over(
1588        &mut self,
1589        thread_id: ThreadId,
1590        granularity: SteppingGranularity,
1591        cx: &mut Context<Self>,
1592    ) {
1593        let supports_single_thread_execution_requests =
1594            self.capabilities.supports_single_thread_execution_requests;
1595        let supports_stepping_granularity = self
1596            .capabilities
1597            .supports_stepping_granularity
1598            .unwrap_or_default();
1599
1600        let command = NextCommand {
1601            inner: StepCommand {
1602                thread_id: thread_id.0,
1603                granularity: supports_stepping_granularity.then(|| granularity),
1604                single_thread: supports_single_thread_execution_requests,
1605            },
1606        };
1607
1608        self.thread_states.process_step(thread_id);
1609        self.request(
1610            command,
1611            Self::on_step_response::<NextCommand>(thread_id),
1612            cx,
1613        )
1614        .detach();
1615    }
1616
1617    pub fn step_in(
1618        &mut self,
1619        thread_id: ThreadId,
1620        granularity: SteppingGranularity,
1621        cx: &mut Context<Self>,
1622    ) {
1623        let supports_single_thread_execution_requests =
1624            self.capabilities.supports_single_thread_execution_requests;
1625        let supports_stepping_granularity = self
1626            .capabilities
1627            .supports_stepping_granularity
1628            .unwrap_or_default();
1629
1630        let command = StepInCommand {
1631            inner: StepCommand {
1632                thread_id: thread_id.0,
1633                granularity: supports_stepping_granularity.then(|| granularity),
1634                single_thread: supports_single_thread_execution_requests,
1635            },
1636        };
1637
1638        self.thread_states.process_step(thread_id);
1639        self.request(
1640            command,
1641            Self::on_step_response::<StepInCommand>(thread_id),
1642            cx,
1643        )
1644        .detach();
1645    }
1646
1647    pub fn step_out(
1648        &mut self,
1649        thread_id: ThreadId,
1650        granularity: SteppingGranularity,
1651        cx: &mut Context<Self>,
1652    ) {
1653        let supports_single_thread_execution_requests =
1654            self.capabilities.supports_single_thread_execution_requests;
1655        let supports_stepping_granularity = self
1656            .capabilities
1657            .supports_stepping_granularity
1658            .unwrap_or_default();
1659
1660        let command = StepOutCommand {
1661            inner: StepCommand {
1662                thread_id: thread_id.0,
1663                granularity: supports_stepping_granularity.then(|| granularity),
1664                single_thread: supports_single_thread_execution_requests,
1665            },
1666        };
1667
1668        self.thread_states.process_step(thread_id);
1669        self.request(
1670            command,
1671            Self::on_step_response::<StepOutCommand>(thread_id),
1672            cx,
1673        )
1674        .detach();
1675    }
1676
1677    pub fn step_back(
1678        &mut self,
1679        thread_id: ThreadId,
1680        granularity: SteppingGranularity,
1681        cx: &mut Context<Self>,
1682    ) {
1683        let supports_single_thread_execution_requests =
1684            self.capabilities.supports_single_thread_execution_requests;
1685        let supports_stepping_granularity = self
1686            .capabilities
1687            .supports_stepping_granularity
1688            .unwrap_or_default();
1689
1690        let command = StepBackCommand {
1691            inner: StepCommand {
1692                thread_id: thread_id.0,
1693                granularity: supports_stepping_granularity.then(|| granularity),
1694                single_thread: supports_single_thread_execution_requests,
1695            },
1696        };
1697
1698        self.thread_states.process_step(thread_id);
1699
1700        self.request(
1701            command,
1702            Self::on_step_response::<StepBackCommand>(thread_id),
1703            cx,
1704        )
1705        .detach();
1706    }
1707
1708    pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1709        if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1710            && self.requests.contains_key(&ThreadsCommand.type_id())
1711            && self.threads.contains_key(&thread_id)
1712        // ^ todo(debugger): We need a better way to check that we're not querying stale data
1713        // We could still be using an old thread id and have sent a new thread's request
1714        // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1715        // But it very well could cause a minor bug in the future that is hard to track down
1716        {
1717            self.fetch(
1718                super::dap_command::StackTraceCommand {
1719                    thread_id: thread_id.0,
1720                    start_frame: None,
1721                    levels: None,
1722                },
1723                move |this, stack_frames, cx| {
1724                    let stack_frames = stack_frames.log_err()?;
1725
1726                    let entry = this.threads.entry(thread_id).and_modify(|thread| {
1727                        thread.stack_frame_ids =
1728                            stack_frames.iter().map(|frame| frame.id).collect();
1729                    });
1730                    debug_assert!(
1731                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1732                        "Sent request for thread_id that doesn't exist"
1733                    );
1734
1735                    this.stack_frames.extend(
1736                        stack_frames
1737                            .iter()
1738                            .cloned()
1739                            .map(|frame| (frame.id, StackFrame::from(frame))),
1740                    );
1741
1742                    this.invalidate_command_type::<ScopesCommand>();
1743                    this.invalidate_command_type::<VariablesCommand>();
1744
1745                    cx.emit(SessionEvent::StackTrace);
1746                    cx.notify();
1747                    Some(stack_frames)
1748                },
1749                cx,
1750            );
1751        }
1752
1753        self.threads
1754            .get(&thread_id)
1755            .map(|thread| {
1756                thread
1757                    .stack_frame_ids
1758                    .iter()
1759                    .filter_map(|id| self.stack_frames.get(id))
1760                    .cloned()
1761                    .collect()
1762            })
1763            .unwrap_or_default()
1764    }
1765
1766    pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1767        if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1768            && self
1769                .requests
1770                .contains_key(&TypeId::of::<StackTraceCommand>())
1771        {
1772            self.fetch(
1773                ScopesCommand { stack_frame_id },
1774                move |this, scopes, cx| {
1775                    let scopes = scopes.log_err()?;
1776
1777                    for scope in scopes .iter(){
1778                        this.variables(scope.variables_reference, cx);
1779                    }
1780
1781                    let entry = this
1782                        .stack_frames
1783                        .entry(stack_frame_id)
1784                        .and_modify(|stack_frame| {
1785                            stack_frame.scopes = scopes.clone();
1786                        });
1787
1788                    cx.emit(SessionEvent::Variables);
1789
1790                    debug_assert!(
1791                        matches!(entry, indexmap::map::Entry::Occupied(_)),
1792                        "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1793                    );
1794
1795                    Some(scopes)
1796                },
1797                cx,
1798            );
1799        }
1800
1801        self.stack_frames
1802            .get(&stack_frame_id)
1803            .map(|frame| frame.scopes.as_slice())
1804            .unwrap_or_default()
1805    }
1806
1807    pub fn variables(
1808        &mut self,
1809        variables_reference: VariableReference,
1810        cx: &mut Context<Self>,
1811    ) -> Vec<dap::Variable> {
1812        let command = VariablesCommand {
1813            variables_reference,
1814            filter: None,
1815            start: None,
1816            count: None,
1817            format: None,
1818        };
1819
1820        self.fetch(
1821            command,
1822            move |this, variables, cx| {
1823                let variables = variables.log_err()?;
1824                this.variables
1825                    .insert(variables_reference, variables.clone());
1826
1827                cx.emit(SessionEvent::Variables);
1828                Some(variables)
1829            },
1830            cx,
1831        );
1832
1833        self.variables
1834            .get(&variables_reference)
1835            .cloned()
1836            .unwrap_or_default()
1837    }
1838
1839    pub fn set_variable_value(
1840        &mut self,
1841        variables_reference: u64,
1842        name: String,
1843        value: String,
1844        cx: &mut Context<Self>,
1845    ) {
1846        if self.capabilities.supports_set_variable.unwrap_or_default() {
1847            self.request(
1848                SetVariableValueCommand {
1849                    name,
1850                    value,
1851                    variables_reference,
1852                },
1853                move |this, response, cx| {
1854                    let response = response.log_err()?;
1855                    this.invalidate_command_type::<VariablesCommand>();
1856                    cx.notify();
1857                    Some(response)
1858                },
1859                cx,
1860            )
1861            .detach()
1862        }
1863    }
1864
1865    pub fn evaluate(
1866        &mut self,
1867        expression: String,
1868        context: Option<EvaluateArgumentsContext>,
1869        frame_id: Option<u64>,
1870        source: Option<Source>,
1871        cx: &mut Context<Self>,
1872    ) {
1873        self.request(
1874            EvaluateCommand {
1875                expression,
1876                context,
1877                frame_id,
1878                source,
1879            },
1880            |this, response, cx| {
1881                let response = response.log_err()?;
1882                this.output_token.0 += 1;
1883                this.output.push_back(dap::OutputEvent {
1884                    category: None,
1885                    output: response.result.clone(),
1886                    group: None,
1887                    variables_reference: Some(response.variables_reference),
1888                    source: None,
1889                    line: None,
1890                    column: None,
1891                    data: None,
1892                    location_reference: None,
1893                });
1894
1895                this.invalidate_command_type::<ScopesCommand>();
1896                cx.notify();
1897                Some(response)
1898            },
1899            cx,
1900        )
1901        .detach();
1902    }
1903
1904    pub fn location(
1905        &mut self,
1906        reference: u64,
1907        cx: &mut Context<Self>,
1908    ) -> Option<dap::LocationsResponse> {
1909        self.fetch(
1910            LocationsCommand { reference },
1911            move |this, response, _| {
1912                let response = response.log_err()?;
1913                this.locations.insert(reference, response.clone());
1914                Some(response)
1915            },
1916            cx,
1917        );
1918        self.locations.get(&reference).cloned()
1919    }
1920    pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1921        let command = DisconnectCommand {
1922            restart: Some(false),
1923            terminate_debuggee: Some(true),
1924            suspend_debuggee: Some(false),
1925        };
1926
1927        self.request(command, Self::empty_response, cx).detach()
1928    }
1929
1930    pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1931        if self
1932            .capabilities
1933            .supports_terminate_threads_request
1934            .unwrap_or_default()
1935        {
1936            self.request(
1937                TerminateThreadsCommand {
1938                    thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1939                },
1940                Self::clear_active_debug_line_response,
1941                cx,
1942            )
1943            .detach();
1944        } else {
1945            self.shutdown(cx).detach();
1946        }
1947    }
1948}
1949
1950fn create_local_session(
1951    breakpoint_store: Entity<BreakpointStore>,
1952    session_id: SessionId,
1953    parent_session: Option<Entity<Session>>,
1954    start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1955    initialized_tx: oneshot::Sender<()>,
1956    mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1957    mode: LocalMode,
1958    cx: &mut Context<Session>,
1959) -> Session {
1960    let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1961        let mut initialized_tx = Some(initialized_tx);
1962        while let Some(message) = message_rx.next().await {
1963            if let Message::Event(event) = message {
1964                if let Events::Initialized(_) = *event {
1965                    if let Some(tx) = initialized_tx.take() {
1966                        tx.send(()).ok();
1967                    }
1968                } else {
1969                    let Ok(_) = this.update(cx, |session, cx| {
1970                        session.handle_dap_event(event, cx);
1971                    }) else {
1972                        break;
1973                    };
1974                }
1975            } else {
1976                let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1977                else {
1978                    break;
1979                };
1980            }
1981        }
1982    })];
1983
1984    cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1985        BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1986            if let Some(local) = (!this.ignore_breakpoints)
1987                .then(|| this.as_local_mut())
1988                .flatten()
1989            {
1990                local
1991                    .send_breakpoints_from_path(path.clone(), *reason, cx)
1992                    .detach();
1993            };
1994        }
1995        BreakpointStoreEvent::BreakpointsCleared(paths) => {
1996            if let Some(local) = (!this.ignore_breakpoints)
1997                .then(|| this.as_local_mut())
1998                .flatten()
1999            {
2000                local.unset_breakpoints_from_paths(paths, cx).detach();
2001            }
2002        }
2003        BreakpointStoreEvent::ActiveDebugLineChanged => {}
2004    })
2005    .detach();
2006
2007    Session {
2008        mode: Mode::Local(mode),
2009        id: session_id,
2010        child_session_ids: HashSet::default(),
2011        parent_id: parent_session.map(|session| session.read(cx).id),
2012        variables: Default::default(),
2013        capabilities: Capabilities::default(),
2014        thread_states: ThreadStates::default(),
2015        output_token: OutputToken(0),
2016        ignore_breakpoints: false,
2017        output: circular_buffer::CircularBuffer::boxed(),
2018        requests: HashMap::default(),
2019        modules: Vec::default(),
2020        loaded_sources: Vec::default(),
2021        threads: IndexMap::default(),
2022        stack_frames: IndexMap::default(),
2023        locations: Default::default(),
2024        exception_breakpoints: Default::default(),
2025        _background_tasks,
2026        is_session_terminated: false,
2027    }
2028}