1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{anyhow, Result};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::OutputEventCategory;
18use dap::{
19 adapters::{DapDelegate, DapStatus},
20 client::{DebugAdapterClient, SessionId},
21 messages::{Events, Message},
22 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
23 SteppingGranularity, StoppedEvent, VariableReference,
24};
25use dap_adapters::build_adapter;
26use futures::channel::oneshot;
27use futures::{future::Shared, FutureExt};
28use gpui::{
29 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
30};
31use rpc::AnyProtoClient;
32use serde_json::{json, Value};
33use settings::Settings;
34use smol::stream::StreamExt;
35use std::any::TypeId;
36use std::path::PathBuf;
37use std::u64;
38use std::{
39 any::Any,
40 collections::hash_map::Entry,
41 hash::{Hash, Hasher},
42 path::Path,
43 sync::Arc,
44};
45use task::DebugAdapterConfig;
46use text::{PointUtf16, ToPointUtf16};
47use util::{merge_json_value_into, ResultExt};
48
49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
50#[repr(transparent)]
51pub struct ThreadId(pub u64);
52
53impl ThreadId {
54 pub const MIN: ThreadId = ThreadId(u64::MIN);
55 pub const MAX: ThreadId = ThreadId(u64::MAX);
56}
57
58impl From<u64> for ThreadId {
59 fn from(id: u64) -> Self {
60 Self(id)
61 }
62}
63
64#[derive(Clone, Debug)]
65pub struct StackFrame {
66 pub dap: dap::StackFrame,
67 pub scopes: Vec<dap::Scope>,
68}
69
70impl From<dap::StackFrame> for StackFrame {
71 fn from(stack_frame: dap::StackFrame) -> Self {
72 Self {
73 scopes: vec![],
74 dap: stack_frame,
75 }
76 }
77}
78
79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
80pub enum ThreadStatus {
81 #[default]
82 Running,
83 Stopped,
84 Stepping,
85 Exited,
86 Ended,
87}
88
89impl ThreadStatus {
90 pub fn label(&self) -> &'static str {
91 match self {
92 ThreadStatus::Running => "Running",
93 ThreadStatus::Stopped => "Stopped",
94 ThreadStatus::Stepping => "Stepping",
95 ThreadStatus::Exited => "Exited",
96 ThreadStatus::Ended => "Ended",
97 }
98 }
99}
100
101#[derive(Debug)]
102pub struct Thread {
103 dap: dap::Thread,
104 stack_frame_ids: IndexSet<StackFrameId>,
105 _has_stopped: bool,
106}
107
108impl From<dap::Thread> for Thread {
109 fn from(dap: dap::Thread) -> Self {
110 Self {
111 dap,
112 stack_frame_ids: Default::default(),
113 _has_stopped: false,
114 }
115 }
116}
117
118type UpstreamProjectId = u64;
119
120struct RemoteConnection {
121 _client: AnyProtoClient,
122 _upstream_project_id: UpstreamProjectId,
123}
124
125impl RemoteConnection {
126 fn send_proto_client_request<R: DapCommand>(
127 &self,
128 _request: R,
129 _session_id: SessionId,
130 cx: &mut App,
131 ) -> Task<Result<R::Response>> {
132 // let message = request.to_proto(session_id, self.upstream_project_id);
133 // let upstream_client = self.client.clone();
134 cx.background_executor().spawn(async move {
135 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
136 // let response = upstream_client.request(message).await?;
137 // request.response_from_proto(response)
138 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
139 })
140 }
141
142 fn request<R: DapCommand>(
143 &self,
144 request: R,
145 session_id: SessionId,
146 cx: &mut App,
147 ) -> Task<Result<R::Response>>
148 where
149 <R::DapRequest as dap::requests::Request>::Response: 'static,
150 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
151 {
152 return self.send_proto_client_request::<R>(request, session_id, cx);
153 }
154}
155
156enum Mode {
157 Local(LocalMode),
158 Remote(RemoteConnection),
159}
160
161#[derive(Clone)]
162pub struct LocalMode {
163 client: Arc<DebugAdapterClient>,
164 config: DebugAdapterConfig,
165 adapter: Arc<dyn DebugAdapter>,
166 breakpoint_store: Entity<BreakpointStore>,
167}
168
169fn client_source(abs_path: &Path) -> dap::Source {
170 dap::Source {
171 name: abs_path
172 .file_name()
173 .map(|filename| filename.to_string_lossy().to_string()),
174 path: Some(abs_path.to_string_lossy().to_string()),
175 source_reference: None,
176 presentation_hint: None,
177 origin: None,
178 sources: None,
179 adapter_data: None,
180 checksums: None,
181 }
182}
183
184impl LocalMode {
185 fn new(
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<(Self, Capabilities)>> {
194 cx.spawn(async move |cx| {
195 let (adapter, binary) = Self::get_adapter_binary(&config, &delegate, cx).await?;
196
197 let message_handler = Box::new(move |message| {
198 messages_tx.unbounded_send(message).ok();
199 });
200
201 let client = Arc::new(
202 if let Some(client) = parent_session
203 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
204 .flatten()
205 {
206 client
207 .reconnect(session_id, binary, message_handler, cx.clone())
208 .await?
209 } else {
210 DebugAdapterClient::start(
211 session_id,
212 adapter.name(),
213 binary,
214 message_handler,
215 cx.clone(),
216 )
217 .await?
218 },
219 );
220
221 let adapter_id = adapter.name().to_string().to_owned();
222 let session = Self {
223 client,
224 adapter,
225 breakpoint_store,
226 config: config.clone(),
227 };
228
229 #[cfg(any(test, feature = "test-support"))]
230 {
231 let dap::DebugAdapterKind::Fake((fail, caps)) = session.config.kind.clone() else {
232 panic!("Only fake debug adapter configs should be used in tests");
233 };
234
235 session
236 .client
237 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
238 .await;
239
240 let paths = cx.update(|cx| session.breakpoint_store.read(cx).breakpoint_paths()).expect("Breakpoint store should exist in all tests that start debuggers");
241
242 session.client.on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
243 let p = Arc::from(Path::new(&args.source.path.unwrap()));
244 if !paths.contains(&p) {
245 panic!("Sent breakpoints for path without any")
246 }
247
248 Ok(dap::SetBreakpointsResponse {
249 breakpoints: Vec::default(),
250 })
251 }).await;
252
253 match config.request.clone() {
254 dap::DebugRequestType::Launch if fail => {
255 session
256 .client
257 .on_request::<dap::requests::Launch, _>(move |_, _| {
258 Err(dap::ErrorResponse {
259 error: Some(dap::Message {
260 id: 1,
261 format: "error".into(),
262 variables: None,
263 send_telemetry: None,
264 show_user: None,
265 url: None,
266 url_label: None,
267 }),
268 })
269 })
270 .await;
271 }
272 dap::DebugRequestType::Launch => {
273 session
274 .client
275 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
276 .await;
277 }
278 dap::DebugRequestType::Attach(_) if fail => {
279 session
280 .client
281 .on_request::<dap::requests::Attach, _>(move |_, _| {
282 Err(dap::ErrorResponse {
283 error: Some(dap::Message {
284 id: 1,
285 format: "error".into(),
286 variables: None,
287 send_telemetry: None,
288 show_user: None,
289 url: None,
290 url_label: None,
291 }),
292 })
293 })
294 .await;
295 }
296 dap::DebugRequestType::Attach(attach_config) => {
297 session
298 .client
299 .on_request::<dap::requests::Attach, _>(move |_, args| {
300 assert_eq!(
301 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
302 args.raw
303 );
304
305 Ok(())
306 })
307 .await;
308 }
309 }
310
311 session.client.on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(())).await;
312 session.client.fake_event(Events::Initialized(None)).await;
313 }
314
315 let capabilities = session
316 .request(Initialize { adapter_id }, cx.background_executor().clone())
317 .await?;
318
319 Ok((session, capabilities))
320 })
321 }
322
323 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
324 let tasks: Vec<_> = paths
325 .into_iter()
326 .map(|path| {
327 self.request(
328 dap_command::SetBreakpoints {
329 source: client_source(path),
330 source_modified: None,
331 breakpoints: vec![],
332 },
333 cx.background_executor().clone(),
334 )
335 })
336 .collect();
337
338 cx.background_spawn(async move {
339 futures::future::join_all(tasks)
340 .await
341 .iter()
342 .for_each(|res| match res {
343 Ok(_) => {}
344 Err(err) => {
345 log::warn!("Set breakpoints request failed: {}", err);
346 }
347 });
348 })
349 }
350
351 fn send_breakpoints_from_path(
352 &self,
353 abs_path: Arc<Path>,
354 reason: BreakpointUpdatedReason,
355 cx: &mut App,
356 ) -> Task<()> {
357 let breakpoints = self
358 .breakpoint_store
359 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
360 .into_iter()
361 .map(Into::into)
362 .collect();
363
364 let task = self.request(
365 dap_command::SetBreakpoints {
366 source: client_source(&abs_path),
367 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
368 breakpoints,
369 },
370 cx.background_executor().clone(),
371 );
372
373 cx.background_spawn(async move {
374 match task.await {
375 Ok(_) => {}
376 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
377 }
378 })
379 }
380
381 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
382 let mut breakpoint_tasks = Vec::new();
383 let breakpoints = self
384 .breakpoint_store
385 .read_with(cx, |store, cx| store.all_breakpoints(cx));
386
387 for (path, breakpoints) in breakpoints {
388 let breakpoints = if ignore_breakpoints {
389 vec![]
390 } else {
391 breakpoints.into_iter().map(Into::into).collect()
392 };
393
394 breakpoint_tasks.push(self.request(
395 dap_command::SetBreakpoints {
396 source: client_source(&path),
397 source_modified: Some(false),
398 breakpoints,
399 },
400 cx.background_executor().clone(),
401 ));
402 }
403
404 cx.background_spawn(async move {
405 futures::future::join_all(breakpoint_tasks)
406 .await
407 .iter()
408 .for_each(|res| match res {
409 Ok(_) => {}
410 Err(err) => {
411 log::warn!("Set breakpoints request failed: {}", err);
412 }
413 });
414 })
415 }
416
417 async fn get_adapter_binary(
418 config: &DebugAdapterConfig,
419 delegate: &DapAdapterDelegate,
420 cx: &mut AsyncApp,
421 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
422 let adapter = build_adapter(&config.kind).await?;
423
424 let binary = cx.update(|cx| {
425 ProjectSettings::get_global(cx)
426 .dap
427 .get(&adapter.name())
428 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
429 })?;
430
431 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
432 Err(error) => {
433 delegate.update_status(
434 adapter.name(),
435 DapStatus::Failed {
436 error: error.to_string(),
437 },
438 );
439
440 return Err(error);
441 }
442 Ok(mut binary) => {
443 delegate.update_status(adapter.name(), DapStatus::None);
444
445 let shell_env = delegate.shell_env().await;
446 let mut envs = binary.envs.unwrap_or_default();
447 envs.extend(shell_env);
448 binary.envs = Some(envs);
449
450 binary
451 }
452 };
453
454 Ok((adapter, binary))
455 }
456
457 pub fn initialize_sequence(
458 &self,
459 capabilities: &Capabilities,
460 initialized_rx: oneshot::Receiver<()>,
461 cx: &App,
462 ) -> Task<Result<()>> {
463 let mut raw = self.adapter.request_args(&self.config);
464 merge_json_value_into(
465 self.config.initialize_args.clone().unwrap_or(json!({})),
466 &mut raw,
467 );
468
469 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
470 let launch = match &self.config.request {
471 dap::DebugRequestType::Launch => {
472 self.request(Launch { raw }, cx.background_executor().clone())
473 }
474 dap::DebugRequestType::Attach(_) => {
475 self.request(Attach { raw }, cx.background_executor().clone())
476 }
477 };
478
479 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
480
481 let configuration_sequence = cx.spawn({
482 let this = self.clone();
483 async move |cx| {
484 initialized_rx.await?;
485 // todo(debugger) figure out if we want to handle a breakpoint response error
486 // This will probably consist of letting a user know that breakpoints failed to be set
487 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
488
489 if configuration_done_supported {
490 this.request(ConfigurationDone, cx.background_executor().clone())
491 } else {
492 Task::ready(Ok(()))
493 }
494 .await
495 }
496 });
497
498 cx.background_spawn(async move {
499 futures::future::try_join(launch, configuration_sequence).await?;
500 Ok(())
501 })
502 }
503
504 fn request<R: LocalDapCommand>(
505 &self,
506 request: R,
507 executor: BackgroundExecutor,
508 ) -> Task<Result<R::Response>>
509 where
510 <R::DapRequest as dap::requests::Request>::Response: 'static,
511 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
512 {
513 let request = Arc::new(request);
514
515 let request_clone = request.clone();
516 let connection = self.client.clone();
517 let request_task = executor.spawn(async move {
518 let args = request_clone.to_dap();
519 connection.request::<R::DapRequest>(args).await
520 });
521
522 executor.spawn(async move {
523 let response = request.response_from_dap(request_task.await?);
524 response
525 })
526 }
527}
528impl From<RemoteConnection> for Mode {
529 fn from(value: RemoteConnection) -> Self {
530 Self::Remote(value)
531 }
532}
533
534impl Mode {
535 fn request_dap<R: DapCommand>(
536 &self,
537 session_id: SessionId,
538 request: R,
539 cx: &mut Context<Session>,
540 ) -> Task<Result<R::Response>>
541 where
542 <R::DapRequest as dap::requests::Request>::Response: 'static,
543 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
544 {
545 match self {
546 Mode::Local(debug_adapter_client) => {
547 debug_adapter_client.request(request, cx.background_executor().clone())
548 }
549 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
550 }
551 }
552}
553
554#[derive(Default)]
555struct ThreadStates {
556 global_state: Option<ThreadStatus>,
557 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
558}
559
560impl ThreadStates {
561 fn stop_all_threads(&mut self) {
562 self.global_state = Some(ThreadStatus::Stopped);
563 self.known_thread_states.clear();
564 }
565
566 fn exit_all_threads(&mut self) {
567 self.global_state = Some(ThreadStatus::Exited);
568 self.known_thread_states.clear();
569 }
570
571 fn continue_all_threads(&mut self) {
572 self.global_state = Some(ThreadStatus::Running);
573 self.known_thread_states.clear();
574 }
575
576 fn stop_thread(&mut self, thread_id: ThreadId) {
577 self.known_thread_states
578 .insert(thread_id, ThreadStatus::Stopped);
579 }
580
581 fn continue_thread(&mut self, thread_id: ThreadId) {
582 self.known_thread_states
583 .insert(thread_id, ThreadStatus::Running);
584 }
585
586 fn process_step(&mut self, thread_id: ThreadId) {
587 self.known_thread_states
588 .insert(thread_id, ThreadStatus::Stepping);
589 }
590
591 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
592 self.thread_state(thread_id)
593 .unwrap_or(ThreadStatus::Running)
594 }
595
596 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
597 self.known_thread_states
598 .get(&thread_id)
599 .copied()
600 .or(self.global_state)
601 }
602
603 fn exit_thread(&mut self, thread_id: ThreadId) {
604 self.known_thread_states
605 .insert(thread_id, ThreadStatus::Exited);
606 }
607
608 fn any_stopped_thread(&self) -> bool {
609 self.global_state
610 .is_some_and(|state| state == ThreadStatus::Stopped)
611 || self
612 .known_thread_states
613 .values()
614 .any(|status| *status == ThreadStatus::Stopped)
615 }
616}
617const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
618
619#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
620pub struct OutputToken(pub usize);
621/// Represents a current state of a single debug adapter and provides ways to mutate it.
622pub struct Session {
623 mode: Mode,
624 pub(super) capabilities: Capabilities,
625 id: SessionId,
626 child_session_ids: HashSet<SessionId>,
627 parent_id: Option<SessionId>,
628 ignore_breakpoints: bool,
629 modules: Vec<dap::Module>,
630 loaded_sources: Vec<dap::Source>,
631 output_token: OutputToken,
632 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
633 threads: IndexMap<ThreadId, Thread>,
634 thread_states: ThreadStates,
635 variables: HashMap<VariableReference, Vec<dap::Variable>>,
636 stack_frames: IndexMap<StackFrameId, StackFrame>,
637 locations: HashMap<u64, dap::LocationsResponse>,
638 is_session_terminated: bool,
639 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
640 _background_tasks: Vec<Task<()>>,
641}
642
643trait CacheableCommand: 'static + Send + Sync {
644 fn as_any(&self) -> &dyn Any;
645 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
646 fn dyn_hash(&self, hasher: &mut dyn Hasher);
647 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
648}
649
650impl<T> CacheableCommand for T
651where
652 T: DapCommand + PartialEq + Eq + Hash,
653{
654 fn as_any(&self) -> &dyn Any {
655 self
656 }
657
658 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
659 rhs.as_any()
660 .downcast_ref::<Self>()
661 .map_or(false, |rhs| self == rhs)
662 }
663
664 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
665 T::hash(self, &mut hasher);
666 }
667
668 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
669 self
670 }
671}
672
673pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
674
675impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
676 fn from(request: T) -> Self {
677 Self(Arc::new(request))
678 }
679}
680
681impl PartialEq for RequestSlot {
682 fn eq(&self, other: &Self) -> bool {
683 self.0.dyn_eq(other.0.as_ref())
684 }
685}
686
687impl Eq for RequestSlot {}
688
689impl Hash for RequestSlot {
690 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
691 self.0.dyn_hash(state);
692 self.0.as_any().type_id().hash(state)
693 }
694}
695
696#[derive(Debug, Clone, Hash, PartialEq, Eq)]
697pub struct CompletionsQuery {
698 pub query: String,
699 pub column: u64,
700 pub line: Option<u64>,
701 pub frame_id: Option<u64>,
702}
703
704impl CompletionsQuery {
705 pub fn new(
706 buffer: &language::Buffer,
707 cursor_position: language::Anchor,
708 frame_id: Option<u64>,
709 ) -> Self {
710 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
711 Self {
712 query: buffer.text(),
713 column: column as u64,
714 frame_id,
715 line: Some(row as u64),
716 }
717 }
718}
719
720pub enum SessionEvent {
721 Modules,
722 LoadedSources,
723 Stopped(Option<ThreadId>),
724 StackTrace,
725 Variables,
726 Threads,
727}
728
729impl EventEmitter<SessionEvent> for Session {}
730
731// local session will send breakpoint updates to DAP for all new breakpoints
732// remote side will only send breakpoint updates when it is a breakpoint created by that peer
733// BreakpointStore notifies session on breakpoint changes
734impl Session {
735 pub(crate) fn local(
736 breakpoint_store: Entity<BreakpointStore>,
737 session_id: SessionId,
738 parent_session: Option<Entity<Session>>,
739 delegate: DapAdapterDelegate,
740 config: DebugAdapterConfig,
741 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
742 initialized_tx: oneshot::Sender<()>,
743 cx: &mut App,
744 ) -> Task<Result<Entity<Self>>> {
745 let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
746
747 cx.spawn(async move |cx| {
748 let (mode, capabilities) = LocalMode::new(
749 session_id,
750 parent_session.clone(),
751 breakpoint_store.clone(),
752 config.clone(),
753 delegate,
754 message_tx,
755 cx.clone(),
756 )
757 .await?;
758
759 cx.new(|cx| {
760 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Self>, cx| {
761 let mut initialized_tx = Some(initialized_tx);
762 while let Some(message) = message_rx.next().await {
763 if let Message::Event(event) = message {
764 if let Events::Initialized(_) = *event {
765 if let Some(tx) = initialized_tx.take() {
766 tx.send(()).ok();
767 }
768 } else {
769 let Ok(_) = this.update(cx, |session, cx| {
770 session.handle_dap_event(event, cx);
771 }) else {
772 break;
773 };
774 }
775 } else {
776 let Ok(_) =
777 start_debugging_requests_tx.unbounded_send((session_id, message))
778 else {
779 break;
780 };
781 }
782 }
783 })];
784
785 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
786 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
787 if let Some(local) = (!this.ignore_breakpoints)
788 .then(|| this.as_local_mut())
789 .flatten()
790 {
791 local
792 .send_breakpoints_from_path(path.clone(), *reason, cx)
793 .detach();
794 };
795 }
796 BreakpointStoreEvent::BreakpointsCleared(paths) => {
797 if let Some(local) = (!this.ignore_breakpoints)
798 .then(|| this.as_local_mut())
799 .flatten()
800 {
801 local.unset_breakpoints_from_paths(paths, cx).detach();
802 }
803 }
804 BreakpointStoreEvent::ActiveDebugLineChanged => {}
805 })
806 .detach();
807
808 Self {
809 mode: Mode::Local(mode),
810 id: session_id,
811 child_session_ids: HashSet::default(),
812 parent_id: parent_session.map(|session| session.read(cx).id),
813 variables: Default::default(),
814 capabilities,
815 thread_states: ThreadStates::default(),
816 output_token: OutputToken(0),
817 ignore_breakpoints: false,
818 output: circular_buffer::CircularBuffer::boxed(),
819 requests: HashMap::default(),
820 modules: Vec::default(),
821 loaded_sources: Vec::default(),
822 threads: IndexMap::default(),
823 stack_frames: IndexMap::default(),
824 locations: Default::default(),
825 _background_tasks,
826 is_session_terminated: false,
827 }
828 })
829 })
830 }
831
832 pub(crate) fn remote(
833 session_id: SessionId,
834 client: AnyProtoClient,
835 upstream_project_id: u64,
836 ignore_breakpoints: bool,
837 ) -> Self {
838 Self {
839 mode: Mode::Remote(RemoteConnection {
840 _client: client,
841 _upstream_project_id: upstream_project_id,
842 }),
843 id: session_id,
844 child_session_ids: HashSet::default(),
845 parent_id: None,
846 capabilities: Capabilities::default(),
847 ignore_breakpoints,
848 variables: Default::default(),
849 stack_frames: Default::default(),
850 thread_states: ThreadStates::default(),
851 output_token: OutputToken(0),
852 output: circular_buffer::CircularBuffer::boxed(),
853 requests: HashMap::default(),
854 modules: Vec::default(),
855 loaded_sources: Vec::default(),
856 threads: IndexMap::default(),
857 _background_tasks: Vec::default(),
858 locations: Default::default(),
859 is_session_terminated: false,
860 }
861 }
862
863 pub fn session_id(&self) -> SessionId {
864 self.id
865 }
866
867 pub fn child_session_ids(&self) -> HashSet<SessionId> {
868 self.child_session_ids.clone()
869 }
870
871 pub fn add_child_session_id(&mut self, session_id: SessionId) {
872 self.child_session_ids.insert(session_id);
873 }
874
875 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
876 self.child_session_ids.remove(&session_id);
877 }
878
879 pub fn parent_id(&self) -> Option<SessionId> {
880 self.parent_id
881 }
882
883 pub fn capabilities(&self) -> &Capabilities {
884 &self.capabilities
885 }
886
887 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
888 if let Mode::Local(local_mode) = &self.mode {
889 Some(local_mode.config.clone())
890 } else {
891 None
892 }
893 }
894
895 pub fn is_terminated(&self) -> bool {
896 self.is_session_terminated
897 }
898
899 pub fn is_local(&self) -> bool {
900 matches!(self.mode, Mode::Local(_))
901 }
902
903 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
904 match &mut self.mode {
905 Mode::Local(local_mode) => Some(local_mode),
906 Mode::Remote(_) => None,
907 }
908 }
909
910 pub fn as_local(&self) -> Option<&LocalMode> {
911 match &self.mode {
912 Mode::Local(local_mode) => Some(local_mode),
913 Mode::Remote(_) => None,
914 }
915 }
916
917 pub(super) fn initialize_sequence(
918 &mut self,
919 initialize_rx: oneshot::Receiver<()>,
920 cx: &mut Context<Self>,
921 ) -> Task<Result<()>> {
922 match &self.mode {
923 Mode::Local(local_mode) => {
924 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
925 }
926 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
927 }
928 }
929
930 pub fn output(
931 &self,
932 since: OutputToken,
933 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
934 if self.output_token.0 == 0 {
935 return (self.output.range(0..0), OutputToken(0));
936 };
937
938 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
939
940 let clamped_events_since = events_since.clamp(0, self.output.len());
941 (
942 self.output
943 .range(self.output.len() - clamped_events_since..),
944 self.output_token,
945 )
946 }
947
948 pub fn respond_to_client(
949 &self,
950 request_seq: u64,
951 success: bool,
952 command: String,
953 body: Option<serde_json::Value>,
954 cx: &mut Context<Self>,
955 ) -> Task<Result<()>> {
956 let Some(local_session) = self.as_local().cloned() else {
957 unreachable!("Cannot respond to remote client");
958 };
959
960 cx.background_spawn(async move {
961 local_session
962 .client
963 .send_message(Message::Response(Response {
964 body,
965 success,
966 command,
967 seq: request_seq + 1,
968 request_seq,
969 message: None,
970 }))
971 .await
972 })
973 }
974
975 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
976 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
977 self.thread_states.stop_all_threads();
978
979 self.invalidate_command_type::<StackTraceCommand>();
980 }
981
982 // Event if we stopped all threads we still need to insert the thread_id
983 // to our own data
984 if let Some(thread_id) = event.thread_id {
985 self.thread_states.stop_thread(ThreadId(thread_id));
986
987 self.invalidate_state(
988 &StackTraceCommand {
989 thread_id,
990 start_frame: None,
991 levels: None,
992 }
993 .into(),
994 );
995 }
996
997 self.invalidate_generic();
998 self.threads.clear();
999 self.variables.clear();
1000 cx.emit(SessionEvent::Stopped(
1001 event
1002 .thread_id
1003 .map(Into::into)
1004 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1005 ));
1006 cx.notify();
1007 }
1008
1009 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1010 match *event {
1011 Events::Initialized(_) => {
1012 debug_assert!(
1013 false,
1014 "Initialized event should have been handled in LocalMode"
1015 );
1016 }
1017 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1018 Events::Continued(event) => {
1019 if event.all_threads_continued.unwrap_or_default() {
1020 self.thread_states.continue_all_threads();
1021 } else {
1022 self.thread_states
1023 .continue_thread(ThreadId(event.thread_id));
1024 }
1025 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1026 self.invalidate_generic();
1027 }
1028 Events::Exited(_event) => {
1029 self.clear_active_debug_line(cx);
1030 }
1031 Events::Terminated(_) => {
1032 self.is_session_terminated = true;
1033 self.clear_active_debug_line(cx);
1034 }
1035 Events::Thread(event) => {
1036 let thread_id = ThreadId(event.thread_id);
1037
1038 match event.reason {
1039 dap::ThreadEventReason::Started => {
1040 self.thread_states.continue_thread(thread_id);
1041 }
1042 dap::ThreadEventReason::Exited => {
1043 self.thread_states.exit_thread(thread_id);
1044 }
1045 reason => {
1046 log::error!("Unhandled thread event reason {:?}", reason);
1047 }
1048 }
1049 self.invalidate_state(&ThreadsCommand.into());
1050 cx.notify();
1051 }
1052 Events::Output(event) => {
1053 if event
1054 .category
1055 .as_ref()
1056 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1057 {
1058 return;
1059 }
1060
1061 self.output.push_back(event);
1062 self.output_token.0 += 1;
1063 cx.notify();
1064 }
1065 Events::Breakpoint(_) => {}
1066 Events::Module(event) => {
1067 match event.reason {
1068 dap::ModuleEventReason::New => {
1069 self.modules.push(event.module);
1070 }
1071 dap::ModuleEventReason::Changed => {
1072 if let Some(module) = self
1073 .modules
1074 .iter_mut()
1075 .find(|other| event.module.id == other.id)
1076 {
1077 *module = event.module;
1078 }
1079 }
1080 dap::ModuleEventReason::Removed => {
1081 self.modules.retain(|other| event.module.id != other.id);
1082 }
1083 }
1084
1085 // todo(debugger): We should only send the invalidate command to downstream clients.
1086 // self.invalidate_state(&ModulesCommand.into());
1087 }
1088 Events::LoadedSource(_) => {
1089 self.invalidate_state(&LoadedSourcesCommand.into());
1090 }
1091 Events::Capabilities(event) => {
1092 self.capabilities = self.capabilities.merge(event.capabilities);
1093 cx.notify();
1094 }
1095 Events::Memory(_) => {}
1096 Events::Process(_) => {}
1097 Events::ProgressEnd(_) => {}
1098 Events::ProgressStart(_) => {}
1099 Events::ProgressUpdate(_) => {}
1100 Events::Invalidated(_) => {}
1101 Events::Other(_) => {}
1102 }
1103 }
1104
1105 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1106 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1107 &mut self,
1108 request: T,
1109 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1110 + 'static,
1111 cx: &mut Context<Self>,
1112 ) {
1113 const {
1114 assert!(
1115 T::CACHEABLE,
1116 "Only requests marked as cacheable should invoke `fetch`"
1117 );
1118 }
1119
1120 if !self.thread_states.any_stopped_thread()
1121 && request.type_id() != TypeId::of::<ThreadsCommand>()
1122 || self.is_session_terminated
1123 {
1124 return;
1125 }
1126
1127 let request_map = self
1128 .requests
1129 .entry(std::any::TypeId::of::<T>())
1130 .or_default();
1131
1132 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1133 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1134
1135 let task = Self::request_inner::<Arc<T>>(
1136 &self.capabilities,
1137 self.id,
1138 &self.mode,
1139 command,
1140 process_result,
1141 cx,
1142 );
1143 let task = cx
1144 .background_executor()
1145 .spawn(async move {
1146 let _ = task.await?;
1147 Some(())
1148 })
1149 .shared();
1150
1151 vacant.insert(task);
1152 cx.notify();
1153 }
1154 }
1155
1156 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1157 capabilities: &Capabilities,
1158 session_id: SessionId,
1159 mode: &Mode,
1160 request: T,
1161 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1162 + 'static,
1163 cx: &mut Context<Self>,
1164 ) -> Task<Option<T::Response>> {
1165 if !T::is_supported(&capabilities) {
1166 log::warn!(
1167 "Attempted to send a DAP request that isn't supported: {:?}",
1168 request
1169 );
1170 let error = Err(anyhow::Error::msg(
1171 "Couldn't complete request because it's not supported",
1172 ));
1173 return cx.spawn(async move |this, cx| {
1174 this.update(cx, |this, cx| process_result(this, error, cx))
1175 .log_err()
1176 .flatten()
1177 });
1178 }
1179
1180 let request = mode.request_dap(session_id, request, cx);
1181 cx.spawn(async move |this, cx| {
1182 let result = request.await;
1183 this.update(cx, |this, cx| process_result(this, result, cx))
1184 .log_err()
1185 .flatten()
1186 })
1187 }
1188
1189 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1190 &self,
1191 request: T,
1192 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1193 + 'static,
1194 cx: &mut Context<Self>,
1195 ) -> Task<Option<T::Response>> {
1196 Self::request_inner(
1197 &self.capabilities,
1198 self.id,
1199 &self.mode,
1200 request,
1201 process_result,
1202 cx,
1203 )
1204 }
1205
1206 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1207 self.requests.remove(&std::any::TypeId::of::<Command>());
1208 }
1209
1210 fn invalidate_generic(&mut self) {
1211 self.invalidate_command_type::<ModulesCommand>();
1212 self.invalidate_command_type::<LoadedSourcesCommand>();
1213 self.invalidate_command_type::<ThreadsCommand>();
1214 }
1215
1216 fn invalidate_state(&mut self, key: &RequestSlot) {
1217 self.requests
1218 .entry(key.0.as_any().type_id())
1219 .and_modify(|request_map| {
1220 request_map.remove(&key);
1221 });
1222 }
1223
1224 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1225 self.thread_states.thread_status(thread_id)
1226 }
1227
1228 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1229 self.fetch(
1230 dap_command::ThreadsCommand,
1231 |this, result, cx| {
1232 let result = result.log_err()?;
1233
1234 this.threads = result
1235 .iter()
1236 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1237 .collect();
1238
1239 this.invalidate_command_type::<StackTraceCommand>();
1240 cx.emit(SessionEvent::Threads);
1241 cx.notify();
1242
1243 Some(result)
1244 },
1245 cx,
1246 );
1247
1248 self.threads
1249 .values()
1250 .map(|thread| {
1251 (
1252 thread.dap.clone(),
1253 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1254 )
1255 })
1256 .collect()
1257 }
1258
1259 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1260 self.fetch(
1261 dap_command::ModulesCommand,
1262 |this, result, cx| {
1263 let result = result.log_err()?;
1264
1265 this.modules = result.iter().cloned().collect();
1266 cx.emit(SessionEvent::Modules);
1267 cx.notify();
1268
1269 Some(result)
1270 },
1271 cx,
1272 );
1273
1274 &self.modules
1275 }
1276
1277 pub fn ignore_breakpoints(&self) -> bool {
1278 self.ignore_breakpoints
1279 }
1280
1281 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1282 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1283 }
1284
1285 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1286 if self.ignore_breakpoints == ignore {
1287 return Task::ready(());
1288 }
1289
1290 self.ignore_breakpoints = ignore;
1291
1292 if let Some(local) = self.as_local() {
1293 local.send_all_breakpoints(ignore, cx)
1294 } else {
1295 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1296 unimplemented!()
1297 }
1298 }
1299
1300 pub fn breakpoints_enabled(&self) -> bool {
1301 self.ignore_breakpoints
1302 }
1303
1304 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1305 self.fetch(
1306 dap_command::LoadedSourcesCommand,
1307 |this, result, cx| {
1308 let result = result.log_err()?;
1309 this.loaded_sources = result.iter().cloned().collect();
1310 cx.emit(SessionEvent::LoadedSources);
1311 cx.notify();
1312 Some(result)
1313 },
1314 cx,
1315 );
1316
1317 &self.loaded_sources
1318 }
1319
1320 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1321 res.log_err()?;
1322 Some(())
1323 }
1324
1325 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1326 thread_id: ThreadId,
1327 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1328 {
1329 move |this, response, cx| match response.log_err() {
1330 Some(response) => Some(response),
1331 None => {
1332 this.thread_states.stop_thread(thread_id);
1333 cx.notify();
1334 None
1335 }
1336 }
1337 }
1338
1339 fn clear_active_debug_line_response(
1340 &mut self,
1341 response: Result<()>,
1342 cx: &mut Context<'_, Session>,
1343 ) -> Option<()> {
1344 response.log_err()?;
1345 self.clear_active_debug_line(cx);
1346 Some(())
1347 }
1348
1349 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1350 self.as_local()
1351 .expect("Message handler will only run in local mode")
1352 .breakpoint_store
1353 .update(cx, |store, cx| {
1354 store.remove_active_position(Some(self.id), cx)
1355 });
1356 }
1357
1358 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1359 self.request(
1360 PauseCommand {
1361 thread_id: thread_id.0,
1362 },
1363 Self::empty_response,
1364 cx,
1365 )
1366 .detach();
1367 }
1368
1369 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1370 self.request(
1371 RestartStackFrameCommand { stack_frame_id },
1372 Self::empty_response,
1373 cx,
1374 )
1375 .detach();
1376 }
1377
1378 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1379 if self.capabilities.supports_restart_request.unwrap_or(false) {
1380 self.request(
1381 RestartCommand {
1382 raw: args.unwrap_or(Value::Null),
1383 },
1384 Self::empty_response,
1385 cx,
1386 )
1387 .detach();
1388 } else {
1389 self.request(
1390 DisconnectCommand {
1391 restart: Some(false),
1392 terminate_debuggee: Some(true),
1393 suspend_debuggee: Some(false),
1394 },
1395 Self::empty_response,
1396 cx,
1397 )
1398 .detach();
1399 }
1400 }
1401
1402 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1403 self.is_session_terminated = true;
1404 self.thread_states.exit_all_threads();
1405 cx.notify();
1406
1407 let task = if self
1408 .capabilities
1409 .supports_terminate_request
1410 .unwrap_or_default()
1411 {
1412 self.request(
1413 TerminateCommand {
1414 restart: Some(false),
1415 },
1416 Self::clear_active_debug_line_response,
1417 cx,
1418 )
1419 } else {
1420 self.request(
1421 DisconnectCommand {
1422 restart: Some(false),
1423 terminate_debuggee: Some(true),
1424 suspend_debuggee: Some(false),
1425 },
1426 Self::clear_active_debug_line_response,
1427 cx,
1428 )
1429 };
1430
1431 cx.background_spawn(async move {
1432 let _ = task.await;
1433 })
1434 }
1435
1436 pub fn completions(
1437 &mut self,
1438 query: CompletionsQuery,
1439 cx: &mut Context<Self>,
1440 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1441 let task = self.request(query, |_, result, _| result.log_err(), cx);
1442
1443 cx.background_executor().spawn(async move {
1444 anyhow::Ok(
1445 task.await
1446 .map(|response| response.targets)
1447 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1448 )
1449 })
1450 }
1451
1452 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1453 self.thread_states.continue_thread(thread_id);
1454 self.request(
1455 ContinueCommand {
1456 args: ContinueArguments {
1457 thread_id: thread_id.0,
1458 single_thread: Some(true),
1459 },
1460 },
1461 Self::on_step_response::<ContinueCommand>(thread_id),
1462 cx,
1463 )
1464 .detach();
1465 }
1466
1467 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1468 match self.mode {
1469 Mode::Local(ref local) => Some(local.client.clone()),
1470 Mode::Remote(_) => None,
1471 }
1472 }
1473
1474 pub fn step_over(
1475 &mut self,
1476 thread_id: ThreadId,
1477 granularity: SteppingGranularity,
1478 cx: &mut Context<Self>,
1479 ) {
1480 let supports_single_thread_execution_requests =
1481 self.capabilities.supports_single_thread_execution_requests;
1482 let supports_stepping_granularity = self
1483 .capabilities
1484 .supports_stepping_granularity
1485 .unwrap_or_default();
1486
1487 let command = NextCommand {
1488 inner: StepCommand {
1489 thread_id: thread_id.0,
1490 granularity: supports_stepping_granularity.then(|| granularity),
1491 single_thread: supports_single_thread_execution_requests,
1492 },
1493 };
1494
1495 self.thread_states.process_step(thread_id);
1496 self.request(
1497 command,
1498 Self::on_step_response::<NextCommand>(thread_id),
1499 cx,
1500 )
1501 .detach();
1502 }
1503
1504 pub fn step_in(
1505 &mut self,
1506 thread_id: ThreadId,
1507 granularity: SteppingGranularity,
1508 cx: &mut Context<Self>,
1509 ) {
1510 let supports_single_thread_execution_requests =
1511 self.capabilities.supports_single_thread_execution_requests;
1512 let supports_stepping_granularity = self
1513 .capabilities
1514 .supports_stepping_granularity
1515 .unwrap_or_default();
1516
1517 let command = StepInCommand {
1518 inner: StepCommand {
1519 thread_id: thread_id.0,
1520 granularity: supports_stepping_granularity.then(|| granularity),
1521 single_thread: supports_single_thread_execution_requests,
1522 },
1523 };
1524
1525 self.thread_states.process_step(thread_id);
1526 self.request(
1527 command,
1528 Self::on_step_response::<StepInCommand>(thread_id),
1529 cx,
1530 )
1531 .detach();
1532 }
1533
1534 pub fn step_out(
1535 &mut self,
1536 thread_id: ThreadId,
1537 granularity: SteppingGranularity,
1538 cx: &mut Context<Self>,
1539 ) {
1540 let supports_single_thread_execution_requests =
1541 self.capabilities.supports_single_thread_execution_requests;
1542 let supports_stepping_granularity = self
1543 .capabilities
1544 .supports_stepping_granularity
1545 .unwrap_or_default();
1546
1547 let command = StepOutCommand {
1548 inner: StepCommand {
1549 thread_id: thread_id.0,
1550 granularity: supports_stepping_granularity.then(|| granularity),
1551 single_thread: supports_single_thread_execution_requests,
1552 },
1553 };
1554
1555 self.thread_states.process_step(thread_id);
1556 self.request(
1557 command,
1558 Self::on_step_response::<StepOutCommand>(thread_id),
1559 cx,
1560 )
1561 .detach();
1562 }
1563
1564 pub fn step_back(
1565 &mut self,
1566 thread_id: ThreadId,
1567 granularity: SteppingGranularity,
1568 cx: &mut Context<Self>,
1569 ) {
1570 let supports_single_thread_execution_requests =
1571 self.capabilities.supports_single_thread_execution_requests;
1572 let supports_stepping_granularity = self
1573 .capabilities
1574 .supports_stepping_granularity
1575 .unwrap_or_default();
1576
1577 let command = StepBackCommand {
1578 inner: StepCommand {
1579 thread_id: thread_id.0,
1580 granularity: supports_stepping_granularity.then(|| granularity),
1581 single_thread: supports_single_thread_execution_requests,
1582 },
1583 };
1584
1585 self.thread_states.process_step(thread_id);
1586
1587 self.request(
1588 command,
1589 Self::on_step_response::<StepBackCommand>(thread_id),
1590 cx,
1591 )
1592 .detach();
1593 }
1594
1595 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1596 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1597 && self.requests.contains_key(&ThreadsCommand.type_id())
1598 && self.threads.contains_key(&thread_id)
1599 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1600 // We could still be using an old thread id and have sent a new thread's request
1601 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1602 // But it very well could cause a minor bug in the future that is hard to track down
1603 {
1604 self.fetch(
1605 super::dap_command::StackTraceCommand {
1606 thread_id: thread_id.0,
1607 start_frame: None,
1608 levels: None,
1609 },
1610 move |this, stack_frames, cx| {
1611 let stack_frames = stack_frames.log_err()?;
1612
1613 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1614 thread.stack_frame_ids =
1615 stack_frames.iter().map(|frame| frame.id).collect();
1616 });
1617 debug_assert!(
1618 matches!(entry, indexmap::map::Entry::Occupied(_)),
1619 "Sent request for thread_id that doesn't exist"
1620 );
1621
1622 this.stack_frames.extend(
1623 stack_frames
1624 .iter()
1625 .cloned()
1626 .map(|frame| (frame.id, StackFrame::from(frame))),
1627 );
1628
1629 this.invalidate_command_type::<ScopesCommand>();
1630 this.invalidate_command_type::<VariablesCommand>();
1631
1632 cx.emit(SessionEvent::StackTrace);
1633 cx.notify();
1634 Some(stack_frames)
1635 },
1636 cx,
1637 );
1638 }
1639
1640 self.threads
1641 .get(&thread_id)
1642 .map(|thread| {
1643 thread
1644 .stack_frame_ids
1645 .iter()
1646 .filter_map(|id| self.stack_frames.get(id))
1647 .cloned()
1648 .collect()
1649 })
1650 .unwrap_or_default()
1651 }
1652
1653 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1654 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1655 && self
1656 .requests
1657 .contains_key(&TypeId::of::<StackTraceCommand>())
1658 {
1659 self.fetch(
1660 ScopesCommand { stack_frame_id },
1661 move |this, scopes, cx| {
1662 let scopes = scopes.log_err()?;
1663
1664 for scope in scopes .iter(){
1665 this.variables(scope.variables_reference, cx);
1666 }
1667
1668 let entry = this
1669 .stack_frames
1670 .entry(stack_frame_id)
1671 .and_modify(|stack_frame| {
1672 stack_frame.scopes = scopes.clone();
1673 });
1674
1675 cx.emit(SessionEvent::Variables);
1676
1677 debug_assert!(
1678 matches!(entry, indexmap::map::Entry::Occupied(_)),
1679 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1680 );
1681
1682 Some(scopes)
1683 },
1684 cx,
1685 );
1686 }
1687
1688 self.stack_frames
1689 .get(&stack_frame_id)
1690 .map(|frame| frame.scopes.as_slice())
1691 .unwrap_or_default()
1692 }
1693
1694 pub fn variables(
1695 &mut self,
1696 variables_reference: VariableReference,
1697 cx: &mut Context<Self>,
1698 ) -> Vec<dap::Variable> {
1699 let command = VariablesCommand {
1700 variables_reference,
1701 filter: None,
1702 start: None,
1703 count: None,
1704 format: None,
1705 };
1706
1707 self.fetch(
1708 command,
1709 move |this, variables, cx| {
1710 let variables = variables.log_err()?;
1711 this.variables
1712 .insert(variables_reference, variables.clone());
1713
1714 cx.emit(SessionEvent::Variables);
1715 Some(variables)
1716 },
1717 cx,
1718 );
1719
1720 self.variables
1721 .get(&variables_reference)
1722 .cloned()
1723 .unwrap_or_default()
1724 }
1725
1726 pub fn set_variable_value(
1727 &mut self,
1728 variables_reference: u64,
1729 name: String,
1730 value: String,
1731 cx: &mut Context<Self>,
1732 ) {
1733 if self.capabilities.supports_set_variable.unwrap_or_default() {
1734 self.request(
1735 SetVariableValueCommand {
1736 name,
1737 value,
1738 variables_reference,
1739 },
1740 move |this, response, cx| {
1741 let response = response.log_err()?;
1742 this.invalidate_command_type::<VariablesCommand>();
1743 cx.notify();
1744 Some(response)
1745 },
1746 cx,
1747 )
1748 .detach()
1749 }
1750 }
1751
1752 pub fn evaluate(
1753 &mut self,
1754 expression: String,
1755 context: Option<EvaluateArgumentsContext>,
1756 frame_id: Option<u64>,
1757 source: Option<Source>,
1758 cx: &mut Context<Self>,
1759 ) {
1760 self.request(
1761 EvaluateCommand {
1762 expression,
1763 context,
1764 frame_id,
1765 source,
1766 },
1767 |this, response, cx| {
1768 let response = response.log_err()?;
1769 this.output_token.0 += 1;
1770 this.output.push_back(dap::OutputEvent {
1771 category: None,
1772 output: response.result.clone(),
1773 group: None,
1774 variables_reference: Some(response.variables_reference),
1775 source: None,
1776 line: None,
1777 column: None,
1778 data: None,
1779 location_reference: None,
1780 });
1781
1782 this.invalidate_command_type::<ScopesCommand>();
1783 cx.notify();
1784 Some(response)
1785 },
1786 cx,
1787 )
1788 .detach();
1789 }
1790
1791 pub fn location(
1792 &mut self,
1793 reference: u64,
1794 cx: &mut Context<Self>,
1795 ) -> Option<dap::LocationsResponse> {
1796 self.fetch(
1797 LocationsCommand { reference },
1798 move |this, response, _| {
1799 let response = response.log_err()?;
1800 this.locations.insert(reference, response.clone());
1801 Some(response)
1802 },
1803 cx,
1804 );
1805 self.locations.get(&reference).cloned()
1806 }
1807 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1808 let command = DisconnectCommand {
1809 restart: Some(false),
1810 terminate_debuggee: Some(true),
1811 suspend_debuggee: Some(false),
1812 };
1813
1814 self.request(command, Self::empty_response, cx).detach()
1815 }
1816
1817 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1818 if self
1819 .capabilities
1820 .supports_terminate_threads_request
1821 .unwrap_or_default()
1822 {
1823 self.request(
1824 TerminateThreadsCommand {
1825 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1826 },
1827 Self::clear_active_debug_line_response,
1828 cx,
1829 )
1830 .detach();
1831 } else {
1832 self.shutdown(cx).detach();
1833 }
1834 }
1835}