1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{Context as _, Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 adapters::{DapDelegate, DapStatus},
21 client::{DebugAdapterClient, SessionId},
22 messages::{Events, Message},
23};
24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
25use futures::channel::oneshot;
26use futures::{FutureExt, future::Shared};
27use gpui::{
28 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
29};
30use rpc::AnyProtoClient;
31use serde_json::{Value, json};
32use settings::Settings;
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::path::PathBuf;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::{DebugAdapterConfig, DebugTaskDefinition};
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47
48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
49#[repr(transparent)]
50pub struct ThreadId(pub u64);
51
52impl ThreadId {
53 pub const MIN: ThreadId = ThreadId(u64::MIN);
54 pub const MAX: ThreadId = ThreadId(u64::MAX);
55}
56
57impl From<u64> for ThreadId {
58 fn from(id: u64) -> Self {
59 Self(id)
60 }
61}
62
63#[derive(Clone, Debug)]
64pub struct StackFrame {
65 pub dap: dap::StackFrame,
66 pub scopes: Vec<dap::Scope>,
67}
68
69impl From<dap::StackFrame> for StackFrame {
70 fn from(stack_frame: dap::StackFrame) -> Self {
71 Self {
72 scopes: vec![],
73 dap: stack_frame,
74 }
75 }
76}
77
78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
79pub enum ThreadStatus {
80 #[default]
81 Running,
82 Stopped,
83 Stepping,
84 Exited,
85 Ended,
86}
87
88impl ThreadStatus {
89 pub fn label(&self) -> &'static str {
90 match self {
91 ThreadStatus::Running => "Running",
92 ThreadStatus::Stopped => "Stopped",
93 ThreadStatus::Stepping => "Stepping",
94 ThreadStatus::Exited => "Exited",
95 ThreadStatus::Ended => "Ended",
96 }
97 }
98}
99
100#[derive(Debug)]
101pub struct Thread {
102 dap: dap::Thread,
103 stack_frame_ids: IndexSet<StackFrameId>,
104 _has_stopped: bool,
105}
106
107impl From<dap::Thread> for Thread {
108 fn from(dap: dap::Thread) -> Self {
109 Self {
110 dap,
111 stack_frame_ids: Default::default(),
112 _has_stopped: false,
113 }
114 }
115}
116
117type UpstreamProjectId = u64;
118
119struct RemoteConnection {
120 _client: AnyProtoClient,
121 _upstream_project_id: UpstreamProjectId,
122}
123
124impl RemoteConnection {
125 fn send_proto_client_request<R: DapCommand>(
126 &self,
127 _request: R,
128 _session_id: SessionId,
129 cx: &mut App,
130 ) -> Task<Result<R::Response>> {
131 // let message = request.to_proto(session_id, self.upstream_project_id);
132 // let upstream_client = self.client.clone();
133 cx.background_executor().spawn(async move {
134 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
135 // let response = upstream_client.request(message).await?;
136 // request.response_from_proto(response)
137 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
138 })
139 }
140
141 fn request<R: DapCommand>(
142 &self,
143 request: R,
144 session_id: SessionId,
145 cx: &mut App,
146 ) -> Task<Result<R::Response>>
147 where
148 <R::DapRequest as dap::requests::Request>::Response: 'static,
149 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
150 {
151 return self.send_proto_client_request::<R>(request, session_id, cx);
152 }
153}
154
155enum Mode {
156 Local(LocalMode),
157 Remote(RemoteConnection),
158}
159
160#[derive(Clone)]
161pub struct LocalMode {
162 client: Arc<DebugAdapterClient>,
163 config: DebugAdapterConfig,
164 adapter: Arc<dyn DebugAdapter>,
165 breakpoint_store: Entity<BreakpointStore>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 debug_adapters: Arc<DapRegistry>,
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<Self>> {
194 Self::new_inner(
195 debug_adapters,
196 session_id,
197 parent_session,
198 breakpoint_store,
199 config,
200 delegate,
201 messages_tx,
202 async |_, _| {},
203 cx,
204 )
205 }
206 #[cfg(any(test, feature = "test-support"))]
207 fn new_fake(
208 session_id: SessionId,
209 parent_session: Option<Entity<Session>>,
210 breakpoint_store: Entity<BreakpointStore>,
211 config: DebugAdapterConfig,
212 delegate: DapAdapterDelegate,
213 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
214 caps: Capabilities,
215 fail: bool,
216 cx: AsyncApp,
217 ) -> Task<Result<Self>> {
218 use task::DebugRequestDisposition;
219
220 let request = match config.request.clone() {
221 DebugRequestDisposition::UserConfigured(request) => request,
222 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
223 match reverse_request_args.request {
224 dap::StartDebuggingRequestArgumentsRequest::Launch => {
225 DebugRequestType::Launch(task::LaunchConfig {
226 program: "".to_owned(),
227 cwd: None,
228 args: Default::default(),
229 })
230 }
231 dap::StartDebuggingRequestArgumentsRequest::Attach => {
232 DebugRequestType::Attach(task::AttachConfig {
233 process_id: Some(0),
234 })
235 }
236 }
237 }
238 };
239
240 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
241 session
242 .client
243 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
244 .await;
245
246 let paths = cx
247 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
248 .expect("Breakpoint store should exist in all tests that start debuggers");
249
250 session
251 .client
252 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
253 let p = Arc::from(Path::new(&args.source.path.unwrap()));
254 if !paths.contains(&p) {
255 panic!("Sent breakpoints for path without any")
256 }
257
258 Ok(dap::SetBreakpointsResponse {
259 breakpoints: Vec::default(),
260 })
261 })
262 .await;
263
264 match request {
265 dap::DebugRequestType::Launch(_) => {
266 if fail {
267 session
268 .client
269 .on_request::<dap::requests::Launch, _>(move |_, _| {
270 Err(dap::ErrorResponse {
271 error: Some(dap::Message {
272 id: 1,
273 format: "error".into(),
274 variables: None,
275 send_telemetry: None,
276 show_user: None,
277 url: None,
278 url_label: None,
279 }),
280 })
281 })
282 .await;
283 } else {
284 session
285 .client
286 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
287 .await;
288 }
289 }
290 dap::DebugRequestType::Attach(attach_config) => {
291 if fail {
292 session
293 .client
294 .on_request::<dap::requests::Attach, _>(move |_, _| {
295 Err(dap::ErrorResponse {
296 error: Some(dap::Message {
297 id: 1,
298 format: "error".into(),
299 variables: None,
300 send_telemetry: None,
301 show_user: None,
302 url: None,
303 url_label: None,
304 }),
305 })
306 })
307 .await;
308 } else {
309 session
310 .client
311 .on_request::<dap::requests::Attach, _>(move |_, args| {
312 assert_eq!(
313 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
314 args.raw
315 );
316
317 Ok(())
318 })
319 .await;
320 }
321 }
322 }
323
324 session
325 .client
326 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
327 .await;
328 session.client.fake_event(Events::Initialized(None)).await;
329 };
330 Self::new_inner(
331 DapRegistry::fake().into(),
332 session_id,
333 parent_session,
334 breakpoint_store,
335 config,
336 delegate,
337 messages_tx,
338 callback,
339 cx,
340 )
341 }
342 fn new_inner(
343 registry: Arc<DapRegistry>,
344 session_id: SessionId,
345 parent_session: Option<Entity<Session>>,
346 breakpoint_store: Entity<BreakpointStore>,
347 config: DebugAdapterConfig,
348 delegate: DapAdapterDelegate,
349 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
350 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
351 cx: AsyncApp,
352 ) -> Task<Result<Self>> {
353 cx.spawn(async move |cx| {
354 let (adapter, binary) =
355 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
356
357 let message_handler = Box::new(move |message| {
358 messages_tx.unbounded_send(message).ok();
359 });
360
361 let client = Arc::new(
362 if let Some(client) = parent_session
363 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
364 .flatten()
365 {
366 client
367 .reconnect(session_id, binary, message_handler, cx.clone())
368 .await?
369 } else {
370 DebugAdapterClient::start(
371 session_id,
372 adapter.name(),
373 binary,
374 message_handler,
375 cx.clone(),
376 )
377 .await
378 .with_context(|| "Failed to start communication with debug adapter")?
379 },
380 );
381
382 let mut session = Self {
383 client,
384 adapter,
385 breakpoint_store,
386 config: config.clone(),
387 };
388
389 on_initialized(&mut session, cx.clone()).await;
390
391 Ok(session)
392 })
393 }
394
395 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
396 let tasks: Vec<_> = paths
397 .into_iter()
398 .map(|path| {
399 self.request(
400 dap_command::SetBreakpoints {
401 source: client_source(path),
402 source_modified: None,
403 breakpoints: vec![],
404 },
405 cx.background_executor().clone(),
406 )
407 })
408 .collect();
409
410 cx.background_spawn(async move {
411 futures::future::join_all(tasks)
412 .await
413 .iter()
414 .for_each(|res| match res {
415 Ok(_) => {}
416 Err(err) => {
417 log::warn!("Set breakpoints request failed: {}", err);
418 }
419 });
420 })
421 }
422
423 fn send_breakpoints_from_path(
424 &self,
425 abs_path: Arc<Path>,
426 reason: BreakpointUpdatedReason,
427 cx: &mut App,
428 ) -> Task<()> {
429 let breakpoints = self
430 .breakpoint_store
431 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
432 .into_iter()
433 .filter(|bp| bp.state.is_enabled())
434 .map(Into::into)
435 .collect();
436
437 let task = self.request(
438 dap_command::SetBreakpoints {
439 source: client_source(&abs_path),
440 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
441 breakpoints,
442 },
443 cx.background_executor().clone(),
444 );
445
446 cx.background_spawn(async move {
447 match task.await {
448 Ok(_) => {}
449 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
450 }
451 })
452 }
453
454 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
455 let mut breakpoint_tasks = Vec::new();
456 let breakpoints = self
457 .breakpoint_store
458 .read_with(cx, |store, cx| store.all_breakpoints(cx));
459
460 for (path, breakpoints) in breakpoints {
461 let breakpoints = if ignore_breakpoints {
462 vec![]
463 } else {
464 breakpoints
465 .into_iter()
466 .filter(|bp| bp.state.is_enabled())
467 .map(Into::into)
468 .collect()
469 };
470
471 breakpoint_tasks.push(self.request(
472 dap_command::SetBreakpoints {
473 source: client_source(&path),
474 source_modified: Some(false),
475 breakpoints,
476 },
477 cx.background_executor().clone(),
478 ));
479 }
480
481 cx.background_spawn(async move {
482 futures::future::join_all(breakpoint_tasks)
483 .await
484 .iter()
485 .for_each(|res| match res {
486 Ok(_) => {}
487 Err(err) => {
488 log::warn!("Set breakpoints request failed: {}", err);
489 }
490 });
491 })
492 }
493
494 async fn get_adapter_binary(
495 registry: &Arc<DapRegistry>,
496 config: &DebugAdapterConfig,
497 delegate: &DapAdapterDelegate,
498 cx: &mut AsyncApp,
499 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
500 let adapter = registry
501 .adapter(&config.adapter)
502 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
503
504 let binary = cx.update(|cx| {
505 ProjectSettings::get_global(cx)
506 .dap
507 .get(&adapter.name())
508 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
509 })?;
510
511 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
512 Err(error) => {
513 delegate.update_status(
514 adapter.name(),
515 DapStatus::Failed {
516 error: error.to_string(),
517 },
518 );
519
520 return Err(error);
521 }
522 Ok(mut binary) => {
523 delegate.update_status(adapter.name(), DapStatus::None);
524
525 let shell_env = delegate.shell_env().await;
526 let mut envs = binary.envs.unwrap_or_default();
527 envs.extend(shell_env);
528 binary.envs = Some(envs);
529
530 binary
531 }
532 };
533
534 Ok((adapter, binary))
535 }
536
537 pub fn label(&self) -> String {
538 self.config.label.clone()
539 }
540
541 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
542 let adapter_id = self.adapter.name().to_string();
543
544 self.request(Initialize { adapter_id }, cx.background_executor().clone())
545 }
546
547 fn initialize_sequence(
548 &self,
549 capabilities: &Capabilities,
550 initialized_rx: oneshot::Receiver<()>,
551 cx: &App,
552 ) -> Task<Result<()>> {
553 let (mut raw, is_launch) = match &self.config.request {
554 task::DebugRequestDisposition::UserConfigured(_) => {
555 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
556 debug_assert!(false, "This part of code should be unreachable in practice");
557 return Task::ready(Err(anyhow!(
558 "Expected debug config conversion to succeed"
559 )));
560 };
561 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
562 let raw = self.adapter.request_args(&raw);
563 (raw, is_launch)
564 }
565 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
566 start_debugging_request_arguments.configuration.clone(),
567 matches!(
568 start_debugging_request_arguments.request,
569 dap::StartDebuggingRequestArgumentsRequest::Launch
570 ),
571 ),
572 };
573
574 merge_json_value_into(
575 self.config.initialize_args.clone().unwrap_or(json!({})),
576 &mut raw,
577 );
578 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
579 let launch = if is_launch {
580 self.request(Launch { raw }, cx.background_executor().clone())
581 } else {
582 self.request(Attach { raw }, cx.background_executor().clone())
583 };
584
585 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
586
587 let configuration_sequence = cx.spawn({
588 let this = self.clone();
589 async move |cx| {
590 initialized_rx.await?;
591 // todo(debugger) figure out if we want to handle a breakpoint response error
592 // This will probably consist of letting a user know that breakpoints failed to be set
593 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
594
595 if configuration_done_supported {
596 this.request(ConfigurationDone {}, cx.background_executor().clone())
597 } else {
598 Task::ready(Ok(()))
599 }
600 .await
601 }
602 });
603
604 cx.background_spawn(async move {
605 futures::future::try_join(launch, configuration_sequence).await?;
606 Ok(())
607 })
608 }
609
610 fn request<R: LocalDapCommand>(
611 &self,
612 request: R,
613 executor: BackgroundExecutor,
614 ) -> Task<Result<R::Response>>
615 where
616 <R::DapRequest as dap::requests::Request>::Response: 'static,
617 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
618 {
619 let request = Arc::new(request);
620
621 let request_clone = request.clone();
622 let connection = self.client.clone();
623 let request_task = executor.spawn(async move {
624 let args = request_clone.to_dap();
625 connection.request::<R::DapRequest>(args).await
626 });
627
628 executor.spawn(async move {
629 let response = request.response_from_dap(request_task.await?);
630 response
631 })
632 }
633}
634impl From<RemoteConnection> for Mode {
635 fn from(value: RemoteConnection) -> Self {
636 Self::Remote(value)
637 }
638}
639
640impl Mode {
641 fn request_dap<R: DapCommand>(
642 &self,
643 session_id: SessionId,
644 request: R,
645 cx: &mut Context<Session>,
646 ) -> Task<Result<R::Response>>
647 where
648 <R::DapRequest as dap::requests::Request>::Response: 'static,
649 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
650 {
651 match self {
652 Mode::Local(debug_adapter_client) => {
653 debug_adapter_client.request(request, cx.background_executor().clone())
654 }
655 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
656 }
657 }
658}
659
660#[derive(Default)]
661struct ThreadStates {
662 global_state: Option<ThreadStatus>,
663 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
664}
665
666impl ThreadStates {
667 fn stop_all_threads(&mut self) {
668 self.global_state = Some(ThreadStatus::Stopped);
669 self.known_thread_states.clear();
670 }
671
672 fn exit_all_threads(&mut self) {
673 self.global_state = Some(ThreadStatus::Exited);
674 self.known_thread_states.clear();
675 }
676
677 fn continue_all_threads(&mut self) {
678 self.global_state = Some(ThreadStatus::Running);
679 self.known_thread_states.clear();
680 }
681
682 fn stop_thread(&mut self, thread_id: ThreadId) {
683 self.known_thread_states
684 .insert(thread_id, ThreadStatus::Stopped);
685 }
686
687 fn continue_thread(&mut self, thread_id: ThreadId) {
688 self.known_thread_states
689 .insert(thread_id, ThreadStatus::Running);
690 }
691
692 fn process_step(&mut self, thread_id: ThreadId) {
693 self.known_thread_states
694 .insert(thread_id, ThreadStatus::Stepping);
695 }
696
697 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
698 self.thread_state(thread_id)
699 .unwrap_or(ThreadStatus::Running)
700 }
701
702 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
703 self.known_thread_states
704 .get(&thread_id)
705 .copied()
706 .or(self.global_state)
707 }
708
709 fn exit_thread(&mut self, thread_id: ThreadId) {
710 self.known_thread_states
711 .insert(thread_id, ThreadStatus::Exited);
712 }
713
714 fn any_stopped_thread(&self) -> bool {
715 self.global_state
716 .is_some_and(|state| state == ThreadStatus::Stopped)
717 || self
718 .known_thread_states
719 .values()
720 .any(|status| *status == ThreadStatus::Stopped)
721 }
722}
723const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
724
725#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
726pub struct OutputToken(pub usize);
727/// Represents a current state of a single debug adapter and provides ways to mutate it.
728pub struct Session {
729 mode: Mode,
730 pub(super) capabilities: Capabilities,
731 id: SessionId,
732 child_session_ids: HashSet<SessionId>,
733 parent_id: Option<SessionId>,
734 ignore_breakpoints: bool,
735 modules: Vec<dap::Module>,
736 loaded_sources: Vec<dap::Source>,
737 output_token: OutputToken,
738 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
739 threads: IndexMap<ThreadId, Thread>,
740 thread_states: ThreadStates,
741 variables: HashMap<VariableReference, Vec<dap::Variable>>,
742 stack_frames: IndexMap<StackFrameId, StackFrame>,
743 locations: HashMap<u64, dap::LocationsResponse>,
744 is_session_terminated: bool,
745 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
746 _background_tasks: Vec<Task<()>>,
747}
748
749trait CacheableCommand: 'static + Send + Sync {
750 fn as_any(&self) -> &dyn Any;
751 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
752 fn dyn_hash(&self, hasher: &mut dyn Hasher);
753 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
754}
755
756impl<T> CacheableCommand for T
757where
758 T: DapCommand + PartialEq + Eq + Hash,
759{
760 fn as_any(&self) -> &dyn Any {
761 self
762 }
763
764 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
765 rhs.as_any()
766 .downcast_ref::<Self>()
767 .map_or(false, |rhs| self == rhs)
768 }
769
770 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
771 T::hash(self, &mut hasher);
772 }
773
774 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
775 self
776 }
777}
778
779pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
780
781impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
782 fn from(request: T) -> Self {
783 Self(Arc::new(request))
784 }
785}
786
787impl PartialEq for RequestSlot {
788 fn eq(&self, other: &Self) -> bool {
789 self.0.dyn_eq(other.0.as_ref())
790 }
791}
792
793impl Eq for RequestSlot {}
794
795impl Hash for RequestSlot {
796 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
797 self.0.dyn_hash(state);
798 self.0.as_any().type_id().hash(state)
799 }
800}
801
802#[derive(Debug, Clone, Hash, PartialEq, Eq)]
803pub struct CompletionsQuery {
804 pub query: String,
805 pub column: u64,
806 pub line: Option<u64>,
807 pub frame_id: Option<u64>,
808}
809
810impl CompletionsQuery {
811 pub fn new(
812 buffer: &language::Buffer,
813 cursor_position: language::Anchor,
814 frame_id: Option<u64>,
815 ) -> Self {
816 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
817 Self {
818 query: buffer.text(),
819 column: column as u64,
820 frame_id,
821 line: Some(row as u64),
822 }
823 }
824}
825
826pub enum SessionEvent {
827 Modules,
828 LoadedSources,
829 Stopped(Option<ThreadId>),
830 StackTrace,
831 Variables,
832 Threads,
833}
834
835pub(crate) enum SessionStateEvent {
836 Shutdown,
837}
838
839impl EventEmitter<SessionEvent> for Session {}
840impl EventEmitter<SessionStateEvent> for Session {}
841
842// local session will send breakpoint updates to DAP for all new breakpoints
843// remote side will only send breakpoint updates when it is a breakpoint created by that peer
844// BreakpointStore notifies session on breakpoint changes
845impl Session {
846 pub(crate) fn local(
847 breakpoint_store: Entity<BreakpointStore>,
848 session_id: SessionId,
849 parent_session: Option<Entity<Session>>,
850 delegate: DapAdapterDelegate,
851 config: DebugAdapterConfig,
852 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
853 initialized_tx: oneshot::Sender<()>,
854 debug_adapters: Arc<DapRegistry>,
855 cx: &mut App,
856 ) -> Task<Result<Entity<Self>>> {
857 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
858
859 cx.spawn(async move |cx| {
860 let mode = LocalMode::new(
861 debug_adapters,
862 session_id,
863 parent_session.clone(),
864 breakpoint_store.clone(),
865 config.clone(),
866 delegate,
867 message_tx,
868 cx.clone(),
869 )
870 .await?;
871
872 cx.new(|cx| {
873 create_local_session(
874 breakpoint_store,
875 session_id,
876 parent_session,
877 start_debugging_requests_tx,
878 initialized_tx,
879 message_rx,
880 mode,
881 cx,
882 )
883 })
884 })
885 }
886
887 #[cfg(any(test, feature = "test-support"))]
888 pub(crate) fn fake(
889 breakpoint_store: Entity<BreakpointStore>,
890 session_id: SessionId,
891 parent_session: Option<Entity<Session>>,
892 delegate: DapAdapterDelegate,
893 config: DebugAdapterConfig,
894 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
895 initialized_tx: oneshot::Sender<()>,
896 caps: Capabilities,
897 fails: bool,
898 cx: &mut App,
899 ) -> Task<Result<Entity<Session>>> {
900 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
901
902 cx.spawn(async move |cx| {
903 let mode = LocalMode::new_fake(
904 session_id,
905 parent_session.clone(),
906 breakpoint_store.clone(),
907 config.clone(),
908 delegate,
909 message_tx,
910 caps,
911 fails,
912 cx.clone(),
913 )
914 .await?;
915
916 cx.new(|cx| {
917 create_local_session(
918 breakpoint_store,
919 session_id,
920 parent_session,
921 start_debugging_requests_tx,
922 initialized_tx,
923 message_rx,
924 mode,
925 cx,
926 )
927 })
928 })
929 }
930
931 pub(crate) fn remote(
932 session_id: SessionId,
933 client: AnyProtoClient,
934 upstream_project_id: u64,
935 ignore_breakpoints: bool,
936 ) -> Self {
937 Self {
938 mode: Mode::Remote(RemoteConnection {
939 _client: client,
940 _upstream_project_id: upstream_project_id,
941 }),
942 id: session_id,
943 child_session_ids: HashSet::default(),
944 parent_id: None,
945 capabilities: Capabilities::default(),
946 ignore_breakpoints,
947 variables: Default::default(),
948 stack_frames: Default::default(),
949 thread_states: ThreadStates::default(),
950 output_token: OutputToken(0),
951 output: circular_buffer::CircularBuffer::boxed(),
952 requests: HashMap::default(),
953 modules: Vec::default(),
954 loaded_sources: Vec::default(),
955 threads: IndexMap::default(),
956 _background_tasks: Vec::default(),
957 locations: Default::default(),
958 is_session_terminated: false,
959 }
960 }
961
962 pub fn session_id(&self) -> SessionId {
963 self.id
964 }
965
966 pub fn child_session_ids(&self) -> HashSet<SessionId> {
967 self.child_session_ids.clone()
968 }
969
970 pub fn add_child_session_id(&mut self, session_id: SessionId) {
971 self.child_session_ids.insert(session_id);
972 }
973
974 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
975 self.child_session_ids.remove(&session_id);
976 }
977
978 pub fn parent_id(&self) -> Option<SessionId> {
979 self.parent_id
980 }
981
982 pub fn capabilities(&self) -> &Capabilities {
983 &self.capabilities
984 }
985
986 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
987 if let Mode::Local(local_mode) = &self.mode {
988 Some(local_mode.config.clone())
989 } else {
990 None
991 }
992 }
993
994 pub fn is_terminated(&self) -> bool {
995 self.is_session_terminated
996 }
997
998 pub fn is_local(&self) -> bool {
999 matches!(self.mode, Mode::Local(_))
1000 }
1001
1002 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1003 match &mut self.mode {
1004 Mode::Local(local_mode) => Some(local_mode),
1005 Mode::Remote(_) => None,
1006 }
1007 }
1008
1009 pub fn as_local(&self) -> Option<&LocalMode> {
1010 match &self.mode {
1011 Mode::Local(local_mode) => Some(local_mode),
1012 Mode::Remote(_) => None,
1013 }
1014 }
1015
1016 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1017 match &self.mode {
1018 Mode::Local(local_mode) => {
1019 let capabilities = local_mode.clone().request_initialization(cx);
1020
1021 cx.spawn(async move |this, cx| {
1022 let capabilities = capabilities.await?;
1023 this.update(cx, |session, _| {
1024 session.capabilities = capabilities;
1025 })?;
1026 Ok(())
1027 })
1028 }
1029 Mode::Remote(_) => Task::ready(Err(anyhow!(
1030 "Cannot send initialize request from remote session"
1031 ))),
1032 }
1033 }
1034
1035 pub(super) fn initialize_sequence(
1036 &mut self,
1037 initialize_rx: oneshot::Receiver<()>,
1038 cx: &mut Context<Self>,
1039 ) -> Task<Result<()>> {
1040 match &self.mode {
1041 Mode::Local(local_mode) => {
1042 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1043 }
1044 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1045 }
1046 }
1047
1048 pub fn output(
1049 &self,
1050 since: OutputToken,
1051 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1052 if self.output_token.0 == 0 {
1053 return (self.output.range(0..0), OutputToken(0));
1054 };
1055
1056 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1057
1058 let clamped_events_since = events_since.clamp(0, self.output.len());
1059 (
1060 self.output
1061 .range(self.output.len() - clamped_events_since..),
1062 self.output_token,
1063 )
1064 }
1065
1066 pub fn respond_to_client(
1067 &self,
1068 request_seq: u64,
1069 success: bool,
1070 command: String,
1071 body: Option<serde_json::Value>,
1072 cx: &mut Context<Self>,
1073 ) -> Task<Result<()>> {
1074 let Some(local_session) = self.as_local().cloned() else {
1075 unreachable!("Cannot respond to remote client");
1076 };
1077
1078 cx.background_spawn(async move {
1079 local_session
1080 .client
1081 .send_message(Message::Response(Response {
1082 body,
1083 success,
1084 command,
1085 seq: request_seq + 1,
1086 request_seq,
1087 message: None,
1088 }))
1089 .await
1090 })
1091 }
1092
1093 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1094 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1095 self.thread_states.stop_all_threads();
1096
1097 self.invalidate_command_type::<StackTraceCommand>();
1098 }
1099
1100 // Event if we stopped all threads we still need to insert the thread_id
1101 // to our own data
1102 if let Some(thread_id) = event.thread_id {
1103 self.thread_states.stop_thread(ThreadId(thread_id));
1104
1105 self.invalidate_state(
1106 &StackTraceCommand {
1107 thread_id,
1108 start_frame: None,
1109 levels: None,
1110 }
1111 .into(),
1112 );
1113 }
1114
1115 self.invalidate_generic();
1116 self.threads.clear();
1117 self.variables.clear();
1118 cx.emit(SessionEvent::Stopped(
1119 event
1120 .thread_id
1121 .map(Into::into)
1122 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1123 ));
1124 cx.notify();
1125 }
1126
1127 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1128 match *event {
1129 Events::Initialized(_) => {
1130 debug_assert!(
1131 false,
1132 "Initialized event should have been handled in LocalMode"
1133 );
1134 }
1135 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1136 Events::Continued(event) => {
1137 if event.all_threads_continued.unwrap_or_default() {
1138 self.thread_states.continue_all_threads();
1139 } else {
1140 self.thread_states
1141 .continue_thread(ThreadId(event.thread_id));
1142 }
1143 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1144 self.invalidate_generic();
1145 }
1146 Events::Exited(_event) => {
1147 self.clear_active_debug_line(cx);
1148 }
1149 Events::Terminated(_) => {
1150 self.is_session_terminated = true;
1151 self.clear_active_debug_line(cx);
1152 }
1153 Events::Thread(event) => {
1154 let thread_id = ThreadId(event.thread_id);
1155
1156 match event.reason {
1157 dap::ThreadEventReason::Started => {
1158 self.thread_states.continue_thread(thread_id);
1159 }
1160 dap::ThreadEventReason::Exited => {
1161 self.thread_states.exit_thread(thread_id);
1162 }
1163 reason => {
1164 log::error!("Unhandled thread event reason {:?}", reason);
1165 }
1166 }
1167 self.invalidate_state(&ThreadsCommand.into());
1168 cx.notify();
1169 }
1170 Events::Output(event) => {
1171 if event
1172 .category
1173 .as_ref()
1174 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1175 {
1176 return;
1177 }
1178
1179 self.output.push_back(event);
1180 self.output_token.0 += 1;
1181 cx.notify();
1182 }
1183 Events::Breakpoint(_) => {}
1184 Events::Module(event) => {
1185 match event.reason {
1186 dap::ModuleEventReason::New => {
1187 self.modules.push(event.module);
1188 }
1189 dap::ModuleEventReason::Changed => {
1190 if let Some(module) = self
1191 .modules
1192 .iter_mut()
1193 .find(|other| event.module.id == other.id)
1194 {
1195 *module = event.module;
1196 }
1197 }
1198 dap::ModuleEventReason::Removed => {
1199 self.modules.retain(|other| event.module.id != other.id);
1200 }
1201 }
1202
1203 // todo(debugger): We should only send the invalidate command to downstream clients.
1204 // self.invalidate_state(&ModulesCommand.into());
1205 }
1206 Events::LoadedSource(_) => {
1207 self.invalidate_state(&LoadedSourcesCommand.into());
1208 }
1209 Events::Capabilities(event) => {
1210 self.capabilities = self.capabilities.merge(event.capabilities);
1211 cx.notify();
1212 }
1213 Events::Memory(_) => {}
1214 Events::Process(_) => {}
1215 Events::ProgressEnd(_) => {}
1216 Events::ProgressStart(_) => {}
1217 Events::ProgressUpdate(_) => {}
1218 Events::Invalidated(_) => {}
1219 Events::Other(_) => {}
1220 }
1221 }
1222
1223 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1224 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1225 &mut self,
1226 request: T,
1227 process_result: impl FnOnce(
1228 &mut Self,
1229 Result<T::Response>,
1230 &mut Context<Self>,
1231 ) -> Option<T::Response>
1232 + 'static,
1233 cx: &mut Context<Self>,
1234 ) {
1235 const {
1236 assert!(
1237 T::CACHEABLE,
1238 "Only requests marked as cacheable should invoke `fetch`"
1239 );
1240 }
1241
1242 if !self.thread_states.any_stopped_thread()
1243 && request.type_id() != TypeId::of::<ThreadsCommand>()
1244 || self.is_session_terminated
1245 {
1246 return;
1247 }
1248
1249 let request_map = self
1250 .requests
1251 .entry(std::any::TypeId::of::<T>())
1252 .or_default();
1253
1254 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1255 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1256
1257 let task = Self::request_inner::<Arc<T>>(
1258 &self.capabilities,
1259 self.id,
1260 &self.mode,
1261 command,
1262 process_result,
1263 cx,
1264 );
1265 let task = cx
1266 .background_executor()
1267 .spawn(async move {
1268 let _ = task.await?;
1269 Some(())
1270 })
1271 .shared();
1272
1273 vacant.insert(task);
1274 cx.notify();
1275 }
1276 }
1277
1278 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1279 capabilities: &Capabilities,
1280 session_id: SessionId,
1281 mode: &Mode,
1282 request: T,
1283 process_result: impl FnOnce(
1284 &mut Self,
1285 Result<T::Response>,
1286 &mut Context<Self>,
1287 ) -> Option<T::Response>
1288 + 'static,
1289 cx: &mut Context<Self>,
1290 ) -> Task<Option<T::Response>> {
1291 if !T::is_supported(&capabilities) {
1292 log::warn!(
1293 "Attempted to send a DAP request that isn't supported: {:?}",
1294 request
1295 );
1296 let error = Err(anyhow::Error::msg(
1297 "Couldn't complete request because it's not supported",
1298 ));
1299 return cx.spawn(async move |this, cx| {
1300 this.update(cx, |this, cx| process_result(this, error, cx))
1301 .log_err()
1302 .flatten()
1303 });
1304 }
1305
1306 let request = mode.request_dap(session_id, request, cx);
1307 cx.spawn(async move |this, cx| {
1308 let result = request.await;
1309 this.update(cx, |this, cx| process_result(this, result, cx))
1310 .log_err()
1311 .flatten()
1312 })
1313 }
1314
1315 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1316 &self,
1317 request: T,
1318 process_result: impl FnOnce(
1319 &mut Self,
1320 Result<T::Response>,
1321 &mut Context<Self>,
1322 ) -> Option<T::Response>
1323 + 'static,
1324 cx: &mut Context<Self>,
1325 ) -> Task<Option<T::Response>> {
1326 Self::request_inner(
1327 &self.capabilities,
1328 self.id,
1329 &self.mode,
1330 request,
1331 process_result,
1332 cx,
1333 )
1334 }
1335
1336 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1337 self.requests.remove(&std::any::TypeId::of::<Command>());
1338 }
1339
1340 fn invalidate_generic(&mut self) {
1341 self.invalidate_command_type::<ModulesCommand>();
1342 self.invalidate_command_type::<LoadedSourcesCommand>();
1343 self.invalidate_command_type::<ThreadsCommand>();
1344 }
1345
1346 fn invalidate_state(&mut self, key: &RequestSlot) {
1347 self.requests
1348 .entry(key.0.as_any().type_id())
1349 .and_modify(|request_map| {
1350 request_map.remove(&key);
1351 });
1352 }
1353
1354 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1355 self.thread_states.thread_status(thread_id)
1356 }
1357
1358 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1359 self.fetch(
1360 dap_command::ThreadsCommand,
1361 |this, result, cx| {
1362 let result = result.log_err()?;
1363
1364 this.threads = result
1365 .iter()
1366 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1367 .collect();
1368
1369 this.invalidate_command_type::<StackTraceCommand>();
1370 cx.emit(SessionEvent::Threads);
1371 cx.notify();
1372
1373 Some(result)
1374 },
1375 cx,
1376 );
1377
1378 self.threads
1379 .values()
1380 .map(|thread| {
1381 (
1382 thread.dap.clone(),
1383 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1384 )
1385 })
1386 .collect()
1387 }
1388
1389 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1390 self.fetch(
1391 dap_command::ModulesCommand,
1392 |this, result, cx| {
1393 let result = result.log_err()?;
1394
1395 this.modules = result.iter().cloned().collect();
1396 cx.emit(SessionEvent::Modules);
1397 cx.notify();
1398
1399 Some(result)
1400 },
1401 cx,
1402 );
1403
1404 &self.modules
1405 }
1406
1407 pub fn ignore_breakpoints(&self) -> bool {
1408 self.ignore_breakpoints
1409 }
1410
1411 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1412 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1413 }
1414
1415 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1416 if self.ignore_breakpoints == ignore {
1417 return Task::ready(());
1418 }
1419
1420 self.ignore_breakpoints = ignore;
1421
1422 if let Some(local) = self.as_local() {
1423 local.send_all_breakpoints(ignore, cx)
1424 } else {
1425 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1426 unimplemented!()
1427 }
1428 }
1429
1430 pub fn breakpoints_enabled(&self) -> bool {
1431 self.ignore_breakpoints
1432 }
1433
1434 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1435 self.fetch(
1436 dap_command::LoadedSourcesCommand,
1437 |this, result, cx| {
1438 let result = result.log_err()?;
1439 this.loaded_sources = result.iter().cloned().collect();
1440 cx.emit(SessionEvent::LoadedSources);
1441 cx.notify();
1442 Some(result)
1443 },
1444 cx,
1445 );
1446
1447 &self.loaded_sources
1448 }
1449
1450 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1451 res.log_err()?;
1452 Some(())
1453 }
1454
1455 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1456 thread_id: ThreadId,
1457 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1458 {
1459 move |this, response, cx| match response.log_err() {
1460 Some(response) => Some(response),
1461 None => {
1462 this.thread_states.stop_thread(thread_id);
1463 cx.notify();
1464 None
1465 }
1466 }
1467 }
1468
1469 fn clear_active_debug_line_response(
1470 &mut self,
1471 response: Result<()>,
1472 cx: &mut Context<Session>,
1473 ) -> Option<()> {
1474 response.log_err()?;
1475 self.clear_active_debug_line(cx);
1476 Some(())
1477 }
1478
1479 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1480 self.as_local()
1481 .expect("Message handler will only run in local mode")
1482 .breakpoint_store
1483 .update(cx, |store, cx| {
1484 store.remove_active_position(Some(self.id), cx)
1485 });
1486 }
1487
1488 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1489 self.request(
1490 PauseCommand {
1491 thread_id: thread_id.0,
1492 },
1493 Self::empty_response,
1494 cx,
1495 )
1496 .detach();
1497 }
1498
1499 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1500 self.request(
1501 RestartStackFrameCommand { stack_frame_id },
1502 Self::empty_response,
1503 cx,
1504 )
1505 .detach();
1506 }
1507
1508 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1509 if self.capabilities.supports_restart_request.unwrap_or(false) {
1510 self.request(
1511 RestartCommand {
1512 raw: args.unwrap_or(Value::Null),
1513 },
1514 Self::empty_response,
1515 cx,
1516 )
1517 .detach();
1518 } else {
1519 self.request(
1520 DisconnectCommand {
1521 restart: Some(false),
1522 terminate_debuggee: Some(true),
1523 suspend_debuggee: Some(false),
1524 },
1525 Self::empty_response,
1526 cx,
1527 )
1528 .detach();
1529 }
1530 }
1531
1532 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1533 self.is_session_terminated = true;
1534 self.thread_states.exit_all_threads();
1535 cx.notify();
1536
1537 let task = if self
1538 .capabilities
1539 .supports_terminate_request
1540 .unwrap_or_default()
1541 {
1542 self.request(
1543 TerminateCommand {
1544 restart: Some(false),
1545 },
1546 Self::clear_active_debug_line_response,
1547 cx,
1548 )
1549 } else {
1550 self.request(
1551 DisconnectCommand {
1552 restart: Some(false),
1553 terminate_debuggee: Some(true),
1554 suspend_debuggee: Some(false),
1555 },
1556 Self::clear_active_debug_line_response,
1557 cx,
1558 )
1559 };
1560
1561 cx.emit(SessionStateEvent::Shutdown);
1562
1563 cx.background_spawn(async move {
1564 let _ = task.await;
1565 })
1566 }
1567
1568 pub fn completions(
1569 &mut self,
1570 query: CompletionsQuery,
1571 cx: &mut Context<Self>,
1572 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1573 let task = self.request(query, |_, result, _| result.log_err(), cx);
1574
1575 cx.background_executor().spawn(async move {
1576 anyhow::Ok(
1577 task.await
1578 .map(|response| response.targets)
1579 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1580 )
1581 })
1582 }
1583
1584 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1585 self.thread_states.continue_thread(thread_id);
1586 self.request(
1587 ContinueCommand {
1588 args: ContinueArguments {
1589 thread_id: thread_id.0,
1590 single_thread: Some(true),
1591 },
1592 },
1593 Self::on_step_response::<ContinueCommand>(thread_id),
1594 cx,
1595 )
1596 .detach();
1597 }
1598
1599 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1600 match self.mode {
1601 Mode::Local(ref local) => Some(local.client.clone()),
1602 Mode::Remote(_) => None,
1603 }
1604 }
1605
1606 pub fn step_over(
1607 &mut self,
1608 thread_id: ThreadId,
1609 granularity: SteppingGranularity,
1610 cx: &mut Context<Self>,
1611 ) {
1612 let supports_single_thread_execution_requests =
1613 self.capabilities.supports_single_thread_execution_requests;
1614 let supports_stepping_granularity = self
1615 .capabilities
1616 .supports_stepping_granularity
1617 .unwrap_or_default();
1618
1619 let command = NextCommand {
1620 inner: StepCommand {
1621 thread_id: thread_id.0,
1622 granularity: supports_stepping_granularity.then(|| granularity),
1623 single_thread: supports_single_thread_execution_requests,
1624 },
1625 };
1626
1627 self.thread_states.process_step(thread_id);
1628 self.request(
1629 command,
1630 Self::on_step_response::<NextCommand>(thread_id),
1631 cx,
1632 )
1633 .detach();
1634 }
1635
1636 pub fn step_in(
1637 &mut self,
1638 thread_id: ThreadId,
1639 granularity: SteppingGranularity,
1640 cx: &mut Context<Self>,
1641 ) {
1642 let supports_single_thread_execution_requests =
1643 self.capabilities.supports_single_thread_execution_requests;
1644 let supports_stepping_granularity = self
1645 .capabilities
1646 .supports_stepping_granularity
1647 .unwrap_or_default();
1648
1649 let command = StepInCommand {
1650 inner: StepCommand {
1651 thread_id: thread_id.0,
1652 granularity: supports_stepping_granularity.then(|| granularity),
1653 single_thread: supports_single_thread_execution_requests,
1654 },
1655 };
1656
1657 self.thread_states.process_step(thread_id);
1658 self.request(
1659 command,
1660 Self::on_step_response::<StepInCommand>(thread_id),
1661 cx,
1662 )
1663 .detach();
1664 }
1665
1666 pub fn step_out(
1667 &mut self,
1668 thread_id: ThreadId,
1669 granularity: SteppingGranularity,
1670 cx: &mut Context<Self>,
1671 ) {
1672 let supports_single_thread_execution_requests =
1673 self.capabilities.supports_single_thread_execution_requests;
1674 let supports_stepping_granularity = self
1675 .capabilities
1676 .supports_stepping_granularity
1677 .unwrap_or_default();
1678
1679 let command = StepOutCommand {
1680 inner: StepCommand {
1681 thread_id: thread_id.0,
1682 granularity: supports_stepping_granularity.then(|| granularity),
1683 single_thread: supports_single_thread_execution_requests,
1684 },
1685 };
1686
1687 self.thread_states.process_step(thread_id);
1688 self.request(
1689 command,
1690 Self::on_step_response::<StepOutCommand>(thread_id),
1691 cx,
1692 )
1693 .detach();
1694 }
1695
1696 pub fn step_back(
1697 &mut self,
1698 thread_id: ThreadId,
1699 granularity: SteppingGranularity,
1700 cx: &mut Context<Self>,
1701 ) {
1702 let supports_single_thread_execution_requests =
1703 self.capabilities.supports_single_thread_execution_requests;
1704 let supports_stepping_granularity = self
1705 .capabilities
1706 .supports_stepping_granularity
1707 .unwrap_or_default();
1708
1709 let command = StepBackCommand {
1710 inner: StepCommand {
1711 thread_id: thread_id.0,
1712 granularity: supports_stepping_granularity.then(|| granularity),
1713 single_thread: supports_single_thread_execution_requests,
1714 },
1715 };
1716
1717 self.thread_states.process_step(thread_id);
1718
1719 self.request(
1720 command,
1721 Self::on_step_response::<StepBackCommand>(thread_id),
1722 cx,
1723 )
1724 .detach();
1725 }
1726
1727 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1728 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1729 && self.requests.contains_key(&ThreadsCommand.type_id())
1730 && self.threads.contains_key(&thread_id)
1731 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1732 // We could still be using an old thread id and have sent a new thread's request
1733 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1734 // But it very well could cause a minor bug in the future that is hard to track down
1735 {
1736 self.fetch(
1737 super::dap_command::StackTraceCommand {
1738 thread_id: thread_id.0,
1739 start_frame: None,
1740 levels: None,
1741 },
1742 move |this, stack_frames, cx| {
1743 let stack_frames = stack_frames.log_err()?;
1744
1745 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1746 thread.stack_frame_ids =
1747 stack_frames.iter().map(|frame| frame.id).collect();
1748 });
1749 debug_assert!(
1750 matches!(entry, indexmap::map::Entry::Occupied(_)),
1751 "Sent request for thread_id that doesn't exist"
1752 );
1753
1754 this.stack_frames.extend(
1755 stack_frames
1756 .iter()
1757 .cloned()
1758 .map(|frame| (frame.id, StackFrame::from(frame))),
1759 );
1760
1761 this.invalidate_command_type::<ScopesCommand>();
1762 this.invalidate_command_type::<VariablesCommand>();
1763
1764 cx.emit(SessionEvent::StackTrace);
1765 cx.notify();
1766 Some(stack_frames)
1767 },
1768 cx,
1769 );
1770 }
1771
1772 self.threads
1773 .get(&thread_id)
1774 .map(|thread| {
1775 thread
1776 .stack_frame_ids
1777 .iter()
1778 .filter_map(|id| self.stack_frames.get(id))
1779 .cloned()
1780 .collect()
1781 })
1782 .unwrap_or_default()
1783 }
1784
1785 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1786 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1787 && self
1788 .requests
1789 .contains_key(&TypeId::of::<StackTraceCommand>())
1790 {
1791 self.fetch(
1792 ScopesCommand { stack_frame_id },
1793 move |this, scopes, cx| {
1794 let scopes = scopes.log_err()?;
1795
1796 for scope in scopes .iter(){
1797 this.variables(scope.variables_reference, cx);
1798 }
1799
1800 let entry = this
1801 .stack_frames
1802 .entry(stack_frame_id)
1803 .and_modify(|stack_frame| {
1804 stack_frame.scopes = scopes.clone();
1805 });
1806
1807 cx.emit(SessionEvent::Variables);
1808
1809 debug_assert!(
1810 matches!(entry, indexmap::map::Entry::Occupied(_)),
1811 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1812 );
1813
1814 Some(scopes)
1815 },
1816 cx,
1817 );
1818 }
1819
1820 self.stack_frames
1821 .get(&stack_frame_id)
1822 .map(|frame| frame.scopes.as_slice())
1823 .unwrap_or_default()
1824 }
1825
1826 pub fn variables(
1827 &mut self,
1828 variables_reference: VariableReference,
1829 cx: &mut Context<Self>,
1830 ) -> Vec<dap::Variable> {
1831 let command = VariablesCommand {
1832 variables_reference,
1833 filter: None,
1834 start: None,
1835 count: None,
1836 format: None,
1837 };
1838
1839 self.fetch(
1840 command,
1841 move |this, variables, cx| {
1842 let variables = variables.log_err()?;
1843 this.variables
1844 .insert(variables_reference, variables.clone());
1845
1846 cx.emit(SessionEvent::Variables);
1847 Some(variables)
1848 },
1849 cx,
1850 );
1851
1852 self.variables
1853 .get(&variables_reference)
1854 .cloned()
1855 .unwrap_or_default()
1856 }
1857
1858 pub fn set_variable_value(
1859 &mut self,
1860 variables_reference: u64,
1861 name: String,
1862 value: String,
1863 cx: &mut Context<Self>,
1864 ) {
1865 if self.capabilities.supports_set_variable.unwrap_or_default() {
1866 self.request(
1867 SetVariableValueCommand {
1868 name,
1869 value,
1870 variables_reference,
1871 },
1872 move |this, response, cx| {
1873 let response = response.log_err()?;
1874 this.invalidate_command_type::<VariablesCommand>();
1875 cx.notify();
1876 Some(response)
1877 },
1878 cx,
1879 )
1880 .detach()
1881 }
1882 }
1883
1884 pub fn evaluate(
1885 &mut self,
1886 expression: String,
1887 context: Option<EvaluateArgumentsContext>,
1888 frame_id: Option<u64>,
1889 source: Option<Source>,
1890 cx: &mut Context<Self>,
1891 ) {
1892 self.request(
1893 EvaluateCommand {
1894 expression,
1895 context,
1896 frame_id,
1897 source,
1898 },
1899 |this, response, cx| {
1900 let response = response.log_err()?;
1901 this.output_token.0 += 1;
1902 this.output.push_back(dap::OutputEvent {
1903 category: None,
1904 output: response.result.clone(),
1905 group: None,
1906 variables_reference: Some(response.variables_reference),
1907 source: None,
1908 line: None,
1909 column: None,
1910 data: None,
1911 location_reference: None,
1912 });
1913
1914 this.invalidate_command_type::<ScopesCommand>();
1915 cx.notify();
1916 Some(response)
1917 },
1918 cx,
1919 )
1920 .detach();
1921 }
1922
1923 pub fn location(
1924 &mut self,
1925 reference: u64,
1926 cx: &mut Context<Self>,
1927 ) -> Option<dap::LocationsResponse> {
1928 self.fetch(
1929 LocationsCommand { reference },
1930 move |this, response, _| {
1931 let response = response.log_err()?;
1932 this.locations.insert(reference, response.clone());
1933 Some(response)
1934 },
1935 cx,
1936 );
1937 self.locations.get(&reference).cloned()
1938 }
1939 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1940 let command = DisconnectCommand {
1941 restart: Some(false),
1942 terminate_debuggee: Some(true),
1943 suspend_debuggee: Some(false),
1944 };
1945
1946 self.request(command, Self::empty_response, cx).detach()
1947 }
1948
1949 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1950 if self
1951 .capabilities
1952 .supports_terminate_threads_request
1953 .unwrap_or_default()
1954 {
1955 self.request(
1956 TerminateThreadsCommand {
1957 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1958 },
1959 Self::clear_active_debug_line_response,
1960 cx,
1961 )
1962 .detach();
1963 } else {
1964 self.shutdown(cx).detach();
1965 }
1966 }
1967}
1968
1969fn create_local_session(
1970 breakpoint_store: Entity<BreakpointStore>,
1971 session_id: SessionId,
1972 parent_session: Option<Entity<Session>>,
1973 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1974 initialized_tx: oneshot::Sender<()>,
1975 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1976 mode: LocalMode,
1977 cx: &mut Context<Session>,
1978) -> Session {
1979 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1980 let mut initialized_tx = Some(initialized_tx);
1981 while let Some(message) = message_rx.next().await {
1982 if let Message::Event(event) = message {
1983 if let Events::Initialized(_) = *event {
1984 if let Some(tx) = initialized_tx.take() {
1985 tx.send(()).ok();
1986 }
1987 } else {
1988 let Ok(_) = this.update(cx, |session, cx| {
1989 session.handle_dap_event(event, cx);
1990 }) else {
1991 break;
1992 };
1993 }
1994 } else {
1995 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1996 else {
1997 break;
1998 };
1999 }
2000 }
2001 })];
2002
2003 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2004 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2005 if let Some(local) = (!this.ignore_breakpoints)
2006 .then(|| this.as_local_mut())
2007 .flatten()
2008 {
2009 local
2010 .send_breakpoints_from_path(path.clone(), *reason, cx)
2011 .detach();
2012 };
2013 }
2014 BreakpointStoreEvent::BreakpointsCleared(paths) => {
2015 if let Some(local) = (!this.ignore_breakpoints)
2016 .then(|| this.as_local_mut())
2017 .flatten()
2018 {
2019 local.unset_breakpoints_from_paths(paths, cx).detach();
2020 }
2021 }
2022 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2023 })
2024 .detach();
2025
2026 Session {
2027 mode: Mode::Local(mode),
2028 id: session_id,
2029 child_session_ids: HashSet::default(),
2030 parent_id: parent_session.map(|session| session.read(cx).id),
2031 variables: Default::default(),
2032 capabilities: Capabilities::default(),
2033 thread_states: ThreadStates::default(),
2034 output_token: OutputToken(0),
2035 ignore_breakpoints: false,
2036 output: circular_buffer::CircularBuffer::boxed(),
2037 requests: HashMap::default(),
2038 modules: Vec::default(),
2039 loaded_sources: Vec::default(),
2040 threads: IndexMap::default(),
2041 stack_frames: IndexMap::default(),
2042 locations: Default::default(),
2043 _background_tasks,
2044 is_session_terminated: false,
2045 }
2046}