1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 adapters::{DapDelegate, DapStatus},
21 client::{DebugAdapterClient, SessionId},
22 messages::{Events, Message},
23};
24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
25use futures::channel::oneshot;
26use futures::{FutureExt, future::Shared};
27use gpui::{
28 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
29};
30use rpc::AnyProtoClient;
31use serde_json::{Value, json};
32use settings::Settings;
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::path::PathBuf;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::{DebugAdapterConfig, DebugTaskDefinition};
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47
48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
49#[repr(transparent)]
50pub struct ThreadId(pub u64);
51
52impl ThreadId {
53 pub const MIN: ThreadId = ThreadId(u64::MIN);
54 pub const MAX: ThreadId = ThreadId(u64::MAX);
55}
56
57impl From<u64> for ThreadId {
58 fn from(id: u64) -> Self {
59 Self(id)
60 }
61}
62
63#[derive(Clone, Debug)]
64pub struct StackFrame {
65 pub dap: dap::StackFrame,
66 pub scopes: Vec<dap::Scope>,
67}
68
69impl From<dap::StackFrame> for StackFrame {
70 fn from(stack_frame: dap::StackFrame) -> Self {
71 Self {
72 scopes: vec![],
73 dap: stack_frame,
74 }
75 }
76}
77
78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
79pub enum ThreadStatus {
80 #[default]
81 Running,
82 Stopped,
83 Stepping,
84 Exited,
85 Ended,
86}
87
88impl ThreadStatus {
89 pub fn label(&self) -> &'static str {
90 match self {
91 ThreadStatus::Running => "Running",
92 ThreadStatus::Stopped => "Stopped",
93 ThreadStatus::Stepping => "Stepping",
94 ThreadStatus::Exited => "Exited",
95 ThreadStatus::Ended => "Ended",
96 }
97 }
98}
99
100#[derive(Debug)]
101pub struct Thread {
102 dap: dap::Thread,
103 stack_frame_ids: IndexSet<StackFrameId>,
104 _has_stopped: bool,
105}
106
107impl From<dap::Thread> for Thread {
108 fn from(dap: dap::Thread) -> Self {
109 Self {
110 dap,
111 stack_frame_ids: Default::default(),
112 _has_stopped: false,
113 }
114 }
115}
116
117type UpstreamProjectId = u64;
118
119struct RemoteConnection {
120 _client: AnyProtoClient,
121 _upstream_project_id: UpstreamProjectId,
122}
123
124impl RemoteConnection {
125 fn send_proto_client_request<R: DapCommand>(
126 &self,
127 _request: R,
128 _session_id: SessionId,
129 cx: &mut App,
130 ) -> Task<Result<R::Response>> {
131 // let message = request.to_proto(session_id, self.upstream_project_id);
132 // let upstream_client = self.client.clone();
133 cx.background_executor().spawn(async move {
134 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
135 // let response = upstream_client.request(message).await?;
136 // request.response_from_proto(response)
137 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
138 })
139 }
140
141 fn request<R: DapCommand>(
142 &self,
143 request: R,
144 session_id: SessionId,
145 cx: &mut App,
146 ) -> Task<Result<R::Response>>
147 where
148 <R::DapRequest as dap::requests::Request>::Response: 'static,
149 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
150 {
151 return self.send_proto_client_request::<R>(request, session_id, cx);
152 }
153}
154
155enum Mode {
156 Local(LocalMode),
157 Remote(RemoteConnection),
158}
159
160#[derive(Clone)]
161pub struct LocalMode {
162 client: Arc<DebugAdapterClient>,
163 config: DebugAdapterConfig,
164 adapter: Arc<dyn DebugAdapter>,
165 breakpoint_store: Entity<BreakpointStore>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 debug_adapters: Arc<DapRegistry>,
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<(Self, Capabilities)>> {
194 Self::new_inner(
195 debug_adapters,
196 session_id,
197 parent_session,
198 breakpoint_store,
199 config,
200 delegate,
201 messages_tx,
202 async |_, _| {},
203 cx,
204 )
205 }
206 #[cfg(any(test, feature = "test-support"))]
207 fn new_fake(
208 session_id: SessionId,
209 parent_session: Option<Entity<Session>>,
210 breakpoint_store: Entity<BreakpointStore>,
211 config: DebugAdapterConfig,
212 delegate: DapAdapterDelegate,
213 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
214 caps: Capabilities,
215 fail: bool,
216 cx: AsyncApp,
217 ) -> Task<Result<(Self, Capabilities)>> {
218 use task::DebugRequestDisposition;
219
220 let request = match config.request.clone() {
221 DebugRequestDisposition::UserConfigured(request) => request,
222 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
223 match reverse_request_args.request {
224 dap::StartDebuggingRequestArgumentsRequest::Launch => {
225 DebugRequestType::Launch(task::LaunchConfig {
226 program: "".to_owned(),
227 cwd: None,
228 })
229 }
230 dap::StartDebuggingRequestArgumentsRequest::Attach => {
231 DebugRequestType::Attach(task::AttachConfig {
232 process_id: Some(0),
233 })
234 }
235 }
236 }
237 };
238
239 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
240 session
241 .client
242 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
243 .await;
244
245 let paths = cx
246 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
247 .expect("Breakpoint store should exist in all tests that start debuggers");
248
249 session
250 .client
251 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
252 let p = Arc::from(Path::new(&args.source.path.unwrap()));
253 if !paths.contains(&p) {
254 panic!("Sent breakpoints for path without any")
255 }
256
257 Ok(dap::SetBreakpointsResponse {
258 breakpoints: Vec::default(),
259 })
260 })
261 .await;
262
263 match request {
264 dap::DebugRequestType::Launch(_) => {
265 if fail {
266 session
267 .client
268 .on_request::<dap::requests::Launch, _>(move |_, _| {
269 Err(dap::ErrorResponse {
270 error: Some(dap::Message {
271 id: 1,
272 format: "error".into(),
273 variables: None,
274 send_telemetry: None,
275 show_user: None,
276 url: None,
277 url_label: None,
278 }),
279 })
280 })
281 .await;
282 } else {
283 session
284 .client
285 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
286 .await;
287 }
288 }
289 dap::DebugRequestType::Attach(attach_config) => {
290 if fail {
291 session
292 .client
293 .on_request::<dap::requests::Attach, _>(move |_, _| {
294 Err(dap::ErrorResponse {
295 error: Some(dap::Message {
296 id: 1,
297 format: "error".into(),
298 variables: None,
299 send_telemetry: None,
300 show_user: None,
301 url: None,
302 url_label: None,
303 }),
304 })
305 })
306 .await;
307 } else {
308 session
309 .client
310 .on_request::<dap::requests::Attach, _>(move |_, args| {
311 assert_eq!(
312 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
313 args.raw
314 );
315
316 Ok(())
317 })
318 .await;
319 }
320 }
321 }
322
323 session
324 .client
325 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
326 .await;
327 session.client.fake_event(Events::Initialized(None)).await;
328 };
329 Self::new_inner(
330 DapRegistry::fake().into(),
331 session_id,
332 parent_session,
333 breakpoint_store,
334 config,
335 delegate,
336 messages_tx,
337 callback,
338 cx,
339 )
340 }
341 fn new_inner(
342 registry: Arc<DapRegistry>,
343 session_id: SessionId,
344 parent_session: Option<Entity<Session>>,
345 breakpoint_store: Entity<BreakpointStore>,
346 config: DebugAdapterConfig,
347 delegate: DapAdapterDelegate,
348 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
349 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
350 cx: AsyncApp,
351 ) -> Task<Result<(Self, Capabilities)>> {
352 cx.spawn(async move |cx| {
353 let (adapter, binary) =
354 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
355
356 let message_handler = Box::new(move |message| {
357 messages_tx.unbounded_send(message).ok();
358 });
359
360 let client = Arc::new(
361 if let Some(client) = parent_session
362 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
363 .flatten()
364 {
365 client
366 .reconnect(session_id, binary, message_handler, cx.clone())
367 .await?
368 } else {
369 DebugAdapterClient::start(
370 session_id,
371 adapter.name(),
372 binary,
373 message_handler,
374 cx.clone(),
375 )
376 .await?
377 },
378 );
379
380 let adapter_id = adapter.name().to_string().to_owned();
381 let mut session = Self {
382 client,
383 adapter,
384 breakpoint_store,
385 config: config.clone(),
386 };
387
388 on_initialized(&mut session, cx.clone()).await;
389 let capabilities = session
390 .request(Initialize { adapter_id }, cx.background_executor().clone())
391 .await?;
392
393 Ok((session, capabilities))
394 })
395 }
396
397 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
398 let tasks: Vec<_> = paths
399 .into_iter()
400 .map(|path| {
401 self.request(
402 dap_command::SetBreakpoints {
403 source: client_source(path),
404 source_modified: None,
405 breakpoints: vec![],
406 },
407 cx.background_executor().clone(),
408 )
409 })
410 .collect();
411
412 cx.background_spawn(async move {
413 futures::future::join_all(tasks)
414 .await
415 .iter()
416 .for_each(|res| match res {
417 Ok(_) => {}
418 Err(err) => {
419 log::warn!("Set breakpoints request failed: {}", err);
420 }
421 });
422 })
423 }
424
425 fn send_breakpoints_from_path(
426 &self,
427 abs_path: Arc<Path>,
428 reason: BreakpointUpdatedReason,
429 cx: &mut App,
430 ) -> Task<()> {
431 let breakpoints = self
432 .breakpoint_store
433 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
434 .into_iter()
435 .filter(|bp| bp.state.is_enabled())
436 .map(Into::into)
437 .collect();
438
439 let task = self.request(
440 dap_command::SetBreakpoints {
441 source: client_source(&abs_path),
442 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
443 breakpoints,
444 },
445 cx.background_executor().clone(),
446 );
447
448 cx.background_spawn(async move {
449 match task.await {
450 Ok(_) => {}
451 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
452 }
453 })
454 }
455
456 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
457 let mut breakpoint_tasks = Vec::new();
458 let breakpoints = self
459 .breakpoint_store
460 .read_with(cx, |store, cx| store.all_breakpoints(cx));
461
462 for (path, breakpoints) in breakpoints {
463 let breakpoints = if ignore_breakpoints {
464 vec![]
465 } else {
466 breakpoints
467 .into_iter()
468 .filter(|bp| bp.state.is_enabled())
469 .map(Into::into)
470 .collect()
471 };
472
473 breakpoint_tasks.push(self.request(
474 dap_command::SetBreakpoints {
475 source: client_source(&path),
476 source_modified: Some(false),
477 breakpoints,
478 },
479 cx.background_executor().clone(),
480 ));
481 }
482
483 cx.background_spawn(async move {
484 futures::future::join_all(breakpoint_tasks)
485 .await
486 .iter()
487 .for_each(|res| match res {
488 Ok(_) => {}
489 Err(err) => {
490 log::warn!("Set breakpoints request failed: {}", err);
491 }
492 });
493 })
494 }
495
496 async fn get_adapter_binary(
497 registry: &Arc<DapRegistry>,
498 config: &DebugAdapterConfig,
499 delegate: &DapAdapterDelegate,
500 cx: &mut AsyncApp,
501 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
502 let adapter = registry
503 .adapter(&config.adapter)
504 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
505
506 let binary = cx.update(|cx| {
507 ProjectSettings::get_global(cx)
508 .dap
509 .get(&adapter.name())
510 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
511 })?;
512
513 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
514 Err(error) => {
515 delegate.update_status(
516 adapter.name(),
517 DapStatus::Failed {
518 error: error.to_string(),
519 },
520 );
521
522 return Err(error);
523 }
524 Ok(mut binary) => {
525 delegate.update_status(adapter.name(), DapStatus::None);
526
527 let shell_env = delegate.shell_env().await;
528 let mut envs = binary.envs.unwrap_or_default();
529 envs.extend(shell_env);
530 binary.envs = Some(envs);
531
532 binary
533 }
534 };
535
536 Ok((adapter, binary))
537 }
538
539 pub fn initialize_sequence(
540 &self,
541 capabilities: &Capabilities,
542 initialized_rx: oneshot::Receiver<()>,
543 cx: &App,
544 ) -> Task<Result<()>> {
545 let (mut raw, is_launch) = match &self.config.request {
546 task::DebugRequestDisposition::UserConfigured(_) => {
547 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
548 debug_assert!(false, "This part of code should be unreachable in practice");
549 return Task::ready(Err(anyhow!(
550 "Expected debug config conversion to succeed"
551 )));
552 };
553 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
554 let raw = self.adapter.request_args(&raw);
555 (raw, is_launch)
556 }
557 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
558 start_debugging_request_arguments.configuration.clone(),
559 matches!(
560 start_debugging_request_arguments.request,
561 dap::StartDebuggingRequestArgumentsRequest::Launch
562 ),
563 ),
564 };
565
566 merge_json_value_into(
567 self.config.initialize_args.clone().unwrap_or(json!({})),
568 &mut raw,
569 );
570 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
571 let launch = if is_launch {
572 self.request(Launch { raw }, cx.background_executor().clone())
573 } else {
574 self.request(Attach { raw }, cx.background_executor().clone())
575 };
576
577 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
578
579 let configuration_sequence = cx.spawn({
580 let this = self.clone();
581 async move |cx| {
582 initialized_rx.await?;
583 // todo(debugger) figure out if we want to handle a breakpoint response error
584 // This will probably consist of letting a user know that breakpoints failed to be set
585 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
586
587 if configuration_done_supported {
588 this.request(ConfigurationDone, cx.background_executor().clone())
589 } else {
590 Task::ready(Ok(()))
591 }
592 .await
593 }
594 });
595
596 cx.background_spawn(async move {
597 futures::future::try_join(launch, configuration_sequence).await?;
598 Ok(())
599 })
600 }
601
602 fn request<R: LocalDapCommand>(
603 &self,
604 request: R,
605 executor: BackgroundExecutor,
606 ) -> Task<Result<R::Response>>
607 where
608 <R::DapRequest as dap::requests::Request>::Response: 'static,
609 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
610 {
611 let request = Arc::new(request);
612
613 let request_clone = request.clone();
614 let connection = self.client.clone();
615 let request_task = executor.spawn(async move {
616 let args = request_clone.to_dap();
617 connection.request::<R::DapRequest>(args).await
618 });
619
620 executor.spawn(async move {
621 let response = request.response_from_dap(request_task.await?);
622 response
623 })
624 }
625}
626impl From<RemoteConnection> for Mode {
627 fn from(value: RemoteConnection) -> Self {
628 Self::Remote(value)
629 }
630}
631
632impl Mode {
633 fn request_dap<R: DapCommand>(
634 &self,
635 session_id: SessionId,
636 request: R,
637 cx: &mut Context<Session>,
638 ) -> Task<Result<R::Response>>
639 where
640 <R::DapRequest as dap::requests::Request>::Response: 'static,
641 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
642 {
643 match self {
644 Mode::Local(debug_adapter_client) => {
645 debug_adapter_client.request(request, cx.background_executor().clone())
646 }
647 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
648 }
649 }
650}
651
652#[derive(Default)]
653struct ThreadStates {
654 global_state: Option<ThreadStatus>,
655 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
656}
657
658impl ThreadStates {
659 fn stop_all_threads(&mut self) {
660 self.global_state = Some(ThreadStatus::Stopped);
661 self.known_thread_states.clear();
662 }
663
664 fn exit_all_threads(&mut self) {
665 self.global_state = Some(ThreadStatus::Exited);
666 self.known_thread_states.clear();
667 }
668
669 fn continue_all_threads(&mut self) {
670 self.global_state = Some(ThreadStatus::Running);
671 self.known_thread_states.clear();
672 }
673
674 fn stop_thread(&mut self, thread_id: ThreadId) {
675 self.known_thread_states
676 .insert(thread_id, ThreadStatus::Stopped);
677 }
678
679 fn continue_thread(&mut self, thread_id: ThreadId) {
680 self.known_thread_states
681 .insert(thread_id, ThreadStatus::Running);
682 }
683
684 fn process_step(&mut self, thread_id: ThreadId) {
685 self.known_thread_states
686 .insert(thread_id, ThreadStatus::Stepping);
687 }
688
689 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
690 self.thread_state(thread_id)
691 .unwrap_or(ThreadStatus::Running)
692 }
693
694 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
695 self.known_thread_states
696 .get(&thread_id)
697 .copied()
698 .or(self.global_state)
699 }
700
701 fn exit_thread(&mut self, thread_id: ThreadId) {
702 self.known_thread_states
703 .insert(thread_id, ThreadStatus::Exited);
704 }
705
706 fn any_stopped_thread(&self) -> bool {
707 self.global_state
708 .is_some_and(|state| state == ThreadStatus::Stopped)
709 || self
710 .known_thread_states
711 .values()
712 .any(|status| *status == ThreadStatus::Stopped)
713 }
714}
715const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
716
717#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
718pub struct OutputToken(pub usize);
719/// Represents a current state of a single debug adapter and provides ways to mutate it.
720pub struct Session {
721 mode: Mode,
722 pub(super) capabilities: Capabilities,
723 id: SessionId,
724 child_session_ids: HashSet<SessionId>,
725 parent_id: Option<SessionId>,
726 ignore_breakpoints: bool,
727 modules: Vec<dap::Module>,
728 loaded_sources: Vec<dap::Source>,
729 output_token: OutputToken,
730 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
731 threads: IndexMap<ThreadId, Thread>,
732 thread_states: ThreadStates,
733 variables: HashMap<VariableReference, Vec<dap::Variable>>,
734 stack_frames: IndexMap<StackFrameId, StackFrame>,
735 locations: HashMap<u64, dap::LocationsResponse>,
736 is_session_terminated: bool,
737 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
738 _background_tasks: Vec<Task<()>>,
739}
740
741trait CacheableCommand: 'static + Send + Sync {
742 fn as_any(&self) -> &dyn Any;
743 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
744 fn dyn_hash(&self, hasher: &mut dyn Hasher);
745 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
746}
747
748impl<T> CacheableCommand for T
749where
750 T: DapCommand + PartialEq + Eq + Hash,
751{
752 fn as_any(&self) -> &dyn Any {
753 self
754 }
755
756 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
757 rhs.as_any()
758 .downcast_ref::<Self>()
759 .map_or(false, |rhs| self == rhs)
760 }
761
762 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
763 T::hash(self, &mut hasher);
764 }
765
766 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
767 self
768 }
769}
770
771pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
772
773impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
774 fn from(request: T) -> Self {
775 Self(Arc::new(request))
776 }
777}
778
779impl PartialEq for RequestSlot {
780 fn eq(&self, other: &Self) -> bool {
781 self.0.dyn_eq(other.0.as_ref())
782 }
783}
784
785impl Eq for RequestSlot {}
786
787impl Hash for RequestSlot {
788 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
789 self.0.dyn_hash(state);
790 self.0.as_any().type_id().hash(state)
791 }
792}
793
794#[derive(Debug, Clone, Hash, PartialEq, Eq)]
795pub struct CompletionsQuery {
796 pub query: String,
797 pub column: u64,
798 pub line: Option<u64>,
799 pub frame_id: Option<u64>,
800}
801
802impl CompletionsQuery {
803 pub fn new(
804 buffer: &language::Buffer,
805 cursor_position: language::Anchor,
806 frame_id: Option<u64>,
807 ) -> Self {
808 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
809 Self {
810 query: buffer.text(),
811 column: column as u64,
812 frame_id,
813 line: Some(row as u64),
814 }
815 }
816}
817
818pub enum SessionEvent {
819 Modules,
820 LoadedSources,
821 Stopped(Option<ThreadId>),
822 StackTrace,
823 Variables,
824 Threads,
825}
826
827impl EventEmitter<SessionEvent> for Session {}
828
829// local session will send breakpoint updates to DAP for all new breakpoints
830// remote side will only send breakpoint updates when it is a breakpoint created by that peer
831// BreakpointStore notifies session on breakpoint changes
832impl Session {
833 pub(crate) fn local(
834 breakpoint_store: Entity<BreakpointStore>,
835 session_id: SessionId,
836 parent_session: Option<Entity<Session>>,
837 delegate: DapAdapterDelegate,
838 config: DebugAdapterConfig,
839 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
840 initialized_tx: oneshot::Sender<()>,
841 debug_adapters: Arc<DapRegistry>,
842 cx: &mut App,
843 ) -> Task<Result<Entity<Self>>> {
844 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
845
846 cx.spawn(async move |cx| {
847 let (mode, capabilities) = LocalMode::new(
848 debug_adapters,
849 session_id,
850 parent_session.clone(),
851 breakpoint_store.clone(),
852 config.clone(),
853 delegate,
854 message_tx,
855 cx.clone(),
856 )
857 .await?;
858
859 cx.new(|cx| {
860 create_local_session(
861 breakpoint_store,
862 session_id,
863 parent_session,
864 start_debugging_requests_tx,
865 initialized_tx,
866 message_rx,
867 mode,
868 capabilities,
869 cx,
870 )
871 })
872 })
873 }
874
875 #[cfg(any(test, feature = "test-support"))]
876 pub(crate) fn fake(
877 breakpoint_store: Entity<BreakpointStore>,
878 session_id: SessionId,
879 parent_session: Option<Entity<Session>>,
880 delegate: DapAdapterDelegate,
881 config: DebugAdapterConfig,
882 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
883 initialized_tx: oneshot::Sender<()>,
884 caps: Capabilities,
885 fails: bool,
886 cx: &mut App,
887 ) -> Task<Result<Entity<Session>>> {
888 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
889
890 cx.spawn(async move |cx| {
891 let (mode, capabilities) = LocalMode::new_fake(
892 session_id,
893 parent_session.clone(),
894 breakpoint_store.clone(),
895 config.clone(),
896 delegate,
897 message_tx,
898 caps,
899 fails,
900 cx.clone(),
901 )
902 .await?;
903
904 cx.new(|cx| {
905 create_local_session(
906 breakpoint_store,
907 session_id,
908 parent_session,
909 start_debugging_requests_tx,
910 initialized_tx,
911 message_rx,
912 mode,
913 capabilities,
914 cx,
915 )
916 })
917 })
918 }
919
920 pub(crate) fn remote(
921 session_id: SessionId,
922 client: AnyProtoClient,
923 upstream_project_id: u64,
924 ignore_breakpoints: bool,
925 ) -> Self {
926 Self {
927 mode: Mode::Remote(RemoteConnection {
928 _client: client,
929 _upstream_project_id: upstream_project_id,
930 }),
931 id: session_id,
932 child_session_ids: HashSet::default(),
933 parent_id: None,
934 capabilities: Capabilities::default(),
935 ignore_breakpoints,
936 variables: Default::default(),
937 stack_frames: Default::default(),
938 thread_states: ThreadStates::default(),
939 output_token: OutputToken(0),
940 output: circular_buffer::CircularBuffer::boxed(),
941 requests: HashMap::default(),
942 modules: Vec::default(),
943 loaded_sources: Vec::default(),
944 threads: IndexMap::default(),
945 _background_tasks: Vec::default(),
946 locations: Default::default(),
947 is_session_terminated: false,
948 }
949 }
950
951 pub fn session_id(&self) -> SessionId {
952 self.id
953 }
954
955 pub fn child_session_ids(&self) -> HashSet<SessionId> {
956 self.child_session_ids.clone()
957 }
958
959 pub fn add_child_session_id(&mut self, session_id: SessionId) {
960 self.child_session_ids.insert(session_id);
961 }
962
963 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
964 self.child_session_ids.remove(&session_id);
965 }
966
967 pub fn parent_id(&self) -> Option<SessionId> {
968 self.parent_id
969 }
970
971 pub fn capabilities(&self) -> &Capabilities {
972 &self.capabilities
973 }
974
975 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
976 if let Mode::Local(local_mode) = &self.mode {
977 Some(local_mode.config.clone())
978 } else {
979 None
980 }
981 }
982
983 pub fn is_terminated(&self) -> bool {
984 self.is_session_terminated
985 }
986
987 pub fn is_local(&self) -> bool {
988 matches!(self.mode, Mode::Local(_))
989 }
990
991 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
992 match &mut self.mode {
993 Mode::Local(local_mode) => Some(local_mode),
994 Mode::Remote(_) => None,
995 }
996 }
997
998 pub fn as_local(&self) -> Option<&LocalMode> {
999 match &self.mode {
1000 Mode::Local(local_mode) => Some(local_mode),
1001 Mode::Remote(_) => None,
1002 }
1003 }
1004
1005 pub(super) fn initialize_sequence(
1006 &mut self,
1007 initialize_rx: oneshot::Receiver<()>,
1008 cx: &mut Context<Self>,
1009 ) -> Task<Result<()>> {
1010 match &self.mode {
1011 Mode::Local(local_mode) => {
1012 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1013 }
1014 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1015 }
1016 }
1017
1018 pub fn output(
1019 &self,
1020 since: OutputToken,
1021 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1022 if self.output_token.0 == 0 {
1023 return (self.output.range(0..0), OutputToken(0));
1024 };
1025
1026 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1027
1028 let clamped_events_since = events_since.clamp(0, self.output.len());
1029 (
1030 self.output
1031 .range(self.output.len() - clamped_events_since..),
1032 self.output_token,
1033 )
1034 }
1035
1036 pub fn respond_to_client(
1037 &self,
1038 request_seq: u64,
1039 success: bool,
1040 command: String,
1041 body: Option<serde_json::Value>,
1042 cx: &mut Context<Self>,
1043 ) -> Task<Result<()>> {
1044 let Some(local_session) = self.as_local().cloned() else {
1045 unreachable!("Cannot respond to remote client");
1046 };
1047
1048 cx.background_spawn(async move {
1049 local_session
1050 .client
1051 .send_message(Message::Response(Response {
1052 body,
1053 success,
1054 command,
1055 seq: request_seq + 1,
1056 request_seq,
1057 message: None,
1058 }))
1059 .await
1060 })
1061 }
1062
1063 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1064 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1065 self.thread_states.stop_all_threads();
1066
1067 self.invalidate_command_type::<StackTraceCommand>();
1068 }
1069
1070 // Event if we stopped all threads we still need to insert the thread_id
1071 // to our own data
1072 if let Some(thread_id) = event.thread_id {
1073 self.thread_states.stop_thread(ThreadId(thread_id));
1074
1075 self.invalidate_state(
1076 &StackTraceCommand {
1077 thread_id,
1078 start_frame: None,
1079 levels: None,
1080 }
1081 .into(),
1082 );
1083 }
1084
1085 self.invalidate_generic();
1086 self.threads.clear();
1087 self.variables.clear();
1088 cx.emit(SessionEvent::Stopped(
1089 event
1090 .thread_id
1091 .map(Into::into)
1092 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1093 ));
1094 cx.notify();
1095 }
1096
1097 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1098 match *event {
1099 Events::Initialized(_) => {
1100 debug_assert!(
1101 false,
1102 "Initialized event should have been handled in LocalMode"
1103 );
1104 }
1105 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1106 Events::Continued(event) => {
1107 if event.all_threads_continued.unwrap_or_default() {
1108 self.thread_states.continue_all_threads();
1109 } else {
1110 self.thread_states
1111 .continue_thread(ThreadId(event.thread_id));
1112 }
1113 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1114 self.invalidate_generic();
1115 }
1116 Events::Exited(_event) => {
1117 self.clear_active_debug_line(cx);
1118 }
1119 Events::Terminated(_) => {
1120 self.is_session_terminated = true;
1121 self.clear_active_debug_line(cx);
1122 }
1123 Events::Thread(event) => {
1124 let thread_id = ThreadId(event.thread_id);
1125
1126 match event.reason {
1127 dap::ThreadEventReason::Started => {
1128 self.thread_states.continue_thread(thread_id);
1129 }
1130 dap::ThreadEventReason::Exited => {
1131 self.thread_states.exit_thread(thread_id);
1132 }
1133 reason => {
1134 log::error!("Unhandled thread event reason {:?}", reason);
1135 }
1136 }
1137 self.invalidate_state(&ThreadsCommand.into());
1138 cx.notify();
1139 }
1140 Events::Output(event) => {
1141 if event
1142 .category
1143 .as_ref()
1144 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1145 {
1146 return;
1147 }
1148
1149 self.output.push_back(event);
1150 self.output_token.0 += 1;
1151 cx.notify();
1152 }
1153 Events::Breakpoint(_) => {}
1154 Events::Module(event) => {
1155 match event.reason {
1156 dap::ModuleEventReason::New => {
1157 self.modules.push(event.module);
1158 }
1159 dap::ModuleEventReason::Changed => {
1160 if let Some(module) = self
1161 .modules
1162 .iter_mut()
1163 .find(|other| event.module.id == other.id)
1164 {
1165 *module = event.module;
1166 }
1167 }
1168 dap::ModuleEventReason::Removed => {
1169 self.modules.retain(|other| event.module.id != other.id);
1170 }
1171 }
1172
1173 // todo(debugger): We should only send the invalidate command to downstream clients.
1174 // self.invalidate_state(&ModulesCommand.into());
1175 }
1176 Events::LoadedSource(_) => {
1177 self.invalidate_state(&LoadedSourcesCommand.into());
1178 }
1179 Events::Capabilities(event) => {
1180 self.capabilities = self.capabilities.merge(event.capabilities);
1181 cx.notify();
1182 }
1183 Events::Memory(_) => {}
1184 Events::Process(_) => {}
1185 Events::ProgressEnd(_) => {}
1186 Events::ProgressStart(_) => {}
1187 Events::ProgressUpdate(_) => {}
1188 Events::Invalidated(_) => {}
1189 Events::Other(_) => {}
1190 }
1191 }
1192
1193 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1194 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1195 &mut self,
1196 request: T,
1197 process_result: impl FnOnce(
1198 &mut Self,
1199 Result<T::Response>,
1200 &mut Context<Self>,
1201 ) -> Option<T::Response>
1202 + 'static,
1203 cx: &mut Context<Self>,
1204 ) {
1205 const {
1206 assert!(
1207 T::CACHEABLE,
1208 "Only requests marked as cacheable should invoke `fetch`"
1209 );
1210 }
1211
1212 if !self.thread_states.any_stopped_thread()
1213 && request.type_id() != TypeId::of::<ThreadsCommand>()
1214 || self.is_session_terminated
1215 {
1216 return;
1217 }
1218
1219 let request_map = self
1220 .requests
1221 .entry(std::any::TypeId::of::<T>())
1222 .or_default();
1223
1224 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1225 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1226
1227 let task = Self::request_inner::<Arc<T>>(
1228 &self.capabilities,
1229 self.id,
1230 &self.mode,
1231 command,
1232 process_result,
1233 cx,
1234 );
1235 let task = cx
1236 .background_executor()
1237 .spawn(async move {
1238 let _ = task.await?;
1239 Some(())
1240 })
1241 .shared();
1242
1243 vacant.insert(task);
1244 cx.notify();
1245 }
1246 }
1247
1248 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1249 capabilities: &Capabilities,
1250 session_id: SessionId,
1251 mode: &Mode,
1252 request: T,
1253 process_result: impl FnOnce(
1254 &mut Self,
1255 Result<T::Response>,
1256 &mut Context<Self>,
1257 ) -> Option<T::Response>
1258 + 'static,
1259 cx: &mut Context<Self>,
1260 ) -> Task<Option<T::Response>> {
1261 if !T::is_supported(&capabilities) {
1262 log::warn!(
1263 "Attempted to send a DAP request that isn't supported: {:?}",
1264 request
1265 );
1266 let error = Err(anyhow::Error::msg(
1267 "Couldn't complete request because it's not supported",
1268 ));
1269 return cx.spawn(async move |this, cx| {
1270 this.update(cx, |this, cx| process_result(this, error, cx))
1271 .log_err()
1272 .flatten()
1273 });
1274 }
1275
1276 let request = mode.request_dap(session_id, request, cx);
1277 cx.spawn(async move |this, cx| {
1278 let result = request.await;
1279 this.update(cx, |this, cx| process_result(this, result, cx))
1280 .log_err()
1281 .flatten()
1282 })
1283 }
1284
1285 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1286 &self,
1287 request: T,
1288 process_result: impl FnOnce(
1289 &mut Self,
1290 Result<T::Response>,
1291 &mut Context<Self>,
1292 ) -> Option<T::Response>
1293 + 'static,
1294 cx: &mut Context<Self>,
1295 ) -> Task<Option<T::Response>> {
1296 Self::request_inner(
1297 &self.capabilities,
1298 self.id,
1299 &self.mode,
1300 request,
1301 process_result,
1302 cx,
1303 )
1304 }
1305
1306 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1307 self.requests.remove(&std::any::TypeId::of::<Command>());
1308 }
1309
1310 fn invalidate_generic(&mut self) {
1311 self.invalidate_command_type::<ModulesCommand>();
1312 self.invalidate_command_type::<LoadedSourcesCommand>();
1313 self.invalidate_command_type::<ThreadsCommand>();
1314 }
1315
1316 fn invalidate_state(&mut self, key: &RequestSlot) {
1317 self.requests
1318 .entry(key.0.as_any().type_id())
1319 .and_modify(|request_map| {
1320 request_map.remove(&key);
1321 });
1322 }
1323
1324 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1325 self.thread_states.thread_status(thread_id)
1326 }
1327
1328 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1329 self.fetch(
1330 dap_command::ThreadsCommand,
1331 |this, result, cx| {
1332 let result = result.log_err()?;
1333
1334 this.threads = result
1335 .iter()
1336 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1337 .collect();
1338
1339 this.invalidate_command_type::<StackTraceCommand>();
1340 cx.emit(SessionEvent::Threads);
1341 cx.notify();
1342
1343 Some(result)
1344 },
1345 cx,
1346 );
1347
1348 self.threads
1349 .values()
1350 .map(|thread| {
1351 (
1352 thread.dap.clone(),
1353 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1354 )
1355 })
1356 .collect()
1357 }
1358
1359 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1360 self.fetch(
1361 dap_command::ModulesCommand,
1362 |this, result, cx| {
1363 let result = result.log_err()?;
1364
1365 this.modules = result.iter().cloned().collect();
1366 cx.emit(SessionEvent::Modules);
1367 cx.notify();
1368
1369 Some(result)
1370 },
1371 cx,
1372 );
1373
1374 &self.modules
1375 }
1376
1377 pub fn ignore_breakpoints(&self) -> bool {
1378 self.ignore_breakpoints
1379 }
1380
1381 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1382 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1383 }
1384
1385 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1386 if self.ignore_breakpoints == ignore {
1387 return Task::ready(());
1388 }
1389
1390 self.ignore_breakpoints = ignore;
1391
1392 if let Some(local) = self.as_local() {
1393 local.send_all_breakpoints(ignore, cx)
1394 } else {
1395 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1396 unimplemented!()
1397 }
1398 }
1399
1400 pub fn breakpoints_enabled(&self) -> bool {
1401 self.ignore_breakpoints
1402 }
1403
1404 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1405 self.fetch(
1406 dap_command::LoadedSourcesCommand,
1407 |this, result, cx| {
1408 let result = result.log_err()?;
1409 this.loaded_sources = result.iter().cloned().collect();
1410 cx.emit(SessionEvent::LoadedSources);
1411 cx.notify();
1412 Some(result)
1413 },
1414 cx,
1415 );
1416
1417 &self.loaded_sources
1418 }
1419
1420 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1421 res.log_err()?;
1422 Some(())
1423 }
1424
1425 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1426 thread_id: ThreadId,
1427 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1428 {
1429 move |this, response, cx| match response.log_err() {
1430 Some(response) => Some(response),
1431 None => {
1432 this.thread_states.stop_thread(thread_id);
1433 cx.notify();
1434 None
1435 }
1436 }
1437 }
1438
1439 fn clear_active_debug_line_response(
1440 &mut self,
1441 response: Result<()>,
1442 cx: &mut Context<Session>,
1443 ) -> Option<()> {
1444 response.log_err()?;
1445 self.clear_active_debug_line(cx);
1446 Some(())
1447 }
1448
1449 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1450 self.as_local()
1451 .expect("Message handler will only run in local mode")
1452 .breakpoint_store
1453 .update(cx, |store, cx| {
1454 store.remove_active_position(Some(self.id), cx)
1455 });
1456 }
1457
1458 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1459 self.request(
1460 PauseCommand {
1461 thread_id: thread_id.0,
1462 },
1463 Self::empty_response,
1464 cx,
1465 )
1466 .detach();
1467 }
1468
1469 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1470 self.request(
1471 RestartStackFrameCommand { stack_frame_id },
1472 Self::empty_response,
1473 cx,
1474 )
1475 .detach();
1476 }
1477
1478 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1479 if self.capabilities.supports_restart_request.unwrap_or(false) {
1480 self.request(
1481 RestartCommand {
1482 raw: args.unwrap_or(Value::Null),
1483 },
1484 Self::empty_response,
1485 cx,
1486 )
1487 .detach();
1488 } else {
1489 self.request(
1490 DisconnectCommand {
1491 restart: Some(false),
1492 terminate_debuggee: Some(true),
1493 suspend_debuggee: Some(false),
1494 },
1495 Self::empty_response,
1496 cx,
1497 )
1498 .detach();
1499 }
1500 }
1501
1502 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1503 self.is_session_terminated = true;
1504 self.thread_states.exit_all_threads();
1505 cx.notify();
1506
1507 let task = if self
1508 .capabilities
1509 .supports_terminate_request
1510 .unwrap_or_default()
1511 {
1512 self.request(
1513 TerminateCommand {
1514 restart: Some(false),
1515 },
1516 Self::clear_active_debug_line_response,
1517 cx,
1518 )
1519 } else {
1520 self.request(
1521 DisconnectCommand {
1522 restart: Some(false),
1523 terminate_debuggee: Some(true),
1524 suspend_debuggee: Some(false),
1525 },
1526 Self::clear_active_debug_line_response,
1527 cx,
1528 )
1529 };
1530
1531 cx.background_spawn(async move {
1532 let _ = task.await;
1533 })
1534 }
1535
1536 pub fn completions(
1537 &mut self,
1538 query: CompletionsQuery,
1539 cx: &mut Context<Self>,
1540 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1541 let task = self.request(query, |_, result, _| result.log_err(), cx);
1542
1543 cx.background_executor().spawn(async move {
1544 anyhow::Ok(
1545 task.await
1546 .map(|response| response.targets)
1547 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1548 )
1549 })
1550 }
1551
1552 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1553 self.thread_states.continue_thread(thread_id);
1554 self.request(
1555 ContinueCommand {
1556 args: ContinueArguments {
1557 thread_id: thread_id.0,
1558 single_thread: Some(true),
1559 },
1560 },
1561 Self::on_step_response::<ContinueCommand>(thread_id),
1562 cx,
1563 )
1564 .detach();
1565 }
1566
1567 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1568 match self.mode {
1569 Mode::Local(ref local) => Some(local.client.clone()),
1570 Mode::Remote(_) => None,
1571 }
1572 }
1573
1574 pub fn step_over(
1575 &mut self,
1576 thread_id: ThreadId,
1577 granularity: SteppingGranularity,
1578 cx: &mut Context<Self>,
1579 ) {
1580 let supports_single_thread_execution_requests =
1581 self.capabilities.supports_single_thread_execution_requests;
1582 let supports_stepping_granularity = self
1583 .capabilities
1584 .supports_stepping_granularity
1585 .unwrap_or_default();
1586
1587 let command = NextCommand {
1588 inner: StepCommand {
1589 thread_id: thread_id.0,
1590 granularity: supports_stepping_granularity.then(|| granularity),
1591 single_thread: supports_single_thread_execution_requests,
1592 },
1593 };
1594
1595 self.thread_states.process_step(thread_id);
1596 self.request(
1597 command,
1598 Self::on_step_response::<NextCommand>(thread_id),
1599 cx,
1600 )
1601 .detach();
1602 }
1603
1604 pub fn step_in(
1605 &mut self,
1606 thread_id: ThreadId,
1607 granularity: SteppingGranularity,
1608 cx: &mut Context<Self>,
1609 ) {
1610 let supports_single_thread_execution_requests =
1611 self.capabilities.supports_single_thread_execution_requests;
1612 let supports_stepping_granularity = self
1613 .capabilities
1614 .supports_stepping_granularity
1615 .unwrap_or_default();
1616
1617 let command = StepInCommand {
1618 inner: StepCommand {
1619 thread_id: thread_id.0,
1620 granularity: supports_stepping_granularity.then(|| granularity),
1621 single_thread: supports_single_thread_execution_requests,
1622 },
1623 };
1624
1625 self.thread_states.process_step(thread_id);
1626 self.request(
1627 command,
1628 Self::on_step_response::<StepInCommand>(thread_id),
1629 cx,
1630 )
1631 .detach();
1632 }
1633
1634 pub fn step_out(
1635 &mut self,
1636 thread_id: ThreadId,
1637 granularity: SteppingGranularity,
1638 cx: &mut Context<Self>,
1639 ) {
1640 let supports_single_thread_execution_requests =
1641 self.capabilities.supports_single_thread_execution_requests;
1642 let supports_stepping_granularity = self
1643 .capabilities
1644 .supports_stepping_granularity
1645 .unwrap_or_default();
1646
1647 let command = StepOutCommand {
1648 inner: StepCommand {
1649 thread_id: thread_id.0,
1650 granularity: supports_stepping_granularity.then(|| granularity),
1651 single_thread: supports_single_thread_execution_requests,
1652 },
1653 };
1654
1655 self.thread_states.process_step(thread_id);
1656 self.request(
1657 command,
1658 Self::on_step_response::<StepOutCommand>(thread_id),
1659 cx,
1660 )
1661 .detach();
1662 }
1663
1664 pub fn step_back(
1665 &mut self,
1666 thread_id: ThreadId,
1667 granularity: SteppingGranularity,
1668 cx: &mut Context<Self>,
1669 ) {
1670 let supports_single_thread_execution_requests =
1671 self.capabilities.supports_single_thread_execution_requests;
1672 let supports_stepping_granularity = self
1673 .capabilities
1674 .supports_stepping_granularity
1675 .unwrap_or_default();
1676
1677 let command = StepBackCommand {
1678 inner: StepCommand {
1679 thread_id: thread_id.0,
1680 granularity: supports_stepping_granularity.then(|| granularity),
1681 single_thread: supports_single_thread_execution_requests,
1682 },
1683 };
1684
1685 self.thread_states.process_step(thread_id);
1686
1687 self.request(
1688 command,
1689 Self::on_step_response::<StepBackCommand>(thread_id),
1690 cx,
1691 )
1692 .detach();
1693 }
1694
1695 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1696 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1697 && self.requests.contains_key(&ThreadsCommand.type_id())
1698 && self.threads.contains_key(&thread_id)
1699 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1700 // We could still be using an old thread id and have sent a new thread's request
1701 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1702 // But it very well could cause a minor bug in the future that is hard to track down
1703 {
1704 self.fetch(
1705 super::dap_command::StackTraceCommand {
1706 thread_id: thread_id.0,
1707 start_frame: None,
1708 levels: None,
1709 },
1710 move |this, stack_frames, cx| {
1711 let stack_frames = stack_frames.log_err()?;
1712
1713 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1714 thread.stack_frame_ids =
1715 stack_frames.iter().map(|frame| frame.id).collect();
1716 });
1717 debug_assert!(
1718 matches!(entry, indexmap::map::Entry::Occupied(_)),
1719 "Sent request for thread_id that doesn't exist"
1720 );
1721
1722 this.stack_frames.extend(
1723 stack_frames
1724 .iter()
1725 .cloned()
1726 .map(|frame| (frame.id, StackFrame::from(frame))),
1727 );
1728
1729 this.invalidate_command_type::<ScopesCommand>();
1730 this.invalidate_command_type::<VariablesCommand>();
1731
1732 cx.emit(SessionEvent::StackTrace);
1733 cx.notify();
1734 Some(stack_frames)
1735 },
1736 cx,
1737 );
1738 }
1739
1740 self.threads
1741 .get(&thread_id)
1742 .map(|thread| {
1743 thread
1744 .stack_frame_ids
1745 .iter()
1746 .filter_map(|id| self.stack_frames.get(id))
1747 .cloned()
1748 .collect()
1749 })
1750 .unwrap_or_default()
1751 }
1752
1753 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1754 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1755 && self
1756 .requests
1757 .contains_key(&TypeId::of::<StackTraceCommand>())
1758 {
1759 self.fetch(
1760 ScopesCommand { stack_frame_id },
1761 move |this, scopes, cx| {
1762 let scopes = scopes.log_err()?;
1763
1764 for scope in scopes .iter(){
1765 this.variables(scope.variables_reference, cx);
1766 }
1767
1768 let entry = this
1769 .stack_frames
1770 .entry(stack_frame_id)
1771 .and_modify(|stack_frame| {
1772 stack_frame.scopes = scopes.clone();
1773 });
1774
1775 cx.emit(SessionEvent::Variables);
1776
1777 debug_assert!(
1778 matches!(entry, indexmap::map::Entry::Occupied(_)),
1779 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1780 );
1781
1782 Some(scopes)
1783 },
1784 cx,
1785 );
1786 }
1787
1788 self.stack_frames
1789 .get(&stack_frame_id)
1790 .map(|frame| frame.scopes.as_slice())
1791 .unwrap_or_default()
1792 }
1793
1794 pub fn variables(
1795 &mut self,
1796 variables_reference: VariableReference,
1797 cx: &mut Context<Self>,
1798 ) -> Vec<dap::Variable> {
1799 let command = VariablesCommand {
1800 variables_reference,
1801 filter: None,
1802 start: None,
1803 count: None,
1804 format: None,
1805 };
1806
1807 self.fetch(
1808 command,
1809 move |this, variables, cx| {
1810 let variables = variables.log_err()?;
1811 this.variables
1812 .insert(variables_reference, variables.clone());
1813
1814 cx.emit(SessionEvent::Variables);
1815 Some(variables)
1816 },
1817 cx,
1818 );
1819
1820 self.variables
1821 .get(&variables_reference)
1822 .cloned()
1823 .unwrap_or_default()
1824 }
1825
1826 pub fn set_variable_value(
1827 &mut self,
1828 variables_reference: u64,
1829 name: String,
1830 value: String,
1831 cx: &mut Context<Self>,
1832 ) {
1833 if self.capabilities.supports_set_variable.unwrap_or_default() {
1834 self.request(
1835 SetVariableValueCommand {
1836 name,
1837 value,
1838 variables_reference,
1839 },
1840 move |this, response, cx| {
1841 let response = response.log_err()?;
1842 this.invalidate_command_type::<VariablesCommand>();
1843 cx.notify();
1844 Some(response)
1845 },
1846 cx,
1847 )
1848 .detach()
1849 }
1850 }
1851
1852 pub fn evaluate(
1853 &mut self,
1854 expression: String,
1855 context: Option<EvaluateArgumentsContext>,
1856 frame_id: Option<u64>,
1857 source: Option<Source>,
1858 cx: &mut Context<Self>,
1859 ) {
1860 self.request(
1861 EvaluateCommand {
1862 expression,
1863 context,
1864 frame_id,
1865 source,
1866 },
1867 |this, response, cx| {
1868 let response = response.log_err()?;
1869 this.output_token.0 += 1;
1870 this.output.push_back(dap::OutputEvent {
1871 category: None,
1872 output: response.result.clone(),
1873 group: None,
1874 variables_reference: Some(response.variables_reference),
1875 source: None,
1876 line: None,
1877 column: None,
1878 data: None,
1879 location_reference: None,
1880 });
1881
1882 this.invalidate_command_type::<ScopesCommand>();
1883 cx.notify();
1884 Some(response)
1885 },
1886 cx,
1887 )
1888 .detach();
1889 }
1890
1891 pub fn location(
1892 &mut self,
1893 reference: u64,
1894 cx: &mut Context<Self>,
1895 ) -> Option<dap::LocationsResponse> {
1896 self.fetch(
1897 LocationsCommand { reference },
1898 move |this, response, _| {
1899 let response = response.log_err()?;
1900 this.locations.insert(reference, response.clone());
1901 Some(response)
1902 },
1903 cx,
1904 );
1905 self.locations.get(&reference).cloned()
1906 }
1907 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1908 let command = DisconnectCommand {
1909 restart: Some(false),
1910 terminate_debuggee: Some(true),
1911 suspend_debuggee: Some(false),
1912 };
1913
1914 self.request(command, Self::empty_response, cx).detach()
1915 }
1916
1917 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1918 if self
1919 .capabilities
1920 .supports_terminate_threads_request
1921 .unwrap_or_default()
1922 {
1923 self.request(
1924 TerminateThreadsCommand {
1925 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1926 },
1927 Self::clear_active_debug_line_response,
1928 cx,
1929 )
1930 .detach();
1931 } else {
1932 self.shutdown(cx).detach();
1933 }
1934 }
1935}
1936
1937fn create_local_session(
1938 breakpoint_store: Entity<BreakpointStore>,
1939 session_id: SessionId,
1940 parent_session: Option<Entity<Session>>,
1941 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1942 initialized_tx: oneshot::Sender<()>,
1943 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1944 mode: LocalMode,
1945 capabilities: Capabilities,
1946 cx: &mut Context<Session>,
1947) -> Session {
1948 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1949 let mut initialized_tx = Some(initialized_tx);
1950 while let Some(message) = message_rx.next().await {
1951 if let Message::Event(event) = message {
1952 if let Events::Initialized(_) = *event {
1953 if let Some(tx) = initialized_tx.take() {
1954 tx.send(()).ok();
1955 }
1956 } else {
1957 let Ok(_) = this.update(cx, |session, cx| {
1958 session.handle_dap_event(event, cx);
1959 }) else {
1960 break;
1961 };
1962 }
1963 } else {
1964 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1965 else {
1966 break;
1967 };
1968 }
1969 }
1970 })];
1971
1972 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1973 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1974 if let Some(local) = (!this.ignore_breakpoints)
1975 .then(|| this.as_local_mut())
1976 .flatten()
1977 {
1978 local
1979 .send_breakpoints_from_path(path.clone(), *reason, cx)
1980 .detach();
1981 };
1982 }
1983 BreakpointStoreEvent::BreakpointsCleared(paths) => {
1984 if let Some(local) = (!this.ignore_breakpoints)
1985 .then(|| this.as_local_mut())
1986 .flatten()
1987 {
1988 local.unset_breakpoints_from_paths(paths, cx).detach();
1989 }
1990 }
1991 BreakpointStoreEvent::ActiveDebugLineChanged => {}
1992 })
1993 .detach();
1994
1995 Session {
1996 mode: Mode::Local(mode),
1997 id: session_id,
1998 child_session_ids: HashSet::default(),
1999 parent_id: parent_session.map(|session| session.read(cx).id),
2000 variables: Default::default(),
2001 capabilities,
2002 thread_states: ThreadStates::default(),
2003 output_token: OutputToken(0),
2004 ignore_breakpoints: false,
2005 output: circular_buffer::CircularBuffer::boxed(),
2006 requests: HashMap::default(),
2007 modules: Vec::default(),
2008 loaded_sources: Vec::default(),
2009 threads: IndexMap::default(),
2010 stack_frames: IndexMap::default(),
2011 locations: Default::default(),
2012 _background_tasks,
2013 is_session_terminated: false,
2014 }
2015}