1use super::breakpoint_store::{
2 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
3};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
9 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
10 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
11};
12use anyhow::{Context as _, Result, anyhow};
13use collections::{HashMap, HashSet, IndexMap, IndexSet};
14use dap::adapters::DebugAdapterBinary;
15use dap::messages::Response;
16use dap::{
17 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
18 SteppingGranularity, StoppedEvent, VariableReference,
19 client::{DebugAdapterClient, SessionId},
20 messages::{Events, Message},
21};
22use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
23use futures::channel::oneshot;
24use futures::{FutureExt, future::Shared};
25use gpui::{
26 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
27 Task, WeakEntity,
28};
29use rpc::AnyProtoClient;
30use serde_json::{Value, json};
31use smol::stream::StreamExt;
32use std::any::TypeId;
33use std::collections::BTreeMap;
34use std::u64;
35use std::{
36 any::Any,
37 collections::hash_map::Entry,
38 hash::{Hash, Hasher},
39 path::Path,
40 sync::Arc,
41};
42use task::DebugTaskDefinition;
43use text::{PointUtf16, ToPointUtf16};
44use util::{ResultExt, merge_json_value_into};
45
46#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
47#[repr(transparent)]
48pub struct ThreadId(pub u64);
49
50impl ThreadId {
51 pub const MIN: ThreadId = ThreadId(u64::MIN);
52 pub const MAX: ThreadId = ThreadId(u64::MAX);
53}
54
55impl From<u64> for ThreadId {
56 fn from(id: u64) -> Self {
57 Self(id)
58 }
59}
60
61#[derive(Clone, Debug)]
62pub struct StackFrame {
63 pub dap: dap::StackFrame,
64 pub scopes: Vec<dap::Scope>,
65}
66
67impl From<dap::StackFrame> for StackFrame {
68 fn from(stack_frame: dap::StackFrame) -> Self {
69 Self {
70 scopes: vec![],
71 dap: stack_frame,
72 }
73 }
74}
75
76#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
77pub enum ThreadStatus {
78 #[default]
79 Running,
80 Stopped,
81 Stepping,
82 Exited,
83 Ended,
84}
85
86impl ThreadStatus {
87 pub fn label(&self) -> &'static str {
88 match self {
89 ThreadStatus::Running => "Running",
90 ThreadStatus::Stopped => "Stopped",
91 ThreadStatus::Stepping => "Stepping",
92 ThreadStatus::Exited => "Exited",
93 ThreadStatus::Ended => "Ended",
94 }
95 }
96}
97
98#[derive(Debug)]
99pub struct Thread {
100 dap: dap::Thread,
101 stack_frame_ids: IndexSet<StackFrameId>,
102 _has_stopped: bool,
103}
104
105impl From<dap::Thread> for Thread {
106 fn from(dap: dap::Thread) -> Self {
107 Self {
108 dap,
109 stack_frame_ids: Default::default(),
110 _has_stopped: false,
111 }
112 }
113}
114
115type UpstreamProjectId = u64;
116
117struct RemoteConnection {
118 _client: AnyProtoClient,
119 _upstream_project_id: UpstreamProjectId,
120 _adapter_name: SharedString,
121}
122
123impl RemoteConnection {
124 fn send_proto_client_request<R: DapCommand>(
125 &self,
126 _request: R,
127 _session_id: SessionId,
128 cx: &mut App,
129 ) -> Task<Result<R::Response>> {
130 // let message = request.to_proto(session_id, self.upstream_project_id);
131 // let upstream_client = self.client.clone();
132 cx.background_executor().spawn(async move {
133 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
134 // let response = upstream_client.request(message).await?;
135 // request.response_from_proto(response)
136 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
137 })
138 }
139
140 fn request<R: DapCommand>(
141 &self,
142 request: R,
143 session_id: SessionId,
144 cx: &mut App,
145 ) -> Task<Result<R::Response>>
146 where
147 <R::DapRequest as dap::requests::Request>::Response: 'static,
148 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
149 {
150 return self.send_proto_client_request::<R>(request, session_id, cx);
151 }
152}
153
154enum Mode {
155 Local(LocalMode),
156 Remote(RemoteConnection),
157}
158
159#[derive(Clone)]
160pub struct LocalMode {
161 client: Arc<DebugAdapterClient>,
162 definition: DebugTaskDefinition,
163 binary: DebugAdapterBinary,
164 pub(crate) breakpoint_store: Entity<BreakpointStore>,
165 tmp_breakpoint: Option<SourceBreakpoint>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 session_id: SessionId,
186 parent_session: Option<Entity<Session>>,
187 breakpoint_store: Entity<BreakpointStore>,
188 config: DebugTaskDefinition,
189 binary: DebugAdapterBinary,
190 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
191 cx: AsyncApp,
192 ) -> Task<Result<Self>> {
193 Self::new_inner(
194 session_id,
195 parent_session,
196 breakpoint_store,
197 config,
198 binary,
199 messages_tx,
200 async |_, _| {},
201 cx,
202 )
203 }
204
205 fn new_inner(
206 session_id: SessionId,
207 parent_session: Option<Entity<Session>>,
208 breakpoint_store: Entity<BreakpointStore>,
209 config: DebugTaskDefinition,
210 binary: DebugAdapterBinary,
211 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
212 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
213 cx: AsyncApp,
214 ) -> Task<Result<Self>> {
215 cx.spawn(async move |cx| {
216 let message_handler = Box::new(move |message| {
217 messages_tx.unbounded_send(message).ok();
218 });
219
220 let client = Arc::new(
221 if let Some(client) = parent_session
222 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
223 .flatten()
224 {
225 client
226 .reconnect(session_id, binary.clone(), message_handler, cx.clone())
227 .await?
228 } else {
229 DebugAdapterClient::start(
230 session_id,
231 binary.clone(),
232 message_handler,
233 cx.clone(),
234 )
235 .await
236 .with_context(|| "Failed to start communication with debug adapter")?
237 },
238 );
239
240 let mut session = Self {
241 client,
242 breakpoint_store,
243 tmp_breakpoint: None,
244 definition: config,
245 binary,
246 };
247
248 on_initialized(&mut session, cx.clone()).await;
249
250 Ok(session)
251 })
252 }
253
254 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
255 let tasks: Vec<_> = paths
256 .into_iter()
257 .map(|path| {
258 self.request(
259 dap_command::SetBreakpoints {
260 source: client_source(path),
261 source_modified: None,
262 breakpoints: vec![],
263 },
264 cx.background_executor().clone(),
265 )
266 })
267 .collect();
268
269 cx.background_spawn(async move {
270 futures::future::join_all(tasks)
271 .await
272 .iter()
273 .for_each(|res| match res {
274 Ok(_) => {}
275 Err(err) => {
276 log::warn!("Set breakpoints request failed: {}", err);
277 }
278 });
279 })
280 }
281
282 fn send_breakpoints_from_path(
283 &self,
284 abs_path: Arc<Path>,
285 reason: BreakpointUpdatedReason,
286 cx: &mut App,
287 ) -> Task<()> {
288 let breakpoints = self
289 .breakpoint_store
290 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
291 .into_iter()
292 .filter(|bp| bp.state.is_enabled())
293 .chain(self.tmp_breakpoint.clone())
294 .map(Into::into)
295 .collect();
296
297 let task = self.request(
298 dap_command::SetBreakpoints {
299 source: client_source(&abs_path),
300 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
301 breakpoints,
302 },
303 cx.background_executor().clone(),
304 );
305
306 cx.background_spawn(async move {
307 match task.await {
308 Ok(_) => {}
309 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
310 }
311 })
312 }
313
314 fn send_exception_breakpoints(
315 &self,
316 filters: Vec<ExceptionBreakpointsFilter>,
317 supports_filter_options: bool,
318 cx: &App,
319 ) -> Task<Result<Vec<dap::Breakpoint>>> {
320 let arg = if supports_filter_options {
321 SetExceptionBreakpoints::WithOptions {
322 filters: filters
323 .into_iter()
324 .map(|filter| ExceptionFilterOptions {
325 filter_id: filter.filter,
326 condition: None,
327 mode: None,
328 })
329 .collect(),
330 }
331 } else {
332 SetExceptionBreakpoints::Plain {
333 filters: filters.into_iter().map(|filter| filter.filter).collect(),
334 }
335 };
336 self.request(arg, cx.background_executor().clone())
337 }
338 fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
339 let mut breakpoint_tasks = Vec::new();
340 let breakpoints = self
341 .breakpoint_store
342 .read_with(cx, |store, cx| store.all_breakpoints(cx));
343
344 for (path, breakpoints) in breakpoints {
345 let breakpoints = if ignore_breakpoints {
346 vec![]
347 } else {
348 breakpoints
349 .into_iter()
350 .filter(|bp| bp.state.is_enabled())
351 .map(Into::into)
352 .collect()
353 };
354
355 breakpoint_tasks.push(self.request(
356 dap_command::SetBreakpoints {
357 source: client_source(&path),
358 source_modified: Some(false),
359 breakpoints,
360 },
361 cx.background_executor().clone(),
362 ));
363 }
364
365 cx.background_spawn(async move {
366 futures::future::join_all(breakpoint_tasks)
367 .await
368 .iter()
369 .for_each(|res| match res {
370 Ok(_) => {}
371 Err(err) => {
372 log::warn!("Set breakpoints request failed: {}", err);
373 }
374 });
375 })
376 }
377
378 pub fn label(&self) -> String {
379 self.definition.label.clone()
380 }
381
382 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
383 let adapter_id = self.binary.adapter_name.to_string();
384
385 self.request(Initialize { adapter_id }, cx.background_executor().clone())
386 }
387
388 fn initialize_sequence(
389 &self,
390 capabilities: &Capabilities,
391 initialized_rx: oneshot::Receiver<()>,
392 cx: &App,
393 ) -> Task<Result<()>> {
394 let mut raw = self.binary.request_args.clone();
395
396 merge_json_value_into(
397 self.definition.initialize_args.clone().unwrap_or(json!({})),
398 &mut raw.configuration,
399 );
400 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
401 let launch = match raw.request {
402 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
403 Launch {
404 raw: raw.configuration,
405 },
406 cx.background_executor().clone(),
407 ),
408 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
409 Attach {
410 raw: raw.configuration,
411 },
412 cx.background_executor().clone(),
413 ),
414 };
415
416 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
417 let exception_filters = capabilities
418 .exception_breakpoint_filters
419 .as_ref()
420 .map(|exception_filters| {
421 exception_filters
422 .iter()
423 .filter(|filter| filter.default == Some(true))
424 .cloned()
425 .collect::<Vec<_>>()
426 })
427 .unwrap_or_default();
428 let supports_exception_filters = capabilities
429 .supports_exception_filter_options
430 .unwrap_or_default();
431 let configuration_sequence = cx.spawn({
432 let this = self.clone();
433 async move |cx| {
434 initialized_rx.await?;
435 // todo(debugger) figure out if we want to handle a breakpoint response error
436 // This will probably consist of letting a user know that breakpoints failed to be set
437 cx.update(|cx| this.send_source_breakpoints(false, cx))?
438 .await;
439 cx.update(|cx| {
440 this.send_exception_breakpoints(
441 exception_filters,
442 supports_exception_filters,
443 cx,
444 )
445 })?
446 .await
447 .ok();
448 let ret = if configuration_done_supported {
449 this.request(ConfigurationDone {}, cx.background_executor().clone())
450 } else {
451 Task::ready(Ok(()))
452 }
453 .await;
454 ret
455 }
456 });
457
458 cx.background_spawn(async move {
459 futures::future::try_join(launch, configuration_sequence).await?;
460 Ok(())
461 })
462 }
463
464 fn request<R: LocalDapCommand>(
465 &self,
466 request: R,
467 executor: BackgroundExecutor,
468 ) -> Task<Result<R::Response>>
469 where
470 <R::DapRequest as dap::requests::Request>::Response: 'static,
471 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
472 {
473 let request = Arc::new(request);
474
475 let request_clone = request.clone();
476 let connection = self.client.clone();
477 let request_task = executor.spawn(async move {
478 let args = request_clone.to_dap();
479 connection.request::<R::DapRequest>(args).await
480 });
481
482 executor.spawn(async move {
483 let response = request.response_from_dap(request_task.await?);
484 response
485 })
486 }
487}
488impl From<RemoteConnection> for Mode {
489 fn from(value: RemoteConnection) -> Self {
490 Self::Remote(value)
491 }
492}
493
494impl Mode {
495 fn request_dap<R: DapCommand>(
496 &self,
497 session_id: SessionId,
498 request: R,
499 cx: &mut Context<Session>,
500 ) -> Task<Result<R::Response>>
501 where
502 <R::DapRequest as dap::requests::Request>::Response: 'static,
503 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
504 {
505 match self {
506 Mode::Local(debug_adapter_client) => {
507 debug_adapter_client.request(request, cx.background_executor().clone())
508 }
509 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
510 }
511 }
512}
513
514#[derive(Default)]
515struct ThreadStates {
516 global_state: Option<ThreadStatus>,
517 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
518}
519
520impl ThreadStates {
521 fn stop_all_threads(&mut self) {
522 self.global_state = Some(ThreadStatus::Stopped);
523 self.known_thread_states.clear();
524 }
525
526 fn exit_all_threads(&mut self) {
527 self.global_state = Some(ThreadStatus::Exited);
528 self.known_thread_states.clear();
529 }
530
531 fn continue_all_threads(&mut self) {
532 self.global_state = Some(ThreadStatus::Running);
533 self.known_thread_states.clear();
534 }
535
536 fn stop_thread(&mut self, thread_id: ThreadId) {
537 self.known_thread_states
538 .insert(thread_id, ThreadStatus::Stopped);
539 }
540
541 fn continue_thread(&mut self, thread_id: ThreadId) {
542 self.known_thread_states
543 .insert(thread_id, ThreadStatus::Running);
544 }
545
546 fn process_step(&mut self, thread_id: ThreadId) {
547 self.known_thread_states
548 .insert(thread_id, ThreadStatus::Stepping);
549 }
550
551 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
552 self.thread_state(thread_id)
553 .unwrap_or(ThreadStatus::Running)
554 }
555
556 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
557 self.known_thread_states
558 .get(&thread_id)
559 .copied()
560 .or(self.global_state)
561 }
562
563 fn exit_thread(&mut self, thread_id: ThreadId) {
564 self.known_thread_states
565 .insert(thread_id, ThreadStatus::Exited);
566 }
567
568 fn any_stopped_thread(&self) -> bool {
569 self.global_state
570 .is_some_and(|state| state == ThreadStatus::Stopped)
571 || self
572 .known_thread_states
573 .values()
574 .any(|status| *status == ThreadStatus::Stopped)
575 }
576}
577const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
578
579type IsEnabled = bool;
580
581#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
582pub struct OutputToken(pub usize);
583/// Represents a current state of a single debug adapter and provides ways to mutate it.
584pub struct Session {
585 mode: Mode,
586 pub(super) capabilities: Capabilities,
587 id: SessionId,
588 child_session_ids: HashSet<SessionId>,
589 parent_id: Option<SessionId>,
590 ignore_breakpoints: bool,
591 modules: Vec<dap::Module>,
592 loaded_sources: Vec<dap::Source>,
593 output_token: OutputToken,
594 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
595 threads: IndexMap<ThreadId, Thread>,
596 thread_states: ThreadStates,
597 variables: HashMap<VariableReference, Vec<dap::Variable>>,
598 stack_frames: IndexMap<StackFrameId, StackFrame>,
599 locations: HashMap<u64, dap::LocationsResponse>,
600 is_session_terminated: bool,
601 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
602 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
603 _background_tasks: Vec<Task<()>>,
604}
605
606trait CacheableCommand: Any + Send + Sync {
607 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
608 fn dyn_hash(&self, hasher: &mut dyn Hasher);
609 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
610}
611
612impl<T> CacheableCommand for T
613where
614 T: DapCommand + PartialEq + Eq + Hash,
615{
616 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
617 (rhs as &dyn Any)
618 .downcast_ref::<Self>()
619 .map_or(false, |rhs| self == rhs)
620 }
621
622 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
623 T::hash(self, &mut hasher);
624 }
625
626 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
627 self
628 }
629}
630
631pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
632
633impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
634 fn from(request: T) -> Self {
635 Self(Arc::new(request))
636 }
637}
638
639impl PartialEq for RequestSlot {
640 fn eq(&self, other: &Self) -> bool {
641 self.0.dyn_eq(other.0.as_ref())
642 }
643}
644
645impl Eq for RequestSlot {}
646
647impl Hash for RequestSlot {
648 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
649 self.0.dyn_hash(state);
650 (&*self.0 as &dyn Any).type_id().hash(state)
651 }
652}
653
654#[derive(Debug, Clone, Hash, PartialEq, Eq)]
655pub struct CompletionsQuery {
656 pub query: String,
657 pub column: u64,
658 pub line: Option<u64>,
659 pub frame_id: Option<u64>,
660}
661
662impl CompletionsQuery {
663 pub fn new(
664 buffer: &language::Buffer,
665 cursor_position: language::Anchor,
666 frame_id: Option<u64>,
667 ) -> Self {
668 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
669 Self {
670 query: buffer.text(),
671 column: column as u64,
672 frame_id,
673 line: Some(row as u64),
674 }
675 }
676}
677
678pub enum SessionEvent {
679 Modules,
680 LoadedSources,
681 Stopped(Option<ThreadId>),
682 StackTrace,
683 Variables,
684 Threads,
685}
686
687pub(crate) enum SessionStateEvent {
688 Shutdown,
689}
690
691impl EventEmitter<SessionEvent> for Session {}
692impl EventEmitter<SessionStateEvent> for Session {}
693
694// local session will send breakpoint updates to DAP for all new breakpoints
695// remote side will only send breakpoint updates when it is a breakpoint created by that peer
696// BreakpointStore notifies session on breakpoint changes
697impl Session {
698 pub(crate) fn local(
699 breakpoint_store: Entity<BreakpointStore>,
700 session_id: SessionId,
701 parent_session: Option<Entity<Session>>,
702 binary: DebugAdapterBinary,
703 config: DebugTaskDefinition,
704 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
705 initialized_tx: oneshot::Sender<()>,
706 cx: &mut App,
707 ) -> Task<Result<Entity<Self>>> {
708 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
709
710 cx.spawn(async move |cx| {
711 let mode = LocalMode::new(
712 session_id,
713 parent_session.clone(),
714 breakpoint_store.clone(),
715 config.clone(),
716 binary,
717 message_tx,
718 cx.clone(),
719 )
720 .await?;
721
722 cx.new(|cx| {
723 create_local_session(
724 breakpoint_store,
725 session_id,
726 parent_session,
727 start_debugging_requests_tx,
728 initialized_tx,
729 message_rx,
730 mode,
731 cx,
732 )
733 })
734 })
735 }
736
737 pub(crate) fn remote(
738 session_id: SessionId,
739 client: AnyProtoClient,
740 upstream_project_id: u64,
741 ignore_breakpoints: bool,
742 ) -> Self {
743 Self {
744 mode: Mode::Remote(RemoteConnection {
745 _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
746 _client: client,
747 _upstream_project_id: upstream_project_id,
748 }),
749 id: session_id,
750 child_session_ids: HashSet::default(),
751 parent_id: None,
752 capabilities: Capabilities::default(),
753 ignore_breakpoints,
754 variables: Default::default(),
755 stack_frames: Default::default(),
756 thread_states: ThreadStates::default(),
757 output_token: OutputToken(0),
758 output: circular_buffer::CircularBuffer::boxed(),
759 requests: HashMap::default(),
760 modules: Vec::default(),
761 loaded_sources: Vec::default(),
762 threads: IndexMap::default(),
763 _background_tasks: Vec::default(),
764 locations: Default::default(),
765 is_session_terminated: false,
766 exception_breakpoints: Default::default(),
767 }
768 }
769
770 pub fn session_id(&self) -> SessionId {
771 self.id
772 }
773
774 pub fn child_session_ids(&self) -> HashSet<SessionId> {
775 self.child_session_ids.clone()
776 }
777
778 pub fn add_child_session_id(&mut self, session_id: SessionId) {
779 self.child_session_ids.insert(session_id);
780 }
781
782 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
783 self.child_session_ids.remove(&session_id);
784 }
785
786 pub fn parent_id(&self) -> Option<SessionId> {
787 self.parent_id
788 }
789
790 pub fn capabilities(&self) -> &Capabilities {
791 &self.capabilities
792 }
793
794 pub fn binary(&self) -> &DebugAdapterBinary {
795 let Mode::Local(local_mode) = &self.mode else {
796 panic!("Session is not local");
797 };
798 &local_mode.binary
799 }
800
801 pub fn adapter_name(&self) -> SharedString {
802 match &self.mode {
803 Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
804 Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
805 }
806 }
807
808 pub fn configuration(&self) -> Option<DebugTaskDefinition> {
809 if let Mode::Local(local_mode) = &self.mode {
810 Some(local_mode.definition.clone())
811 } else {
812 None
813 }
814 }
815
816 pub fn is_terminated(&self) -> bool {
817 self.is_session_terminated
818 }
819
820 pub fn is_local(&self) -> bool {
821 matches!(self.mode, Mode::Local(_))
822 }
823
824 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
825 match &mut self.mode {
826 Mode::Local(local_mode) => Some(local_mode),
827 Mode::Remote(_) => None,
828 }
829 }
830
831 pub fn as_local(&self) -> Option<&LocalMode> {
832 match &self.mode {
833 Mode::Local(local_mode) => Some(local_mode),
834 Mode::Remote(_) => None,
835 }
836 }
837
838 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
839 match &self.mode {
840 Mode::Local(local_mode) => {
841 let capabilities = local_mode.clone().request_initialization(cx);
842
843 cx.spawn(async move |this, cx| {
844 let capabilities = capabilities.await?;
845 this.update(cx, |session, _| {
846 session.capabilities = capabilities;
847 let filters = session
848 .capabilities
849 .exception_breakpoint_filters
850 .clone()
851 .unwrap_or_default();
852 for filter in filters {
853 let default = filter.default.unwrap_or_default();
854 session
855 .exception_breakpoints
856 .entry(filter.filter.clone())
857 .or_insert_with(|| (filter, default));
858 }
859 })?;
860 Ok(())
861 })
862 }
863 Mode::Remote(_) => Task::ready(Err(anyhow!(
864 "Cannot send initialize request from remote session"
865 ))),
866 }
867 }
868
869 pub(super) fn initialize_sequence(
870 &mut self,
871 initialize_rx: oneshot::Receiver<()>,
872 cx: &mut Context<Self>,
873 ) -> Task<Result<()>> {
874 match &self.mode {
875 Mode::Local(local_mode) => {
876 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
877 }
878 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
879 }
880 }
881
882 pub fn run_to_position(
883 &mut self,
884 breakpoint: SourceBreakpoint,
885 active_thread_id: ThreadId,
886 cx: &mut Context<Self>,
887 ) {
888 match &mut self.mode {
889 Mode::Local(local_mode) => {
890 if !matches!(
891 self.thread_states.thread_state(active_thread_id),
892 Some(ThreadStatus::Stopped)
893 ) {
894 return;
895 };
896 let path = breakpoint.path.clone();
897 local_mode.tmp_breakpoint = Some(breakpoint);
898 let task = local_mode.send_breakpoints_from_path(
899 path,
900 BreakpointUpdatedReason::Toggled,
901 cx,
902 );
903
904 cx.spawn(async move |this, cx| {
905 task.await;
906 this.update(cx, |this, cx| {
907 this.continue_thread(active_thread_id, cx);
908 })
909 })
910 .detach();
911 }
912 Mode::Remote(_) => {}
913 }
914 }
915
916 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
917 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
918 }
919
920 pub fn output(
921 &self,
922 since: OutputToken,
923 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
924 if self.output_token.0 == 0 {
925 return (self.output.range(0..0), OutputToken(0));
926 };
927
928 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
929
930 let clamped_events_since = events_since.clamp(0, self.output.len());
931 (
932 self.output
933 .range(self.output.len() - clamped_events_since..),
934 self.output_token,
935 )
936 }
937
938 pub fn respond_to_client(
939 &self,
940 request_seq: u64,
941 success: bool,
942 command: String,
943 body: Option<serde_json::Value>,
944 cx: &mut Context<Self>,
945 ) -> Task<Result<()>> {
946 let Some(local_session) = self.as_local().cloned() else {
947 unreachable!("Cannot respond to remote client");
948 };
949
950 cx.background_spawn(async move {
951 local_session
952 .client
953 .send_message(Message::Response(Response {
954 body,
955 success,
956 command,
957 seq: request_seq + 1,
958 request_seq,
959 message: None,
960 }))
961 .await
962 })
963 }
964
965 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
966 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
967 let breakpoint = local.tmp_breakpoint.take()?;
968 let path = breakpoint.path.clone();
969 Some((local, path))
970 }) {
971 local
972 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
973 .detach();
974 };
975
976 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
977 self.thread_states.stop_all_threads();
978
979 self.invalidate_command_type::<StackTraceCommand>();
980 }
981
982 // Event if we stopped all threads we still need to insert the thread_id
983 // to our own data
984 if let Some(thread_id) = event.thread_id {
985 self.thread_states.stop_thread(ThreadId(thread_id));
986
987 self.invalidate_state(
988 &StackTraceCommand {
989 thread_id,
990 start_frame: None,
991 levels: None,
992 }
993 .into(),
994 );
995 }
996
997 self.invalidate_generic();
998 self.threads.clear();
999 self.variables.clear();
1000 cx.emit(SessionEvent::Stopped(
1001 event
1002 .thread_id
1003 .map(Into::into)
1004 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1005 ));
1006 cx.notify();
1007 }
1008
1009 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1010 match *event {
1011 Events::Initialized(_) => {
1012 debug_assert!(
1013 false,
1014 "Initialized event should have been handled in LocalMode"
1015 );
1016 }
1017 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1018 Events::Continued(event) => {
1019 if event.all_threads_continued.unwrap_or_default() {
1020 self.thread_states.continue_all_threads();
1021 } else {
1022 self.thread_states
1023 .continue_thread(ThreadId(event.thread_id));
1024 }
1025 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1026 self.invalidate_generic();
1027 }
1028 Events::Exited(_event) => {
1029 self.clear_active_debug_line(cx);
1030 }
1031 Events::Terminated(_) => {
1032 self.is_session_terminated = true;
1033 self.clear_active_debug_line(cx);
1034 }
1035 Events::Thread(event) => {
1036 let thread_id = ThreadId(event.thread_id);
1037
1038 match event.reason {
1039 dap::ThreadEventReason::Started => {
1040 self.thread_states.continue_thread(thread_id);
1041 }
1042 dap::ThreadEventReason::Exited => {
1043 self.thread_states.exit_thread(thread_id);
1044 }
1045 reason => {
1046 log::error!("Unhandled thread event reason {:?}", reason);
1047 }
1048 }
1049 self.invalidate_state(&ThreadsCommand.into());
1050 cx.notify();
1051 }
1052 Events::Output(event) => {
1053 if event
1054 .category
1055 .as_ref()
1056 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1057 {
1058 return;
1059 }
1060
1061 self.output.push_back(event);
1062 self.output_token.0 += 1;
1063 cx.notify();
1064 }
1065 Events::Breakpoint(_) => {}
1066 Events::Module(event) => {
1067 match event.reason {
1068 dap::ModuleEventReason::New => {
1069 self.modules.push(event.module);
1070 }
1071 dap::ModuleEventReason::Changed => {
1072 if let Some(module) = self
1073 .modules
1074 .iter_mut()
1075 .find(|other| event.module.id == other.id)
1076 {
1077 *module = event.module;
1078 }
1079 }
1080 dap::ModuleEventReason::Removed => {
1081 self.modules.retain(|other| event.module.id != other.id);
1082 }
1083 }
1084
1085 // todo(debugger): We should only send the invalidate command to downstream clients.
1086 // self.invalidate_state(&ModulesCommand.into());
1087 }
1088 Events::LoadedSource(_) => {
1089 self.invalidate_state(&LoadedSourcesCommand.into());
1090 }
1091 Events::Capabilities(event) => {
1092 self.capabilities = self.capabilities.merge(event.capabilities);
1093 cx.notify();
1094 }
1095 Events::Memory(_) => {}
1096 Events::Process(_) => {}
1097 Events::ProgressEnd(_) => {}
1098 Events::ProgressStart(_) => {}
1099 Events::ProgressUpdate(_) => {}
1100 Events::Invalidated(_) => {}
1101 Events::Other(_) => {}
1102 }
1103 }
1104
1105 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1106 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1107 &mut self,
1108 request: T,
1109 process_result: impl FnOnce(
1110 &mut Self,
1111 Result<T::Response>,
1112 &mut Context<Self>,
1113 ) -> Option<T::Response>
1114 + 'static,
1115 cx: &mut Context<Self>,
1116 ) {
1117 const {
1118 assert!(
1119 T::CACHEABLE,
1120 "Only requests marked as cacheable should invoke `fetch`"
1121 );
1122 }
1123
1124 if !self.thread_states.any_stopped_thread()
1125 && request.type_id() != TypeId::of::<ThreadsCommand>()
1126 || self.is_session_terminated
1127 {
1128 return;
1129 }
1130
1131 let request_map = self
1132 .requests
1133 .entry(std::any::TypeId::of::<T>())
1134 .or_default();
1135
1136 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1137 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1138
1139 let task = Self::request_inner::<Arc<T>>(
1140 &self.capabilities,
1141 self.id,
1142 &self.mode,
1143 command,
1144 process_result,
1145 cx,
1146 );
1147 let task = cx
1148 .background_executor()
1149 .spawn(async move {
1150 let _ = task.await?;
1151 Some(())
1152 })
1153 .shared();
1154
1155 vacant.insert(task);
1156 cx.notify();
1157 }
1158 }
1159
1160 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1161 capabilities: &Capabilities,
1162 session_id: SessionId,
1163 mode: &Mode,
1164 request: T,
1165 process_result: impl FnOnce(
1166 &mut Self,
1167 Result<T::Response>,
1168 &mut Context<Self>,
1169 ) -> Option<T::Response>
1170 + 'static,
1171 cx: &mut Context<Self>,
1172 ) -> Task<Option<T::Response>> {
1173 if !T::is_supported(&capabilities) {
1174 log::warn!(
1175 "Attempted to send a DAP request that isn't supported: {:?}",
1176 request
1177 );
1178 let error = Err(anyhow::Error::msg(
1179 "Couldn't complete request because it's not supported",
1180 ));
1181 return cx.spawn(async move |this, cx| {
1182 this.update(cx, |this, cx| process_result(this, error, cx))
1183 .log_err()
1184 .flatten()
1185 });
1186 }
1187
1188 let request = mode.request_dap(session_id, request, cx);
1189 cx.spawn(async move |this, cx| {
1190 let result = request.await;
1191 this.update(cx, |this, cx| process_result(this, result, cx))
1192 .log_err()
1193 .flatten()
1194 })
1195 }
1196
1197 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1198 &self,
1199 request: T,
1200 process_result: impl FnOnce(
1201 &mut Self,
1202 Result<T::Response>,
1203 &mut Context<Self>,
1204 ) -> Option<T::Response>
1205 + 'static,
1206 cx: &mut Context<Self>,
1207 ) -> Task<Option<T::Response>> {
1208 Self::request_inner(
1209 &self.capabilities,
1210 self.id,
1211 &self.mode,
1212 request,
1213 process_result,
1214 cx,
1215 )
1216 }
1217
1218 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1219 self.requests.remove(&std::any::TypeId::of::<Command>());
1220 }
1221
1222 fn invalidate_generic(&mut self) {
1223 self.invalidate_command_type::<ModulesCommand>();
1224 self.invalidate_command_type::<LoadedSourcesCommand>();
1225 self.invalidate_command_type::<ThreadsCommand>();
1226 }
1227
1228 fn invalidate_state(&mut self, key: &RequestSlot) {
1229 self.requests
1230 .entry((&*key.0 as &dyn Any).type_id())
1231 .and_modify(|request_map| {
1232 request_map.remove(&key);
1233 });
1234 }
1235
1236 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1237 self.thread_states.thread_status(thread_id)
1238 }
1239
1240 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1241 self.fetch(
1242 dap_command::ThreadsCommand,
1243 |this, result, cx| {
1244 let result = result.log_err()?;
1245
1246 this.threads = result
1247 .iter()
1248 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1249 .collect();
1250
1251 this.invalidate_command_type::<StackTraceCommand>();
1252 cx.emit(SessionEvent::Threads);
1253 cx.notify();
1254
1255 Some(result)
1256 },
1257 cx,
1258 );
1259
1260 self.threads
1261 .values()
1262 .map(|thread| {
1263 (
1264 thread.dap.clone(),
1265 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1266 )
1267 })
1268 .collect()
1269 }
1270
1271 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1272 self.fetch(
1273 dap_command::ModulesCommand,
1274 |this, result, cx| {
1275 let result = result.log_err()?;
1276
1277 this.modules = result.iter().cloned().collect();
1278 cx.emit(SessionEvent::Modules);
1279 cx.notify();
1280
1281 Some(result)
1282 },
1283 cx,
1284 );
1285
1286 &self.modules
1287 }
1288
1289 pub fn ignore_breakpoints(&self) -> bool {
1290 self.ignore_breakpoints
1291 }
1292
1293 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1294 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1295 }
1296
1297 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1298 if self.ignore_breakpoints == ignore {
1299 return Task::ready(());
1300 }
1301
1302 self.ignore_breakpoints = ignore;
1303
1304 if let Some(local) = self.as_local() {
1305 local.send_source_breakpoints(ignore, cx)
1306 } else {
1307 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1308 unimplemented!()
1309 }
1310 }
1311
1312 pub fn exception_breakpoints(
1313 &self,
1314 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1315 self.exception_breakpoints.values()
1316 }
1317
1318 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1319 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1320 *is_enabled = !*is_enabled;
1321 self.send_exception_breakpoints(cx);
1322 }
1323 }
1324
1325 fn send_exception_breakpoints(&mut self, cx: &App) {
1326 if let Some(local) = self.as_local() {
1327 let exception_filters = self
1328 .exception_breakpoints
1329 .values()
1330 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1331 .collect();
1332
1333 let supports_exception_filters = self
1334 .capabilities
1335 .supports_exception_filter_options
1336 .unwrap_or_default();
1337 local
1338 .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1339 .detach_and_log_err(cx);
1340 } else {
1341 debug_assert!(false, "Not implemented");
1342 }
1343 }
1344
1345 pub fn breakpoints_enabled(&self) -> bool {
1346 self.ignore_breakpoints
1347 }
1348
1349 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1350 self.fetch(
1351 dap_command::LoadedSourcesCommand,
1352 |this, result, cx| {
1353 let result = result.log_err()?;
1354 this.loaded_sources = result.iter().cloned().collect();
1355 cx.emit(SessionEvent::LoadedSources);
1356 cx.notify();
1357 Some(result)
1358 },
1359 cx,
1360 );
1361
1362 &self.loaded_sources
1363 }
1364
1365 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1366 res.log_err()?;
1367 Some(())
1368 }
1369
1370 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1371 thread_id: ThreadId,
1372 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1373 {
1374 move |this, response, cx| match response.log_err() {
1375 Some(response) => Some(response),
1376 None => {
1377 this.thread_states.stop_thread(thread_id);
1378 cx.notify();
1379 None
1380 }
1381 }
1382 }
1383
1384 fn clear_active_debug_line_response(
1385 &mut self,
1386 response: Result<()>,
1387 cx: &mut Context<Session>,
1388 ) -> Option<()> {
1389 response.log_err()?;
1390 self.clear_active_debug_line(cx);
1391 Some(())
1392 }
1393
1394 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1395 self.as_local()
1396 .expect("Message handler will only run in local mode")
1397 .breakpoint_store
1398 .update(cx, |store, cx| {
1399 store.remove_active_position(Some(self.id), cx)
1400 });
1401 }
1402
1403 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1404 self.request(
1405 PauseCommand {
1406 thread_id: thread_id.0,
1407 },
1408 Self::empty_response,
1409 cx,
1410 )
1411 .detach();
1412 }
1413
1414 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1415 self.request(
1416 RestartStackFrameCommand { stack_frame_id },
1417 Self::empty_response,
1418 cx,
1419 )
1420 .detach();
1421 }
1422
1423 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1424 if self.capabilities.supports_restart_request.unwrap_or(false) {
1425 self.request(
1426 RestartCommand {
1427 raw: args.unwrap_or(Value::Null),
1428 },
1429 Self::empty_response,
1430 cx,
1431 )
1432 .detach();
1433 } else {
1434 self.request(
1435 DisconnectCommand {
1436 restart: Some(false),
1437 terminate_debuggee: Some(true),
1438 suspend_debuggee: Some(false),
1439 },
1440 Self::empty_response,
1441 cx,
1442 )
1443 .detach();
1444 }
1445 }
1446
1447 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1448 self.is_session_terminated = true;
1449 self.thread_states.exit_all_threads();
1450 cx.notify();
1451
1452 let task = if self
1453 .capabilities
1454 .supports_terminate_request
1455 .unwrap_or_default()
1456 {
1457 self.request(
1458 TerminateCommand {
1459 restart: Some(false),
1460 },
1461 Self::clear_active_debug_line_response,
1462 cx,
1463 )
1464 } else {
1465 self.request(
1466 DisconnectCommand {
1467 restart: Some(false),
1468 terminate_debuggee: Some(true),
1469 suspend_debuggee: Some(false),
1470 },
1471 Self::clear_active_debug_line_response,
1472 cx,
1473 )
1474 };
1475
1476 cx.emit(SessionStateEvent::Shutdown);
1477
1478 cx.background_spawn(async move {
1479 let _ = task.await;
1480 })
1481 }
1482
1483 pub fn completions(
1484 &mut self,
1485 query: CompletionsQuery,
1486 cx: &mut Context<Self>,
1487 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1488 let task = self.request(query, |_, result, _| result.log_err(), cx);
1489
1490 cx.background_executor().spawn(async move {
1491 anyhow::Ok(
1492 task.await
1493 .map(|response| response.targets)
1494 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1495 )
1496 })
1497 }
1498
1499 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1500 self.thread_states.continue_thread(thread_id);
1501 self.request(
1502 ContinueCommand {
1503 args: ContinueArguments {
1504 thread_id: thread_id.0,
1505 single_thread: Some(true),
1506 },
1507 },
1508 Self::on_step_response::<ContinueCommand>(thread_id),
1509 cx,
1510 )
1511 .detach();
1512 }
1513
1514 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1515 match self.mode {
1516 Mode::Local(ref local) => Some(local.client.clone()),
1517 Mode::Remote(_) => None,
1518 }
1519 }
1520
1521 pub fn step_over(
1522 &mut self,
1523 thread_id: ThreadId,
1524 granularity: SteppingGranularity,
1525 cx: &mut Context<Self>,
1526 ) {
1527 let supports_single_thread_execution_requests =
1528 self.capabilities.supports_single_thread_execution_requests;
1529 let supports_stepping_granularity = self
1530 .capabilities
1531 .supports_stepping_granularity
1532 .unwrap_or_default();
1533
1534 let command = NextCommand {
1535 inner: StepCommand {
1536 thread_id: thread_id.0,
1537 granularity: supports_stepping_granularity.then(|| granularity),
1538 single_thread: supports_single_thread_execution_requests,
1539 },
1540 };
1541
1542 self.thread_states.process_step(thread_id);
1543 self.request(
1544 command,
1545 Self::on_step_response::<NextCommand>(thread_id),
1546 cx,
1547 )
1548 .detach();
1549 }
1550
1551 pub fn step_in(
1552 &mut self,
1553 thread_id: ThreadId,
1554 granularity: SteppingGranularity,
1555 cx: &mut Context<Self>,
1556 ) {
1557 let supports_single_thread_execution_requests =
1558 self.capabilities.supports_single_thread_execution_requests;
1559 let supports_stepping_granularity = self
1560 .capabilities
1561 .supports_stepping_granularity
1562 .unwrap_or_default();
1563
1564 let command = StepInCommand {
1565 inner: StepCommand {
1566 thread_id: thread_id.0,
1567 granularity: supports_stepping_granularity.then(|| granularity),
1568 single_thread: supports_single_thread_execution_requests,
1569 },
1570 };
1571
1572 self.thread_states.process_step(thread_id);
1573 self.request(
1574 command,
1575 Self::on_step_response::<StepInCommand>(thread_id),
1576 cx,
1577 )
1578 .detach();
1579 }
1580
1581 pub fn step_out(
1582 &mut self,
1583 thread_id: ThreadId,
1584 granularity: SteppingGranularity,
1585 cx: &mut Context<Self>,
1586 ) {
1587 let supports_single_thread_execution_requests =
1588 self.capabilities.supports_single_thread_execution_requests;
1589 let supports_stepping_granularity = self
1590 .capabilities
1591 .supports_stepping_granularity
1592 .unwrap_or_default();
1593
1594 let command = StepOutCommand {
1595 inner: StepCommand {
1596 thread_id: thread_id.0,
1597 granularity: supports_stepping_granularity.then(|| granularity),
1598 single_thread: supports_single_thread_execution_requests,
1599 },
1600 };
1601
1602 self.thread_states.process_step(thread_id);
1603 self.request(
1604 command,
1605 Self::on_step_response::<StepOutCommand>(thread_id),
1606 cx,
1607 )
1608 .detach();
1609 }
1610
1611 pub fn step_back(
1612 &mut self,
1613 thread_id: ThreadId,
1614 granularity: SteppingGranularity,
1615 cx: &mut Context<Self>,
1616 ) {
1617 let supports_single_thread_execution_requests =
1618 self.capabilities.supports_single_thread_execution_requests;
1619 let supports_stepping_granularity = self
1620 .capabilities
1621 .supports_stepping_granularity
1622 .unwrap_or_default();
1623
1624 let command = StepBackCommand {
1625 inner: StepCommand {
1626 thread_id: thread_id.0,
1627 granularity: supports_stepping_granularity.then(|| granularity),
1628 single_thread: supports_single_thread_execution_requests,
1629 },
1630 };
1631
1632 self.thread_states.process_step(thread_id);
1633
1634 self.request(
1635 command,
1636 Self::on_step_response::<StepBackCommand>(thread_id),
1637 cx,
1638 )
1639 .detach();
1640 }
1641
1642 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1643 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1644 && self.requests.contains_key(&ThreadsCommand.type_id())
1645 && self.threads.contains_key(&thread_id)
1646 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1647 // We could still be using an old thread id and have sent a new thread's request
1648 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1649 // But it very well could cause a minor bug in the future that is hard to track down
1650 {
1651 self.fetch(
1652 super::dap_command::StackTraceCommand {
1653 thread_id: thread_id.0,
1654 start_frame: None,
1655 levels: None,
1656 },
1657 move |this, stack_frames, cx| {
1658 let stack_frames = stack_frames.log_err()?;
1659
1660 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1661 thread.stack_frame_ids =
1662 stack_frames.iter().map(|frame| frame.id).collect();
1663 });
1664 debug_assert!(
1665 matches!(entry, indexmap::map::Entry::Occupied(_)),
1666 "Sent request for thread_id that doesn't exist"
1667 );
1668
1669 this.stack_frames.extend(
1670 stack_frames
1671 .iter()
1672 .cloned()
1673 .map(|frame| (frame.id, StackFrame::from(frame))),
1674 );
1675
1676 this.invalidate_command_type::<ScopesCommand>();
1677 this.invalidate_command_type::<VariablesCommand>();
1678
1679 cx.emit(SessionEvent::StackTrace);
1680 cx.notify();
1681 Some(stack_frames)
1682 },
1683 cx,
1684 );
1685 }
1686
1687 self.threads
1688 .get(&thread_id)
1689 .map(|thread| {
1690 thread
1691 .stack_frame_ids
1692 .iter()
1693 .filter_map(|id| self.stack_frames.get(id))
1694 .cloned()
1695 .collect()
1696 })
1697 .unwrap_or_default()
1698 }
1699
1700 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1701 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1702 && self
1703 .requests
1704 .contains_key(&TypeId::of::<StackTraceCommand>())
1705 {
1706 self.fetch(
1707 ScopesCommand { stack_frame_id },
1708 move |this, scopes, cx| {
1709 let scopes = scopes.log_err()?;
1710
1711 for scope in scopes .iter(){
1712 this.variables(scope.variables_reference, cx);
1713 }
1714
1715 let entry = this
1716 .stack_frames
1717 .entry(stack_frame_id)
1718 .and_modify(|stack_frame| {
1719 stack_frame.scopes = scopes.clone();
1720 });
1721
1722 cx.emit(SessionEvent::Variables);
1723
1724 debug_assert!(
1725 matches!(entry, indexmap::map::Entry::Occupied(_)),
1726 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1727 );
1728
1729 Some(scopes)
1730 },
1731 cx,
1732 );
1733 }
1734
1735 self.stack_frames
1736 .get(&stack_frame_id)
1737 .map(|frame| frame.scopes.as_slice())
1738 .unwrap_or_default()
1739 }
1740
1741 pub fn variables(
1742 &mut self,
1743 variables_reference: VariableReference,
1744 cx: &mut Context<Self>,
1745 ) -> Vec<dap::Variable> {
1746 let command = VariablesCommand {
1747 variables_reference,
1748 filter: None,
1749 start: None,
1750 count: None,
1751 format: None,
1752 };
1753
1754 self.fetch(
1755 command,
1756 move |this, variables, cx| {
1757 let variables = variables.log_err()?;
1758 this.variables
1759 .insert(variables_reference, variables.clone());
1760
1761 cx.emit(SessionEvent::Variables);
1762 Some(variables)
1763 },
1764 cx,
1765 );
1766
1767 self.variables
1768 .get(&variables_reference)
1769 .cloned()
1770 .unwrap_or_default()
1771 }
1772
1773 pub fn set_variable_value(
1774 &mut self,
1775 variables_reference: u64,
1776 name: String,
1777 value: String,
1778 cx: &mut Context<Self>,
1779 ) {
1780 if self.capabilities.supports_set_variable.unwrap_or_default() {
1781 self.request(
1782 SetVariableValueCommand {
1783 name,
1784 value,
1785 variables_reference,
1786 },
1787 move |this, response, cx| {
1788 let response = response.log_err()?;
1789 this.invalidate_command_type::<VariablesCommand>();
1790 cx.notify();
1791 Some(response)
1792 },
1793 cx,
1794 )
1795 .detach()
1796 }
1797 }
1798
1799 pub fn evaluate(
1800 &mut self,
1801 expression: String,
1802 context: Option<EvaluateArgumentsContext>,
1803 frame_id: Option<u64>,
1804 source: Option<Source>,
1805 cx: &mut Context<Self>,
1806 ) {
1807 self.request(
1808 EvaluateCommand {
1809 expression,
1810 context,
1811 frame_id,
1812 source,
1813 },
1814 |this, response, cx| {
1815 let response = response.log_err()?;
1816 this.output_token.0 += 1;
1817 this.output.push_back(dap::OutputEvent {
1818 category: None,
1819 output: response.result.clone(),
1820 group: None,
1821 variables_reference: Some(response.variables_reference),
1822 source: None,
1823 line: None,
1824 column: None,
1825 data: None,
1826 location_reference: None,
1827 });
1828
1829 this.invalidate_command_type::<ScopesCommand>();
1830 cx.notify();
1831 Some(response)
1832 },
1833 cx,
1834 )
1835 .detach();
1836 }
1837
1838 pub fn location(
1839 &mut self,
1840 reference: u64,
1841 cx: &mut Context<Self>,
1842 ) -> Option<dap::LocationsResponse> {
1843 self.fetch(
1844 LocationsCommand { reference },
1845 move |this, response, _| {
1846 let response = response.log_err()?;
1847 this.locations.insert(reference, response.clone());
1848 Some(response)
1849 },
1850 cx,
1851 );
1852 self.locations.get(&reference).cloned()
1853 }
1854 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1855 let command = DisconnectCommand {
1856 restart: Some(false),
1857 terminate_debuggee: Some(true),
1858 suspend_debuggee: Some(false),
1859 };
1860
1861 self.request(command, Self::empty_response, cx).detach()
1862 }
1863
1864 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1865 if self
1866 .capabilities
1867 .supports_terminate_threads_request
1868 .unwrap_or_default()
1869 {
1870 self.request(
1871 TerminateThreadsCommand {
1872 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1873 },
1874 Self::clear_active_debug_line_response,
1875 cx,
1876 )
1877 .detach();
1878 } else {
1879 self.shutdown(cx).detach();
1880 }
1881 }
1882}
1883
1884fn create_local_session(
1885 breakpoint_store: Entity<BreakpointStore>,
1886 session_id: SessionId,
1887 parent_session: Option<Entity<Session>>,
1888 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1889 initialized_tx: oneshot::Sender<()>,
1890 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1891 mode: LocalMode,
1892 cx: &mut Context<Session>,
1893) -> Session {
1894 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1895 let mut initialized_tx = Some(initialized_tx);
1896 while let Some(message) = message_rx.next().await {
1897 if let Message::Event(event) = message {
1898 if let Events::Initialized(_) = *event {
1899 if let Some(tx) = initialized_tx.take() {
1900 tx.send(()).ok();
1901 }
1902 } else {
1903 let Ok(_) = this.update(cx, |session, cx| {
1904 session.handle_dap_event(event, cx);
1905 }) else {
1906 break;
1907 };
1908 }
1909 } else {
1910 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1911 else {
1912 break;
1913 };
1914 }
1915 }
1916 })];
1917
1918 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1919 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1920 if let Some(local) = (!this.ignore_breakpoints)
1921 .then(|| this.as_local_mut())
1922 .flatten()
1923 {
1924 local
1925 .send_breakpoints_from_path(path.clone(), *reason, cx)
1926 .detach();
1927 };
1928 }
1929 BreakpointStoreEvent::BreakpointsCleared(paths) => {
1930 if let Some(local) = (!this.ignore_breakpoints)
1931 .then(|| this.as_local_mut())
1932 .flatten()
1933 {
1934 local.unset_breakpoints_from_paths(paths, cx).detach();
1935 }
1936 }
1937 BreakpointStoreEvent::ActiveDebugLineChanged => {}
1938 })
1939 .detach();
1940
1941 Session {
1942 mode: Mode::Local(mode),
1943 id: session_id,
1944 child_session_ids: HashSet::default(),
1945 parent_id: parent_session.map(|session| session.read(cx).id),
1946 variables: Default::default(),
1947 capabilities: Capabilities::default(),
1948 thread_states: ThreadStates::default(),
1949 output_token: OutputToken(0),
1950 ignore_breakpoints: false,
1951 output: circular_buffer::CircularBuffer::boxed(),
1952 requests: HashMap::default(),
1953 modules: Vec::default(),
1954 loaded_sources: Vec::default(),
1955 threads: IndexMap::default(),
1956 stack_frames: IndexMap::default(),
1957 locations: Default::default(),
1958 exception_breakpoints: Default::default(),
1959 _background_tasks,
1960 is_session_terminated: false,
1961 }
1962}