1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{anyhow, Result};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::OutputEventCategory;
18use dap::{
19 adapters::{DapDelegate, DapStatus},
20 client::{DebugAdapterClient, SessionId},
21 messages::{Events, Message},
22 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
23 SteppingGranularity, StoppedEvent, VariableReference,
24};
25use dap_adapters::build_adapter;
26use futures::channel::oneshot;
27use futures::{future::Shared, FutureExt};
28use gpui::{
29 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
30};
31use rpc::AnyProtoClient;
32use serde_json::{json, Value};
33use settings::Settings;
34use smol::stream::StreamExt;
35use std::any::TypeId;
36use std::path::PathBuf;
37use std::u64;
38use std::{
39 any::Any,
40 collections::hash_map::Entry,
41 hash::{Hash, Hasher},
42 path::Path,
43 sync::Arc,
44};
45use task::DebugAdapterConfig;
46use text::{PointUtf16, ToPointUtf16};
47use util::{merge_json_value_into, ResultExt};
48
49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
50#[repr(transparent)]
51pub struct ThreadId(pub u64);
52
53impl ThreadId {
54 pub const MIN: ThreadId = ThreadId(u64::MIN);
55 pub const MAX: ThreadId = ThreadId(u64::MAX);
56}
57
58impl From<u64> for ThreadId {
59 fn from(id: u64) -> Self {
60 Self(id)
61 }
62}
63
64#[derive(Clone, Debug)]
65pub struct StackFrame {
66 pub dap: dap::StackFrame,
67 pub scopes: Vec<dap::Scope>,
68}
69
70impl From<dap::StackFrame> for StackFrame {
71 fn from(stack_frame: dap::StackFrame) -> Self {
72 Self {
73 scopes: vec![],
74 dap: stack_frame,
75 }
76 }
77}
78
79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
80pub enum ThreadStatus {
81 #[default]
82 Running,
83 Stopped,
84 Stepping,
85 Exited,
86 Ended,
87}
88
89impl ThreadStatus {
90 pub fn label(&self) -> &'static str {
91 match self {
92 ThreadStatus::Running => "Running",
93 ThreadStatus::Stopped => "Stopped",
94 ThreadStatus::Stepping => "Stepping",
95 ThreadStatus::Exited => "Exited",
96 ThreadStatus::Ended => "Ended",
97 }
98 }
99}
100
101#[derive(Debug)]
102pub struct Thread {
103 dap: dap::Thread,
104 stack_frame_ids: IndexSet<StackFrameId>,
105 _has_stopped: bool,
106}
107
108impl From<dap::Thread> for Thread {
109 fn from(dap: dap::Thread) -> Self {
110 Self {
111 dap,
112 stack_frame_ids: Default::default(),
113 _has_stopped: false,
114 }
115 }
116}
117
118type UpstreamProjectId = u64;
119
120struct RemoteConnection {
121 _client: AnyProtoClient,
122 _upstream_project_id: UpstreamProjectId,
123}
124
125impl RemoteConnection {
126 fn send_proto_client_request<R: DapCommand>(
127 &self,
128 _request: R,
129 _session_id: SessionId,
130 cx: &mut App,
131 ) -> Task<Result<R::Response>> {
132 // let message = request.to_proto(session_id, self.upstream_project_id);
133 // let upstream_client = self.client.clone();
134 cx.background_executor().spawn(async move {
135 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
136 // let response = upstream_client.request(message).await?;
137 // request.response_from_proto(response)
138 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
139 })
140 }
141
142 fn request<R: DapCommand>(
143 &self,
144 request: R,
145 session_id: SessionId,
146 cx: &mut App,
147 ) -> Task<Result<R::Response>>
148 where
149 <R::DapRequest as dap::requests::Request>::Response: 'static,
150 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
151 {
152 return self.send_proto_client_request::<R>(request, session_id, cx);
153 }
154}
155
156enum Mode {
157 Local(LocalMode),
158 Remote(RemoteConnection),
159}
160
161#[derive(Clone)]
162pub struct LocalMode {
163 client: Arc<DebugAdapterClient>,
164 config: DebugAdapterConfig,
165 adapter: Arc<dyn DebugAdapter>,
166 breakpoint_store: Entity<BreakpointStore>,
167}
168
169fn client_source(abs_path: &Path) -> dap::Source {
170 dap::Source {
171 name: abs_path
172 .file_name()
173 .map(|filename| filename.to_string_lossy().to_string()),
174 path: Some(abs_path.to_string_lossy().to_string()),
175 source_reference: None,
176 presentation_hint: None,
177 origin: None,
178 sources: None,
179 adapter_data: None,
180 checksums: None,
181 }
182}
183
184impl LocalMode {
185 fn new(
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<(Self, Capabilities)>> {
194 cx.spawn(async move |cx| {
195 let (adapter, binary) = Self::get_adapter_binary(&config, &delegate, cx).await?;
196
197 let message_handler = Box::new(move |message| {
198 messages_tx.unbounded_send(message).ok();
199 });
200
201 let client = Arc::new(
202 if let Some(client) = parent_session
203 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
204 .flatten()
205 {
206 client
207 .reconnect(session_id, binary, message_handler, cx.clone())
208 .await?
209 } else {
210 DebugAdapterClient::start(
211 session_id,
212 adapter.name(),
213 binary,
214 message_handler,
215 cx.clone(),
216 )
217 .await?
218 },
219 );
220
221 let adapter_id = adapter.name().to_string().to_owned();
222 let session = Self {
223 client,
224 adapter,
225 breakpoint_store,
226 config: config.clone(),
227 };
228
229 #[cfg(any(test, feature = "test-support"))]
230 {
231 let dap::DebugAdapterKind::Fake((fail, caps)) = session.config.kind.clone() else {
232 panic!("Only fake debug adapter configs should be used in tests");
233 };
234
235 session
236 .client
237 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
238 .await;
239
240 match config.request.clone() {
241 dap::DebugRequestType::Launch if fail => {
242 session
243 .client
244 .on_request::<dap::requests::Launch, _>(move |_, _| {
245 Err(dap::ErrorResponse {
246 error: Some(dap::Message {
247 id: 1,
248 format: "error".into(),
249 variables: None,
250 send_telemetry: None,
251 show_user: None,
252 url: None,
253 url_label: None,
254 }),
255 })
256 })
257 .await;
258 }
259 dap::DebugRequestType::Launch => {
260 session
261 .client
262 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
263 .await;
264 }
265 dap::DebugRequestType::Attach(_) if fail => {
266 session
267 .client
268 .on_request::<dap::requests::Attach, _>(move |_, _| {
269 Err(dap::ErrorResponse {
270 error: Some(dap::Message {
271 id: 1,
272 format: "error".into(),
273 variables: None,
274 send_telemetry: None,
275 show_user: None,
276 url: None,
277 url_label: None,
278 }),
279 })
280 })
281 .await;
282 }
283 dap::DebugRequestType::Attach(attach_config) => {
284 session
285 .client
286 .on_request::<dap::requests::Attach, _>(move |_, args| {
287 assert_eq!(
288 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
289 args.raw
290 );
291
292 Ok(())
293 })
294 .await;
295 }
296 }
297
298 session.client.on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(())).await;
299 session.client.fake_event(Events::Initialized(None)).await;
300 }
301
302 let capabilities = session
303 .request(Initialize { adapter_id }, cx.background_executor().clone())
304 .await?;
305
306 Ok((session, capabilities))
307 })
308 }
309
310 fn send_breakpoints_from_path(
311 &self,
312 abs_path: Arc<Path>,
313 reason: BreakpointUpdatedReason,
314 cx: &mut App,
315 ) -> Task<()> {
316 let breakpoints = self
317 .breakpoint_store
318 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
319 .into_iter()
320 .map(Into::into)
321 .collect();
322
323 let task = self.request(
324 dap_command::SetBreakpoints {
325 source: client_source(&abs_path),
326 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
327 breakpoints,
328 },
329 cx.background_executor().clone(),
330 );
331
332 cx.background_spawn(async move {
333 match task.await {
334 Ok(_) => {}
335 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
336 }
337 })
338 }
339
340 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
341 let mut breakpoint_tasks = Vec::new();
342 let breakpoints = self
343 .breakpoint_store
344 .read_with(cx, |store, cx| store.all_breakpoints(cx));
345
346 for (path, breakpoints) in breakpoints {
347 let breakpoints = if ignore_breakpoints {
348 vec![]
349 } else {
350 breakpoints.into_iter().map(Into::into).collect()
351 };
352
353 breakpoint_tasks.push(self.request(
354 dap_command::SetBreakpoints {
355 source: client_source(&path),
356 source_modified: Some(false),
357 breakpoints,
358 },
359 cx.background_executor().clone(),
360 ));
361 }
362
363 cx.background_spawn(async move {
364 futures::future::join_all(breakpoint_tasks)
365 .await
366 .iter()
367 .for_each(|res| match res {
368 Ok(_) => {}
369 Err(err) => {
370 log::warn!("Set breakpoints request failed: {}", err);
371 }
372 });
373 })
374 }
375
376 async fn get_adapter_binary(
377 config: &DebugAdapterConfig,
378 delegate: &DapAdapterDelegate,
379 cx: &mut AsyncApp,
380 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
381 let adapter = build_adapter(&config.kind).await?;
382
383 let binary = cx.update(|cx| {
384 ProjectSettings::get_global(cx)
385 .dap
386 .get(&adapter.name())
387 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
388 })?;
389
390 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
391 Err(error) => {
392 delegate.update_status(
393 adapter.name(),
394 DapStatus::Failed {
395 error: error.to_string(),
396 },
397 );
398
399 return Err(error);
400 }
401 Ok(mut binary) => {
402 delegate.update_status(adapter.name(), DapStatus::None);
403
404 let shell_env = delegate.shell_env().await;
405 let mut envs = binary.envs.unwrap_or_default();
406 envs.extend(shell_env);
407 binary.envs = Some(envs);
408
409 binary
410 }
411 };
412
413 Ok((adapter, binary))
414 }
415
416 pub fn initialize_sequence(
417 &self,
418 capabilities: &Capabilities,
419 initialized_rx: oneshot::Receiver<()>,
420 cx: &App,
421 ) -> Task<Result<()>> {
422 let mut raw = self.adapter.request_args(&self.config);
423 merge_json_value_into(
424 self.config.initialize_args.clone().unwrap_or(json!({})),
425 &mut raw,
426 );
427
428 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
429 let launch = match &self.config.request {
430 dap::DebugRequestType::Launch => {
431 self.request(Launch { raw }, cx.background_executor().clone())
432 }
433 dap::DebugRequestType::Attach(_) => {
434 self.request(Attach { raw }, cx.background_executor().clone())
435 }
436 };
437
438 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
439
440 let configuration_sequence = cx.spawn({
441 let this = self.clone();
442 async move |cx| {
443 initialized_rx.await?;
444 // todo(debugger) figure out if we want to handle a breakpoint response error
445 // This will probably consist of letting a user know that breakpoints failed to be set
446 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
447
448 if configuration_done_supported {
449 this.request(ConfigurationDone, cx.background_executor().clone())
450 } else {
451 Task::ready(Ok(()))
452 }
453 .await
454 }
455 });
456
457 cx.background_spawn(async move {
458 futures::future::try_join(launch, configuration_sequence).await?;
459 Ok(())
460 })
461 }
462
463 fn request<R: LocalDapCommand>(
464 &self,
465 request: R,
466 executor: BackgroundExecutor,
467 ) -> Task<Result<R::Response>>
468 where
469 <R::DapRequest as dap::requests::Request>::Response: 'static,
470 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
471 {
472 let request = Arc::new(request);
473
474 let request_clone = request.clone();
475 let connection = self.client.clone();
476 let request_task = executor.spawn(async move {
477 let args = request_clone.to_dap();
478 connection.request::<R::DapRequest>(args).await
479 });
480
481 executor.spawn(async move {
482 let response = request.response_from_dap(request_task.await?);
483 response
484 })
485 }
486}
487impl From<RemoteConnection> for Mode {
488 fn from(value: RemoteConnection) -> Self {
489 Self::Remote(value)
490 }
491}
492
493impl Mode {
494 fn request_dap<R: DapCommand>(
495 &self,
496 session_id: SessionId,
497 request: R,
498 cx: &mut Context<Session>,
499 ) -> Task<Result<R::Response>>
500 where
501 <R::DapRequest as dap::requests::Request>::Response: 'static,
502 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
503 {
504 match self {
505 Mode::Local(debug_adapter_client) => {
506 debug_adapter_client.request(request, cx.background_executor().clone())
507 }
508 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
509 }
510 }
511}
512
513#[derive(Default)]
514struct ThreadStates {
515 global_state: Option<ThreadStatus>,
516 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
517}
518
519impl ThreadStates {
520 fn stop_all_threads(&mut self) {
521 self.global_state = Some(ThreadStatus::Stopped);
522 self.known_thread_states.clear();
523 }
524
525 fn exit_all_threads(&mut self) {
526 self.global_state = Some(ThreadStatus::Exited);
527 self.known_thread_states.clear();
528 }
529
530 fn continue_all_threads(&mut self) {
531 self.global_state = Some(ThreadStatus::Running);
532 self.known_thread_states.clear();
533 }
534
535 fn stop_thread(&mut self, thread_id: ThreadId) {
536 self.known_thread_states
537 .insert(thread_id, ThreadStatus::Stopped);
538 }
539
540 fn continue_thread(&mut self, thread_id: ThreadId) {
541 self.known_thread_states
542 .insert(thread_id, ThreadStatus::Running);
543 }
544
545 fn process_step(&mut self, thread_id: ThreadId) {
546 self.known_thread_states
547 .insert(thread_id, ThreadStatus::Stepping);
548 }
549
550 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
551 self.thread_state(thread_id)
552 .unwrap_or(ThreadStatus::Running)
553 }
554
555 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
556 self.known_thread_states
557 .get(&thread_id)
558 .copied()
559 .or(self.global_state)
560 }
561
562 fn exit_thread(&mut self, thread_id: ThreadId) {
563 self.known_thread_states
564 .insert(thread_id, ThreadStatus::Exited);
565 }
566
567 fn any_stopped_thread(&self) -> bool {
568 self.global_state
569 .is_some_and(|state| state == ThreadStatus::Stopped)
570 || self
571 .known_thread_states
572 .values()
573 .any(|status| *status == ThreadStatus::Stopped)
574 }
575}
576const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
577
578#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
579pub struct OutputToken(pub usize);
580/// Represents a current state of a single debug adapter and provides ways to mutate it.
581pub struct Session {
582 mode: Mode,
583 pub(super) capabilities: Capabilities,
584 id: SessionId,
585 child_session_ids: HashSet<SessionId>,
586 parent_id: Option<SessionId>,
587 ignore_breakpoints: bool,
588 modules: Vec<dap::Module>,
589 loaded_sources: Vec<dap::Source>,
590 output_token: OutputToken,
591 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
592 threads: IndexMap<ThreadId, Thread>,
593 thread_states: ThreadStates,
594 variables: HashMap<VariableReference, Vec<dap::Variable>>,
595 stack_frames: IndexMap<StackFrameId, StackFrame>,
596 locations: HashMap<u64, dap::LocationsResponse>,
597 is_session_terminated: bool,
598 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
599 _background_tasks: Vec<Task<()>>,
600}
601
602trait CacheableCommand: 'static + Send + Sync {
603 fn as_any(&self) -> &dyn Any;
604 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
605 fn dyn_hash(&self, hasher: &mut dyn Hasher);
606 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
607}
608
609impl<T> CacheableCommand for T
610where
611 T: DapCommand + PartialEq + Eq + Hash,
612{
613 fn as_any(&self) -> &dyn Any {
614 self
615 }
616
617 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
618 rhs.as_any()
619 .downcast_ref::<Self>()
620 .map_or(false, |rhs| self == rhs)
621 }
622
623 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
624 T::hash(self, &mut hasher);
625 }
626
627 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
628 self
629 }
630}
631
632pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
633
634impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
635 fn from(request: T) -> Self {
636 Self(Arc::new(request))
637 }
638}
639
640impl PartialEq for RequestSlot {
641 fn eq(&self, other: &Self) -> bool {
642 self.0.dyn_eq(other.0.as_ref())
643 }
644}
645
646impl Eq for RequestSlot {}
647
648impl Hash for RequestSlot {
649 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
650 self.0.dyn_hash(state);
651 self.0.as_any().type_id().hash(state)
652 }
653}
654
655#[derive(Debug, Clone, Hash, PartialEq, Eq)]
656pub struct CompletionsQuery {
657 pub query: String,
658 pub column: u64,
659 pub line: Option<u64>,
660 pub frame_id: Option<u64>,
661}
662
663impl CompletionsQuery {
664 pub fn new(
665 buffer: &language::Buffer,
666 cursor_position: language::Anchor,
667 frame_id: Option<u64>,
668 ) -> Self {
669 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
670 Self {
671 query: buffer.text(),
672 column: column as u64,
673 frame_id,
674 line: Some(row as u64),
675 }
676 }
677}
678
679pub enum SessionEvent {
680 Modules,
681 LoadedSources,
682 Stopped(Option<ThreadId>),
683 StackTrace,
684 Variables,
685 Threads,
686}
687
688impl EventEmitter<SessionEvent> for Session {}
689
690// local session will send breakpoint updates to DAP for all new breakpoints
691// remote side will only send breakpoint updates when it is a breakpoint created by that peer
692// BreakpointStore notifies session on breakpoint changes
693impl Session {
694 pub(crate) fn local(
695 breakpoint_store: Entity<BreakpointStore>,
696 session_id: SessionId,
697 parent_session: Option<Entity<Session>>,
698 delegate: DapAdapterDelegate,
699 config: DebugAdapterConfig,
700 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
701 initialized_tx: oneshot::Sender<()>,
702 cx: &mut App,
703 ) -> Task<Result<Entity<Self>>> {
704 let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
705
706 cx.spawn(async move |cx| {
707 let (mode, capabilities) = LocalMode::new(
708 session_id,
709 parent_session.clone(),
710 breakpoint_store.clone(),
711 config.clone(),
712 delegate,
713 message_tx,
714 cx.clone(),
715 )
716 .await?;
717
718 cx.new(|cx| {
719 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Self>, cx| {
720 let mut initialized_tx = Some(initialized_tx);
721 while let Some(message) = message_rx.next().await {
722 if let Message::Event(event) = message {
723 if let Events::Initialized(_) = *event {
724 if let Some(tx) = initialized_tx.take() {
725 tx.send(()).ok();
726 }
727 } else {
728 let Ok(_) = this.update(cx, |session, cx| {
729 session.handle_dap_event(event, cx);
730 }) else {
731 break;
732 };
733 }
734 } else {
735 let Ok(_) =
736 start_debugging_requests_tx.unbounded_send((session_id, message))
737 else {
738 break;
739 };
740 }
741 }
742 })];
743
744 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
745 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
746 if let Some(local) = (!this.ignore_breakpoints)
747 .then(|| this.as_local_mut())
748 .flatten()
749 {
750 local
751 .send_breakpoints_from_path(path.clone(), *reason, cx)
752 .detach();
753 };
754 }
755 BreakpointStoreEvent::ActiveDebugLineChanged => {}
756 })
757 .detach();
758
759 Self {
760 mode: Mode::Local(mode),
761 id: session_id,
762 child_session_ids: HashSet::default(),
763 parent_id: parent_session.map(|session| session.read(cx).id),
764 variables: Default::default(),
765 capabilities,
766 thread_states: ThreadStates::default(),
767 output_token: OutputToken(0),
768 ignore_breakpoints: false,
769 output: circular_buffer::CircularBuffer::boxed(),
770 requests: HashMap::default(),
771 modules: Vec::default(),
772 loaded_sources: Vec::default(),
773 threads: IndexMap::default(),
774 stack_frames: IndexMap::default(),
775 locations: Default::default(),
776 _background_tasks,
777 is_session_terminated: false,
778 }
779 })
780 })
781 }
782
783 pub(crate) fn remote(
784 session_id: SessionId,
785 client: AnyProtoClient,
786 upstream_project_id: u64,
787 ignore_breakpoints: bool,
788 ) -> Self {
789 Self {
790 mode: Mode::Remote(RemoteConnection {
791 _client: client,
792 _upstream_project_id: upstream_project_id,
793 }),
794 id: session_id,
795 child_session_ids: HashSet::default(),
796 parent_id: None,
797 capabilities: Capabilities::default(),
798 ignore_breakpoints,
799 variables: Default::default(),
800 stack_frames: Default::default(),
801 thread_states: ThreadStates::default(),
802 output_token: OutputToken(0),
803 output: circular_buffer::CircularBuffer::boxed(),
804 requests: HashMap::default(),
805 modules: Vec::default(),
806 loaded_sources: Vec::default(),
807 threads: IndexMap::default(),
808 _background_tasks: Vec::default(),
809 locations: Default::default(),
810 is_session_terminated: false,
811 }
812 }
813
814 pub fn session_id(&self) -> SessionId {
815 self.id
816 }
817
818 pub fn child_session_ids(&self) -> HashSet<SessionId> {
819 self.child_session_ids.clone()
820 }
821
822 pub fn add_child_session_id(&mut self, session_id: SessionId) {
823 self.child_session_ids.insert(session_id);
824 }
825
826 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
827 self.child_session_ids.remove(&session_id);
828 }
829
830 pub fn parent_id(&self) -> Option<SessionId> {
831 self.parent_id
832 }
833
834 pub fn capabilities(&self) -> &Capabilities {
835 &self.capabilities
836 }
837
838 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
839 if let Mode::Local(local_mode) = &self.mode {
840 Some(local_mode.config.clone())
841 } else {
842 None
843 }
844 }
845
846 pub fn is_terminated(&self) -> bool {
847 self.is_session_terminated
848 }
849
850 pub fn is_local(&self) -> bool {
851 matches!(self.mode, Mode::Local(_))
852 }
853
854 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
855 match &mut self.mode {
856 Mode::Local(local_mode) => Some(local_mode),
857 Mode::Remote(_) => None,
858 }
859 }
860
861 pub fn as_local(&self) -> Option<&LocalMode> {
862 match &self.mode {
863 Mode::Local(local_mode) => Some(local_mode),
864 Mode::Remote(_) => None,
865 }
866 }
867
868 pub(super) fn initialize_sequence(
869 &mut self,
870 initialize_rx: oneshot::Receiver<()>,
871 cx: &mut Context<Self>,
872 ) -> Task<Result<()>> {
873 match &self.mode {
874 Mode::Local(local_mode) => {
875 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
876 }
877 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
878 }
879 }
880
881 pub fn output(
882 &self,
883 since: OutputToken,
884 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
885 if self.output_token.0 == 0 {
886 return (self.output.range(0..0), OutputToken(0));
887 };
888
889 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
890
891 let clamped_events_since = events_since.clamp(0, self.output.len());
892 (
893 self.output
894 .range(self.output.len() - clamped_events_since..),
895 self.output_token,
896 )
897 }
898
899 pub fn respond_to_client(
900 &self,
901 request_seq: u64,
902 success: bool,
903 command: String,
904 body: Option<serde_json::Value>,
905 cx: &mut Context<Self>,
906 ) -> Task<Result<()>> {
907 let Some(local_session) = self.as_local().cloned() else {
908 unreachable!("Cannot respond to remote client");
909 };
910
911 cx.background_spawn(async move {
912 local_session
913 .client
914 .send_message(Message::Response(Response {
915 body,
916 success,
917 command,
918 seq: request_seq + 1,
919 request_seq,
920 message: None,
921 }))
922 .await
923 })
924 }
925
926 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
927 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
928 self.thread_states.stop_all_threads();
929
930 self.invalidate_command_type::<StackTraceCommand>();
931 }
932
933 // Event if we stopped all threads we still need to insert the thread_id
934 // to our own data
935 if let Some(thread_id) = event.thread_id {
936 self.thread_states.stop_thread(ThreadId(thread_id));
937
938 self.invalidate_state(
939 &StackTraceCommand {
940 thread_id,
941 start_frame: None,
942 levels: None,
943 }
944 .into(),
945 );
946 }
947
948 self.invalidate_generic();
949 self.threads.clear();
950 self.variables.clear();
951 cx.emit(SessionEvent::Stopped(
952 event
953 .thread_id
954 .map(Into::into)
955 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
956 ));
957 cx.notify();
958 }
959
960 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
961 match *event {
962 Events::Initialized(_) => {
963 debug_assert!(
964 false,
965 "Initialized event should have been handled in LocalMode"
966 );
967 }
968 Events::Stopped(event) => self.handle_stopped_event(event, cx),
969 Events::Continued(event) => {
970 if event.all_threads_continued.unwrap_or_default() {
971 self.thread_states.continue_all_threads();
972 } else {
973 self.thread_states
974 .continue_thread(ThreadId(event.thread_id));
975 }
976 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
977 self.invalidate_generic();
978 }
979 Events::Exited(_event) => {
980 self.clear_active_debug_line(cx);
981 }
982 Events::Terminated(_) => {
983 self.is_session_terminated = true;
984 self.clear_active_debug_line(cx);
985 }
986 Events::Thread(event) => {
987 let thread_id = ThreadId(event.thread_id);
988
989 match event.reason {
990 dap::ThreadEventReason::Started => {
991 self.thread_states.continue_thread(thread_id);
992 }
993 dap::ThreadEventReason::Exited => {
994 self.thread_states.exit_thread(thread_id);
995 }
996 reason => {
997 log::error!("Unhandled thread event reason {:?}", reason);
998 }
999 }
1000 self.invalidate_state(&ThreadsCommand.into());
1001 cx.notify();
1002 }
1003 Events::Output(event) => {
1004 if event
1005 .category
1006 .as_ref()
1007 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1008 {
1009 return;
1010 }
1011
1012 self.output.push_back(event);
1013 self.output_token.0 += 1;
1014 cx.notify();
1015 }
1016 Events::Breakpoint(_) => {}
1017 Events::Module(event) => {
1018 match event.reason {
1019 dap::ModuleEventReason::New => {
1020 self.modules.push(event.module);
1021 }
1022 dap::ModuleEventReason::Changed => {
1023 if let Some(module) = self
1024 .modules
1025 .iter_mut()
1026 .find(|other| event.module.id == other.id)
1027 {
1028 *module = event.module;
1029 }
1030 }
1031 dap::ModuleEventReason::Removed => {
1032 self.modules.retain(|other| event.module.id != other.id);
1033 }
1034 }
1035
1036 // todo(debugger): We should only send the invalidate command to downstream clients.
1037 // self.invalidate_state(&ModulesCommand.into());
1038 }
1039 Events::LoadedSource(_) => {
1040 self.invalidate_state(&LoadedSourcesCommand.into());
1041 }
1042 Events::Capabilities(event) => {
1043 self.capabilities = self.capabilities.merge(event.capabilities);
1044 cx.notify();
1045 }
1046 Events::Memory(_) => {}
1047 Events::Process(_) => {}
1048 Events::ProgressEnd(_) => {}
1049 Events::ProgressStart(_) => {}
1050 Events::ProgressUpdate(_) => {}
1051 Events::Invalidated(_) => {}
1052 Events::Other(_) => {}
1053 }
1054 }
1055
1056 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1057 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1058 &mut self,
1059 request: T,
1060 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1061 + 'static,
1062 cx: &mut Context<Self>,
1063 ) {
1064 const {
1065 assert!(
1066 T::CACHEABLE,
1067 "Only requests marked as cacheable should invoke `fetch`"
1068 );
1069 }
1070
1071 if !self.thread_states.any_stopped_thread()
1072 && request.type_id() != TypeId::of::<ThreadsCommand>()
1073 || self.is_session_terminated
1074 {
1075 return;
1076 }
1077
1078 let request_map = self
1079 .requests
1080 .entry(std::any::TypeId::of::<T>())
1081 .or_default();
1082
1083 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1084 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1085
1086 let task = Self::request_inner::<Arc<T>>(
1087 &self.capabilities,
1088 self.id,
1089 &self.mode,
1090 command,
1091 process_result,
1092 cx,
1093 );
1094 let task = cx
1095 .background_executor()
1096 .spawn(async move {
1097 let _ = task.await?;
1098 Some(())
1099 })
1100 .shared();
1101
1102 vacant.insert(task);
1103 cx.notify();
1104 }
1105 }
1106
1107 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1108 capabilities: &Capabilities,
1109 session_id: SessionId,
1110 mode: &Mode,
1111 request: T,
1112 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1113 + 'static,
1114 cx: &mut Context<Self>,
1115 ) -> Task<Option<T::Response>> {
1116 if !T::is_supported(&capabilities) {
1117 log::warn!(
1118 "Attempted to send a DAP request that isn't supported: {:?}",
1119 request
1120 );
1121 let error = Err(anyhow::Error::msg(
1122 "Couldn't complete request because it's not supported",
1123 ));
1124 return cx.spawn(async move |this, cx| {
1125 this.update(cx, |this, cx| process_result(this, error, cx))
1126 .log_err()
1127 .flatten()
1128 });
1129 }
1130
1131 let request = mode.request_dap(session_id, request, cx);
1132 cx.spawn(async move |this, cx| {
1133 let result = request.await;
1134 this.update(cx, |this, cx| process_result(this, result, cx))
1135 .log_err()
1136 .flatten()
1137 })
1138 }
1139
1140 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1141 &self,
1142 request: T,
1143 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response>
1144 + 'static,
1145 cx: &mut Context<Self>,
1146 ) -> Task<Option<T::Response>> {
1147 Self::request_inner(
1148 &self.capabilities,
1149 self.id,
1150 &self.mode,
1151 request,
1152 process_result,
1153 cx,
1154 )
1155 }
1156
1157 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1158 self.requests.remove(&std::any::TypeId::of::<Command>());
1159 }
1160
1161 fn invalidate_generic(&mut self) {
1162 self.invalidate_command_type::<ModulesCommand>();
1163 self.invalidate_command_type::<LoadedSourcesCommand>();
1164 self.invalidate_command_type::<ThreadsCommand>();
1165 }
1166
1167 fn invalidate_state(&mut self, key: &RequestSlot) {
1168 self.requests
1169 .entry(key.0.as_any().type_id())
1170 .and_modify(|request_map| {
1171 request_map.remove(&key);
1172 });
1173 }
1174
1175 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1176 self.thread_states.thread_status(thread_id)
1177 }
1178
1179 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1180 self.fetch(
1181 dap_command::ThreadsCommand,
1182 |this, result, cx| {
1183 let result = result.log_err()?;
1184
1185 this.threads = result
1186 .iter()
1187 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1188 .collect();
1189
1190 this.invalidate_command_type::<StackTraceCommand>();
1191 cx.emit(SessionEvent::Threads);
1192 cx.notify();
1193
1194 Some(result)
1195 },
1196 cx,
1197 );
1198
1199 self.threads
1200 .values()
1201 .map(|thread| {
1202 (
1203 thread.dap.clone(),
1204 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1205 )
1206 })
1207 .collect()
1208 }
1209
1210 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1211 self.fetch(
1212 dap_command::ModulesCommand,
1213 |this, result, cx| {
1214 let result = result.log_err()?;
1215
1216 this.modules = result.iter().cloned().collect();
1217 cx.emit(SessionEvent::Modules);
1218 cx.notify();
1219
1220 Some(result)
1221 },
1222 cx,
1223 );
1224
1225 &self.modules
1226 }
1227
1228 pub fn ignore_breakpoints(&self) -> bool {
1229 self.ignore_breakpoints
1230 }
1231
1232 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1233 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1234 }
1235
1236 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1237 if self.ignore_breakpoints == ignore {
1238 return Task::ready(());
1239 }
1240
1241 self.ignore_breakpoints = ignore;
1242
1243 if let Some(local) = self.as_local() {
1244 local.send_all_breakpoints(ignore, cx)
1245 } else {
1246 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1247 unimplemented!()
1248 }
1249 }
1250
1251 pub fn breakpoints_enabled(&self) -> bool {
1252 self.ignore_breakpoints
1253 }
1254
1255 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1256 self.fetch(
1257 dap_command::LoadedSourcesCommand,
1258 |this, result, cx| {
1259 let result = result.log_err()?;
1260 this.loaded_sources = result.iter().cloned().collect();
1261 cx.emit(SessionEvent::LoadedSources);
1262 cx.notify();
1263 Some(result)
1264 },
1265 cx,
1266 );
1267
1268 &self.loaded_sources
1269 }
1270
1271 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1272 res.log_err()?;
1273 Some(())
1274 }
1275
1276 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1277 thread_id: ThreadId,
1278 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1279 {
1280 move |this, response, cx| match response.log_err() {
1281 Some(response) => Some(response),
1282 None => {
1283 this.thread_states.stop_thread(thread_id);
1284 cx.notify();
1285 None
1286 }
1287 }
1288 }
1289
1290 fn clear_active_debug_line_response(
1291 &mut self,
1292 response: Result<()>,
1293 cx: &mut Context<'_, Session>,
1294 ) -> Option<()> {
1295 response.log_err()?;
1296 self.clear_active_debug_line(cx);
1297 Some(())
1298 }
1299
1300 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1301 self.as_local()
1302 .expect("Message handler will only run in local mode")
1303 .breakpoint_store
1304 .update(cx, |store, cx| {
1305 store.remove_active_position(Some(self.id), cx)
1306 });
1307 }
1308
1309 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1310 self.request(
1311 PauseCommand {
1312 thread_id: thread_id.0,
1313 },
1314 Self::empty_response,
1315 cx,
1316 )
1317 .detach();
1318 }
1319
1320 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1321 self.request(
1322 RestartStackFrameCommand { stack_frame_id },
1323 Self::empty_response,
1324 cx,
1325 )
1326 .detach();
1327 }
1328
1329 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1330 if self.capabilities.supports_restart_request.unwrap_or(false) {
1331 self.request(
1332 RestartCommand {
1333 raw: args.unwrap_or(Value::Null),
1334 },
1335 Self::empty_response,
1336 cx,
1337 )
1338 .detach();
1339 } else {
1340 self.request(
1341 DisconnectCommand {
1342 restart: Some(false),
1343 terminate_debuggee: Some(true),
1344 suspend_debuggee: Some(false),
1345 },
1346 Self::empty_response,
1347 cx,
1348 )
1349 .detach();
1350 }
1351 }
1352
1353 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1354 self.is_session_terminated = true;
1355 self.thread_states.exit_all_threads();
1356 cx.notify();
1357
1358 let task = if self
1359 .capabilities
1360 .supports_terminate_request
1361 .unwrap_or_default()
1362 {
1363 self.request(
1364 TerminateCommand {
1365 restart: Some(false),
1366 },
1367 Self::clear_active_debug_line_response,
1368 cx,
1369 )
1370 } else {
1371 self.request(
1372 DisconnectCommand {
1373 restart: Some(false),
1374 terminate_debuggee: Some(true),
1375 suspend_debuggee: Some(false),
1376 },
1377 Self::clear_active_debug_line_response,
1378 cx,
1379 )
1380 };
1381
1382 cx.background_spawn(async move {
1383 let _ = task.await;
1384 })
1385 }
1386
1387 pub fn completions(
1388 &mut self,
1389 query: CompletionsQuery,
1390 cx: &mut Context<Self>,
1391 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1392 let task = self.request(query, |_, result, _| result.log_err(), cx);
1393
1394 cx.background_executor().spawn(async move {
1395 anyhow::Ok(
1396 task.await
1397 .map(|response| response.targets)
1398 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1399 )
1400 })
1401 }
1402
1403 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1404 self.thread_states.continue_thread(thread_id);
1405 self.request(
1406 ContinueCommand {
1407 args: ContinueArguments {
1408 thread_id: thread_id.0,
1409 single_thread: Some(true),
1410 },
1411 },
1412 Self::on_step_response::<ContinueCommand>(thread_id),
1413 cx,
1414 )
1415 .detach();
1416 }
1417
1418 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1419 match self.mode {
1420 Mode::Local(ref local) => Some(local.client.clone()),
1421 Mode::Remote(_) => None,
1422 }
1423 }
1424
1425 pub fn step_over(
1426 &mut self,
1427 thread_id: ThreadId,
1428 granularity: SteppingGranularity,
1429 cx: &mut Context<Self>,
1430 ) {
1431 let supports_single_thread_execution_requests =
1432 self.capabilities.supports_single_thread_execution_requests;
1433 let supports_stepping_granularity = self
1434 .capabilities
1435 .supports_stepping_granularity
1436 .unwrap_or_default();
1437
1438 let command = NextCommand {
1439 inner: StepCommand {
1440 thread_id: thread_id.0,
1441 granularity: supports_stepping_granularity.then(|| granularity),
1442 single_thread: supports_single_thread_execution_requests,
1443 },
1444 };
1445
1446 self.thread_states.process_step(thread_id);
1447 self.request(
1448 command,
1449 Self::on_step_response::<NextCommand>(thread_id),
1450 cx,
1451 )
1452 .detach();
1453 }
1454
1455 pub fn step_in(
1456 &mut self,
1457 thread_id: ThreadId,
1458 granularity: SteppingGranularity,
1459 cx: &mut Context<Self>,
1460 ) {
1461 let supports_single_thread_execution_requests =
1462 self.capabilities.supports_single_thread_execution_requests;
1463 let supports_stepping_granularity = self
1464 .capabilities
1465 .supports_stepping_granularity
1466 .unwrap_or_default();
1467
1468 let command = StepInCommand {
1469 inner: StepCommand {
1470 thread_id: thread_id.0,
1471 granularity: supports_stepping_granularity.then(|| granularity),
1472 single_thread: supports_single_thread_execution_requests,
1473 },
1474 };
1475
1476 self.thread_states.process_step(thread_id);
1477 self.request(
1478 command,
1479 Self::on_step_response::<StepInCommand>(thread_id),
1480 cx,
1481 )
1482 .detach();
1483 }
1484
1485 pub fn step_out(
1486 &mut self,
1487 thread_id: ThreadId,
1488 granularity: SteppingGranularity,
1489 cx: &mut Context<Self>,
1490 ) {
1491 let supports_single_thread_execution_requests =
1492 self.capabilities.supports_single_thread_execution_requests;
1493 let supports_stepping_granularity = self
1494 .capabilities
1495 .supports_stepping_granularity
1496 .unwrap_or_default();
1497
1498 let command = StepOutCommand {
1499 inner: StepCommand {
1500 thread_id: thread_id.0,
1501 granularity: supports_stepping_granularity.then(|| granularity),
1502 single_thread: supports_single_thread_execution_requests,
1503 },
1504 };
1505
1506 self.thread_states.process_step(thread_id);
1507 self.request(
1508 command,
1509 Self::on_step_response::<StepOutCommand>(thread_id),
1510 cx,
1511 )
1512 .detach();
1513 }
1514
1515 pub fn step_back(
1516 &mut self,
1517 thread_id: ThreadId,
1518 granularity: SteppingGranularity,
1519 cx: &mut Context<Self>,
1520 ) {
1521 let supports_single_thread_execution_requests =
1522 self.capabilities.supports_single_thread_execution_requests;
1523 let supports_stepping_granularity = self
1524 .capabilities
1525 .supports_stepping_granularity
1526 .unwrap_or_default();
1527
1528 let command = StepBackCommand {
1529 inner: StepCommand {
1530 thread_id: thread_id.0,
1531 granularity: supports_stepping_granularity.then(|| granularity),
1532 single_thread: supports_single_thread_execution_requests,
1533 },
1534 };
1535
1536 self.thread_states.process_step(thread_id);
1537
1538 self.request(
1539 command,
1540 Self::on_step_response::<StepBackCommand>(thread_id),
1541 cx,
1542 )
1543 .detach();
1544 }
1545
1546 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1547 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1548 && self.requests.contains_key(&ThreadsCommand.type_id())
1549 && self.threads.contains_key(&thread_id)
1550 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1551 // We could still be using an old thread id and have sent a new thread's request
1552 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1553 // But it very well could cause a minor bug in the future that is hard to track down
1554 {
1555 self.fetch(
1556 super::dap_command::StackTraceCommand {
1557 thread_id: thread_id.0,
1558 start_frame: None,
1559 levels: None,
1560 },
1561 move |this, stack_frames, cx| {
1562 let stack_frames = stack_frames.log_err()?;
1563
1564 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1565 thread.stack_frame_ids =
1566 stack_frames.iter().map(|frame| frame.id).collect();
1567 });
1568 debug_assert!(
1569 matches!(entry, indexmap::map::Entry::Occupied(_)),
1570 "Sent request for thread_id that doesn't exist"
1571 );
1572
1573 this.stack_frames.extend(
1574 stack_frames
1575 .iter()
1576 .cloned()
1577 .map(|frame| (frame.id, StackFrame::from(frame))),
1578 );
1579
1580 this.invalidate_command_type::<ScopesCommand>();
1581 this.invalidate_command_type::<VariablesCommand>();
1582
1583 cx.emit(SessionEvent::StackTrace);
1584 cx.notify();
1585 Some(stack_frames)
1586 },
1587 cx,
1588 );
1589 }
1590
1591 self.threads
1592 .get(&thread_id)
1593 .map(|thread| {
1594 thread
1595 .stack_frame_ids
1596 .iter()
1597 .filter_map(|id| self.stack_frames.get(id))
1598 .cloned()
1599 .collect()
1600 })
1601 .unwrap_or_default()
1602 }
1603
1604 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1605 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1606 && self
1607 .requests
1608 .contains_key(&TypeId::of::<StackTraceCommand>())
1609 {
1610 self.fetch(
1611 ScopesCommand { stack_frame_id },
1612 move |this, scopes, cx| {
1613 let scopes = scopes.log_err()?;
1614
1615 for scope in scopes .iter(){
1616 this.variables(scope.variables_reference, cx);
1617 }
1618
1619 let entry = this
1620 .stack_frames
1621 .entry(stack_frame_id)
1622 .and_modify(|stack_frame| {
1623 stack_frame.scopes = scopes.clone();
1624 });
1625
1626 cx.emit(SessionEvent::Variables);
1627
1628 debug_assert!(
1629 matches!(entry, indexmap::map::Entry::Occupied(_)),
1630 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1631 );
1632
1633 Some(scopes)
1634 },
1635 cx,
1636 );
1637 }
1638
1639 self.stack_frames
1640 .get(&stack_frame_id)
1641 .map(|frame| frame.scopes.as_slice())
1642 .unwrap_or_default()
1643 }
1644
1645 pub fn variables(
1646 &mut self,
1647 variables_reference: VariableReference,
1648 cx: &mut Context<Self>,
1649 ) -> Vec<dap::Variable> {
1650 let command = VariablesCommand {
1651 variables_reference,
1652 filter: None,
1653 start: None,
1654 count: None,
1655 format: None,
1656 };
1657
1658 self.fetch(
1659 command,
1660 move |this, variables, cx| {
1661 let variables = variables.log_err()?;
1662 this.variables
1663 .insert(variables_reference, variables.clone());
1664
1665 cx.emit(SessionEvent::Variables);
1666 Some(variables)
1667 },
1668 cx,
1669 );
1670
1671 self.variables
1672 .get(&variables_reference)
1673 .cloned()
1674 .unwrap_or_default()
1675 }
1676
1677 pub fn set_variable_value(
1678 &mut self,
1679 variables_reference: u64,
1680 name: String,
1681 value: String,
1682 cx: &mut Context<Self>,
1683 ) {
1684 if self.capabilities.supports_set_variable.unwrap_or_default() {
1685 self.request(
1686 SetVariableValueCommand {
1687 name,
1688 value,
1689 variables_reference,
1690 },
1691 move |this, response, cx| {
1692 let response = response.log_err()?;
1693 this.invalidate_command_type::<VariablesCommand>();
1694 cx.notify();
1695 Some(response)
1696 },
1697 cx,
1698 )
1699 .detach()
1700 }
1701 }
1702
1703 pub fn evaluate(
1704 &mut self,
1705 expression: String,
1706 context: Option<EvaluateArgumentsContext>,
1707 frame_id: Option<u64>,
1708 source: Option<Source>,
1709 cx: &mut Context<Self>,
1710 ) {
1711 self.request(
1712 EvaluateCommand {
1713 expression,
1714 context,
1715 frame_id,
1716 source,
1717 },
1718 |this, response, cx| {
1719 let response = response.log_err()?;
1720 this.output_token.0 += 1;
1721 this.output.push_back(dap::OutputEvent {
1722 category: None,
1723 output: response.result.clone(),
1724 group: None,
1725 variables_reference: Some(response.variables_reference),
1726 source: None,
1727 line: None,
1728 column: None,
1729 data: None,
1730 location_reference: None,
1731 });
1732
1733 this.invalidate_command_type::<ScopesCommand>();
1734 cx.notify();
1735 Some(response)
1736 },
1737 cx,
1738 )
1739 .detach();
1740 }
1741
1742 pub fn location(
1743 &mut self,
1744 reference: u64,
1745 cx: &mut Context<Self>,
1746 ) -> Option<dap::LocationsResponse> {
1747 self.fetch(
1748 LocationsCommand { reference },
1749 move |this, response, _| {
1750 let response = response.log_err()?;
1751 this.locations.insert(reference, response.clone());
1752 Some(response)
1753 },
1754 cx,
1755 );
1756 self.locations.get(&reference).cloned()
1757 }
1758 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1759 let command = DisconnectCommand {
1760 restart: Some(false),
1761 terminate_debuggee: Some(true),
1762 suspend_debuggee: Some(false),
1763 };
1764
1765 self.request(command, Self::empty_response, cx).detach()
1766 }
1767
1768 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1769 if self
1770 .capabilities
1771 .supports_terminate_threads_request
1772 .unwrap_or_default()
1773 {
1774 self.request(
1775 TerminateThreadsCommand {
1776 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1777 },
1778 Self::clear_active_debug_line_response,
1779 cx,
1780 )
1781 .detach();
1782 } else {
1783 self.shutdown(cx).detach();
1784 }
1785 }
1786}