1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 adapters::{DapDelegate, DapStatus},
21 client::{DebugAdapterClient, SessionId},
22 messages::{Events, Message},
23};
24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
25use futures::channel::oneshot;
26use futures::{FutureExt, future::Shared};
27use gpui::{
28 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
29};
30use rpc::AnyProtoClient;
31use serde_json::{Value, json};
32use settings::Settings;
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::path::PathBuf;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::{DebugAdapterConfig, DebugTaskDefinition};
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47
48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
49#[repr(transparent)]
50pub struct ThreadId(pub u64);
51
52impl ThreadId {
53 pub const MIN: ThreadId = ThreadId(u64::MIN);
54 pub const MAX: ThreadId = ThreadId(u64::MAX);
55}
56
57impl From<u64> for ThreadId {
58 fn from(id: u64) -> Self {
59 Self(id)
60 }
61}
62
63#[derive(Clone, Debug)]
64pub struct StackFrame {
65 pub dap: dap::StackFrame,
66 pub scopes: Vec<dap::Scope>,
67}
68
69impl From<dap::StackFrame> for StackFrame {
70 fn from(stack_frame: dap::StackFrame) -> Self {
71 Self {
72 scopes: vec![],
73 dap: stack_frame,
74 }
75 }
76}
77
78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
79pub enum ThreadStatus {
80 #[default]
81 Running,
82 Stopped,
83 Stepping,
84 Exited,
85 Ended,
86}
87
88impl ThreadStatus {
89 pub fn label(&self) -> &'static str {
90 match self {
91 ThreadStatus::Running => "Running",
92 ThreadStatus::Stopped => "Stopped",
93 ThreadStatus::Stepping => "Stepping",
94 ThreadStatus::Exited => "Exited",
95 ThreadStatus::Ended => "Ended",
96 }
97 }
98}
99
100#[derive(Debug)]
101pub struct Thread {
102 dap: dap::Thread,
103 stack_frame_ids: IndexSet<StackFrameId>,
104 _has_stopped: bool,
105}
106
107impl From<dap::Thread> for Thread {
108 fn from(dap: dap::Thread) -> Self {
109 Self {
110 dap,
111 stack_frame_ids: Default::default(),
112 _has_stopped: false,
113 }
114 }
115}
116
117type UpstreamProjectId = u64;
118
119struct RemoteConnection {
120 _client: AnyProtoClient,
121 _upstream_project_id: UpstreamProjectId,
122}
123
124impl RemoteConnection {
125 fn send_proto_client_request<R: DapCommand>(
126 &self,
127 _request: R,
128 _session_id: SessionId,
129 cx: &mut App,
130 ) -> Task<Result<R::Response>> {
131 // let message = request.to_proto(session_id, self.upstream_project_id);
132 // let upstream_client = self.client.clone();
133 cx.background_executor().spawn(async move {
134 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
135 // let response = upstream_client.request(message).await?;
136 // request.response_from_proto(response)
137 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
138 })
139 }
140
141 fn request<R: DapCommand>(
142 &self,
143 request: R,
144 session_id: SessionId,
145 cx: &mut App,
146 ) -> Task<Result<R::Response>>
147 where
148 <R::DapRequest as dap::requests::Request>::Response: 'static,
149 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
150 {
151 return self.send_proto_client_request::<R>(request, session_id, cx);
152 }
153}
154
155enum Mode {
156 Local(LocalMode),
157 Remote(RemoteConnection),
158}
159
160#[derive(Clone)]
161pub struct LocalMode {
162 client: Arc<DebugAdapterClient>,
163 config: DebugAdapterConfig,
164 adapter: Arc<dyn DebugAdapter>,
165 breakpoint_store: Entity<BreakpointStore>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 debug_adapters: Arc<DapRegistry>,
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<(Self, Capabilities)>> {
194 Self::new_inner(
195 debug_adapters,
196 session_id,
197 parent_session,
198 breakpoint_store,
199 config,
200 delegate,
201 messages_tx,
202 async |_, _| {},
203 cx,
204 )
205 }
206 #[cfg(any(test, feature = "test-support"))]
207 fn new_fake(
208 session_id: SessionId,
209 parent_session: Option<Entity<Session>>,
210 breakpoint_store: Entity<BreakpointStore>,
211 config: DebugAdapterConfig,
212 delegate: DapAdapterDelegate,
213 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
214 caps: Capabilities,
215 fail: bool,
216 cx: AsyncApp,
217 ) -> Task<Result<(Self, Capabilities)>> {
218 use task::DebugRequestDisposition;
219
220 let request = match config.request.clone() {
221 DebugRequestDisposition::UserConfigured(request) => request,
222 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
223 match reverse_request_args.request {
224 dap::StartDebuggingRequestArgumentsRequest::Launch => {
225 DebugRequestType::Launch(task::LaunchConfig {
226 program: "".to_owned(),
227 cwd: None,
228 args: Default::default(),
229 })
230 }
231 dap::StartDebuggingRequestArgumentsRequest::Attach => {
232 DebugRequestType::Attach(task::AttachConfig {
233 process_id: Some(0),
234 })
235 }
236 }
237 }
238 };
239
240 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
241 session
242 .client
243 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
244 .await;
245
246 let paths = cx
247 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
248 .expect("Breakpoint store should exist in all tests that start debuggers");
249
250 session
251 .client
252 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
253 let p = Arc::from(Path::new(&args.source.path.unwrap()));
254 if !paths.contains(&p) {
255 panic!("Sent breakpoints for path without any")
256 }
257
258 Ok(dap::SetBreakpointsResponse {
259 breakpoints: Vec::default(),
260 })
261 })
262 .await;
263
264 match request {
265 dap::DebugRequestType::Launch(_) => {
266 if fail {
267 session
268 .client
269 .on_request::<dap::requests::Launch, _>(move |_, _| {
270 Err(dap::ErrorResponse {
271 error: Some(dap::Message {
272 id: 1,
273 format: "error".into(),
274 variables: None,
275 send_telemetry: None,
276 show_user: None,
277 url: None,
278 url_label: None,
279 }),
280 })
281 })
282 .await;
283 } else {
284 session
285 .client
286 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
287 .await;
288 }
289 }
290 dap::DebugRequestType::Attach(attach_config) => {
291 if fail {
292 session
293 .client
294 .on_request::<dap::requests::Attach, _>(move |_, _| {
295 Err(dap::ErrorResponse {
296 error: Some(dap::Message {
297 id: 1,
298 format: "error".into(),
299 variables: None,
300 send_telemetry: None,
301 show_user: None,
302 url: None,
303 url_label: None,
304 }),
305 })
306 })
307 .await;
308 } else {
309 session
310 .client
311 .on_request::<dap::requests::Attach, _>(move |_, args| {
312 assert_eq!(
313 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
314 args.raw
315 );
316
317 Ok(())
318 })
319 .await;
320 }
321 }
322 }
323
324 session
325 .client
326 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
327 .await;
328 session.client.fake_event(Events::Initialized(None)).await;
329 };
330 Self::new_inner(
331 DapRegistry::fake().into(),
332 session_id,
333 parent_session,
334 breakpoint_store,
335 config,
336 delegate,
337 messages_tx,
338 callback,
339 cx,
340 )
341 }
342 fn new_inner(
343 registry: Arc<DapRegistry>,
344 session_id: SessionId,
345 parent_session: Option<Entity<Session>>,
346 breakpoint_store: Entity<BreakpointStore>,
347 config: DebugAdapterConfig,
348 delegate: DapAdapterDelegate,
349 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
350 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
351 cx: AsyncApp,
352 ) -> Task<Result<(Self, Capabilities)>> {
353 cx.spawn(async move |cx| {
354 let (adapter, binary) =
355 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
356
357 let message_handler = Box::new(move |message| {
358 messages_tx.unbounded_send(message).ok();
359 });
360
361 let client = Arc::new(
362 if let Some(client) = parent_session
363 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
364 .flatten()
365 {
366 client
367 .reconnect(session_id, binary, message_handler, cx.clone())
368 .await?
369 } else {
370 DebugAdapterClient::start(
371 session_id,
372 adapter.name(),
373 binary,
374 message_handler,
375 cx.clone(),
376 )
377 .await?
378 },
379 );
380
381 let adapter_id = adapter.name().to_string().to_owned();
382 let mut session = Self {
383 client,
384 adapter,
385 breakpoint_store,
386 config: config.clone(),
387 };
388
389 on_initialized(&mut session, cx.clone()).await;
390 let capabilities = session
391 .request(Initialize { adapter_id }, cx.background_executor().clone())
392 .await?;
393
394 Ok((session, capabilities))
395 })
396 }
397
398 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
399 let tasks: Vec<_> = paths
400 .into_iter()
401 .map(|path| {
402 self.request(
403 dap_command::SetBreakpoints {
404 source: client_source(path),
405 source_modified: None,
406 breakpoints: vec![],
407 },
408 cx.background_executor().clone(),
409 )
410 })
411 .collect();
412
413 cx.background_spawn(async move {
414 futures::future::join_all(tasks)
415 .await
416 .iter()
417 .for_each(|res| match res {
418 Ok(_) => {}
419 Err(err) => {
420 log::warn!("Set breakpoints request failed: {}", err);
421 }
422 });
423 })
424 }
425
426 fn send_breakpoints_from_path(
427 &self,
428 abs_path: Arc<Path>,
429 reason: BreakpointUpdatedReason,
430 cx: &mut App,
431 ) -> Task<()> {
432 let breakpoints = self
433 .breakpoint_store
434 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
435 .into_iter()
436 .filter(|bp| bp.state.is_enabled())
437 .map(Into::into)
438 .collect();
439
440 let task = self.request(
441 dap_command::SetBreakpoints {
442 source: client_source(&abs_path),
443 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
444 breakpoints,
445 },
446 cx.background_executor().clone(),
447 );
448
449 cx.background_spawn(async move {
450 match task.await {
451 Ok(_) => {}
452 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
453 }
454 })
455 }
456
457 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
458 let mut breakpoint_tasks = Vec::new();
459 let breakpoints = self
460 .breakpoint_store
461 .read_with(cx, |store, cx| store.all_breakpoints(cx));
462
463 for (path, breakpoints) in breakpoints {
464 let breakpoints = if ignore_breakpoints {
465 vec![]
466 } else {
467 breakpoints
468 .into_iter()
469 .filter(|bp| bp.state.is_enabled())
470 .map(Into::into)
471 .collect()
472 };
473
474 breakpoint_tasks.push(self.request(
475 dap_command::SetBreakpoints {
476 source: client_source(&path),
477 source_modified: Some(false),
478 breakpoints,
479 },
480 cx.background_executor().clone(),
481 ));
482 }
483
484 cx.background_spawn(async move {
485 futures::future::join_all(breakpoint_tasks)
486 .await
487 .iter()
488 .for_each(|res| match res {
489 Ok(_) => {}
490 Err(err) => {
491 log::warn!("Set breakpoints request failed: {}", err);
492 }
493 });
494 })
495 }
496
497 async fn get_adapter_binary(
498 registry: &Arc<DapRegistry>,
499 config: &DebugAdapterConfig,
500 delegate: &DapAdapterDelegate,
501 cx: &mut AsyncApp,
502 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
503 let adapter = registry
504 .adapter(&config.adapter)
505 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
506
507 let binary = cx.update(|cx| {
508 ProjectSettings::get_global(cx)
509 .dap
510 .get(&adapter.name())
511 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
512 })?;
513
514 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
515 Err(error) => {
516 delegate.update_status(
517 adapter.name(),
518 DapStatus::Failed {
519 error: error.to_string(),
520 },
521 );
522
523 return Err(error);
524 }
525 Ok(mut binary) => {
526 delegate.update_status(adapter.name(), DapStatus::None);
527
528 let shell_env = delegate.shell_env().await;
529 let mut envs = binary.envs.unwrap_or_default();
530 envs.extend(shell_env);
531 binary.envs = Some(envs);
532
533 binary
534 }
535 };
536
537 Ok((adapter, binary))
538 }
539
540 pub fn initialize_sequence(
541 &self,
542 capabilities: &Capabilities,
543 initialized_rx: oneshot::Receiver<()>,
544 cx: &App,
545 ) -> Task<Result<()>> {
546 let (mut raw, is_launch) = match &self.config.request {
547 task::DebugRequestDisposition::UserConfigured(_) => {
548 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
549 debug_assert!(false, "This part of code should be unreachable in practice");
550 return Task::ready(Err(anyhow!(
551 "Expected debug config conversion to succeed"
552 )));
553 };
554 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
555 let raw = self.adapter.request_args(&raw);
556 (raw, is_launch)
557 }
558 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
559 start_debugging_request_arguments.configuration.clone(),
560 matches!(
561 start_debugging_request_arguments.request,
562 dap::StartDebuggingRequestArgumentsRequest::Launch
563 ),
564 ),
565 };
566
567 merge_json_value_into(
568 self.config.initialize_args.clone().unwrap_or(json!({})),
569 &mut raw,
570 );
571 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
572 let launch = if is_launch {
573 self.request(Launch { raw }, cx.background_executor().clone())
574 } else {
575 self.request(Attach { raw }, cx.background_executor().clone())
576 };
577
578 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
579
580 let configuration_sequence = cx.spawn({
581 let this = self.clone();
582 async move |cx| {
583 initialized_rx.await?;
584 // todo(debugger) figure out if we want to handle a breakpoint response error
585 // This will probably consist of letting a user know that breakpoints failed to be set
586 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
587
588 if configuration_done_supported {
589 this.request(ConfigurationDone, cx.background_executor().clone())
590 } else {
591 Task::ready(Ok(()))
592 }
593 .await
594 }
595 });
596
597 cx.background_spawn(async move {
598 futures::future::try_join(launch, configuration_sequence).await?;
599 Ok(())
600 })
601 }
602
603 fn request<R: LocalDapCommand>(
604 &self,
605 request: R,
606 executor: BackgroundExecutor,
607 ) -> Task<Result<R::Response>>
608 where
609 <R::DapRequest as dap::requests::Request>::Response: 'static,
610 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
611 {
612 let request = Arc::new(request);
613
614 let request_clone = request.clone();
615 let connection = self.client.clone();
616 let request_task = executor.spawn(async move {
617 let args = request_clone.to_dap();
618 connection.request::<R::DapRequest>(args).await
619 });
620
621 executor.spawn(async move {
622 let response = request.response_from_dap(request_task.await?);
623 response
624 })
625 }
626}
627impl From<RemoteConnection> for Mode {
628 fn from(value: RemoteConnection) -> Self {
629 Self::Remote(value)
630 }
631}
632
633impl Mode {
634 fn request_dap<R: DapCommand>(
635 &self,
636 session_id: SessionId,
637 request: R,
638 cx: &mut Context<Session>,
639 ) -> Task<Result<R::Response>>
640 where
641 <R::DapRequest as dap::requests::Request>::Response: 'static,
642 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
643 {
644 match self {
645 Mode::Local(debug_adapter_client) => {
646 debug_adapter_client.request(request, cx.background_executor().clone())
647 }
648 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
649 }
650 }
651}
652
653#[derive(Default)]
654struct ThreadStates {
655 global_state: Option<ThreadStatus>,
656 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
657}
658
659impl ThreadStates {
660 fn stop_all_threads(&mut self) {
661 self.global_state = Some(ThreadStatus::Stopped);
662 self.known_thread_states.clear();
663 }
664
665 fn exit_all_threads(&mut self) {
666 self.global_state = Some(ThreadStatus::Exited);
667 self.known_thread_states.clear();
668 }
669
670 fn continue_all_threads(&mut self) {
671 self.global_state = Some(ThreadStatus::Running);
672 self.known_thread_states.clear();
673 }
674
675 fn stop_thread(&mut self, thread_id: ThreadId) {
676 self.known_thread_states
677 .insert(thread_id, ThreadStatus::Stopped);
678 }
679
680 fn continue_thread(&mut self, thread_id: ThreadId) {
681 self.known_thread_states
682 .insert(thread_id, ThreadStatus::Running);
683 }
684
685 fn process_step(&mut self, thread_id: ThreadId) {
686 self.known_thread_states
687 .insert(thread_id, ThreadStatus::Stepping);
688 }
689
690 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
691 self.thread_state(thread_id)
692 .unwrap_or(ThreadStatus::Running)
693 }
694
695 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
696 self.known_thread_states
697 .get(&thread_id)
698 .copied()
699 .or(self.global_state)
700 }
701
702 fn exit_thread(&mut self, thread_id: ThreadId) {
703 self.known_thread_states
704 .insert(thread_id, ThreadStatus::Exited);
705 }
706
707 fn any_stopped_thread(&self) -> bool {
708 self.global_state
709 .is_some_and(|state| state == ThreadStatus::Stopped)
710 || self
711 .known_thread_states
712 .values()
713 .any(|status| *status == ThreadStatus::Stopped)
714 }
715}
716const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
717
718#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
719pub struct OutputToken(pub usize);
720/// Represents a current state of a single debug adapter and provides ways to mutate it.
721pub struct Session {
722 mode: Mode,
723 pub(super) capabilities: Capabilities,
724 id: SessionId,
725 child_session_ids: HashSet<SessionId>,
726 parent_id: Option<SessionId>,
727 ignore_breakpoints: bool,
728 modules: Vec<dap::Module>,
729 loaded_sources: Vec<dap::Source>,
730 output_token: OutputToken,
731 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
732 threads: IndexMap<ThreadId, Thread>,
733 thread_states: ThreadStates,
734 variables: HashMap<VariableReference, Vec<dap::Variable>>,
735 stack_frames: IndexMap<StackFrameId, StackFrame>,
736 locations: HashMap<u64, dap::LocationsResponse>,
737 is_session_terminated: bool,
738 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
739 _background_tasks: Vec<Task<()>>,
740}
741
742trait CacheableCommand: 'static + Send + Sync {
743 fn as_any(&self) -> &dyn Any;
744 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
745 fn dyn_hash(&self, hasher: &mut dyn Hasher);
746 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
747}
748
749impl<T> CacheableCommand for T
750where
751 T: DapCommand + PartialEq + Eq + Hash,
752{
753 fn as_any(&self) -> &dyn Any {
754 self
755 }
756
757 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
758 rhs.as_any()
759 .downcast_ref::<Self>()
760 .map_or(false, |rhs| self == rhs)
761 }
762
763 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
764 T::hash(self, &mut hasher);
765 }
766
767 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
768 self
769 }
770}
771
772pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
773
774impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
775 fn from(request: T) -> Self {
776 Self(Arc::new(request))
777 }
778}
779
780impl PartialEq for RequestSlot {
781 fn eq(&self, other: &Self) -> bool {
782 self.0.dyn_eq(other.0.as_ref())
783 }
784}
785
786impl Eq for RequestSlot {}
787
788impl Hash for RequestSlot {
789 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
790 self.0.dyn_hash(state);
791 self.0.as_any().type_id().hash(state)
792 }
793}
794
795#[derive(Debug, Clone, Hash, PartialEq, Eq)]
796pub struct CompletionsQuery {
797 pub query: String,
798 pub column: u64,
799 pub line: Option<u64>,
800 pub frame_id: Option<u64>,
801}
802
803impl CompletionsQuery {
804 pub fn new(
805 buffer: &language::Buffer,
806 cursor_position: language::Anchor,
807 frame_id: Option<u64>,
808 ) -> Self {
809 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
810 Self {
811 query: buffer.text(),
812 column: column as u64,
813 frame_id,
814 line: Some(row as u64),
815 }
816 }
817}
818
819pub enum SessionEvent {
820 Modules,
821 LoadedSources,
822 Stopped(Option<ThreadId>),
823 StackTrace,
824 Variables,
825 Threads,
826}
827
828impl EventEmitter<SessionEvent> for Session {}
829
830// local session will send breakpoint updates to DAP for all new breakpoints
831// remote side will only send breakpoint updates when it is a breakpoint created by that peer
832// BreakpointStore notifies session on breakpoint changes
833impl Session {
834 pub(crate) fn local(
835 breakpoint_store: Entity<BreakpointStore>,
836 session_id: SessionId,
837 parent_session: Option<Entity<Session>>,
838 delegate: DapAdapterDelegate,
839 config: DebugAdapterConfig,
840 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
841 initialized_tx: oneshot::Sender<()>,
842 debug_adapters: Arc<DapRegistry>,
843 cx: &mut App,
844 ) -> Task<Result<Entity<Self>>> {
845 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
846
847 cx.spawn(async move |cx| {
848 let (mode, capabilities) = LocalMode::new(
849 debug_adapters,
850 session_id,
851 parent_session.clone(),
852 breakpoint_store.clone(),
853 config.clone(),
854 delegate,
855 message_tx,
856 cx.clone(),
857 )
858 .await?;
859
860 cx.new(|cx| {
861 create_local_session(
862 breakpoint_store,
863 session_id,
864 parent_session,
865 start_debugging_requests_tx,
866 initialized_tx,
867 message_rx,
868 mode,
869 capabilities,
870 cx,
871 )
872 })
873 })
874 }
875
876 #[cfg(any(test, feature = "test-support"))]
877 pub(crate) fn fake(
878 breakpoint_store: Entity<BreakpointStore>,
879 session_id: SessionId,
880 parent_session: Option<Entity<Session>>,
881 delegate: DapAdapterDelegate,
882 config: DebugAdapterConfig,
883 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
884 initialized_tx: oneshot::Sender<()>,
885 caps: Capabilities,
886 fails: bool,
887 cx: &mut App,
888 ) -> Task<Result<Entity<Session>>> {
889 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
890
891 cx.spawn(async move |cx| {
892 let (mode, capabilities) = LocalMode::new_fake(
893 session_id,
894 parent_session.clone(),
895 breakpoint_store.clone(),
896 config.clone(),
897 delegate,
898 message_tx,
899 caps,
900 fails,
901 cx.clone(),
902 )
903 .await?;
904
905 cx.new(|cx| {
906 create_local_session(
907 breakpoint_store,
908 session_id,
909 parent_session,
910 start_debugging_requests_tx,
911 initialized_tx,
912 message_rx,
913 mode,
914 capabilities,
915 cx,
916 )
917 })
918 })
919 }
920
921 pub(crate) fn remote(
922 session_id: SessionId,
923 client: AnyProtoClient,
924 upstream_project_id: u64,
925 ignore_breakpoints: bool,
926 ) -> Self {
927 Self {
928 mode: Mode::Remote(RemoteConnection {
929 _client: client,
930 _upstream_project_id: upstream_project_id,
931 }),
932 id: session_id,
933 child_session_ids: HashSet::default(),
934 parent_id: None,
935 capabilities: Capabilities::default(),
936 ignore_breakpoints,
937 variables: Default::default(),
938 stack_frames: Default::default(),
939 thread_states: ThreadStates::default(),
940 output_token: OutputToken(0),
941 output: circular_buffer::CircularBuffer::boxed(),
942 requests: HashMap::default(),
943 modules: Vec::default(),
944 loaded_sources: Vec::default(),
945 threads: IndexMap::default(),
946 _background_tasks: Vec::default(),
947 locations: Default::default(),
948 is_session_terminated: false,
949 }
950 }
951
952 pub fn session_id(&self) -> SessionId {
953 self.id
954 }
955
956 pub fn child_session_ids(&self) -> HashSet<SessionId> {
957 self.child_session_ids.clone()
958 }
959
960 pub fn add_child_session_id(&mut self, session_id: SessionId) {
961 self.child_session_ids.insert(session_id);
962 }
963
964 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
965 self.child_session_ids.remove(&session_id);
966 }
967
968 pub fn parent_id(&self) -> Option<SessionId> {
969 self.parent_id
970 }
971
972 pub fn capabilities(&self) -> &Capabilities {
973 &self.capabilities
974 }
975
976 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
977 if let Mode::Local(local_mode) = &self.mode {
978 Some(local_mode.config.clone())
979 } else {
980 None
981 }
982 }
983
984 pub fn is_terminated(&self) -> bool {
985 self.is_session_terminated
986 }
987
988 pub fn is_local(&self) -> bool {
989 matches!(self.mode, Mode::Local(_))
990 }
991
992 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
993 match &mut self.mode {
994 Mode::Local(local_mode) => Some(local_mode),
995 Mode::Remote(_) => None,
996 }
997 }
998
999 pub fn as_local(&self) -> Option<&LocalMode> {
1000 match &self.mode {
1001 Mode::Local(local_mode) => Some(local_mode),
1002 Mode::Remote(_) => None,
1003 }
1004 }
1005
1006 pub(super) fn initialize_sequence(
1007 &mut self,
1008 initialize_rx: oneshot::Receiver<()>,
1009 cx: &mut Context<Self>,
1010 ) -> Task<Result<()>> {
1011 match &self.mode {
1012 Mode::Local(local_mode) => {
1013 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1014 }
1015 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1016 }
1017 }
1018
1019 pub fn output(
1020 &self,
1021 since: OutputToken,
1022 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1023 if self.output_token.0 == 0 {
1024 return (self.output.range(0..0), OutputToken(0));
1025 };
1026
1027 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1028
1029 let clamped_events_since = events_since.clamp(0, self.output.len());
1030 (
1031 self.output
1032 .range(self.output.len() - clamped_events_since..),
1033 self.output_token,
1034 )
1035 }
1036
1037 pub fn respond_to_client(
1038 &self,
1039 request_seq: u64,
1040 success: bool,
1041 command: String,
1042 body: Option<serde_json::Value>,
1043 cx: &mut Context<Self>,
1044 ) -> Task<Result<()>> {
1045 let Some(local_session) = self.as_local().cloned() else {
1046 unreachable!("Cannot respond to remote client");
1047 };
1048
1049 cx.background_spawn(async move {
1050 local_session
1051 .client
1052 .send_message(Message::Response(Response {
1053 body,
1054 success,
1055 command,
1056 seq: request_seq + 1,
1057 request_seq,
1058 message: None,
1059 }))
1060 .await
1061 })
1062 }
1063
1064 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1065 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1066 self.thread_states.stop_all_threads();
1067
1068 self.invalidate_command_type::<StackTraceCommand>();
1069 }
1070
1071 // Event if we stopped all threads we still need to insert the thread_id
1072 // to our own data
1073 if let Some(thread_id) = event.thread_id {
1074 self.thread_states.stop_thread(ThreadId(thread_id));
1075
1076 self.invalidate_state(
1077 &StackTraceCommand {
1078 thread_id,
1079 start_frame: None,
1080 levels: None,
1081 }
1082 .into(),
1083 );
1084 }
1085
1086 self.invalidate_generic();
1087 self.threads.clear();
1088 self.variables.clear();
1089 cx.emit(SessionEvent::Stopped(
1090 event
1091 .thread_id
1092 .map(Into::into)
1093 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1094 ));
1095 cx.notify();
1096 }
1097
1098 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1099 match *event {
1100 Events::Initialized(_) => {
1101 debug_assert!(
1102 false,
1103 "Initialized event should have been handled in LocalMode"
1104 );
1105 }
1106 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1107 Events::Continued(event) => {
1108 if event.all_threads_continued.unwrap_or_default() {
1109 self.thread_states.continue_all_threads();
1110 } else {
1111 self.thread_states
1112 .continue_thread(ThreadId(event.thread_id));
1113 }
1114 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1115 self.invalidate_generic();
1116 }
1117 Events::Exited(_event) => {
1118 self.clear_active_debug_line(cx);
1119 }
1120 Events::Terminated(_) => {
1121 self.is_session_terminated = true;
1122 self.clear_active_debug_line(cx);
1123 }
1124 Events::Thread(event) => {
1125 let thread_id = ThreadId(event.thread_id);
1126
1127 match event.reason {
1128 dap::ThreadEventReason::Started => {
1129 self.thread_states.continue_thread(thread_id);
1130 }
1131 dap::ThreadEventReason::Exited => {
1132 self.thread_states.exit_thread(thread_id);
1133 }
1134 reason => {
1135 log::error!("Unhandled thread event reason {:?}", reason);
1136 }
1137 }
1138 self.invalidate_state(&ThreadsCommand.into());
1139 cx.notify();
1140 }
1141 Events::Output(event) => {
1142 if event
1143 .category
1144 .as_ref()
1145 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1146 {
1147 return;
1148 }
1149
1150 self.output.push_back(event);
1151 self.output_token.0 += 1;
1152 cx.notify();
1153 }
1154 Events::Breakpoint(_) => {}
1155 Events::Module(event) => {
1156 match event.reason {
1157 dap::ModuleEventReason::New => {
1158 self.modules.push(event.module);
1159 }
1160 dap::ModuleEventReason::Changed => {
1161 if let Some(module) = self
1162 .modules
1163 .iter_mut()
1164 .find(|other| event.module.id == other.id)
1165 {
1166 *module = event.module;
1167 }
1168 }
1169 dap::ModuleEventReason::Removed => {
1170 self.modules.retain(|other| event.module.id != other.id);
1171 }
1172 }
1173
1174 // todo(debugger): We should only send the invalidate command to downstream clients.
1175 // self.invalidate_state(&ModulesCommand.into());
1176 }
1177 Events::LoadedSource(_) => {
1178 self.invalidate_state(&LoadedSourcesCommand.into());
1179 }
1180 Events::Capabilities(event) => {
1181 self.capabilities = self.capabilities.merge(event.capabilities);
1182 cx.notify();
1183 }
1184 Events::Memory(_) => {}
1185 Events::Process(_) => {}
1186 Events::ProgressEnd(_) => {}
1187 Events::ProgressStart(_) => {}
1188 Events::ProgressUpdate(_) => {}
1189 Events::Invalidated(_) => {}
1190 Events::Other(_) => {}
1191 }
1192 }
1193
1194 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1195 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1196 &mut self,
1197 request: T,
1198 process_result: impl FnOnce(
1199 &mut Self,
1200 Result<T::Response>,
1201 &mut Context<Self>,
1202 ) -> Option<T::Response>
1203 + 'static,
1204 cx: &mut Context<Self>,
1205 ) {
1206 const {
1207 assert!(
1208 T::CACHEABLE,
1209 "Only requests marked as cacheable should invoke `fetch`"
1210 );
1211 }
1212
1213 if !self.thread_states.any_stopped_thread()
1214 && request.type_id() != TypeId::of::<ThreadsCommand>()
1215 || self.is_session_terminated
1216 {
1217 return;
1218 }
1219
1220 let request_map = self
1221 .requests
1222 .entry(std::any::TypeId::of::<T>())
1223 .or_default();
1224
1225 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1226 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1227
1228 let task = Self::request_inner::<Arc<T>>(
1229 &self.capabilities,
1230 self.id,
1231 &self.mode,
1232 command,
1233 process_result,
1234 cx,
1235 );
1236 let task = cx
1237 .background_executor()
1238 .spawn(async move {
1239 let _ = task.await?;
1240 Some(())
1241 })
1242 .shared();
1243
1244 vacant.insert(task);
1245 cx.notify();
1246 }
1247 }
1248
1249 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1250 capabilities: &Capabilities,
1251 session_id: SessionId,
1252 mode: &Mode,
1253 request: T,
1254 process_result: impl FnOnce(
1255 &mut Self,
1256 Result<T::Response>,
1257 &mut Context<Self>,
1258 ) -> Option<T::Response>
1259 + 'static,
1260 cx: &mut Context<Self>,
1261 ) -> Task<Option<T::Response>> {
1262 if !T::is_supported(&capabilities) {
1263 log::warn!(
1264 "Attempted to send a DAP request that isn't supported: {:?}",
1265 request
1266 );
1267 let error = Err(anyhow::Error::msg(
1268 "Couldn't complete request because it's not supported",
1269 ));
1270 return cx.spawn(async move |this, cx| {
1271 this.update(cx, |this, cx| process_result(this, error, cx))
1272 .log_err()
1273 .flatten()
1274 });
1275 }
1276
1277 let request = mode.request_dap(session_id, request, cx);
1278 cx.spawn(async move |this, cx| {
1279 let result = request.await;
1280 this.update(cx, |this, cx| process_result(this, result, cx))
1281 .log_err()
1282 .flatten()
1283 })
1284 }
1285
1286 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1287 &self,
1288 request: T,
1289 process_result: impl FnOnce(
1290 &mut Self,
1291 Result<T::Response>,
1292 &mut Context<Self>,
1293 ) -> Option<T::Response>
1294 + 'static,
1295 cx: &mut Context<Self>,
1296 ) -> Task<Option<T::Response>> {
1297 Self::request_inner(
1298 &self.capabilities,
1299 self.id,
1300 &self.mode,
1301 request,
1302 process_result,
1303 cx,
1304 )
1305 }
1306
1307 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1308 self.requests.remove(&std::any::TypeId::of::<Command>());
1309 }
1310
1311 fn invalidate_generic(&mut self) {
1312 self.invalidate_command_type::<ModulesCommand>();
1313 self.invalidate_command_type::<LoadedSourcesCommand>();
1314 self.invalidate_command_type::<ThreadsCommand>();
1315 }
1316
1317 fn invalidate_state(&mut self, key: &RequestSlot) {
1318 self.requests
1319 .entry(key.0.as_any().type_id())
1320 .and_modify(|request_map| {
1321 request_map.remove(&key);
1322 });
1323 }
1324
1325 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1326 self.thread_states.thread_status(thread_id)
1327 }
1328
1329 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1330 self.fetch(
1331 dap_command::ThreadsCommand,
1332 |this, result, cx| {
1333 let result = result.log_err()?;
1334
1335 this.threads = result
1336 .iter()
1337 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1338 .collect();
1339
1340 this.invalidate_command_type::<StackTraceCommand>();
1341 cx.emit(SessionEvent::Threads);
1342 cx.notify();
1343
1344 Some(result)
1345 },
1346 cx,
1347 );
1348
1349 self.threads
1350 .values()
1351 .map(|thread| {
1352 (
1353 thread.dap.clone(),
1354 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1355 )
1356 })
1357 .collect()
1358 }
1359
1360 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1361 self.fetch(
1362 dap_command::ModulesCommand,
1363 |this, result, cx| {
1364 let result = result.log_err()?;
1365
1366 this.modules = result.iter().cloned().collect();
1367 cx.emit(SessionEvent::Modules);
1368 cx.notify();
1369
1370 Some(result)
1371 },
1372 cx,
1373 );
1374
1375 &self.modules
1376 }
1377
1378 pub fn ignore_breakpoints(&self) -> bool {
1379 self.ignore_breakpoints
1380 }
1381
1382 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1383 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1384 }
1385
1386 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1387 if self.ignore_breakpoints == ignore {
1388 return Task::ready(());
1389 }
1390
1391 self.ignore_breakpoints = ignore;
1392
1393 if let Some(local) = self.as_local() {
1394 local.send_all_breakpoints(ignore, cx)
1395 } else {
1396 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1397 unimplemented!()
1398 }
1399 }
1400
1401 pub fn breakpoints_enabled(&self) -> bool {
1402 self.ignore_breakpoints
1403 }
1404
1405 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1406 self.fetch(
1407 dap_command::LoadedSourcesCommand,
1408 |this, result, cx| {
1409 let result = result.log_err()?;
1410 this.loaded_sources = result.iter().cloned().collect();
1411 cx.emit(SessionEvent::LoadedSources);
1412 cx.notify();
1413 Some(result)
1414 },
1415 cx,
1416 );
1417
1418 &self.loaded_sources
1419 }
1420
1421 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1422 res.log_err()?;
1423 Some(())
1424 }
1425
1426 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1427 thread_id: ThreadId,
1428 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1429 {
1430 move |this, response, cx| match response.log_err() {
1431 Some(response) => Some(response),
1432 None => {
1433 this.thread_states.stop_thread(thread_id);
1434 cx.notify();
1435 None
1436 }
1437 }
1438 }
1439
1440 fn clear_active_debug_line_response(
1441 &mut self,
1442 response: Result<()>,
1443 cx: &mut Context<Session>,
1444 ) -> Option<()> {
1445 response.log_err()?;
1446 self.clear_active_debug_line(cx);
1447 Some(())
1448 }
1449
1450 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1451 self.as_local()
1452 .expect("Message handler will only run in local mode")
1453 .breakpoint_store
1454 .update(cx, |store, cx| {
1455 store.remove_active_position(Some(self.id), cx)
1456 });
1457 }
1458
1459 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1460 self.request(
1461 PauseCommand {
1462 thread_id: thread_id.0,
1463 },
1464 Self::empty_response,
1465 cx,
1466 )
1467 .detach();
1468 }
1469
1470 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1471 self.request(
1472 RestartStackFrameCommand { stack_frame_id },
1473 Self::empty_response,
1474 cx,
1475 )
1476 .detach();
1477 }
1478
1479 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1480 if self.capabilities.supports_restart_request.unwrap_or(false) {
1481 self.request(
1482 RestartCommand {
1483 raw: args.unwrap_or(Value::Null),
1484 },
1485 Self::empty_response,
1486 cx,
1487 )
1488 .detach();
1489 } else {
1490 self.request(
1491 DisconnectCommand {
1492 restart: Some(false),
1493 terminate_debuggee: Some(true),
1494 suspend_debuggee: Some(false),
1495 },
1496 Self::empty_response,
1497 cx,
1498 )
1499 .detach();
1500 }
1501 }
1502
1503 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1504 self.is_session_terminated = true;
1505 self.thread_states.exit_all_threads();
1506 cx.notify();
1507
1508 let task = if self
1509 .capabilities
1510 .supports_terminate_request
1511 .unwrap_or_default()
1512 {
1513 self.request(
1514 TerminateCommand {
1515 restart: Some(false),
1516 },
1517 Self::clear_active_debug_line_response,
1518 cx,
1519 )
1520 } else {
1521 self.request(
1522 DisconnectCommand {
1523 restart: Some(false),
1524 terminate_debuggee: Some(true),
1525 suspend_debuggee: Some(false),
1526 },
1527 Self::clear_active_debug_line_response,
1528 cx,
1529 )
1530 };
1531
1532 cx.background_spawn(async move {
1533 let _ = task.await;
1534 })
1535 }
1536
1537 pub fn completions(
1538 &mut self,
1539 query: CompletionsQuery,
1540 cx: &mut Context<Self>,
1541 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1542 let task = self.request(query, |_, result, _| result.log_err(), cx);
1543
1544 cx.background_executor().spawn(async move {
1545 anyhow::Ok(
1546 task.await
1547 .map(|response| response.targets)
1548 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1549 )
1550 })
1551 }
1552
1553 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1554 self.thread_states.continue_thread(thread_id);
1555 self.request(
1556 ContinueCommand {
1557 args: ContinueArguments {
1558 thread_id: thread_id.0,
1559 single_thread: Some(true),
1560 },
1561 },
1562 Self::on_step_response::<ContinueCommand>(thread_id),
1563 cx,
1564 )
1565 .detach();
1566 }
1567
1568 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1569 match self.mode {
1570 Mode::Local(ref local) => Some(local.client.clone()),
1571 Mode::Remote(_) => None,
1572 }
1573 }
1574
1575 pub fn step_over(
1576 &mut self,
1577 thread_id: ThreadId,
1578 granularity: SteppingGranularity,
1579 cx: &mut Context<Self>,
1580 ) {
1581 let supports_single_thread_execution_requests =
1582 self.capabilities.supports_single_thread_execution_requests;
1583 let supports_stepping_granularity = self
1584 .capabilities
1585 .supports_stepping_granularity
1586 .unwrap_or_default();
1587
1588 let command = NextCommand {
1589 inner: StepCommand {
1590 thread_id: thread_id.0,
1591 granularity: supports_stepping_granularity.then(|| granularity),
1592 single_thread: supports_single_thread_execution_requests,
1593 },
1594 };
1595
1596 self.thread_states.process_step(thread_id);
1597 self.request(
1598 command,
1599 Self::on_step_response::<NextCommand>(thread_id),
1600 cx,
1601 )
1602 .detach();
1603 }
1604
1605 pub fn step_in(
1606 &mut self,
1607 thread_id: ThreadId,
1608 granularity: SteppingGranularity,
1609 cx: &mut Context<Self>,
1610 ) {
1611 let supports_single_thread_execution_requests =
1612 self.capabilities.supports_single_thread_execution_requests;
1613 let supports_stepping_granularity = self
1614 .capabilities
1615 .supports_stepping_granularity
1616 .unwrap_or_default();
1617
1618 let command = StepInCommand {
1619 inner: StepCommand {
1620 thread_id: thread_id.0,
1621 granularity: supports_stepping_granularity.then(|| granularity),
1622 single_thread: supports_single_thread_execution_requests,
1623 },
1624 };
1625
1626 self.thread_states.process_step(thread_id);
1627 self.request(
1628 command,
1629 Self::on_step_response::<StepInCommand>(thread_id),
1630 cx,
1631 )
1632 .detach();
1633 }
1634
1635 pub fn step_out(
1636 &mut self,
1637 thread_id: ThreadId,
1638 granularity: SteppingGranularity,
1639 cx: &mut Context<Self>,
1640 ) {
1641 let supports_single_thread_execution_requests =
1642 self.capabilities.supports_single_thread_execution_requests;
1643 let supports_stepping_granularity = self
1644 .capabilities
1645 .supports_stepping_granularity
1646 .unwrap_or_default();
1647
1648 let command = StepOutCommand {
1649 inner: StepCommand {
1650 thread_id: thread_id.0,
1651 granularity: supports_stepping_granularity.then(|| granularity),
1652 single_thread: supports_single_thread_execution_requests,
1653 },
1654 };
1655
1656 self.thread_states.process_step(thread_id);
1657 self.request(
1658 command,
1659 Self::on_step_response::<StepOutCommand>(thread_id),
1660 cx,
1661 )
1662 .detach();
1663 }
1664
1665 pub fn step_back(
1666 &mut self,
1667 thread_id: ThreadId,
1668 granularity: SteppingGranularity,
1669 cx: &mut Context<Self>,
1670 ) {
1671 let supports_single_thread_execution_requests =
1672 self.capabilities.supports_single_thread_execution_requests;
1673 let supports_stepping_granularity = self
1674 .capabilities
1675 .supports_stepping_granularity
1676 .unwrap_or_default();
1677
1678 let command = StepBackCommand {
1679 inner: StepCommand {
1680 thread_id: thread_id.0,
1681 granularity: supports_stepping_granularity.then(|| granularity),
1682 single_thread: supports_single_thread_execution_requests,
1683 },
1684 };
1685
1686 self.thread_states.process_step(thread_id);
1687
1688 self.request(
1689 command,
1690 Self::on_step_response::<StepBackCommand>(thread_id),
1691 cx,
1692 )
1693 .detach();
1694 }
1695
1696 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1697 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1698 && self.requests.contains_key(&ThreadsCommand.type_id())
1699 && self.threads.contains_key(&thread_id)
1700 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1701 // We could still be using an old thread id and have sent a new thread's request
1702 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1703 // But it very well could cause a minor bug in the future that is hard to track down
1704 {
1705 self.fetch(
1706 super::dap_command::StackTraceCommand {
1707 thread_id: thread_id.0,
1708 start_frame: None,
1709 levels: None,
1710 },
1711 move |this, stack_frames, cx| {
1712 let stack_frames = stack_frames.log_err()?;
1713
1714 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1715 thread.stack_frame_ids =
1716 stack_frames.iter().map(|frame| frame.id).collect();
1717 });
1718 debug_assert!(
1719 matches!(entry, indexmap::map::Entry::Occupied(_)),
1720 "Sent request for thread_id that doesn't exist"
1721 );
1722
1723 this.stack_frames.extend(
1724 stack_frames
1725 .iter()
1726 .cloned()
1727 .map(|frame| (frame.id, StackFrame::from(frame))),
1728 );
1729
1730 this.invalidate_command_type::<ScopesCommand>();
1731 this.invalidate_command_type::<VariablesCommand>();
1732
1733 cx.emit(SessionEvent::StackTrace);
1734 cx.notify();
1735 Some(stack_frames)
1736 },
1737 cx,
1738 );
1739 }
1740
1741 self.threads
1742 .get(&thread_id)
1743 .map(|thread| {
1744 thread
1745 .stack_frame_ids
1746 .iter()
1747 .filter_map(|id| self.stack_frames.get(id))
1748 .cloned()
1749 .collect()
1750 })
1751 .unwrap_or_default()
1752 }
1753
1754 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1755 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1756 && self
1757 .requests
1758 .contains_key(&TypeId::of::<StackTraceCommand>())
1759 {
1760 self.fetch(
1761 ScopesCommand { stack_frame_id },
1762 move |this, scopes, cx| {
1763 let scopes = scopes.log_err()?;
1764
1765 for scope in scopes .iter(){
1766 this.variables(scope.variables_reference, cx);
1767 }
1768
1769 let entry = this
1770 .stack_frames
1771 .entry(stack_frame_id)
1772 .and_modify(|stack_frame| {
1773 stack_frame.scopes = scopes.clone();
1774 });
1775
1776 cx.emit(SessionEvent::Variables);
1777
1778 debug_assert!(
1779 matches!(entry, indexmap::map::Entry::Occupied(_)),
1780 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1781 );
1782
1783 Some(scopes)
1784 },
1785 cx,
1786 );
1787 }
1788
1789 self.stack_frames
1790 .get(&stack_frame_id)
1791 .map(|frame| frame.scopes.as_slice())
1792 .unwrap_or_default()
1793 }
1794
1795 pub fn variables(
1796 &mut self,
1797 variables_reference: VariableReference,
1798 cx: &mut Context<Self>,
1799 ) -> Vec<dap::Variable> {
1800 let command = VariablesCommand {
1801 variables_reference,
1802 filter: None,
1803 start: None,
1804 count: None,
1805 format: None,
1806 };
1807
1808 self.fetch(
1809 command,
1810 move |this, variables, cx| {
1811 let variables = variables.log_err()?;
1812 this.variables
1813 .insert(variables_reference, variables.clone());
1814
1815 cx.emit(SessionEvent::Variables);
1816 Some(variables)
1817 },
1818 cx,
1819 );
1820
1821 self.variables
1822 .get(&variables_reference)
1823 .cloned()
1824 .unwrap_or_default()
1825 }
1826
1827 pub fn set_variable_value(
1828 &mut self,
1829 variables_reference: u64,
1830 name: String,
1831 value: String,
1832 cx: &mut Context<Self>,
1833 ) {
1834 if self.capabilities.supports_set_variable.unwrap_or_default() {
1835 self.request(
1836 SetVariableValueCommand {
1837 name,
1838 value,
1839 variables_reference,
1840 },
1841 move |this, response, cx| {
1842 let response = response.log_err()?;
1843 this.invalidate_command_type::<VariablesCommand>();
1844 cx.notify();
1845 Some(response)
1846 },
1847 cx,
1848 )
1849 .detach()
1850 }
1851 }
1852
1853 pub fn evaluate(
1854 &mut self,
1855 expression: String,
1856 context: Option<EvaluateArgumentsContext>,
1857 frame_id: Option<u64>,
1858 source: Option<Source>,
1859 cx: &mut Context<Self>,
1860 ) {
1861 self.request(
1862 EvaluateCommand {
1863 expression,
1864 context,
1865 frame_id,
1866 source,
1867 },
1868 |this, response, cx| {
1869 let response = response.log_err()?;
1870 this.output_token.0 += 1;
1871 this.output.push_back(dap::OutputEvent {
1872 category: None,
1873 output: response.result.clone(),
1874 group: None,
1875 variables_reference: Some(response.variables_reference),
1876 source: None,
1877 line: None,
1878 column: None,
1879 data: None,
1880 location_reference: None,
1881 });
1882
1883 this.invalidate_command_type::<ScopesCommand>();
1884 cx.notify();
1885 Some(response)
1886 },
1887 cx,
1888 )
1889 .detach();
1890 }
1891
1892 pub fn location(
1893 &mut self,
1894 reference: u64,
1895 cx: &mut Context<Self>,
1896 ) -> Option<dap::LocationsResponse> {
1897 self.fetch(
1898 LocationsCommand { reference },
1899 move |this, response, _| {
1900 let response = response.log_err()?;
1901 this.locations.insert(reference, response.clone());
1902 Some(response)
1903 },
1904 cx,
1905 );
1906 self.locations.get(&reference).cloned()
1907 }
1908 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1909 let command = DisconnectCommand {
1910 restart: Some(false),
1911 terminate_debuggee: Some(true),
1912 suspend_debuggee: Some(false),
1913 };
1914
1915 self.request(command, Self::empty_response, cx).detach()
1916 }
1917
1918 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1919 if self
1920 .capabilities
1921 .supports_terminate_threads_request
1922 .unwrap_or_default()
1923 {
1924 self.request(
1925 TerminateThreadsCommand {
1926 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1927 },
1928 Self::clear_active_debug_line_response,
1929 cx,
1930 )
1931 .detach();
1932 } else {
1933 self.shutdown(cx).detach();
1934 }
1935 }
1936}
1937
1938fn create_local_session(
1939 breakpoint_store: Entity<BreakpointStore>,
1940 session_id: SessionId,
1941 parent_session: Option<Entity<Session>>,
1942 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1943 initialized_tx: oneshot::Sender<()>,
1944 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1945 mode: LocalMode,
1946 capabilities: Capabilities,
1947 cx: &mut Context<Session>,
1948) -> Session {
1949 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1950 let mut initialized_tx = Some(initialized_tx);
1951 while let Some(message) = message_rx.next().await {
1952 if let Message::Event(event) = message {
1953 if let Events::Initialized(_) = *event {
1954 if let Some(tx) = initialized_tx.take() {
1955 tx.send(()).ok();
1956 }
1957 } else {
1958 let Ok(_) = this.update(cx, |session, cx| {
1959 session.handle_dap_event(event, cx);
1960 }) else {
1961 break;
1962 };
1963 }
1964 } else {
1965 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1966 else {
1967 break;
1968 };
1969 }
1970 }
1971 })];
1972
1973 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1974 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1975 if let Some(local) = (!this.ignore_breakpoints)
1976 .then(|| this.as_local_mut())
1977 .flatten()
1978 {
1979 local
1980 .send_breakpoints_from_path(path.clone(), *reason, cx)
1981 .detach();
1982 };
1983 }
1984 BreakpointStoreEvent::BreakpointsCleared(paths) => {
1985 if let Some(local) = (!this.ignore_breakpoints)
1986 .then(|| this.as_local_mut())
1987 .flatten()
1988 {
1989 local.unset_breakpoints_from_paths(paths, cx).detach();
1990 }
1991 }
1992 BreakpointStoreEvent::ActiveDebugLineChanged => {}
1993 })
1994 .detach();
1995
1996 Session {
1997 mode: Mode::Local(mode),
1998 id: session_id,
1999 child_session_ids: HashSet::default(),
2000 parent_id: parent_session.map(|session| session.read(cx).id),
2001 variables: Default::default(),
2002 capabilities,
2003 thread_states: ThreadStates::default(),
2004 output_token: OutputToken(0),
2005 ignore_breakpoints: false,
2006 output: circular_buffer::CircularBuffer::boxed(),
2007 requests: HashMap::default(),
2008 modules: Vec::default(),
2009 loaded_sources: Vec::default(),
2010 threads: IndexMap::default(),
2011 stack_frames: IndexMap::default(),
2012 locations: Default::default(),
2013 _background_tasks,
2014 is_session_terminated: false,
2015 }
2016}