1use super::breakpoint_store::{
2 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
3};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
9 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
10 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
11};
12use anyhow::{Context as _, Result, anyhow};
13use collections::{HashMap, HashSet, IndexMap, IndexSet};
14use dap::adapters::DebugAdapterBinary;
15use dap::messages::Response;
16use dap::{
17 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
18 SteppingGranularity, StoppedEvent, VariableReference,
19 client::{DebugAdapterClient, SessionId},
20 messages::{Events, Message},
21};
22use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
23use futures::channel::oneshot;
24use futures::{FutureExt, future::Shared};
25use gpui::{
26 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
27 Task, WeakEntity,
28};
29use rpc::AnyProtoClient;
30use serde_json::{Value, json};
31use smol::stream::StreamExt;
32use std::any::TypeId;
33use std::collections::BTreeMap;
34use std::u64;
35use std::{
36 any::Any,
37 collections::hash_map::Entry,
38 hash::{Hash, Hasher},
39 path::Path,
40 sync::Arc,
41};
42use task::DebugTaskDefinition;
43use text::{PointUtf16, ToPointUtf16};
44use util::{ResultExt, merge_json_value_into};
45
46#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
47#[repr(transparent)]
48pub struct ThreadId(pub u64);
49
50impl ThreadId {
51 pub const MIN: ThreadId = ThreadId(u64::MIN);
52 pub const MAX: ThreadId = ThreadId(u64::MAX);
53}
54
55impl From<u64> for ThreadId {
56 fn from(id: u64) -> Self {
57 Self(id)
58 }
59}
60
61#[derive(Clone, Debug)]
62pub struct StackFrame {
63 pub dap: dap::StackFrame,
64 pub scopes: Vec<dap::Scope>,
65}
66
67impl From<dap::StackFrame> for StackFrame {
68 fn from(stack_frame: dap::StackFrame) -> Self {
69 Self {
70 scopes: vec![],
71 dap: stack_frame,
72 }
73 }
74}
75
76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
77pub enum ThreadStatus {
78 #[default]
79 Running,
80 Stopped,
81 Stepping,
82 Exited,
83 Ended,
84}
85
86impl ThreadStatus {
87 pub fn label(&self) -> &'static str {
88 match self {
89 ThreadStatus::Running => "Running",
90 ThreadStatus::Stopped => "Stopped",
91 ThreadStatus::Stepping => "Stepping",
92 ThreadStatus::Exited => "Exited",
93 ThreadStatus::Ended => "Ended",
94 }
95 }
96}
97
98#[derive(Debug)]
99pub struct Thread {
100 dap: dap::Thread,
101 stack_frame_ids: IndexSet<StackFrameId>,
102 _has_stopped: bool,
103}
104
105impl From<dap::Thread> for Thread {
106 fn from(dap: dap::Thread) -> Self {
107 Self {
108 dap,
109 stack_frame_ids: Default::default(),
110 _has_stopped: false,
111 }
112 }
113}
114
115type UpstreamProjectId = u64;
116
117struct RemoteConnection {
118 _client: AnyProtoClient,
119 _upstream_project_id: UpstreamProjectId,
120 _adapter_name: SharedString,
121}
122
123impl RemoteConnection {
124 fn send_proto_client_request<R: DapCommand>(
125 &self,
126 _request: R,
127 _session_id: SessionId,
128 cx: &mut App,
129 ) -> Task<Result<R::Response>> {
130 // let message = request.to_proto(session_id, self.upstream_project_id);
131 // let upstream_client = self.client.clone();
132 cx.background_executor().spawn(async move {
133 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
134 // let response = upstream_client.request(message).await?;
135 // request.response_from_proto(response)
136 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
137 })
138 }
139
140 fn request<R: DapCommand>(
141 &self,
142 request: R,
143 session_id: SessionId,
144 cx: &mut App,
145 ) -> Task<Result<R::Response>>
146 where
147 <R::DapRequest as dap::requests::Request>::Response: 'static,
148 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
149 {
150 return self.send_proto_client_request::<R>(request, session_id, cx);
151 }
152}
153
154enum Mode {
155 Local(LocalMode),
156 Remote(RemoteConnection),
157}
158
159#[derive(Clone)]
160pub struct LocalMode {
161 client: Arc<DebugAdapterClient>,
162 definition: DebugTaskDefinition,
163 binary: DebugAdapterBinary,
164 pub(crate) breakpoint_store: Entity<BreakpointStore>,
165 tmp_breakpoint: Option<SourceBreakpoint>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 session_id: SessionId,
186 parent_session: Option<Entity<Session>>,
187 breakpoint_store: Entity<BreakpointStore>,
188 config: DebugTaskDefinition,
189 binary: DebugAdapterBinary,
190 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
191 cx: AsyncApp,
192 ) -> Task<Result<Self>> {
193 Self::new_inner(
194 session_id,
195 parent_session,
196 breakpoint_store,
197 config,
198 binary,
199 messages_tx,
200 async |_, _| {},
201 cx,
202 )
203 }
204
205 fn new_inner(
206 session_id: SessionId,
207 parent_session: Option<Entity<Session>>,
208 breakpoint_store: Entity<BreakpointStore>,
209 config: DebugTaskDefinition,
210 binary: DebugAdapterBinary,
211 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
212 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
213 cx: AsyncApp,
214 ) -> Task<Result<Self>> {
215 cx.spawn(async move |cx| {
216 let message_handler = Box::new(move |message| {
217 messages_tx.unbounded_send(message).ok();
218 });
219
220 let client = Arc::new(
221 if let Some(client) = parent_session
222 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
223 .flatten()
224 {
225 client
226 .reconnect(session_id, binary.clone(), message_handler, cx.clone())
227 .await?
228 } else {
229 DebugAdapterClient::start(
230 session_id,
231 binary.clone(),
232 message_handler,
233 cx.clone(),
234 )
235 .await
236 .with_context(|| "Failed to start communication with debug adapter")?
237 },
238 );
239
240 let mut session = Self {
241 client,
242 breakpoint_store,
243 tmp_breakpoint: None,
244 definition: config,
245 binary,
246 };
247
248 on_initialized(&mut session, cx.clone()).await;
249
250 Ok(session)
251 })
252 }
253
254 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
255 let tasks: Vec<_> = paths
256 .into_iter()
257 .map(|path| {
258 self.request(
259 dap_command::SetBreakpoints {
260 source: client_source(path),
261 source_modified: None,
262 breakpoints: vec![],
263 },
264 cx.background_executor().clone(),
265 )
266 })
267 .collect();
268
269 cx.background_spawn(async move {
270 futures::future::join_all(tasks)
271 .await
272 .iter()
273 .for_each(|res| match res {
274 Ok(_) => {}
275 Err(err) => {
276 log::warn!("Set breakpoints request failed: {}", err);
277 }
278 });
279 })
280 }
281
282 fn send_breakpoints_from_path(
283 &self,
284 abs_path: Arc<Path>,
285 reason: BreakpointUpdatedReason,
286 cx: &mut App,
287 ) -> Task<()> {
288 let breakpoints = self
289 .breakpoint_store
290 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
291 .into_iter()
292 .filter(|bp| bp.state.is_enabled())
293 .chain(self.tmp_breakpoint.clone())
294 .map(Into::into)
295 .collect();
296
297 let task = self.request(
298 dap_command::SetBreakpoints {
299 source: client_source(&abs_path),
300 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
301 breakpoints,
302 },
303 cx.background_executor().clone(),
304 );
305
306 cx.background_spawn(async move {
307 match task.await {
308 Ok(_) => {}
309 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
310 }
311 })
312 }
313
314 fn send_exception_breakpoints(
315 &self,
316 filters: Vec<ExceptionBreakpointsFilter>,
317 supports_filter_options: bool,
318 cx: &App,
319 ) -> Task<Result<Vec<dap::Breakpoint>>> {
320 let arg = if supports_filter_options {
321 SetExceptionBreakpoints::WithOptions {
322 filters: filters
323 .into_iter()
324 .map(|filter| ExceptionFilterOptions {
325 filter_id: filter.filter,
326 condition: None,
327 mode: None,
328 })
329 .collect(),
330 }
331 } else {
332 SetExceptionBreakpoints::Plain {
333 filters: filters.into_iter().map(|filter| filter.filter).collect(),
334 }
335 };
336 self.request(arg, cx.background_executor().clone())
337 }
338 fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
339 let mut breakpoint_tasks = Vec::new();
340 let breakpoints = self
341 .breakpoint_store
342 .read_with(cx, |store, cx| store.all_breakpoints(cx));
343
344 for (path, breakpoints) in breakpoints {
345 let breakpoints = if ignore_breakpoints {
346 vec![]
347 } else {
348 breakpoints
349 .into_iter()
350 .filter(|bp| bp.state.is_enabled())
351 .map(Into::into)
352 .collect()
353 };
354
355 breakpoint_tasks.push(self.request(
356 dap_command::SetBreakpoints {
357 source: client_source(&path),
358 source_modified: Some(false),
359 breakpoints,
360 },
361 cx.background_executor().clone(),
362 ));
363 }
364
365 cx.background_spawn(async move {
366 futures::future::join_all(breakpoint_tasks)
367 .await
368 .iter()
369 .for_each(|res| match res {
370 Ok(_) => {}
371 Err(err) => {
372 log::warn!("Set breakpoints request failed: {}", err);
373 }
374 });
375 })
376 }
377
378 pub fn label(&self) -> String {
379 self.definition.label.clone()
380 }
381
382 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
383 let adapter_id = self.binary.adapter_name.to_string();
384
385 self.request(Initialize { adapter_id }, cx.background_executor().clone())
386 }
387
388 fn initialize_sequence(
389 &self,
390 capabilities: &Capabilities,
391 initialized_rx: oneshot::Receiver<()>,
392 cx: &App,
393 ) -> Task<Result<()>> {
394 let mut raw = self.binary.request_args.clone();
395
396 merge_json_value_into(
397 self.definition.initialize_args.clone().unwrap_or(json!({})),
398 &mut raw.configuration,
399 );
400
401 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
402 let launch = match raw.request {
403 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
404 Launch {
405 raw: raw.configuration,
406 },
407 cx.background_executor().clone(),
408 ),
409 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
410 Attach {
411 raw: raw.configuration,
412 },
413 cx.background_executor().clone(),
414 ),
415 };
416
417 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
418 let exception_filters = capabilities
419 .exception_breakpoint_filters
420 .as_ref()
421 .map(|exception_filters| {
422 exception_filters
423 .iter()
424 .filter(|filter| filter.default == Some(true))
425 .cloned()
426 .collect::<Vec<_>>()
427 })
428 .unwrap_or_default();
429 let supports_exception_filters = capabilities
430 .supports_exception_filter_options
431 .unwrap_or_default();
432 let configuration_sequence = cx.spawn({
433 let this = self.clone();
434 async move |cx| {
435 initialized_rx.await?;
436 // todo(debugger) figure out if we want to handle a breakpoint response error
437 // This will probably consist of letting a user know that breakpoints failed to be set
438 cx.update(|cx| this.send_source_breakpoints(false, cx))?
439 .await;
440 cx.update(|cx| {
441 this.send_exception_breakpoints(
442 exception_filters,
443 supports_exception_filters,
444 cx,
445 )
446 })?
447 .await
448 .ok();
449 let ret = if configuration_done_supported {
450 this.request(ConfigurationDone {}, cx.background_executor().clone())
451 } else {
452 Task::ready(Ok(()))
453 }
454 .await;
455 ret
456 }
457 });
458
459 cx.background_spawn(async move {
460 futures::future::try_join(launch, configuration_sequence).await?;
461 Ok(())
462 })
463 }
464
465 fn request<R: LocalDapCommand>(
466 &self,
467 request: R,
468 executor: BackgroundExecutor,
469 ) -> Task<Result<R::Response>>
470 where
471 <R::DapRequest as dap::requests::Request>::Response: 'static,
472 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
473 {
474 let request = Arc::new(request);
475
476 let request_clone = request.clone();
477 let connection = self.client.clone();
478 let request_task = executor.spawn(async move {
479 let args = request_clone.to_dap();
480 connection.request::<R::DapRequest>(args).await
481 });
482
483 executor.spawn(async move {
484 let response = request.response_from_dap(request_task.await?);
485 response
486 })
487 }
488}
489impl From<RemoteConnection> for Mode {
490 fn from(value: RemoteConnection) -> Self {
491 Self::Remote(value)
492 }
493}
494
495impl Mode {
496 fn request_dap<R: DapCommand>(
497 &self,
498 session_id: SessionId,
499 request: R,
500 cx: &mut Context<Session>,
501 ) -> Task<Result<R::Response>>
502 where
503 <R::DapRequest as dap::requests::Request>::Response: 'static,
504 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
505 {
506 match self {
507 Mode::Local(debug_adapter_client) => {
508 debug_adapter_client.request(request, cx.background_executor().clone())
509 }
510 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
511 }
512 }
513}
514
515#[derive(Default)]
516struct ThreadStates {
517 global_state: Option<ThreadStatus>,
518 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
519}
520
521impl ThreadStates {
522 fn stop_all_threads(&mut self) {
523 self.global_state = Some(ThreadStatus::Stopped);
524 self.known_thread_states.clear();
525 }
526
527 fn exit_all_threads(&mut self) {
528 self.global_state = Some(ThreadStatus::Exited);
529 self.known_thread_states.clear();
530 }
531
532 fn continue_all_threads(&mut self) {
533 self.global_state = Some(ThreadStatus::Running);
534 self.known_thread_states.clear();
535 }
536
537 fn stop_thread(&mut self, thread_id: ThreadId) {
538 self.known_thread_states
539 .insert(thread_id, ThreadStatus::Stopped);
540 }
541
542 fn continue_thread(&mut self, thread_id: ThreadId) {
543 self.known_thread_states
544 .insert(thread_id, ThreadStatus::Running);
545 }
546
547 fn process_step(&mut self, thread_id: ThreadId) {
548 self.known_thread_states
549 .insert(thread_id, ThreadStatus::Stepping);
550 }
551
552 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
553 self.thread_state(thread_id)
554 .unwrap_or(ThreadStatus::Running)
555 }
556
557 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
558 self.known_thread_states
559 .get(&thread_id)
560 .copied()
561 .or(self.global_state)
562 }
563
564 fn exit_thread(&mut self, thread_id: ThreadId) {
565 self.known_thread_states
566 .insert(thread_id, ThreadStatus::Exited);
567 }
568
569 fn any_stopped_thread(&self) -> bool {
570 self.global_state
571 .is_some_and(|state| state == ThreadStatus::Stopped)
572 || self
573 .known_thread_states
574 .values()
575 .any(|status| *status == ThreadStatus::Stopped)
576 }
577}
578const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
579
580type IsEnabled = bool;
581
582#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
583pub struct OutputToken(pub usize);
584/// Represents a current state of a single debug adapter and provides ways to mutate it.
585pub struct Session {
586 mode: Mode,
587 pub(super) capabilities: Capabilities,
588 id: SessionId,
589 child_session_ids: HashSet<SessionId>,
590 parent_id: Option<SessionId>,
591 ignore_breakpoints: bool,
592 modules: Vec<dap::Module>,
593 loaded_sources: Vec<dap::Source>,
594 output_token: OutputToken,
595 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
596 threads: IndexMap<ThreadId, Thread>,
597 thread_states: ThreadStates,
598 variables: HashMap<VariableReference, Vec<dap::Variable>>,
599 stack_frames: IndexMap<StackFrameId, StackFrame>,
600 locations: HashMap<u64, dap::LocationsResponse>,
601 is_session_terminated: bool,
602 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
603 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
604 _background_tasks: Vec<Task<()>>,
605}
606
607trait CacheableCommand: Any + Send + Sync {
608 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
609 fn dyn_hash(&self, hasher: &mut dyn Hasher);
610 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
611}
612
613impl<T> CacheableCommand for T
614where
615 T: DapCommand + PartialEq + Eq + Hash,
616{
617 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
618 (rhs as &dyn Any)
619 .downcast_ref::<Self>()
620 .map_or(false, |rhs| self == rhs)
621 }
622
623 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
624 T::hash(self, &mut hasher);
625 }
626
627 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
628 self
629 }
630}
631
632pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
633
634impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
635 fn from(request: T) -> Self {
636 Self(Arc::new(request))
637 }
638}
639
640impl PartialEq for RequestSlot {
641 fn eq(&self, other: &Self) -> bool {
642 self.0.dyn_eq(other.0.as_ref())
643 }
644}
645
646impl Eq for RequestSlot {}
647
648impl Hash for RequestSlot {
649 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
650 self.0.dyn_hash(state);
651 (&*self.0 as &dyn Any).type_id().hash(state)
652 }
653}
654
655#[derive(Debug, Clone, Hash, PartialEq, Eq)]
656pub struct CompletionsQuery {
657 pub query: String,
658 pub column: u64,
659 pub line: Option<u64>,
660 pub frame_id: Option<u64>,
661}
662
663impl CompletionsQuery {
664 pub fn new(
665 buffer: &language::Buffer,
666 cursor_position: language::Anchor,
667 frame_id: Option<u64>,
668 ) -> Self {
669 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
670 Self {
671 query: buffer.text(),
672 column: column as u64,
673 frame_id,
674 line: Some(row as u64),
675 }
676 }
677}
678
679pub enum SessionEvent {
680 Modules,
681 LoadedSources,
682 Stopped(Option<ThreadId>),
683 StackTrace,
684 Variables,
685 Threads,
686}
687
688pub(super) enum SessionStateEvent {
689 Shutdown,
690 Restart,
691}
692
693impl EventEmitter<SessionEvent> for Session {}
694impl EventEmitter<SessionStateEvent> for Session {}
695
696// local session will send breakpoint updates to DAP for all new breakpoints
697// remote side will only send breakpoint updates when it is a breakpoint created by that peer
698// BreakpointStore notifies session on breakpoint changes
699impl Session {
700 pub(crate) fn local(
701 breakpoint_store: Entity<BreakpointStore>,
702 session_id: SessionId,
703 parent_session: Option<Entity<Session>>,
704 binary: DebugAdapterBinary,
705 config: DebugTaskDefinition,
706 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
707 initialized_tx: oneshot::Sender<()>,
708 cx: &mut App,
709 ) -> Task<Result<Entity<Self>>> {
710 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
711
712 cx.spawn(async move |cx| {
713 let mode = LocalMode::new(
714 session_id,
715 parent_session.clone(),
716 breakpoint_store.clone(),
717 config.clone(),
718 binary,
719 message_tx,
720 cx.clone(),
721 )
722 .await?;
723
724 cx.new(|cx| {
725 create_local_session(
726 breakpoint_store,
727 session_id,
728 parent_session,
729 start_debugging_requests_tx,
730 initialized_tx,
731 message_rx,
732 mode,
733 cx,
734 )
735 })
736 })
737 }
738
739 pub(crate) fn remote(
740 session_id: SessionId,
741 client: AnyProtoClient,
742 upstream_project_id: u64,
743 ignore_breakpoints: bool,
744 ) -> Self {
745 Self {
746 mode: Mode::Remote(RemoteConnection {
747 _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
748 _client: client,
749 _upstream_project_id: upstream_project_id,
750 }),
751 id: session_id,
752 child_session_ids: HashSet::default(),
753 parent_id: None,
754 capabilities: Capabilities::default(),
755 ignore_breakpoints,
756 variables: Default::default(),
757 stack_frames: Default::default(),
758 thread_states: ThreadStates::default(),
759 output_token: OutputToken(0),
760 output: circular_buffer::CircularBuffer::boxed(),
761 requests: HashMap::default(),
762 modules: Vec::default(),
763 loaded_sources: Vec::default(),
764 threads: IndexMap::default(),
765 _background_tasks: Vec::default(),
766 locations: Default::default(),
767 is_session_terminated: false,
768 exception_breakpoints: Default::default(),
769 }
770 }
771
772 pub fn session_id(&self) -> SessionId {
773 self.id
774 }
775
776 pub fn child_session_ids(&self) -> HashSet<SessionId> {
777 self.child_session_ids.clone()
778 }
779
780 pub fn add_child_session_id(&mut self, session_id: SessionId) {
781 self.child_session_ids.insert(session_id);
782 }
783
784 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
785 self.child_session_ids.remove(&session_id);
786 }
787
788 pub fn parent_id(&self) -> Option<SessionId> {
789 self.parent_id
790 }
791
792 pub fn capabilities(&self) -> &Capabilities {
793 &self.capabilities
794 }
795
796 pub fn binary(&self) -> &DebugAdapterBinary {
797 let Mode::Local(local_mode) = &self.mode else {
798 panic!("Session is not local");
799 };
800 &local_mode.binary
801 }
802
803 pub fn adapter_name(&self) -> SharedString {
804 match &self.mode {
805 Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
806 Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
807 }
808 }
809
810 pub fn configuration(&self) -> Option<DebugTaskDefinition> {
811 if let Mode::Local(local_mode) = &self.mode {
812 Some(local_mode.definition.clone())
813 } else {
814 None
815 }
816 }
817
818 pub fn is_terminated(&self) -> bool {
819 self.is_session_terminated
820 }
821
822 pub fn is_local(&self) -> bool {
823 matches!(self.mode, Mode::Local(_))
824 }
825
826 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
827 match &mut self.mode {
828 Mode::Local(local_mode) => Some(local_mode),
829 Mode::Remote(_) => None,
830 }
831 }
832
833 pub fn as_local(&self) -> Option<&LocalMode> {
834 match &self.mode {
835 Mode::Local(local_mode) => Some(local_mode),
836 Mode::Remote(_) => None,
837 }
838 }
839
840 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
841 match &self.mode {
842 Mode::Local(local_mode) => {
843 let capabilities = local_mode.clone().request_initialization(cx);
844
845 cx.spawn(async move |this, cx| {
846 let capabilities = capabilities.await?;
847 this.update(cx, |session, _| {
848 session.capabilities = capabilities;
849 let filters = session
850 .capabilities
851 .exception_breakpoint_filters
852 .clone()
853 .unwrap_or_default();
854 for filter in filters {
855 let default = filter.default.unwrap_or_default();
856 session
857 .exception_breakpoints
858 .entry(filter.filter.clone())
859 .or_insert_with(|| (filter, default));
860 }
861 })?;
862 Ok(())
863 })
864 }
865 Mode::Remote(_) => Task::ready(Err(anyhow!(
866 "Cannot send initialize request from remote session"
867 ))),
868 }
869 }
870
871 pub(super) fn initialize_sequence(
872 &mut self,
873 initialize_rx: oneshot::Receiver<()>,
874 cx: &mut Context<Self>,
875 ) -> Task<Result<()>> {
876 match &self.mode {
877 Mode::Local(local_mode) => {
878 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
879 }
880 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
881 }
882 }
883
884 pub fn run_to_position(
885 &mut self,
886 breakpoint: SourceBreakpoint,
887 active_thread_id: ThreadId,
888 cx: &mut Context<Self>,
889 ) {
890 match &mut self.mode {
891 Mode::Local(local_mode) => {
892 if !matches!(
893 self.thread_states.thread_state(active_thread_id),
894 Some(ThreadStatus::Stopped)
895 ) {
896 return;
897 };
898 let path = breakpoint.path.clone();
899 local_mode.tmp_breakpoint = Some(breakpoint);
900 let task = local_mode.send_breakpoints_from_path(
901 path,
902 BreakpointUpdatedReason::Toggled,
903 cx,
904 );
905
906 cx.spawn(async move |this, cx| {
907 task.await;
908 this.update(cx, |this, cx| {
909 this.continue_thread(active_thread_id, cx);
910 })
911 })
912 .detach();
913 }
914 Mode::Remote(_) => {}
915 }
916 }
917
918 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
919 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
920 }
921
922 pub fn output(
923 &self,
924 since: OutputToken,
925 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
926 if self.output_token.0 == 0 {
927 return (self.output.range(0..0), OutputToken(0));
928 };
929
930 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
931
932 let clamped_events_since = events_since.clamp(0, self.output.len());
933 (
934 self.output
935 .range(self.output.len() - clamped_events_since..),
936 self.output_token,
937 )
938 }
939
940 pub fn respond_to_client(
941 &self,
942 request_seq: u64,
943 success: bool,
944 command: String,
945 body: Option<serde_json::Value>,
946 cx: &mut Context<Self>,
947 ) -> Task<Result<()>> {
948 let Some(local_session) = self.as_local().cloned() else {
949 unreachable!("Cannot respond to remote client");
950 };
951
952 cx.background_spawn(async move {
953 local_session
954 .client
955 .send_message(Message::Response(Response {
956 body,
957 success,
958 command,
959 seq: request_seq + 1,
960 request_seq,
961 message: None,
962 }))
963 .await
964 })
965 }
966
967 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
968 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
969 let breakpoint = local.tmp_breakpoint.take()?;
970 let path = breakpoint.path.clone();
971 Some((local, path))
972 }) {
973 local
974 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
975 .detach();
976 };
977
978 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
979 self.thread_states.stop_all_threads();
980
981 self.invalidate_command_type::<StackTraceCommand>();
982 }
983
984 // Event if we stopped all threads we still need to insert the thread_id
985 // to our own data
986 if let Some(thread_id) = event.thread_id {
987 self.thread_states.stop_thread(ThreadId(thread_id));
988
989 self.invalidate_state(
990 &StackTraceCommand {
991 thread_id,
992 start_frame: None,
993 levels: None,
994 }
995 .into(),
996 );
997 }
998
999 self.invalidate_generic();
1000 self.threads.clear();
1001 self.variables.clear();
1002 cx.emit(SessionEvent::Stopped(
1003 event
1004 .thread_id
1005 .map(Into::into)
1006 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1007 ));
1008 cx.notify();
1009 }
1010
1011 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1012 match *event {
1013 Events::Initialized(_) => {
1014 debug_assert!(
1015 false,
1016 "Initialized event should have been handled in LocalMode"
1017 );
1018 }
1019 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1020 Events::Continued(event) => {
1021 if event.all_threads_continued.unwrap_or_default() {
1022 self.thread_states.continue_all_threads();
1023 } else {
1024 self.thread_states
1025 .continue_thread(ThreadId(event.thread_id));
1026 }
1027 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1028 self.invalidate_generic();
1029 }
1030 Events::Exited(_event) => {
1031 self.clear_active_debug_line(cx);
1032 }
1033 Events::Terminated(_) => {
1034 self.is_session_terminated = true;
1035 self.clear_active_debug_line(cx);
1036 }
1037 Events::Thread(event) => {
1038 let thread_id = ThreadId(event.thread_id);
1039
1040 match event.reason {
1041 dap::ThreadEventReason::Started => {
1042 self.thread_states.continue_thread(thread_id);
1043 }
1044 dap::ThreadEventReason::Exited => {
1045 self.thread_states.exit_thread(thread_id);
1046 }
1047 reason => {
1048 log::error!("Unhandled thread event reason {:?}", reason);
1049 }
1050 }
1051 self.invalidate_state(&ThreadsCommand.into());
1052 cx.notify();
1053 }
1054 Events::Output(event) => {
1055 if event
1056 .category
1057 .as_ref()
1058 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1059 {
1060 return;
1061 }
1062
1063 self.output.push_back(event);
1064 self.output_token.0 += 1;
1065 cx.notify();
1066 }
1067 Events::Breakpoint(_) => {}
1068 Events::Module(event) => {
1069 match event.reason {
1070 dap::ModuleEventReason::New => {
1071 self.modules.push(event.module);
1072 }
1073 dap::ModuleEventReason::Changed => {
1074 if let Some(module) = self
1075 .modules
1076 .iter_mut()
1077 .find(|other| event.module.id == other.id)
1078 {
1079 *module = event.module;
1080 }
1081 }
1082 dap::ModuleEventReason::Removed => {
1083 self.modules.retain(|other| event.module.id != other.id);
1084 }
1085 }
1086
1087 // todo(debugger): We should only send the invalidate command to downstream clients.
1088 // self.invalidate_state(&ModulesCommand.into());
1089 }
1090 Events::LoadedSource(_) => {
1091 self.invalidate_state(&LoadedSourcesCommand.into());
1092 }
1093 Events::Capabilities(event) => {
1094 self.capabilities = self.capabilities.merge(event.capabilities);
1095 cx.notify();
1096 }
1097 Events::Memory(_) => {}
1098 Events::Process(_) => {}
1099 Events::ProgressEnd(_) => {}
1100 Events::ProgressStart(_) => {}
1101 Events::ProgressUpdate(_) => {}
1102 Events::Invalidated(_) => {}
1103 Events::Other(_) => {}
1104 }
1105 }
1106
1107 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1108 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1109 &mut self,
1110 request: T,
1111 process_result: impl FnOnce(
1112 &mut Self,
1113 Result<T::Response>,
1114 &mut Context<Self>,
1115 ) -> Option<T::Response>
1116 + 'static,
1117 cx: &mut Context<Self>,
1118 ) {
1119 const {
1120 assert!(
1121 T::CACHEABLE,
1122 "Only requests marked as cacheable should invoke `fetch`"
1123 );
1124 }
1125
1126 if !self.thread_states.any_stopped_thread()
1127 && request.type_id() != TypeId::of::<ThreadsCommand>()
1128 || self.is_session_terminated
1129 {
1130 return;
1131 }
1132
1133 let request_map = self
1134 .requests
1135 .entry(std::any::TypeId::of::<T>())
1136 .or_default();
1137
1138 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1139 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1140
1141 let task = Self::request_inner::<Arc<T>>(
1142 &self.capabilities,
1143 self.id,
1144 &self.mode,
1145 command,
1146 process_result,
1147 cx,
1148 );
1149 let task = cx
1150 .background_executor()
1151 .spawn(async move {
1152 let _ = task.await?;
1153 Some(())
1154 })
1155 .shared();
1156
1157 vacant.insert(task);
1158 cx.notify();
1159 }
1160 }
1161
1162 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1163 capabilities: &Capabilities,
1164 session_id: SessionId,
1165 mode: &Mode,
1166 request: T,
1167 process_result: impl FnOnce(
1168 &mut Self,
1169 Result<T::Response>,
1170 &mut Context<Self>,
1171 ) -> Option<T::Response>
1172 + 'static,
1173 cx: &mut Context<Self>,
1174 ) -> Task<Option<T::Response>> {
1175 if !T::is_supported(&capabilities) {
1176 log::warn!(
1177 "Attempted to send a DAP request that isn't supported: {:?}",
1178 request
1179 );
1180 let error = Err(anyhow::Error::msg(
1181 "Couldn't complete request because it's not supported",
1182 ));
1183 return cx.spawn(async move |this, cx| {
1184 this.update(cx, |this, cx| process_result(this, error, cx))
1185 .log_err()
1186 .flatten()
1187 });
1188 }
1189
1190 let request = mode.request_dap(session_id, request, cx);
1191 cx.spawn(async move |this, cx| {
1192 let result = request.await;
1193 this.update(cx, |this, cx| process_result(this, result, cx))
1194 .log_err()
1195 .flatten()
1196 })
1197 }
1198
1199 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1200 &self,
1201 request: T,
1202 process_result: impl FnOnce(
1203 &mut Self,
1204 Result<T::Response>,
1205 &mut Context<Self>,
1206 ) -> Option<T::Response>
1207 + 'static,
1208 cx: &mut Context<Self>,
1209 ) -> Task<Option<T::Response>> {
1210 Self::request_inner(
1211 &self.capabilities,
1212 self.id,
1213 &self.mode,
1214 request,
1215 process_result,
1216 cx,
1217 )
1218 }
1219
1220 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1221 self.requests.remove(&std::any::TypeId::of::<Command>());
1222 }
1223
1224 fn invalidate_generic(&mut self) {
1225 self.invalidate_command_type::<ModulesCommand>();
1226 self.invalidate_command_type::<LoadedSourcesCommand>();
1227 self.invalidate_command_type::<ThreadsCommand>();
1228 }
1229
1230 fn invalidate_state(&mut self, key: &RequestSlot) {
1231 self.requests
1232 .entry((&*key.0 as &dyn Any).type_id())
1233 .and_modify(|request_map| {
1234 request_map.remove(&key);
1235 });
1236 }
1237
1238 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1239 self.thread_states.thread_status(thread_id)
1240 }
1241
1242 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1243 self.fetch(
1244 dap_command::ThreadsCommand,
1245 |this, result, cx| {
1246 let result = result.log_err()?;
1247
1248 this.threads = result
1249 .iter()
1250 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1251 .collect();
1252
1253 this.invalidate_command_type::<StackTraceCommand>();
1254 cx.emit(SessionEvent::Threads);
1255 cx.notify();
1256
1257 Some(result)
1258 },
1259 cx,
1260 );
1261
1262 self.threads
1263 .values()
1264 .map(|thread| {
1265 (
1266 thread.dap.clone(),
1267 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1268 )
1269 })
1270 .collect()
1271 }
1272
1273 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1274 self.fetch(
1275 dap_command::ModulesCommand,
1276 |this, result, cx| {
1277 let result = result.log_err()?;
1278
1279 this.modules = result.iter().cloned().collect();
1280 cx.emit(SessionEvent::Modules);
1281 cx.notify();
1282
1283 Some(result)
1284 },
1285 cx,
1286 );
1287
1288 &self.modules
1289 }
1290
1291 pub fn ignore_breakpoints(&self) -> bool {
1292 self.ignore_breakpoints
1293 }
1294
1295 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1296 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1297 }
1298
1299 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1300 if self.ignore_breakpoints == ignore {
1301 return Task::ready(());
1302 }
1303
1304 self.ignore_breakpoints = ignore;
1305
1306 if let Some(local) = self.as_local() {
1307 local.send_source_breakpoints(ignore, cx)
1308 } else {
1309 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1310 unimplemented!()
1311 }
1312 }
1313
1314 pub fn exception_breakpoints(
1315 &self,
1316 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1317 self.exception_breakpoints.values()
1318 }
1319
1320 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1321 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1322 *is_enabled = !*is_enabled;
1323 self.send_exception_breakpoints(cx);
1324 }
1325 }
1326
1327 fn send_exception_breakpoints(&mut self, cx: &App) {
1328 if let Some(local) = self.as_local() {
1329 let exception_filters = self
1330 .exception_breakpoints
1331 .values()
1332 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1333 .collect();
1334
1335 let supports_exception_filters = self
1336 .capabilities
1337 .supports_exception_filter_options
1338 .unwrap_or_default();
1339 local
1340 .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1341 .detach_and_log_err(cx);
1342 } else {
1343 debug_assert!(false, "Not implemented");
1344 }
1345 }
1346
1347 pub fn breakpoints_enabled(&self) -> bool {
1348 self.ignore_breakpoints
1349 }
1350
1351 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1352 self.fetch(
1353 dap_command::LoadedSourcesCommand,
1354 |this, result, cx| {
1355 let result = result.log_err()?;
1356 this.loaded_sources = result.iter().cloned().collect();
1357 cx.emit(SessionEvent::LoadedSources);
1358 cx.notify();
1359 Some(result)
1360 },
1361 cx,
1362 );
1363
1364 &self.loaded_sources
1365 }
1366
1367 fn fallback_to_manual_restart(
1368 &mut self,
1369 res: Result<()>,
1370 cx: &mut Context<Self>,
1371 ) -> Option<()> {
1372 if res.log_err().is_none() {
1373 cx.emit(SessionStateEvent::Restart);
1374 return None;
1375 }
1376 Some(())
1377 }
1378
1379 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1380 res.log_err()?;
1381 Some(())
1382 }
1383
1384 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1385 thread_id: ThreadId,
1386 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1387 {
1388 move |this, response, cx| match response.log_err() {
1389 Some(response) => Some(response),
1390 None => {
1391 this.thread_states.stop_thread(thread_id);
1392 cx.notify();
1393 None
1394 }
1395 }
1396 }
1397
1398 fn clear_active_debug_line_response(
1399 &mut self,
1400 response: Result<()>,
1401 cx: &mut Context<Session>,
1402 ) -> Option<()> {
1403 response.log_err()?;
1404 self.clear_active_debug_line(cx);
1405 Some(())
1406 }
1407
1408 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1409 self.as_local()
1410 .expect("Message handler will only run in local mode")
1411 .breakpoint_store
1412 .update(cx, |store, cx| {
1413 store.remove_active_position(Some(self.id), cx)
1414 });
1415 }
1416
1417 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1418 self.request(
1419 PauseCommand {
1420 thread_id: thread_id.0,
1421 },
1422 Self::empty_response,
1423 cx,
1424 )
1425 .detach();
1426 }
1427
1428 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1429 self.request(
1430 RestartStackFrameCommand { stack_frame_id },
1431 Self::empty_response,
1432 cx,
1433 )
1434 .detach();
1435 }
1436
1437 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1438 if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1439 self.request(
1440 RestartCommand {
1441 raw: args.unwrap_or(Value::Null),
1442 },
1443 Self::fallback_to_manual_restart,
1444 cx,
1445 )
1446 .detach();
1447 } else {
1448 cx.emit(SessionStateEvent::Restart);
1449 }
1450 }
1451
1452 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1453 self.is_session_terminated = true;
1454 self.thread_states.exit_all_threads();
1455 cx.notify();
1456
1457 let task = if self
1458 .capabilities
1459 .supports_terminate_request
1460 .unwrap_or_default()
1461 {
1462 self.request(
1463 TerminateCommand {
1464 restart: Some(false),
1465 },
1466 Self::clear_active_debug_line_response,
1467 cx,
1468 )
1469 } else {
1470 self.request(
1471 DisconnectCommand {
1472 restart: Some(false),
1473 terminate_debuggee: Some(true),
1474 suspend_debuggee: Some(false),
1475 },
1476 Self::clear_active_debug_line_response,
1477 cx,
1478 )
1479 };
1480
1481 cx.emit(SessionStateEvent::Shutdown);
1482
1483 let debug_client = self.adapter_client();
1484
1485 cx.background_spawn(async move {
1486 let _ = task.await;
1487
1488 if let Some(client) = debug_client {
1489 client.shutdown().await.log_err();
1490 }
1491 })
1492 }
1493
1494 pub fn completions(
1495 &mut self,
1496 query: CompletionsQuery,
1497 cx: &mut Context<Self>,
1498 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1499 let task = self.request(query, |_, result, _| result.log_err(), cx);
1500
1501 cx.background_executor().spawn(async move {
1502 anyhow::Ok(
1503 task.await
1504 .map(|response| response.targets)
1505 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1506 )
1507 })
1508 }
1509
1510 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1511 self.thread_states.continue_thread(thread_id);
1512 self.request(
1513 ContinueCommand {
1514 args: ContinueArguments {
1515 thread_id: thread_id.0,
1516 single_thread: Some(true),
1517 },
1518 },
1519 Self::on_step_response::<ContinueCommand>(thread_id),
1520 cx,
1521 )
1522 .detach();
1523 }
1524
1525 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1526 match self.mode {
1527 Mode::Local(ref local) => Some(local.client.clone()),
1528 Mode::Remote(_) => None,
1529 }
1530 }
1531
1532 pub fn step_over(
1533 &mut self,
1534 thread_id: ThreadId,
1535 granularity: SteppingGranularity,
1536 cx: &mut Context<Self>,
1537 ) {
1538 let supports_single_thread_execution_requests =
1539 self.capabilities.supports_single_thread_execution_requests;
1540 let supports_stepping_granularity = self
1541 .capabilities
1542 .supports_stepping_granularity
1543 .unwrap_or_default();
1544
1545 let command = NextCommand {
1546 inner: StepCommand {
1547 thread_id: thread_id.0,
1548 granularity: supports_stepping_granularity.then(|| granularity),
1549 single_thread: supports_single_thread_execution_requests,
1550 },
1551 };
1552
1553 self.thread_states.process_step(thread_id);
1554 self.request(
1555 command,
1556 Self::on_step_response::<NextCommand>(thread_id),
1557 cx,
1558 )
1559 .detach();
1560 }
1561
1562 pub fn step_in(
1563 &mut self,
1564 thread_id: ThreadId,
1565 granularity: SteppingGranularity,
1566 cx: &mut Context<Self>,
1567 ) {
1568 let supports_single_thread_execution_requests =
1569 self.capabilities.supports_single_thread_execution_requests;
1570 let supports_stepping_granularity = self
1571 .capabilities
1572 .supports_stepping_granularity
1573 .unwrap_or_default();
1574
1575 let command = StepInCommand {
1576 inner: StepCommand {
1577 thread_id: thread_id.0,
1578 granularity: supports_stepping_granularity.then(|| granularity),
1579 single_thread: supports_single_thread_execution_requests,
1580 },
1581 };
1582
1583 self.thread_states.process_step(thread_id);
1584 self.request(
1585 command,
1586 Self::on_step_response::<StepInCommand>(thread_id),
1587 cx,
1588 )
1589 .detach();
1590 }
1591
1592 pub fn step_out(
1593 &mut self,
1594 thread_id: ThreadId,
1595 granularity: SteppingGranularity,
1596 cx: &mut Context<Self>,
1597 ) {
1598 let supports_single_thread_execution_requests =
1599 self.capabilities.supports_single_thread_execution_requests;
1600 let supports_stepping_granularity = self
1601 .capabilities
1602 .supports_stepping_granularity
1603 .unwrap_or_default();
1604
1605 let command = StepOutCommand {
1606 inner: StepCommand {
1607 thread_id: thread_id.0,
1608 granularity: supports_stepping_granularity.then(|| granularity),
1609 single_thread: supports_single_thread_execution_requests,
1610 },
1611 };
1612
1613 self.thread_states.process_step(thread_id);
1614 self.request(
1615 command,
1616 Self::on_step_response::<StepOutCommand>(thread_id),
1617 cx,
1618 )
1619 .detach();
1620 }
1621
1622 pub fn step_back(
1623 &mut self,
1624 thread_id: ThreadId,
1625 granularity: SteppingGranularity,
1626 cx: &mut Context<Self>,
1627 ) {
1628 let supports_single_thread_execution_requests =
1629 self.capabilities.supports_single_thread_execution_requests;
1630 let supports_stepping_granularity = self
1631 .capabilities
1632 .supports_stepping_granularity
1633 .unwrap_or_default();
1634
1635 let command = StepBackCommand {
1636 inner: StepCommand {
1637 thread_id: thread_id.0,
1638 granularity: supports_stepping_granularity.then(|| granularity),
1639 single_thread: supports_single_thread_execution_requests,
1640 },
1641 };
1642
1643 self.thread_states.process_step(thread_id);
1644
1645 self.request(
1646 command,
1647 Self::on_step_response::<StepBackCommand>(thread_id),
1648 cx,
1649 )
1650 .detach();
1651 }
1652
1653 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1654 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1655 && self.requests.contains_key(&ThreadsCommand.type_id())
1656 && self.threads.contains_key(&thread_id)
1657 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1658 // We could still be using an old thread id and have sent a new thread's request
1659 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1660 // But it very well could cause a minor bug in the future that is hard to track down
1661 {
1662 self.fetch(
1663 super::dap_command::StackTraceCommand {
1664 thread_id: thread_id.0,
1665 start_frame: None,
1666 levels: None,
1667 },
1668 move |this, stack_frames, cx| {
1669 let stack_frames = stack_frames.log_err()?;
1670
1671 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1672 thread.stack_frame_ids =
1673 stack_frames.iter().map(|frame| frame.id).collect();
1674 });
1675 debug_assert!(
1676 matches!(entry, indexmap::map::Entry::Occupied(_)),
1677 "Sent request for thread_id that doesn't exist"
1678 );
1679
1680 this.stack_frames.extend(
1681 stack_frames
1682 .iter()
1683 .cloned()
1684 .map(|frame| (frame.id, StackFrame::from(frame))),
1685 );
1686
1687 this.invalidate_command_type::<ScopesCommand>();
1688 this.invalidate_command_type::<VariablesCommand>();
1689
1690 cx.emit(SessionEvent::StackTrace);
1691 cx.notify();
1692 Some(stack_frames)
1693 },
1694 cx,
1695 );
1696 }
1697
1698 self.threads
1699 .get(&thread_id)
1700 .map(|thread| {
1701 thread
1702 .stack_frame_ids
1703 .iter()
1704 .filter_map(|id| self.stack_frames.get(id))
1705 .cloned()
1706 .collect()
1707 })
1708 .unwrap_or_default()
1709 }
1710
1711 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1712 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1713 && self
1714 .requests
1715 .contains_key(&TypeId::of::<StackTraceCommand>())
1716 {
1717 self.fetch(
1718 ScopesCommand { stack_frame_id },
1719 move |this, scopes, cx| {
1720 let scopes = scopes.log_err()?;
1721
1722 for scope in scopes .iter(){
1723 this.variables(scope.variables_reference, cx);
1724 }
1725
1726 let entry = this
1727 .stack_frames
1728 .entry(stack_frame_id)
1729 .and_modify(|stack_frame| {
1730 stack_frame.scopes = scopes.clone();
1731 });
1732
1733 cx.emit(SessionEvent::Variables);
1734
1735 debug_assert!(
1736 matches!(entry, indexmap::map::Entry::Occupied(_)),
1737 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1738 );
1739
1740 Some(scopes)
1741 },
1742 cx,
1743 );
1744 }
1745
1746 self.stack_frames
1747 .get(&stack_frame_id)
1748 .map(|frame| frame.scopes.as_slice())
1749 .unwrap_or_default()
1750 }
1751
1752 pub fn variables(
1753 &mut self,
1754 variables_reference: VariableReference,
1755 cx: &mut Context<Self>,
1756 ) -> Vec<dap::Variable> {
1757 let command = VariablesCommand {
1758 variables_reference,
1759 filter: None,
1760 start: None,
1761 count: None,
1762 format: None,
1763 };
1764
1765 self.fetch(
1766 command,
1767 move |this, variables, cx| {
1768 let variables = variables.log_err()?;
1769 this.variables
1770 .insert(variables_reference, variables.clone());
1771
1772 cx.emit(SessionEvent::Variables);
1773 Some(variables)
1774 },
1775 cx,
1776 );
1777
1778 self.variables
1779 .get(&variables_reference)
1780 .cloned()
1781 .unwrap_or_default()
1782 }
1783
1784 pub fn set_variable_value(
1785 &mut self,
1786 variables_reference: u64,
1787 name: String,
1788 value: String,
1789 cx: &mut Context<Self>,
1790 ) {
1791 if self.capabilities.supports_set_variable.unwrap_or_default() {
1792 self.request(
1793 SetVariableValueCommand {
1794 name,
1795 value,
1796 variables_reference,
1797 },
1798 move |this, response, cx| {
1799 let response = response.log_err()?;
1800 this.invalidate_command_type::<VariablesCommand>();
1801 cx.notify();
1802 Some(response)
1803 },
1804 cx,
1805 )
1806 .detach()
1807 }
1808 }
1809
1810 pub fn evaluate(
1811 &mut self,
1812 expression: String,
1813 context: Option<EvaluateArgumentsContext>,
1814 frame_id: Option<u64>,
1815 source: Option<Source>,
1816 cx: &mut Context<Self>,
1817 ) {
1818 self.request(
1819 EvaluateCommand {
1820 expression,
1821 context,
1822 frame_id,
1823 source,
1824 },
1825 |this, response, cx| {
1826 let response = response.log_err()?;
1827 this.output_token.0 += 1;
1828 this.output.push_back(dap::OutputEvent {
1829 category: None,
1830 output: response.result.clone(),
1831 group: None,
1832 variables_reference: Some(response.variables_reference),
1833 source: None,
1834 line: None,
1835 column: None,
1836 data: None,
1837 location_reference: None,
1838 });
1839
1840 this.invalidate_command_type::<ScopesCommand>();
1841 cx.notify();
1842 Some(response)
1843 },
1844 cx,
1845 )
1846 .detach();
1847 }
1848
1849 pub fn location(
1850 &mut self,
1851 reference: u64,
1852 cx: &mut Context<Self>,
1853 ) -> Option<dap::LocationsResponse> {
1854 self.fetch(
1855 LocationsCommand { reference },
1856 move |this, response, _| {
1857 let response = response.log_err()?;
1858 this.locations.insert(reference, response.clone());
1859 Some(response)
1860 },
1861 cx,
1862 );
1863 self.locations.get(&reference).cloned()
1864 }
1865 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1866 let command = DisconnectCommand {
1867 restart: Some(false),
1868 terminate_debuggee: Some(true),
1869 suspend_debuggee: Some(false),
1870 };
1871
1872 self.request(command, Self::empty_response, cx).detach()
1873 }
1874
1875 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1876 if self
1877 .capabilities
1878 .supports_terminate_threads_request
1879 .unwrap_or_default()
1880 {
1881 self.request(
1882 TerminateThreadsCommand {
1883 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1884 },
1885 Self::clear_active_debug_line_response,
1886 cx,
1887 )
1888 .detach();
1889 } else {
1890 self.shutdown(cx).detach();
1891 }
1892 }
1893}
1894
1895fn create_local_session(
1896 breakpoint_store: Entity<BreakpointStore>,
1897 session_id: SessionId,
1898 parent_session: Option<Entity<Session>>,
1899 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1900 initialized_tx: oneshot::Sender<()>,
1901 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1902 mode: LocalMode,
1903 cx: &mut Context<Session>,
1904) -> Session {
1905 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1906 let mut initialized_tx = Some(initialized_tx);
1907 while let Some(message) = message_rx.next().await {
1908 if let Message::Event(event) = message {
1909 if let Events::Initialized(_) = *event {
1910 if let Some(tx) = initialized_tx.take() {
1911 tx.send(()).ok();
1912 }
1913 } else {
1914 let Ok(_) = this.update(cx, |session, cx| {
1915 session.handle_dap_event(event, cx);
1916 }) else {
1917 break;
1918 };
1919 }
1920 } else {
1921 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1922 else {
1923 break;
1924 };
1925 }
1926 }
1927 })];
1928
1929 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1930 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1931 if let Some(local) = (!this.ignore_breakpoints)
1932 .then(|| this.as_local_mut())
1933 .flatten()
1934 {
1935 local
1936 .send_breakpoints_from_path(path.clone(), *reason, cx)
1937 .detach();
1938 };
1939 }
1940 BreakpointStoreEvent::BreakpointsCleared(paths) => {
1941 if let Some(local) = (!this.ignore_breakpoints)
1942 .then(|| this.as_local_mut())
1943 .flatten()
1944 {
1945 local.unset_breakpoints_from_paths(paths, cx).detach();
1946 }
1947 }
1948 BreakpointStoreEvent::ActiveDebugLineChanged => {}
1949 })
1950 .detach();
1951
1952 Session {
1953 mode: Mode::Local(mode),
1954 id: session_id,
1955 child_session_ids: HashSet::default(),
1956 parent_id: parent_session.map(|session| session.read(cx).id),
1957 variables: Default::default(),
1958 capabilities: Capabilities::default(),
1959 thread_states: ThreadStates::default(),
1960 output_token: OutputToken(0),
1961 ignore_breakpoints: false,
1962 output: circular_buffer::CircularBuffer::boxed(),
1963 requests: HashMap::default(),
1964 modules: Vec::default(),
1965 loaded_sources: Vec::default(),
1966 threads: IndexMap::default(),
1967 stack_frames: IndexMap::default(),
1968 locations: Default::default(),
1969 exception_breakpoints: Default::default(),
1970 _background_tasks,
1971 is_session_terminated: false,
1972 }
1973}