1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 adapters::{DapDelegate, DapStatus},
21 client::{DebugAdapterClient, SessionId},
22 messages::{Events, Message},
23};
24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
25use futures::channel::oneshot;
26use futures::{FutureExt, future::Shared};
27use gpui::{
28 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
29};
30use rpc::AnyProtoClient;
31use serde_json::{Value, json};
32use settings::Settings;
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::path::PathBuf;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::{DebugAdapterConfig, DebugTaskDefinition};
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47
48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
49#[repr(transparent)]
50pub struct ThreadId(pub u64);
51
52impl ThreadId {
53 pub const MIN: ThreadId = ThreadId(u64::MIN);
54 pub const MAX: ThreadId = ThreadId(u64::MAX);
55}
56
57impl From<u64> for ThreadId {
58 fn from(id: u64) -> Self {
59 Self(id)
60 }
61}
62
63#[derive(Clone, Debug)]
64pub struct StackFrame {
65 pub dap: dap::StackFrame,
66 pub scopes: Vec<dap::Scope>,
67}
68
69impl From<dap::StackFrame> for StackFrame {
70 fn from(stack_frame: dap::StackFrame) -> Self {
71 Self {
72 scopes: vec![],
73 dap: stack_frame,
74 }
75 }
76}
77
78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
79pub enum ThreadStatus {
80 #[default]
81 Running,
82 Stopped,
83 Stepping,
84 Exited,
85 Ended,
86}
87
88impl ThreadStatus {
89 pub fn label(&self) -> &'static str {
90 match self {
91 ThreadStatus::Running => "Running",
92 ThreadStatus::Stopped => "Stopped",
93 ThreadStatus::Stepping => "Stepping",
94 ThreadStatus::Exited => "Exited",
95 ThreadStatus::Ended => "Ended",
96 }
97 }
98}
99
100#[derive(Debug)]
101pub struct Thread {
102 dap: dap::Thread,
103 stack_frame_ids: IndexSet<StackFrameId>,
104 _has_stopped: bool,
105}
106
107impl From<dap::Thread> for Thread {
108 fn from(dap: dap::Thread) -> Self {
109 Self {
110 dap,
111 stack_frame_ids: Default::default(),
112 _has_stopped: false,
113 }
114 }
115}
116
117type UpstreamProjectId = u64;
118
119struct RemoteConnection {
120 _client: AnyProtoClient,
121 _upstream_project_id: UpstreamProjectId,
122}
123
124impl RemoteConnection {
125 fn send_proto_client_request<R: DapCommand>(
126 &self,
127 _request: R,
128 _session_id: SessionId,
129 cx: &mut App,
130 ) -> Task<Result<R::Response>> {
131 // let message = request.to_proto(session_id, self.upstream_project_id);
132 // let upstream_client = self.client.clone();
133 cx.background_executor().spawn(async move {
134 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
135 // let response = upstream_client.request(message).await?;
136 // request.response_from_proto(response)
137 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
138 })
139 }
140
141 fn request<R: DapCommand>(
142 &self,
143 request: R,
144 session_id: SessionId,
145 cx: &mut App,
146 ) -> Task<Result<R::Response>>
147 where
148 <R::DapRequest as dap::requests::Request>::Response: 'static,
149 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
150 {
151 return self.send_proto_client_request::<R>(request, session_id, cx);
152 }
153}
154
155enum Mode {
156 Local(LocalMode),
157 Remote(RemoteConnection),
158}
159
160#[derive(Clone)]
161pub struct LocalMode {
162 client: Arc<DebugAdapterClient>,
163 config: DebugAdapterConfig,
164 adapter: Arc<dyn DebugAdapter>,
165 breakpoint_store: Entity<BreakpointStore>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 debug_adapters: Arc<DapRegistry>,
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<(Self, Capabilities)>> {
194 Self::new_inner(
195 debug_adapters,
196 session_id,
197 parent_session,
198 breakpoint_store,
199 config,
200 delegate,
201 messages_tx,
202 async |_, _| {},
203 cx,
204 )
205 }
206 #[cfg(any(test, feature = "test-support"))]
207 fn new_fake(
208 session_id: SessionId,
209 parent_session: Option<Entity<Session>>,
210 breakpoint_store: Entity<BreakpointStore>,
211 config: DebugAdapterConfig,
212 delegate: DapAdapterDelegate,
213 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
214 caps: Capabilities,
215 fail: bool,
216 cx: AsyncApp,
217 ) -> Task<Result<(Self, Capabilities)>> {
218 use task::DebugRequestDisposition;
219
220 let request = match config.request.clone() {
221 DebugRequestDisposition::UserConfigured(request) => request,
222 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
223 match reverse_request_args.request {
224 dap::StartDebuggingRequestArgumentsRequest::Launch => {
225 DebugRequestType::Launch(task::LaunchConfig {
226 program: "".to_owned(),
227 cwd: None,
228 args: Default::default(),
229 })
230 }
231 dap::StartDebuggingRequestArgumentsRequest::Attach => {
232 DebugRequestType::Attach(task::AttachConfig {
233 process_id: Some(0),
234 })
235 }
236 }
237 }
238 };
239
240 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
241 session
242 .client
243 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
244 .await;
245
246 let paths = cx
247 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
248 .expect("Breakpoint store should exist in all tests that start debuggers");
249
250 session
251 .client
252 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
253 let p = Arc::from(Path::new(&args.source.path.unwrap()));
254 if !paths.contains(&p) {
255 panic!("Sent breakpoints for path without any")
256 }
257
258 Ok(dap::SetBreakpointsResponse {
259 breakpoints: Vec::default(),
260 })
261 })
262 .await;
263
264 match request {
265 dap::DebugRequestType::Launch(_) => {
266 if fail {
267 session
268 .client
269 .on_request::<dap::requests::Launch, _>(move |_, _| {
270 Err(dap::ErrorResponse {
271 error: Some(dap::Message {
272 id: 1,
273 format: "error".into(),
274 variables: None,
275 send_telemetry: None,
276 show_user: None,
277 url: None,
278 url_label: None,
279 }),
280 })
281 })
282 .await;
283 } else {
284 session
285 .client
286 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
287 .await;
288 }
289 }
290 dap::DebugRequestType::Attach(attach_config) => {
291 if fail {
292 session
293 .client
294 .on_request::<dap::requests::Attach, _>(move |_, _| {
295 Err(dap::ErrorResponse {
296 error: Some(dap::Message {
297 id: 1,
298 format: "error".into(),
299 variables: None,
300 send_telemetry: None,
301 show_user: None,
302 url: None,
303 url_label: None,
304 }),
305 })
306 })
307 .await;
308 } else {
309 session
310 .client
311 .on_request::<dap::requests::Attach, _>(move |_, args| {
312 assert_eq!(
313 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
314 args.raw
315 );
316
317 Ok(())
318 })
319 .await;
320 }
321 }
322 }
323
324 session
325 .client
326 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
327 .await;
328 session.client.fake_event(Events::Initialized(None)).await;
329 };
330 Self::new_inner(
331 DapRegistry::fake().into(),
332 session_id,
333 parent_session,
334 breakpoint_store,
335 config,
336 delegate,
337 messages_tx,
338 callback,
339 cx,
340 )
341 }
342 fn new_inner(
343 registry: Arc<DapRegistry>,
344 session_id: SessionId,
345 parent_session: Option<Entity<Session>>,
346 breakpoint_store: Entity<BreakpointStore>,
347 config: DebugAdapterConfig,
348 delegate: DapAdapterDelegate,
349 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
350 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
351 cx: AsyncApp,
352 ) -> Task<Result<(Self, Capabilities)>> {
353 cx.spawn(async move |cx| {
354 let (adapter, binary) =
355 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
356
357 let message_handler = Box::new(move |message| {
358 messages_tx.unbounded_send(message).ok();
359 });
360
361 let client = Arc::new(
362 if let Some(client) = parent_session
363 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
364 .flatten()
365 {
366 client
367 .reconnect(session_id, binary, message_handler, cx.clone())
368 .await?
369 } else {
370 DebugAdapterClient::start(
371 session_id,
372 adapter.name(),
373 binary,
374 message_handler,
375 cx.clone(),
376 )
377 .await?
378 },
379 );
380
381 let adapter_id = adapter.name().to_string().to_owned();
382 let mut session = Self {
383 client,
384 adapter,
385 breakpoint_store,
386 config: config.clone(),
387 };
388
389 on_initialized(&mut session, cx.clone()).await;
390 let capabilities = session
391 .request(Initialize { adapter_id }, cx.background_executor().clone())
392 .await?;
393
394 Ok((session, capabilities))
395 })
396 }
397
398 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
399 let tasks: Vec<_> = paths
400 .into_iter()
401 .map(|path| {
402 self.request(
403 dap_command::SetBreakpoints {
404 source: client_source(path),
405 source_modified: None,
406 breakpoints: vec![],
407 },
408 cx.background_executor().clone(),
409 )
410 })
411 .collect();
412
413 cx.background_spawn(async move {
414 futures::future::join_all(tasks)
415 .await
416 .iter()
417 .for_each(|res| match res {
418 Ok(_) => {}
419 Err(err) => {
420 log::warn!("Set breakpoints request failed: {}", err);
421 }
422 });
423 })
424 }
425
426 fn send_breakpoints_from_path(
427 &self,
428 abs_path: Arc<Path>,
429 reason: BreakpointUpdatedReason,
430 cx: &mut App,
431 ) -> Task<()> {
432 let breakpoints = self
433 .breakpoint_store
434 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
435 .into_iter()
436 .filter(|bp| bp.state.is_enabled())
437 .map(Into::into)
438 .collect();
439
440 let task = self.request(
441 dap_command::SetBreakpoints {
442 source: client_source(&abs_path),
443 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
444 breakpoints,
445 },
446 cx.background_executor().clone(),
447 );
448
449 cx.background_spawn(async move {
450 match task.await {
451 Ok(_) => {}
452 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
453 }
454 })
455 }
456
457 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
458 let mut breakpoint_tasks = Vec::new();
459 let breakpoints = self
460 .breakpoint_store
461 .read_with(cx, |store, cx| store.all_breakpoints(cx));
462
463 for (path, breakpoints) in breakpoints {
464 let breakpoints = if ignore_breakpoints {
465 vec![]
466 } else {
467 breakpoints
468 .into_iter()
469 .filter(|bp| bp.state.is_enabled())
470 .map(Into::into)
471 .collect()
472 };
473
474 breakpoint_tasks.push(self.request(
475 dap_command::SetBreakpoints {
476 source: client_source(&path),
477 source_modified: Some(false),
478 breakpoints,
479 },
480 cx.background_executor().clone(),
481 ));
482 }
483
484 cx.background_spawn(async move {
485 futures::future::join_all(breakpoint_tasks)
486 .await
487 .iter()
488 .for_each(|res| match res {
489 Ok(_) => {}
490 Err(err) => {
491 log::warn!("Set breakpoints request failed: {}", err);
492 }
493 });
494 })
495 }
496
497 async fn get_adapter_binary(
498 registry: &Arc<DapRegistry>,
499 config: &DebugAdapterConfig,
500 delegate: &DapAdapterDelegate,
501 cx: &mut AsyncApp,
502 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
503 let adapter = registry
504 .adapter(&config.adapter)
505 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
506
507 let binary = cx.update(|cx| {
508 ProjectSettings::get_global(cx)
509 .dap
510 .get(&adapter.name())
511 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
512 })?;
513
514 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
515 Err(error) => {
516 delegate.update_status(
517 adapter.name(),
518 DapStatus::Failed {
519 error: error.to_string(),
520 },
521 );
522
523 return Err(error);
524 }
525 Ok(mut binary) => {
526 delegate.update_status(adapter.name(), DapStatus::None);
527
528 let shell_env = delegate.shell_env().await;
529 let mut envs = binary.envs.unwrap_or_default();
530 envs.extend(shell_env);
531 binary.envs = Some(envs);
532
533 binary
534 }
535 };
536
537 Ok((adapter, binary))
538 }
539
540 pub fn label(&self) -> String {
541 self.config.label.clone()
542 }
543
544 fn initialize_sequence(
545 &self,
546 capabilities: &Capabilities,
547 initialized_rx: oneshot::Receiver<()>,
548 cx: &App,
549 ) -> Task<Result<()>> {
550 let (mut raw, is_launch) = match &self.config.request {
551 task::DebugRequestDisposition::UserConfigured(_) => {
552 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
553 debug_assert!(false, "This part of code should be unreachable in practice");
554 return Task::ready(Err(anyhow!(
555 "Expected debug config conversion to succeed"
556 )));
557 };
558 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
559 let raw = self.adapter.request_args(&raw);
560 (raw, is_launch)
561 }
562 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
563 start_debugging_request_arguments.configuration.clone(),
564 matches!(
565 start_debugging_request_arguments.request,
566 dap::StartDebuggingRequestArgumentsRequest::Launch
567 ),
568 ),
569 };
570
571 merge_json_value_into(
572 self.config.initialize_args.clone().unwrap_or(json!({})),
573 &mut raw,
574 );
575 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
576 let launch = if is_launch {
577 self.request(Launch { raw }, cx.background_executor().clone())
578 } else {
579 self.request(Attach { raw }, cx.background_executor().clone())
580 };
581
582 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
583
584 let configuration_sequence = cx.spawn({
585 let this = self.clone();
586 async move |cx| {
587 initialized_rx.await?;
588 // todo(debugger) figure out if we want to handle a breakpoint response error
589 // This will probably consist of letting a user know that breakpoints failed to be set
590 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
591
592 if configuration_done_supported {
593 this.request(ConfigurationDone, cx.background_executor().clone())
594 } else {
595 Task::ready(Ok(()))
596 }
597 .await
598 }
599 });
600
601 cx.background_spawn(async move {
602 futures::future::try_join(launch, configuration_sequence).await?;
603 Ok(())
604 })
605 }
606
607 fn request<R: LocalDapCommand>(
608 &self,
609 request: R,
610 executor: BackgroundExecutor,
611 ) -> Task<Result<R::Response>>
612 where
613 <R::DapRequest as dap::requests::Request>::Response: 'static,
614 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
615 {
616 let request = Arc::new(request);
617
618 let request_clone = request.clone();
619 let connection = self.client.clone();
620 let request_task = executor.spawn(async move {
621 let args = request_clone.to_dap();
622 connection.request::<R::DapRequest>(args).await
623 });
624
625 executor.spawn(async move {
626 let response = request.response_from_dap(request_task.await?);
627 response
628 })
629 }
630}
631impl From<RemoteConnection> for Mode {
632 fn from(value: RemoteConnection) -> Self {
633 Self::Remote(value)
634 }
635}
636
637impl Mode {
638 fn request_dap<R: DapCommand>(
639 &self,
640 session_id: SessionId,
641 request: R,
642 cx: &mut Context<Session>,
643 ) -> Task<Result<R::Response>>
644 where
645 <R::DapRequest as dap::requests::Request>::Response: 'static,
646 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
647 {
648 match self {
649 Mode::Local(debug_adapter_client) => {
650 debug_adapter_client.request(request, cx.background_executor().clone())
651 }
652 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
653 }
654 }
655}
656
657#[derive(Default)]
658struct ThreadStates {
659 global_state: Option<ThreadStatus>,
660 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
661}
662
663impl ThreadStates {
664 fn stop_all_threads(&mut self) {
665 self.global_state = Some(ThreadStatus::Stopped);
666 self.known_thread_states.clear();
667 }
668
669 fn exit_all_threads(&mut self) {
670 self.global_state = Some(ThreadStatus::Exited);
671 self.known_thread_states.clear();
672 }
673
674 fn continue_all_threads(&mut self) {
675 self.global_state = Some(ThreadStatus::Running);
676 self.known_thread_states.clear();
677 }
678
679 fn stop_thread(&mut self, thread_id: ThreadId) {
680 self.known_thread_states
681 .insert(thread_id, ThreadStatus::Stopped);
682 }
683
684 fn continue_thread(&mut self, thread_id: ThreadId) {
685 self.known_thread_states
686 .insert(thread_id, ThreadStatus::Running);
687 }
688
689 fn process_step(&mut self, thread_id: ThreadId) {
690 self.known_thread_states
691 .insert(thread_id, ThreadStatus::Stepping);
692 }
693
694 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
695 self.thread_state(thread_id)
696 .unwrap_or(ThreadStatus::Running)
697 }
698
699 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
700 self.known_thread_states
701 .get(&thread_id)
702 .copied()
703 .or(self.global_state)
704 }
705
706 fn exit_thread(&mut self, thread_id: ThreadId) {
707 self.known_thread_states
708 .insert(thread_id, ThreadStatus::Exited);
709 }
710
711 fn any_stopped_thread(&self) -> bool {
712 self.global_state
713 .is_some_and(|state| state == ThreadStatus::Stopped)
714 || self
715 .known_thread_states
716 .values()
717 .any(|status| *status == ThreadStatus::Stopped)
718 }
719}
720const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
721
722#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
723pub struct OutputToken(pub usize);
724/// Represents a current state of a single debug adapter and provides ways to mutate it.
725pub struct Session {
726 mode: Mode,
727 pub(super) capabilities: Capabilities,
728 id: SessionId,
729 child_session_ids: HashSet<SessionId>,
730 parent_id: Option<SessionId>,
731 ignore_breakpoints: bool,
732 modules: Vec<dap::Module>,
733 loaded_sources: Vec<dap::Source>,
734 output_token: OutputToken,
735 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
736 threads: IndexMap<ThreadId, Thread>,
737 thread_states: ThreadStates,
738 variables: HashMap<VariableReference, Vec<dap::Variable>>,
739 stack_frames: IndexMap<StackFrameId, StackFrame>,
740 locations: HashMap<u64, dap::LocationsResponse>,
741 is_session_terminated: bool,
742 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
743 _background_tasks: Vec<Task<()>>,
744}
745
746trait CacheableCommand: 'static + Send + Sync {
747 fn as_any(&self) -> &dyn Any;
748 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
749 fn dyn_hash(&self, hasher: &mut dyn Hasher);
750 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
751}
752
753impl<T> CacheableCommand for T
754where
755 T: DapCommand + PartialEq + Eq + Hash,
756{
757 fn as_any(&self) -> &dyn Any {
758 self
759 }
760
761 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
762 rhs.as_any()
763 .downcast_ref::<Self>()
764 .map_or(false, |rhs| self == rhs)
765 }
766
767 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
768 T::hash(self, &mut hasher);
769 }
770
771 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
772 self
773 }
774}
775
776pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
777
778impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
779 fn from(request: T) -> Self {
780 Self(Arc::new(request))
781 }
782}
783
784impl PartialEq for RequestSlot {
785 fn eq(&self, other: &Self) -> bool {
786 self.0.dyn_eq(other.0.as_ref())
787 }
788}
789
790impl Eq for RequestSlot {}
791
792impl Hash for RequestSlot {
793 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
794 self.0.dyn_hash(state);
795 self.0.as_any().type_id().hash(state)
796 }
797}
798
799#[derive(Debug, Clone, Hash, PartialEq, Eq)]
800pub struct CompletionsQuery {
801 pub query: String,
802 pub column: u64,
803 pub line: Option<u64>,
804 pub frame_id: Option<u64>,
805}
806
807impl CompletionsQuery {
808 pub fn new(
809 buffer: &language::Buffer,
810 cursor_position: language::Anchor,
811 frame_id: Option<u64>,
812 ) -> Self {
813 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
814 Self {
815 query: buffer.text(),
816 column: column as u64,
817 frame_id,
818 line: Some(row as u64),
819 }
820 }
821}
822
823pub enum SessionEvent {
824 Modules,
825 LoadedSources,
826 Stopped(Option<ThreadId>),
827 StackTrace,
828 Variables,
829 Threads,
830}
831
832impl EventEmitter<SessionEvent> for Session {}
833
834// local session will send breakpoint updates to DAP for all new breakpoints
835// remote side will only send breakpoint updates when it is a breakpoint created by that peer
836// BreakpointStore notifies session on breakpoint changes
837impl Session {
838 pub(crate) fn local(
839 breakpoint_store: Entity<BreakpointStore>,
840 session_id: SessionId,
841 parent_session: Option<Entity<Session>>,
842 delegate: DapAdapterDelegate,
843 config: DebugAdapterConfig,
844 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
845 initialized_tx: oneshot::Sender<()>,
846 debug_adapters: Arc<DapRegistry>,
847 cx: &mut App,
848 ) -> Task<Result<Entity<Self>>> {
849 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
850
851 cx.spawn(async move |cx| {
852 let (mode, capabilities) = LocalMode::new(
853 debug_adapters,
854 session_id,
855 parent_session.clone(),
856 breakpoint_store.clone(),
857 config.clone(),
858 delegate,
859 message_tx,
860 cx.clone(),
861 )
862 .await?;
863
864 cx.new(|cx| {
865 create_local_session(
866 breakpoint_store,
867 session_id,
868 parent_session,
869 start_debugging_requests_tx,
870 initialized_tx,
871 message_rx,
872 mode,
873 capabilities,
874 cx,
875 )
876 })
877 })
878 }
879
880 #[cfg(any(test, feature = "test-support"))]
881 pub(crate) fn fake(
882 breakpoint_store: Entity<BreakpointStore>,
883 session_id: SessionId,
884 parent_session: Option<Entity<Session>>,
885 delegate: DapAdapterDelegate,
886 config: DebugAdapterConfig,
887 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
888 initialized_tx: oneshot::Sender<()>,
889 caps: Capabilities,
890 fails: bool,
891 cx: &mut App,
892 ) -> Task<Result<Entity<Session>>> {
893 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
894
895 cx.spawn(async move |cx| {
896 let (mode, capabilities) = LocalMode::new_fake(
897 session_id,
898 parent_session.clone(),
899 breakpoint_store.clone(),
900 config.clone(),
901 delegate,
902 message_tx,
903 caps,
904 fails,
905 cx.clone(),
906 )
907 .await?;
908
909 cx.new(|cx| {
910 create_local_session(
911 breakpoint_store,
912 session_id,
913 parent_session,
914 start_debugging_requests_tx,
915 initialized_tx,
916 message_rx,
917 mode,
918 capabilities,
919 cx,
920 )
921 })
922 })
923 }
924
925 pub(crate) fn remote(
926 session_id: SessionId,
927 client: AnyProtoClient,
928 upstream_project_id: u64,
929 ignore_breakpoints: bool,
930 ) -> Self {
931 Self {
932 mode: Mode::Remote(RemoteConnection {
933 _client: client,
934 _upstream_project_id: upstream_project_id,
935 }),
936 id: session_id,
937 child_session_ids: HashSet::default(),
938 parent_id: None,
939 capabilities: Capabilities::default(),
940 ignore_breakpoints,
941 variables: Default::default(),
942 stack_frames: Default::default(),
943 thread_states: ThreadStates::default(),
944 output_token: OutputToken(0),
945 output: circular_buffer::CircularBuffer::boxed(),
946 requests: HashMap::default(),
947 modules: Vec::default(),
948 loaded_sources: Vec::default(),
949 threads: IndexMap::default(),
950 _background_tasks: Vec::default(),
951 locations: Default::default(),
952 is_session_terminated: false,
953 }
954 }
955
956 pub fn session_id(&self) -> SessionId {
957 self.id
958 }
959
960 pub fn child_session_ids(&self) -> HashSet<SessionId> {
961 self.child_session_ids.clone()
962 }
963
964 pub fn add_child_session_id(&mut self, session_id: SessionId) {
965 self.child_session_ids.insert(session_id);
966 }
967
968 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
969 self.child_session_ids.remove(&session_id);
970 }
971
972 pub fn parent_id(&self) -> Option<SessionId> {
973 self.parent_id
974 }
975
976 pub fn capabilities(&self) -> &Capabilities {
977 &self.capabilities
978 }
979
980 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
981 if let Mode::Local(local_mode) = &self.mode {
982 Some(local_mode.config.clone())
983 } else {
984 None
985 }
986 }
987
988 pub fn is_terminated(&self) -> bool {
989 self.is_session_terminated
990 }
991
992 pub fn is_local(&self) -> bool {
993 matches!(self.mode, Mode::Local(_))
994 }
995
996 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
997 match &mut self.mode {
998 Mode::Local(local_mode) => Some(local_mode),
999 Mode::Remote(_) => None,
1000 }
1001 }
1002
1003 pub fn as_local(&self) -> Option<&LocalMode> {
1004 match &self.mode {
1005 Mode::Local(local_mode) => Some(local_mode),
1006 Mode::Remote(_) => None,
1007 }
1008 }
1009
1010 pub(super) fn initialize_sequence(
1011 &mut self,
1012 initialize_rx: oneshot::Receiver<()>,
1013 cx: &mut Context<Self>,
1014 ) -> Task<Result<()>> {
1015 match &self.mode {
1016 Mode::Local(local_mode) => {
1017 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1018 }
1019 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1020 }
1021 }
1022
1023 pub fn output(
1024 &self,
1025 since: OutputToken,
1026 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1027 if self.output_token.0 == 0 {
1028 return (self.output.range(0..0), OutputToken(0));
1029 };
1030
1031 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1032
1033 let clamped_events_since = events_since.clamp(0, self.output.len());
1034 (
1035 self.output
1036 .range(self.output.len() - clamped_events_since..),
1037 self.output_token,
1038 )
1039 }
1040
1041 pub fn respond_to_client(
1042 &self,
1043 request_seq: u64,
1044 success: bool,
1045 command: String,
1046 body: Option<serde_json::Value>,
1047 cx: &mut Context<Self>,
1048 ) -> Task<Result<()>> {
1049 let Some(local_session) = self.as_local().cloned() else {
1050 unreachable!("Cannot respond to remote client");
1051 };
1052
1053 cx.background_spawn(async move {
1054 local_session
1055 .client
1056 .send_message(Message::Response(Response {
1057 body,
1058 success,
1059 command,
1060 seq: request_seq + 1,
1061 request_seq,
1062 message: None,
1063 }))
1064 .await
1065 })
1066 }
1067
1068 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1069 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1070 self.thread_states.stop_all_threads();
1071
1072 self.invalidate_command_type::<StackTraceCommand>();
1073 }
1074
1075 // Event if we stopped all threads we still need to insert the thread_id
1076 // to our own data
1077 if let Some(thread_id) = event.thread_id {
1078 self.thread_states.stop_thread(ThreadId(thread_id));
1079
1080 self.invalidate_state(
1081 &StackTraceCommand {
1082 thread_id,
1083 start_frame: None,
1084 levels: None,
1085 }
1086 .into(),
1087 );
1088 }
1089
1090 self.invalidate_generic();
1091 self.threads.clear();
1092 self.variables.clear();
1093 cx.emit(SessionEvent::Stopped(
1094 event
1095 .thread_id
1096 .map(Into::into)
1097 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1098 ));
1099 cx.notify();
1100 }
1101
1102 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1103 match *event {
1104 Events::Initialized(_) => {
1105 debug_assert!(
1106 false,
1107 "Initialized event should have been handled in LocalMode"
1108 );
1109 }
1110 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1111 Events::Continued(event) => {
1112 if event.all_threads_continued.unwrap_or_default() {
1113 self.thread_states.continue_all_threads();
1114 } else {
1115 self.thread_states
1116 .continue_thread(ThreadId(event.thread_id));
1117 }
1118 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1119 self.invalidate_generic();
1120 }
1121 Events::Exited(_event) => {
1122 self.clear_active_debug_line(cx);
1123 }
1124 Events::Terminated(_) => {
1125 self.is_session_terminated = true;
1126 self.clear_active_debug_line(cx);
1127 }
1128 Events::Thread(event) => {
1129 let thread_id = ThreadId(event.thread_id);
1130
1131 match event.reason {
1132 dap::ThreadEventReason::Started => {
1133 self.thread_states.continue_thread(thread_id);
1134 }
1135 dap::ThreadEventReason::Exited => {
1136 self.thread_states.exit_thread(thread_id);
1137 }
1138 reason => {
1139 log::error!("Unhandled thread event reason {:?}", reason);
1140 }
1141 }
1142 self.invalidate_state(&ThreadsCommand.into());
1143 cx.notify();
1144 }
1145 Events::Output(event) => {
1146 if event
1147 .category
1148 .as_ref()
1149 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1150 {
1151 return;
1152 }
1153
1154 self.output.push_back(event);
1155 self.output_token.0 += 1;
1156 cx.notify();
1157 }
1158 Events::Breakpoint(_) => {}
1159 Events::Module(event) => {
1160 match event.reason {
1161 dap::ModuleEventReason::New => {
1162 self.modules.push(event.module);
1163 }
1164 dap::ModuleEventReason::Changed => {
1165 if let Some(module) = self
1166 .modules
1167 .iter_mut()
1168 .find(|other| event.module.id == other.id)
1169 {
1170 *module = event.module;
1171 }
1172 }
1173 dap::ModuleEventReason::Removed => {
1174 self.modules.retain(|other| event.module.id != other.id);
1175 }
1176 }
1177
1178 // todo(debugger): We should only send the invalidate command to downstream clients.
1179 // self.invalidate_state(&ModulesCommand.into());
1180 }
1181 Events::LoadedSource(_) => {
1182 self.invalidate_state(&LoadedSourcesCommand.into());
1183 }
1184 Events::Capabilities(event) => {
1185 self.capabilities = self.capabilities.merge(event.capabilities);
1186 cx.notify();
1187 }
1188 Events::Memory(_) => {}
1189 Events::Process(_) => {}
1190 Events::ProgressEnd(_) => {}
1191 Events::ProgressStart(_) => {}
1192 Events::ProgressUpdate(_) => {}
1193 Events::Invalidated(_) => {}
1194 Events::Other(_) => {}
1195 }
1196 }
1197
1198 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1199 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1200 &mut self,
1201 request: T,
1202 process_result: impl FnOnce(
1203 &mut Self,
1204 Result<T::Response>,
1205 &mut Context<Self>,
1206 ) -> Option<T::Response>
1207 + 'static,
1208 cx: &mut Context<Self>,
1209 ) {
1210 const {
1211 assert!(
1212 T::CACHEABLE,
1213 "Only requests marked as cacheable should invoke `fetch`"
1214 );
1215 }
1216
1217 if !self.thread_states.any_stopped_thread()
1218 && request.type_id() != TypeId::of::<ThreadsCommand>()
1219 || self.is_session_terminated
1220 {
1221 return;
1222 }
1223
1224 let request_map = self
1225 .requests
1226 .entry(std::any::TypeId::of::<T>())
1227 .or_default();
1228
1229 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1230 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1231
1232 let task = Self::request_inner::<Arc<T>>(
1233 &self.capabilities,
1234 self.id,
1235 &self.mode,
1236 command,
1237 process_result,
1238 cx,
1239 );
1240 let task = cx
1241 .background_executor()
1242 .spawn(async move {
1243 let _ = task.await?;
1244 Some(())
1245 })
1246 .shared();
1247
1248 vacant.insert(task);
1249 cx.notify();
1250 }
1251 }
1252
1253 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1254 capabilities: &Capabilities,
1255 session_id: SessionId,
1256 mode: &Mode,
1257 request: T,
1258 process_result: impl FnOnce(
1259 &mut Self,
1260 Result<T::Response>,
1261 &mut Context<Self>,
1262 ) -> Option<T::Response>
1263 + 'static,
1264 cx: &mut Context<Self>,
1265 ) -> Task<Option<T::Response>> {
1266 if !T::is_supported(&capabilities) {
1267 log::warn!(
1268 "Attempted to send a DAP request that isn't supported: {:?}",
1269 request
1270 );
1271 let error = Err(anyhow::Error::msg(
1272 "Couldn't complete request because it's not supported",
1273 ));
1274 return cx.spawn(async move |this, cx| {
1275 this.update(cx, |this, cx| process_result(this, error, cx))
1276 .log_err()
1277 .flatten()
1278 });
1279 }
1280
1281 let request = mode.request_dap(session_id, request, cx);
1282 cx.spawn(async move |this, cx| {
1283 let result = request.await;
1284 this.update(cx, |this, cx| process_result(this, result, cx))
1285 .log_err()
1286 .flatten()
1287 })
1288 }
1289
1290 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1291 &self,
1292 request: T,
1293 process_result: impl FnOnce(
1294 &mut Self,
1295 Result<T::Response>,
1296 &mut Context<Self>,
1297 ) -> Option<T::Response>
1298 + 'static,
1299 cx: &mut Context<Self>,
1300 ) -> Task<Option<T::Response>> {
1301 Self::request_inner(
1302 &self.capabilities,
1303 self.id,
1304 &self.mode,
1305 request,
1306 process_result,
1307 cx,
1308 )
1309 }
1310
1311 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1312 self.requests.remove(&std::any::TypeId::of::<Command>());
1313 }
1314
1315 fn invalidate_generic(&mut self) {
1316 self.invalidate_command_type::<ModulesCommand>();
1317 self.invalidate_command_type::<LoadedSourcesCommand>();
1318 self.invalidate_command_type::<ThreadsCommand>();
1319 }
1320
1321 fn invalidate_state(&mut self, key: &RequestSlot) {
1322 self.requests
1323 .entry(key.0.as_any().type_id())
1324 .and_modify(|request_map| {
1325 request_map.remove(&key);
1326 });
1327 }
1328
1329 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1330 self.thread_states.thread_status(thread_id)
1331 }
1332
1333 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1334 self.fetch(
1335 dap_command::ThreadsCommand,
1336 |this, result, cx| {
1337 let result = result.log_err()?;
1338
1339 this.threads = result
1340 .iter()
1341 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1342 .collect();
1343
1344 this.invalidate_command_type::<StackTraceCommand>();
1345 cx.emit(SessionEvent::Threads);
1346 cx.notify();
1347
1348 Some(result)
1349 },
1350 cx,
1351 );
1352
1353 self.threads
1354 .values()
1355 .map(|thread| {
1356 (
1357 thread.dap.clone(),
1358 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1359 )
1360 })
1361 .collect()
1362 }
1363
1364 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1365 self.fetch(
1366 dap_command::ModulesCommand,
1367 |this, result, cx| {
1368 let result = result.log_err()?;
1369
1370 this.modules = result.iter().cloned().collect();
1371 cx.emit(SessionEvent::Modules);
1372 cx.notify();
1373
1374 Some(result)
1375 },
1376 cx,
1377 );
1378
1379 &self.modules
1380 }
1381
1382 pub fn ignore_breakpoints(&self) -> bool {
1383 self.ignore_breakpoints
1384 }
1385
1386 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1387 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1388 }
1389
1390 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1391 if self.ignore_breakpoints == ignore {
1392 return Task::ready(());
1393 }
1394
1395 self.ignore_breakpoints = ignore;
1396
1397 if let Some(local) = self.as_local() {
1398 local.send_all_breakpoints(ignore, cx)
1399 } else {
1400 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1401 unimplemented!()
1402 }
1403 }
1404
1405 pub fn breakpoints_enabled(&self) -> bool {
1406 self.ignore_breakpoints
1407 }
1408
1409 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1410 self.fetch(
1411 dap_command::LoadedSourcesCommand,
1412 |this, result, cx| {
1413 let result = result.log_err()?;
1414 this.loaded_sources = result.iter().cloned().collect();
1415 cx.emit(SessionEvent::LoadedSources);
1416 cx.notify();
1417 Some(result)
1418 },
1419 cx,
1420 );
1421
1422 &self.loaded_sources
1423 }
1424
1425 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1426 res.log_err()?;
1427 Some(())
1428 }
1429
1430 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1431 thread_id: ThreadId,
1432 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1433 {
1434 move |this, response, cx| match response.log_err() {
1435 Some(response) => Some(response),
1436 None => {
1437 this.thread_states.stop_thread(thread_id);
1438 cx.notify();
1439 None
1440 }
1441 }
1442 }
1443
1444 fn clear_active_debug_line_response(
1445 &mut self,
1446 response: Result<()>,
1447 cx: &mut Context<Session>,
1448 ) -> Option<()> {
1449 response.log_err()?;
1450 self.clear_active_debug_line(cx);
1451 Some(())
1452 }
1453
1454 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1455 self.as_local()
1456 .expect("Message handler will only run in local mode")
1457 .breakpoint_store
1458 .update(cx, |store, cx| {
1459 store.remove_active_position(Some(self.id), cx)
1460 });
1461 }
1462
1463 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1464 self.request(
1465 PauseCommand {
1466 thread_id: thread_id.0,
1467 },
1468 Self::empty_response,
1469 cx,
1470 )
1471 .detach();
1472 }
1473
1474 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1475 self.request(
1476 RestartStackFrameCommand { stack_frame_id },
1477 Self::empty_response,
1478 cx,
1479 )
1480 .detach();
1481 }
1482
1483 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1484 if self.capabilities.supports_restart_request.unwrap_or(false) {
1485 self.request(
1486 RestartCommand {
1487 raw: args.unwrap_or(Value::Null),
1488 },
1489 Self::empty_response,
1490 cx,
1491 )
1492 .detach();
1493 } else {
1494 self.request(
1495 DisconnectCommand {
1496 restart: Some(false),
1497 terminate_debuggee: Some(true),
1498 suspend_debuggee: Some(false),
1499 },
1500 Self::empty_response,
1501 cx,
1502 )
1503 .detach();
1504 }
1505 }
1506
1507 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1508 self.is_session_terminated = true;
1509 self.thread_states.exit_all_threads();
1510 cx.notify();
1511
1512 let task = if self
1513 .capabilities
1514 .supports_terminate_request
1515 .unwrap_or_default()
1516 {
1517 self.request(
1518 TerminateCommand {
1519 restart: Some(false),
1520 },
1521 Self::clear_active_debug_line_response,
1522 cx,
1523 )
1524 } else {
1525 self.request(
1526 DisconnectCommand {
1527 restart: Some(false),
1528 terminate_debuggee: Some(true),
1529 suspend_debuggee: Some(false),
1530 },
1531 Self::clear_active_debug_line_response,
1532 cx,
1533 )
1534 };
1535
1536 cx.background_spawn(async move {
1537 let _ = task.await;
1538 })
1539 }
1540
1541 pub fn completions(
1542 &mut self,
1543 query: CompletionsQuery,
1544 cx: &mut Context<Self>,
1545 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1546 let task = self.request(query, |_, result, _| result.log_err(), cx);
1547
1548 cx.background_executor().spawn(async move {
1549 anyhow::Ok(
1550 task.await
1551 .map(|response| response.targets)
1552 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1553 )
1554 })
1555 }
1556
1557 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1558 self.thread_states.continue_thread(thread_id);
1559 self.request(
1560 ContinueCommand {
1561 args: ContinueArguments {
1562 thread_id: thread_id.0,
1563 single_thread: Some(true),
1564 },
1565 },
1566 Self::on_step_response::<ContinueCommand>(thread_id),
1567 cx,
1568 )
1569 .detach();
1570 }
1571
1572 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1573 match self.mode {
1574 Mode::Local(ref local) => Some(local.client.clone()),
1575 Mode::Remote(_) => None,
1576 }
1577 }
1578
1579 pub fn step_over(
1580 &mut self,
1581 thread_id: ThreadId,
1582 granularity: SteppingGranularity,
1583 cx: &mut Context<Self>,
1584 ) {
1585 let supports_single_thread_execution_requests =
1586 self.capabilities.supports_single_thread_execution_requests;
1587 let supports_stepping_granularity = self
1588 .capabilities
1589 .supports_stepping_granularity
1590 .unwrap_or_default();
1591
1592 let command = NextCommand {
1593 inner: StepCommand {
1594 thread_id: thread_id.0,
1595 granularity: supports_stepping_granularity.then(|| granularity),
1596 single_thread: supports_single_thread_execution_requests,
1597 },
1598 };
1599
1600 self.thread_states.process_step(thread_id);
1601 self.request(
1602 command,
1603 Self::on_step_response::<NextCommand>(thread_id),
1604 cx,
1605 )
1606 .detach();
1607 }
1608
1609 pub fn step_in(
1610 &mut self,
1611 thread_id: ThreadId,
1612 granularity: SteppingGranularity,
1613 cx: &mut Context<Self>,
1614 ) {
1615 let supports_single_thread_execution_requests =
1616 self.capabilities.supports_single_thread_execution_requests;
1617 let supports_stepping_granularity = self
1618 .capabilities
1619 .supports_stepping_granularity
1620 .unwrap_or_default();
1621
1622 let command = StepInCommand {
1623 inner: StepCommand {
1624 thread_id: thread_id.0,
1625 granularity: supports_stepping_granularity.then(|| granularity),
1626 single_thread: supports_single_thread_execution_requests,
1627 },
1628 };
1629
1630 self.thread_states.process_step(thread_id);
1631 self.request(
1632 command,
1633 Self::on_step_response::<StepInCommand>(thread_id),
1634 cx,
1635 )
1636 .detach();
1637 }
1638
1639 pub fn step_out(
1640 &mut self,
1641 thread_id: ThreadId,
1642 granularity: SteppingGranularity,
1643 cx: &mut Context<Self>,
1644 ) {
1645 let supports_single_thread_execution_requests =
1646 self.capabilities.supports_single_thread_execution_requests;
1647 let supports_stepping_granularity = self
1648 .capabilities
1649 .supports_stepping_granularity
1650 .unwrap_or_default();
1651
1652 let command = StepOutCommand {
1653 inner: StepCommand {
1654 thread_id: thread_id.0,
1655 granularity: supports_stepping_granularity.then(|| granularity),
1656 single_thread: supports_single_thread_execution_requests,
1657 },
1658 };
1659
1660 self.thread_states.process_step(thread_id);
1661 self.request(
1662 command,
1663 Self::on_step_response::<StepOutCommand>(thread_id),
1664 cx,
1665 )
1666 .detach();
1667 }
1668
1669 pub fn step_back(
1670 &mut self,
1671 thread_id: ThreadId,
1672 granularity: SteppingGranularity,
1673 cx: &mut Context<Self>,
1674 ) {
1675 let supports_single_thread_execution_requests =
1676 self.capabilities.supports_single_thread_execution_requests;
1677 let supports_stepping_granularity = self
1678 .capabilities
1679 .supports_stepping_granularity
1680 .unwrap_or_default();
1681
1682 let command = StepBackCommand {
1683 inner: StepCommand {
1684 thread_id: thread_id.0,
1685 granularity: supports_stepping_granularity.then(|| granularity),
1686 single_thread: supports_single_thread_execution_requests,
1687 },
1688 };
1689
1690 self.thread_states.process_step(thread_id);
1691
1692 self.request(
1693 command,
1694 Self::on_step_response::<StepBackCommand>(thread_id),
1695 cx,
1696 )
1697 .detach();
1698 }
1699
1700 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1701 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1702 && self.requests.contains_key(&ThreadsCommand.type_id())
1703 && self.threads.contains_key(&thread_id)
1704 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1705 // We could still be using an old thread id and have sent a new thread's request
1706 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1707 // But it very well could cause a minor bug in the future that is hard to track down
1708 {
1709 self.fetch(
1710 super::dap_command::StackTraceCommand {
1711 thread_id: thread_id.0,
1712 start_frame: None,
1713 levels: None,
1714 },
1715 move |this, stack_frames, cx| {
1716 let stack_frames = stack_frames.log_err()?;
1717
1718 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1719 thread.stack_frame_ids =
1720 stack_frames.iter().map(|frame| frame.id).collect();
1721 });
1722 debug_assert!(
1723 matches!(entry, indexmap::map::Entry::Occupied(_)),
1724 "Sent request for thread_id that doesn't exist"
1725 );
1726
1727 this.stack_frames.extend(
1728 stack_frames
1729 .iter()
1730 .cloned()
1731 .map(|frame| (frame.id, StackFrame::from(frame))),
1732 );
1733
1734 this.invalidate_command_type::<ScopesCommand>();
1735 this.invalidate_command_type::<VariablesCommand>();
1736
1737 cx.emit(SessionEvent::StackTrace);
1738 cx.notify();
1739 Some(stack_frames)
1740 },
1741 cx,
1742 );
1743 }
1744
1745 self.threads
1746 .get(&thread_id)
1747 .map(|thread| {
1748 thread
1749 .stack_frame_ids
1750 .iter()
1751 .filter_map(|id| self.stack_frames.get(id))
1752 .cloned()
1753 .collect()
1754 })
1755 .unwrap_or_default()
1756 }
1757
1758 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1759 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1760 && self
1761 .requests
1762 .contains_key(&TypeId::of::<StackTraceCommand>())
1763 {
1764 self.fetch(
1765 ScopesCommand { stack_frame_id },
1766 move |this, scopes, cx| {
1767 let scopes = scopes.log_err()?;
1768
1769 for scope in scopes .iter(){
1770 this.variables(scope.variables_reference, cx);
1771 }
1772
1773 let entry = this
1774 .stack_frames
1775 .entry(stack_frame_id)
1776 .and_modify(|stack_frame| {
1777 stack_frame.scopes = scopes.clone();
1778 });
1779
1780 cx.emit(SessionEvent::Variables);
1781
1782 debug_assert!(
1783 matches!(entry, indexmap::map::Entry::Occupied(_)),
1784 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1785 );
1786
1787 Some(scopes)
1788 },
1789 cx,
1790 );
1791 }
1792
1793 self.stack_frames
1794 .get(&stack_frame_id)
1795 .map(|frame| frame.scopes.as_slice())
1796 .unwrap_or_default()
1797 }
1798
1799 pub fn variables(
1800 &mut self,
1801 variables_reference: VariableReference,
1802 cx: &mut Context<Self>,
1803 ) -> Vec<dap::Variable> {
1804 let command = VariablesCommand {
1805 variables_reference,
1806 filter: None,
1807 start: None,
1808 count: None,
1809 format: None,
1810 };
1811
1812 self.fetch(
1813 command,
1814 move |this, variables, cx| {
1815 let variables = variables.log_err()?;
1816 this.variables
1817 .insert(variables_reference, variables.clone());
1818
1819 cx.emit(SessionEvent::Variables);
1820 Some(variables)
1821 },
1822 cx,
1823 );
1824
1825 self.variables
1826 .get(&variables_reference)
1827 .cloned()
1828 .unwrap_or_default()
1829 }
1830
1831 pub fn set_variable_value(
1832 &mut self,
1833 variables_reference: u64,
1834 name: String,
1835 value: String,
1836 cx: &mut Context<Self>,
1837 ) {
1838 if self.capabilities.supports_set_variable.unwrap_or_default() {
1839 self.request(
1840 SetVariableValueCommand {
1841 name,
1842 value,
1843 variables_reference,
1844 },
1845 move |this, response, cx| {
1846 let response = response.log_err()?;
1847 this.invalidate_command_type::<VariablesCommand>();
1848 cx.notify();
1849 Some(response)
1850 },
1851 cx,
1852 )
1853 .detach()
1854 }
1855 }
1856
1857 pub fn evaluate(
1858 &mut self,
1859 expression: String,
1860 context: Option<EvaluateArgumentsContext>,
1861 frame_id: Option<u64>,
1862 source: Option<Source>,
1863 cx: &mut Context<Self>,
1864 ) {
1865 self.request(
1866 EvaluateCommand {
1867 expression,
1868 context,
1869 frame_id,
1870 source,
1871 },
1872 |this, response, cx| {
1873 let response = response.log_err()?;
1874 this.output_token.0 += 1;
1875 this.output.push_back(dap::OutputEvent {
1876 category: None,
1877 output: response.result.clone(),
1878 group: None,
1879 variables_reference: Some(response.variables_reference),
1880 source: None,
1881 line: None,
1882 column: None,
1883 data: None,
1884 location_reference: None,
1885 });
1886
1887 this.invalidate_command_type::<ScopesCommand>();
1888 cx.notify();
1889 Some(response)
1890 },
1891 cx,
1892 )
1893 .detach();
1894 }
1895
1896 pub fn location(
1897 &mut self,
1898 reference: u64,
1899 cx: &mut Context<Self>,
1900 ) -> Option<dap::LocationsResponse> {
1901 self.fetch(
1902 LocationsCommand { reference },
1903 move |this, response, _| {
1904 let response = response.log_err()?;
1905 this.locations.insert(reference, response.clone());
1906 Some(response)
1907 },
1908 cx,
1909 );
1910 self.locations.get(&reference).cloned()
1911 }
1912 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1913 let command = DisconnectCommand {
1914 restart: Some(false),
1915 terminate_debuggee: Some(true),
1916 suspend_debuggee: Some(false),
1917 };
1918
1919 self.request(command, Self::empty_response, cx).detach()
1920 }
1921
1922 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1923 if self
1924 .capabilities
1925 .supports_terminate_threads_request
1926 .unwrap_or_default()
1927 {
1928 self.request(
1929 TerminateThreadsCommand {
1930 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1931 },
1932 Self::clear_active_debug_line_response,
1933 cx,
1934 )
1935 .detach();
1936 } else {
1937 self.shutdown(cx).detach();
1938 }
1939 }
1940}
1941
1942fn create_local_session(
1943 breakpoint_store: Entity<BreakpointStore>,
1944 session_id: SessionId,
1945 parent_session: Option<Entity<Session>>,
1946 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1947 initialized_tx: oneshot::Sender<()>,
1948 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1949 mode: LocalMode,
1950 capabilities: Capabilities,
1951 cx: &mut Context<Session>,
1952) -> Session {
1953 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1954 let mut initialized_tx = Some(initialized_tx);
1955 while let Some(message) = message_rx.next().await {
1956 if let Message::Event(event) = message {
1957 if let Events::Initialized(_) = *event {
1958 if let Some(tx) = initialized_tx.take() {
1959 tx.send(()).ok();
1960 }
1961 } else {
1962 let Ok(_) = this.update(cx, |session, cx| {
1963 session.handle_dap_event(event, cx);
1964 }) else {
1965 break;
1966 };
1967 }
1968 } else {
1969 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1970 else {
1971 break;
1972 };
1973 }
1974 }
1975 })];
1976
1977 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1978 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1979 if let Some(local) = (!this.ignore_breakpoints)
1980 .then(|| this.as_local_mut())
1981 .flatten()
1982 {
1983 local
1984 .send_breakpoints_from_path(path.clone(), *reason, cx)
1985 .detach();
1986 };
1987 }
1988 BreakpointStoreEvent::BreakpointsCleared(paths) => {
1989 if let Some(local) = (!this.ignore_breakpoints)
1990 .then(|| this.as_local_mut())
1991 .flatten()
1992 {
1993 local.unset_breakpoints_from_paths(paths, cx).detach();
1994 }
1995 }
1996 BreakpointStoreEvent::ActiveDebugLineChanged => {}
1997 })
1998 .detach();
1999
2000 Session {
2001 mode: Mode::Local(mode),
2002 id: session_id,
2003 child_session_ids: HashSet::default(),
2004 parent_id: parent_session.map(|session| session.read(cx).id),
2005 variables: Default::default(),
2006 capabilities,
2007 thread_states: ThreadStates::default(),
2008 output_token: OutputToken(0),
2009 ignore_breakpoints: false,
2010 output: circular_buffer::CircularBuffer::boxed(),
2011 requests: HashMap::default(),
2012 modules: Vec::default(),
2013 loaded_sources: Vec::default(),
2014 threads: IndexMap::default(),
2015 stack_frames: IndexMap::default(),
2016 locations: Default::default(),
2017 _background_tasks,
2018 is_session_terminated: false,
2019 }
2020}