1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{Context as _, Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 adapters::{DapDelegate, DapStatus},
21 client::{DebugAdapterClient, SessionId},
22 messages::{Events, Message},
23};
24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
25use futures::channel::oneshot;
26use futures::{FutureExt, future::Shared};
27use gpui::{
28 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
29};
30use rpc::AnyProtoClient;
31use serde_json::{Value, json};
32use settings::Settings;
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::path::PathBuf;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::{DebugAdapterConfig, DebugTaskDefinition};
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47
48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
49#[repr(transparent)]
50pub struct ThreadId(pub u64);
51
52impl ThreadId {
53 pub const MIN: ThreadId = ThreadId(u64::MIN);
54 pub const MAX: ThreadId = ThreadId(u64::MAX);
55}
56
57impl From<u64> for ThreadId {
58 fn from(id: u64) -> Self {
59 Self(id)
60 }
61}
62
63#[derive(Clone, Debug)]
64pub struct StackFrame {
65 pub dap: dap::StackFrame,
66 pub scopes: Vec<dap::Scope>,
67}
68
69impl From<dap::StackFrame> for StackFrame {
70 fn from(stack_frame: dap::StackFrame) -> Self {
71 Self {
72 scopes: vec![],
73 dap: stack_frame,
74 }
75 }
76}
77
78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
79pub enum ThreadStatus {
80 #[default]
81 Running,
82 Stopped,
83 Stepping,
84 Exited,
85 Ended,
86}
87
88impl ThreadStatus {
89 pub fn label(&self) -> &'static str {
90 match self {
91 ThreadStatus::Running => "Running",
92 ThreadStatus::Stopped => "Stopped",
93 ThreadStatus::Stepping => "Stepping",
94 ThreadStatus::Exited => "Exited",
95 ThreadStatus::Ended => "Ended",
96 }
97 }
98}
99
100#[derive(Debug)]
101pub struct Thread {
102 dap: dap::Thread,
103 stack_frame_ids: IndexSet<StackFrameId>,
104 _has_stopped: bool,
105}
106
107impl From<dap::Thread> for Thread {
108 fn from(dap: dap::Thread) -> Self {
109 Self {
110 dap,
111 stack_frame_ids: Default::default(),
112 _has_stopped: false,
113 }
114 }
115}
116
117type UpstreamProjectId = u64;
118
119struct RemoteConnection {
120 _client: AnyProtoClient,
121 _upstream_project_id: UpstreamProjectId,
122}
123
124impl RemoteConnection {
125 fn send_proto_client_request<R: DapCommand>(
126 &self,
127 _request: R,
128 _session_id: SessionId,
129 cx: &mut App,
130 ) -> Task<Result<R::Response>> {
131 // let message = request.to_proto(session_id, self.upstream_project_id);
132 // let upstream_client = self.client.clone();
133 cx.background_executor().spawn(async move {
134 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
135 // let response = upstream_client.request(message).await?;
136 // request.response_from_proto(response)
137 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
138 })
139 }
140
141 fn request<R: DapCommand>(
142 &self,
143 request: R,
144 session_id: SessionId,
145 cx: &mut App,
146 ) -> Task<Result<R::Response>>
147 where
148 <R::DapRequest as dap::requests::Request>::Response: 'static,
149 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
150 {
151 return self.send_proto_client_request::<R>(request, session_id, cx);
152 }
153}
154
155enum Mode {
156 Local(LocalMode),
157 Remote(RemoteConnection),
158}
159
160#[derive(Clone)]
161pub struct LocalMode {
162 client: Arc<DebugAdapterClient>,
163 config: DebugAdapterConfig,
164 adapter: Arc<dyn DebugAdapter>,
165 breakpoint_store: Entity<BreakpointStore>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 debug_adapters: Arc<DapRegistry>,
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<Self>> {
194 Self::new_inner(
195 debug_adapters,
196 session_id,
197 parent_session,
198 breakpoint_store,
199 config,
200 delegate,
201 messages_tx,
202 async |_, _| {},
203 cx,
204 )
205 }
206 #[cfg(any(test, feature = "test-support"))]
207 fn new_fake(
208 session_id: SessionId,
209 parent_session: Option<Entity<Session>>,
210 breakpoint_store: Entity<BreakpointStore>,
211 config: DebugAdapterConfig,
212 delegate: DapAdapterDelegate,
213 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
214 caps: Capabilities,
215 fail: bool,
216 cx: AsyncApp,
217 ) -> Task<Result<Self>> {
218 use task::DebugRequestDisposition;
219
220 let request = match config.request.clone() {
221 DebugRequestDisposition::UserConfigured(request) => request,
222 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
223 match reverse_request_args.request {
224 dap::StartDebuggingRequestArgumentsRequest::Launch => {
225 DebugRequestType::Launch(task::LaunchConfig {
226 program: "".to_owned(),
227 cwd: None,
228 args: Default::default(),
229 })
230 }
231 dap::StartDebuggingRequestArgumentsRequest::Attach => {
232 DebugRequestType::Attach(task::AttachConfig {
233 process_id: Some(0),
234 })
235 }
236 }
237 }
238 };
239
240 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
241 session
242 .client
243 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
244 .await;
245
246 let paths = cx
247 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
248 .expect("Breakpoint store should exist in all tests that start debuggers");
249
250 session
251 .client
252 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
253 let p = Arc::from(Path::new(&args.source.path.unwrap()));
254 if !paths.contains(&p) {
255 panic!("Sent breakpoints for path without any")
256 }
257
258 Ok(dap::SetBreakpointsResponse {
259 breakpoints: Vec::default(),
260 })
261 })
262 .await;
263
264 match request {
265 dap::DebugRequestType::Launch(_) => {
266 if fail {
267 session
268 .client
269 .on_request::<dap::requests::Launch, _>(move |_, _| {
270 Err(dap::ErrorResponse {
271 error: Some(dap::Message {
272 id: 1,
273 format: "error".into(),
274 variables: None,
275 send_telemetry: None,
276 show_user: None,
277 url: None,
278 url_label: None,
279 }),
280 })
281 })
282 .await;
283 } else {
284 session
285 .client
286 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
287 .await;
288 }
289 }
290 dap::DebugRequestType::Attach(attach_config) => {
291 if fail {
292 session
293 .client
294 .on_request::<dap::requests::Attach, _>(move |_, _| {
295 Err(dap::ErrorResponse {
296 error: Some(dap::Message {
297 id: 1,
298 format: "error".into(),
299 variables: None,
300 send_telemetry: None,
301 show_user: None,
302 url: None,
303 url_label: None,
304 }),
305 })
306 })
307 .await;
308 } else {
309 session
310 .client
311 .on_request::<dap::requests::Attach, _>(move |_, args| {
312 assert_eq!(
313 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
314 args.raw
315 );
316
317 Ok(())
318 })
319 .await;
320 }
321 }
322 }
323
324 session
325 .client
326 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
327 .await;
328 session.client.fake_event(Events::Initialized(None)).await;
329 };
330 Self::new_inner(
331 DapRegistry::fake().into(),
332 session_id,
333 parent_session,
334 breakpoint_store,
335 config,
336 delegate,
337 messages_tx,
338 callback,
339 cx,
340 )
341 }
342 fn new_inner(
343 registry: Arc<DapRegistry>,
344 session_id: SessionId,
345 parent_session: Option<Entity<Session>>,
346 breakpoint_store: Entity<BreakpointStore>,
347 config: DebugAdapterConfig,
348 delegate: DapAdapterDelegate,
349 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
350 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
351 cx: AsyncApp,
352 ) -> Task<Result<Self>> {
353 cx.spawn(async move |cx| {
354 let (adapter, binary) =
355 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
356
357 let message_handler = Box::new(move |message| {
358 messages_tx.unbounded_send(message).ok();
359 });
360
361 let client = Arc::new(
362 if let Some(client) = parent_session
363 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
364 .flatten()
365 {
366 client
367 .reconnect(session_id, binary, message_handler, cx.clone())
368 .await?
369 } else {
370 DebugAdapterClient::start(
371 session_id,
372 adapter.name(),
373 binary,
374 message_handler,
375 cx.clone(),
376 )
377 .await
378 .with_context(|| "Failed to start communication with debug adapter")?
379 },
380 );
381
382 let mut session = Self {
383 client,
384 adapter,
385 breakpoint_store,
386 config: config.clone(),
387 };
388
389 on_initialized(&mut session, cx.clone()).await;
390
391 Ok(session)
392 })
393 }
394
395 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
396 let tasks: Vec<_> = paths
397 .into_iter()
398 .map(|path| {
399 self.request(
400 dap_command::SetBreakpoints {
401 source: client_source(path),
402 source_modified: None,
403 breakpoints: vec![],
404 },
405 cx.background_executor().clone(),
406 )
407 })
408 .collect();
409
410 cx.background_spawn(async move {
411 futures::future::join_all(tasks)
412 .await
413 .iter()
414 .for_each(|res| match res {
415 Ok(_) => {}
416 Err(err) => {
417 log::warn!("Set breakpoints request failed: {}", err);
418 }
419 });
420 })
421 }
422
423 fn send_breakpoints_from_path(
424 &self,
425 abs_path: Arc<Path>,
426 reason: BreakpointUpdatedReason,
427 cx: &mut App,
428 ) -> Task<()> {
429 let breakpoints = self
430 .breakpoint_store
431 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
432 .into_iter()
433 .filter(|bp| bp.state.is_enabled())
434 .map(Into::into)
435 .collect();
436
437 let task = self.request(
438 dap_command::SetBreakpoints {
439 source: client_source(&abs_path),
440 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
441 breakpoints,
442 },
443 cx.background_executor().clone(),
444 );
445
446 cx.background_spawn(async move {
447 match task.await {
448 Ok(_) => {}
449 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
450 }
451 })
452 }
453
454 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
455 let mut breakpoint_tasks = Vec::new();
456 let breakpoints = self
457 .breakpoint_store
458 .read_with(cx, |store, cx| store.all_breakpoints(cx));
459
460 for (path, breakpoints) in breakpoints {
461 let breakpoints = if ignore_breakpoints {
462 vec![]
463 } else {
464 breakpoints
465 .into_iter()
466 .filter(|bp| bp.state.is_enabled())
467 .map(Into::into)
468 .collect()
469 };
470
471 breakpoint_tasks.push(self.request(
472 dap_command::SetBreakpoints {
473 source: client_source(&path),
474 source_modified: Some(false),
475 breakpoints,
476 },
477 cx.background_executor().clone(),
478 ));
479 }
480
481 cx.background_spawn(async move {
482 futures::future::join_all(breakpoint_tasks)
483 .await
484 .iter()
485 .for_each(|res| match res {
486 Ok(_) => {}
487 Err(err) => {
488 log::warn!("Set breakpoints request failed: {}", err);
489 }
490 });
491 })
492 }
493
494 async fn get_adapter_binary(
495 registry: &Arc<DapRegistry>,
496 config: &DebugAdapterConfig,
497 delegate: &DapAdapterDelegate,
498 cx: &mut AsyncApp,
499 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
500 let adapter = registry
501 .adapter(&config.adapter)
502 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
503
504 let binary = cx.update(|cx| {
505 ProjectSettings::get_global(cx)
506 .dap
507 .get(&adapter.name())
508 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
509 })?;
510
511 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
512 Err(error) => {
513 delegate.update_status(
514 adapter.name(),
515 DapStatus::Failed {
516 error: error.to_string(),
517 },
518 );
519
520 return Err(error);
521 }
522 Ok(mut binary) => {
523 delegate.update_status(adapter.name(), DapStatus::None);
524
525 let shell_env = delegate.shell_env().await;
526 let mut envs = binary.envs.unwrap_or_default();
527 envs.extend(shell_env);
528 binary.envs = Some(envs);
529
530 binary
531 }
532 };
533
534 Ok((adapter, binary))
535 }
536
537 pub fn label(&self) -> String {
538 self.config.label.clone()
539 }
540
541 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
542 let adapter_id = self.adapter.name().to_string();
543
544 self.request(Initialize { adapter_id }, cx.background_executor().clone())
545 }
546
547 fn initialize_sequence(
548 &self,
549 capabilities: &Capabilities,
550 initialized_rx: oneshot::Receiver<()>,
551 cx: &App,
552 ) -> Task<Result<()>> {
553 let (mut raw, is_launch) = match &self.config.request {
554 task::DebugRequestDisposition::UserConfigured(_) => {
555 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
556 debug_assert!(false, "This part of code should be unreachable in practice");
557 return Task::ready(Err(anyhow!(
558 "Expected debug config conversion to succeed"
559 )));
560 };
561 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
562 let raw = self.adapter.request_args(&raw);
563 (raw, is_launch)
564 }
565 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
566 start_debugging_request_arguments.configuration.clone(),
567 matches!(
568 start_debugging_request_arguments.request,
569 dap::StartDebuggingRequestArgumentsRequest::Launch
570 ),
571 ),
572 };
573
574 merge_json_value_into(
575 self.config.initialize_args.clone().unwrap_or(json!({})),
576 &mut raw,
577 );
578 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
579 let launch = if is_launch {
580 self.request(Launch { raw }, cx.background_executor().clone())
581 } else {
582 self.request(Attach { raw }, cx.background_executor().clone())
583 };
584
585 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
586
587 let configuration_sequence = cx.spawn({
588 let this = self.clone();
589 async move |cx| {
590 initialized_rx.await?;
591 // todo(debugger) figure out if we want to handle a breakpoint response error
592 // This will probably consist of letting a user know that breakpoints failed to be set
593 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
594
595 if configuration_done_supported {
596 this.request(ConfigurationDone {}, cx.background_executor().clone())
597 } else {
598 Task::ready(Ok(()))
599 }
600 .await
601 }
602 });
603
604 cx.background_spawn(async move {
605 futures::future::try_join(launch, configuration_sequence).await?;
606 Ok(())
607 })
608 }
609
610 fn request<R: LocalDapCommand>(
611 &self,
612 request: R,
613 executor: BackgroundExecutor,
614 ) -> Task<Result<R::Response>>
615 where
616 <R::DapRequest as dap::requests::Request>::Response: 'static,
617 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
618 {
619 let request = Arc::new(request);
620
621 let request_clone = request.clone();
622 let connection = self.client.clone();
623 let request_task = executor.spawn(async move {
624 let args = request_clone.to_dap();
625 connection.request::<R::DapRequest>(args).await
626 });
627
628 executor.spawn(async move {
629 let response = request.response_from_dap(request_task.await?);
630 response
631 })
632 }
633}
634impl From<RemoteConnection> for Mode {
635 fn from(value: RemoteConnection) -> Self {
636 Self::Remote(value)
637 }
638}
639
640impl Mode {
641 fn request_dap<R: DapCommand>(
642 &self,
643 session_id: SessionId,
644 request: R,
645 cx: &mut Context<Session>,
646 ) -> Task<Result<R::Response>>
647 where
648 <R::DapRequest as dap::requests::Request>::Response: 'static,
649 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
650 {
651 match self {
652 Mode::Local(debug_adapter_client) => {
653 debug_adapter_client.request(request, cx.background_executor().clone())
654 }
655 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
656 }
657 }
658}
659
660#[derive(Default)]
661struct ThreadStates {
662 global_state: Option<ThreadStatus>,
663 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
664}
665
666impl ThreadStates {
667 fn stop_all_threads(&mut self) {
668 self.global_state = Some(ThreadStatus::Stopped);
669 self.known_thread_states.clear();
670 }
671
672 fn exit_all_threads(&mut self) {
673 self.global_state = Some(ThreadStatus::Exited);
674 self.known_thread_states.clear();
675 }
676
677 fn continue_all_threads(&mut self) {
678 self.global_state = Some(ThreadStatus::Running);
679 self.known_thread_states.clear();
680 }
681
682 fn stop_thread(&mut self, thread_id: ThreadId) {
683 self.known_thread_states
684 .insert(thread_id, ThreadStatus::Stopped);
685 }
686
687 fn continue_thread(&mut self, thread_id: ThreadId) {
688 self.known_thread_states
689 .insert(thread_id, ThreadStatus::Running);
690 }
691
692 fn process_step(&mut self, thread_id: ThreadId) {
693 self.known_thread_states
694 .insert(thread_id, ThreadStatus::Stepping);
695 }
696
697 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
698 self.thread_state(thread_id)
699 .unwrap_or(ThreadStatus::Running)
700 }
701
702 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
703 self.known_thread_states
704 .get(&thread_id)
705 .copied()
706 .or(self.global_state)
707 }
708
709 fn exit_thread(&mut self, thread_id: ThreadId) {
710 self.known_thread_states
711 .insert(thread_id, ThreadStatus::Exited);
712 }
713
714 fn any_stopped_thread(&self) -> bool {
715 self.global_state
716 .is_some_and(|state| state == ThreadStatus::Stopped)
717 || self
718 .known_thread_states
719 .values()
720 .any(|status| *status == ThreadStatus::Stopped)
721 }
722}
723const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
724
725#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
726pub struct OutputToken(pub usize);
727/// Represents a current state of a single debug adapter and provides ways to mutate it.
728pub struct Session {
729 mode: Mode,
730 pub(super) capabilities: Capabilities,
731 id: SessionId,
732 child_session_ids: HashSet<SessionId>,
733 parent_id: Option<SessionId>,
734 ignore_breakpoints: bool,
735 modules: Vec<dap::Module>,
736 loaded_sources: Vec<dap::Source>,
737 output_token: OutputToken,
738 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
739 threads: IndexMap<ThreadId, Thread>,
740 thread_states: ThreadStates,
741 variables: HashMap<VariableReference, Vec<dap::Variable>>,
742 stack_frames: IndexMap<StackFrameId, StackFrame>,
743 locations: HashMap<u64, dap::LocationsResponse>,
744 is_session_terminated: bool,
745 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
746 _background_tasks: Vec<Task<()>>,
747}
748
749trait CacheableCommand: 'static + Send + Sync {
750 fn as_any(&self) -> &dyn Any;
751 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
752 fn dyn_hash(&self, hasher: &mut dyn Hasher);
753 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
754}
755
756impl<T> CacheableCommand for T
757where
758 T: DapCommand + PartialEq + Eq + Hash,
759{
760 fn as_any(&self) -> &dyn Any {
761 self
762 }
763
764 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
765 rhs.as_any()
766 .downcast_ref::<Self>()
767 .map_or(false, |rhs| self == rhs)
768 }
769
770 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
771 T::hash(self, &mut hasher);
772 }
773
774 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
775 self
776 }
777}
778
779pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
780
781impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
782 fn from(request: T) -> Self {
783 Self(Arc::new(request))
784 }
785}
786
787impl PartialEq for RequestSlot {
788 fn eq(&self, other: &Self) -> bool {
789 self.0.dyn_eq(other.0.as_ref())
790 }
791}
792
793impl Eq for RequestSlot {}
794
795impl Hash for RequestSlot {
796 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
797 self.0.dyn_hash(state);
798 self.0.as_any().type_id().hash(state)
799 }
800}
801
802#[derive(Debug, Clone, Hash, PartialEq, Eq)]
803pub struct CompletionsQuery {
804 pub query: String,
805 pub column: u64,
806 pub line: Option<u64>,
807 pub frame_id: Option<u64>,
808}
809
810impl CompletionsQuery {
811 pub fn new(
812 buffer: &language::Buffer,
813 cursor_position: language::Anchor,
814 frame_id: Option<u64>,
815 ) -> Self {
816 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
817 Self {
818 query: buffer.text(),
819 column: column as u64,
820 frame_id,
821 line: Some(row as u64),
822 }
823 }
824}
825
826pub enum SessionEvent {
827 Modules,
828 LoadedSources,
829 Stopped(Option<ThreadId>),
830 StackTrace,
831 Variables,
832 Threads,
833}
834
835impl EventEmitter<SessionEvent> for Session {}
836
837// local session will send breakpoint updates to DAP for all new breakpoints
838// remote side will only send breakpoint updates when it is a breakpoint created by that peer
839// BreakpointStore notifies session on breakpoint changes
840impl Session {
841 pub(crate) fn local(
842 breakpoint_store: Entity<BreakpointStore>,
843 session_id: SessionId,
844 parent_session: Option<Entity<Session>>,
845 delegate: DapAdapterDelegate,
846 config: DebugAdapterConfig,
847 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
848 initialized_tx: oneshot::Sender<()>,
849 debug_adapters: Arc<DapRegistry>,
850 cx: &mut App,
851 ) -> Task<Result<Entity<Self>>> {
852 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
853
854 cx.spawn(async move |cx| {
855 let mode = LocalMode::new(
856 debug_adapters,
857 session_id,
858 parent_session.clone(),
859 breakpoint_store.clone(),
860 config.clone(),
861 delegate,
862 message_tx,
863 cx.clone(),
864 )
865 .await?;
866
867 cx.new(|cx| {
868 create_local_session(
869 breakpoint_store,
870 session_id,
871 parent_session,
872 start_debugging_requests_tx,
873 initialized_tx,
874 message_rx,
875 mode,
876 cx,
877 )
878 })
879 })
880 }
881
882 #[cfg(any(test, feature = "test-support"))]
883 pub(crate) fn fake(
884 breakpoint_store: Entity<BreakpointStore>,
885 session_id: SessionId,
886 parent_session: Option<Entity<Session>>,
887 delegate: DapAdapterDelegate,
888 config: DebugAdapterConfig,
889 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
890 initialized_tx: oneshot::Sender<()>,
891 caps: Capabilities,
892 fails: bool,
893 cx: &mut App,
894 ) -> Task<Result<Entity<Session>>> {
895 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
896
897 cx.spawn(async move |cx| {
898 let mode = LocalMode::new_fake(
899 session_id,
900 parent_session.clone(),
901 breakpoint_store.clone(),
902 config.clone(),
903 delegate,
904 message_tx,
905 caps,
906 fails,
907 cx.clone(),
908 )
909 .await?;
910
911 cx.new(|cx| {
912 create_local_session(
913 breakpoint_store,
914 session_id,
915 parent_session,
916 start_debugging_requests_tx,
917 initialized_tx,
918 message_rx,
919 mode,
920 cx,
921 )
922 })
923 })
924 }
925
926 pub(crate) fn remote(
927 session_id: SessionId,
928 client: AnyProtoClient,
929 upstream_project_id: u64,
930 ignore_breakpoints: bool,
931 ) -> Self {
932 Self {
933 mode: Mode::Remote(RemoteConnection {
934 _client: client,
935 _upstream_project_id: upstream_project_id,
936 }),
937 id: session_id,
938 child_session_ids: HashSet::default(),
939 parent_id: None,
940 capabilities: Capabilities::default(),
941 ignore_breakpoints,
942 variables: Default::default(),
943 stack_frames: Default::default(),
944 thread_states: ThreadStates::default(),
945 output_token: OutputToken(0),
946 output: circular_buffer::CircularBuffer::boxed(),
947 requests: HashMap::default(),
948 modules: Vec::default(),
949 loaded_sources: Vec::default(),
950 threads: IndexMap::default(),
951 _background_tasks: Vec::default(),
952 locations: Default::default(),
953 is_session_terminated: false,
954 }
955 }
956
957 pub fn session_id(&self) -> SessionId {
958 self.id
959 }
960
961 pub fn child_session_ids(&self) -> HashSet<SessionId> {
962 self.child_session_ids.clone()
963 }
964
965 pub fn add_child_session_id(&mut self, session_id: SessionId) {
966 self.child_session_ids.insert(session_id);
967 }
968
969 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
970 self.child_session_ids.remove(&session_id);
971 }
972
973 pub fn parent_id(&self) -> Option<SessionId> {
974 self.parent_id
975 }
976
977 pub fn capabilities(&self) -> &Capabilities {
978 &self.capabilities
979 }
980
981 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
982 if let Mode::Local(local_mode) = &self.mode {
983 Some(local_mode.config.clone())
984 } else {
985 None
986 }
987 }
988
989 pub fn is_terminated(&self) -> bool {
990 self.is_session_terminated
991 }
992
993 pub fn is_local(&self) -> bool {
994 matches!(self.mode, Mode::Local(_))
995 }
996
997 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
998 match &mut self.mode {
999 Mode::Local(local_mode) => Some(local_mode),
1000 Mode::Remote(_) => None,
1001 }
1002 }
1003
1004 pub fn as_local(&self) -> Option<&LocalMode> {
1005 match &self.mode {
1006 Mode::Local(local_mode) => Some(local_mode),
1007 Mode::Remote(_) => None,
1008 }
1009 }
1010
1011 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1012 match &self.mode {
1013 Mode::Local(local_mode) => {
1014 let capabilities = local_mode.clone().request_initialization(cx);
1015
1016 cx.spawn(async move |this, cx| {
1017 let capabilities = capabilities.await?;
1018 this.update(cx, |session, _| {
1019 session.capabilities = capabilities;
1020 })?;
1021 Ok(())
1022 })
1023 }
1024 Mode::Remote(_) => Task::ready(Err(anyhow!(
1025 "Cannot send initialize request from remote session"
1026 ))),
1027 }
1028 }
1029
1030 pub(super) fn initialize_sequence(
1031 &mut self,
1032 initialize_rx: oneshot::Receiver<()>,
1033 cx: &mut Context<Self>,
1034 ) -> Task<Result<()>> {
1035 match &self.mode {
1036 Mode::Local(local_mode) => {
1037 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1038 }
1039 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1040 }
1041 }
1042
1043 pub fn output(
1044 &self,
1045 since: OutputToken,
1046 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1047 if self.output_token.0 == 0 {
1048 return (self.output.range(0..0), OutputToken(0));
1049 };
1050
1051 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1052
1053 let clamped_events_since = events_since.clamp(0, self.output.len());
1054 (
1055 self.output
1056 .range(self.output.len() - clamped_events_since..),
1057 self.output_token,
1058 )
1059 }
1060
1061 pub fn respond_to_client(
1062 &self,
1063 request_seq: u64,
1064 success: bool,
1065 command: String,
1066 body: Option<serde_json::Value>,
1067 cx: &mut Context<Self>,
1068 ) -> Task<Result<()>> {
1069 let Some(local_session) = self.as_local().cloned() else {
1070 unreachable!("Cannot respond to remote client");
1071 };
1072
1073 cx.background_spawn(async move {
1074 local_session
1075 .client
1076 .send_message(Message::Response(Response {
1077 body,
1078 success,
1079 command,
1080 seq: request_seq + 1,
1081 request_seq,
1082 message: None,
1083 }))
1084 .await
1085 })
1086 }
1087
1088 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1089 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1090 self.thread_states.stop_all_threads();
1091
1092 self.invalidate_command_type::<StackTraceCommand>();
1093 }
1094
1095 // Event if we stopped all threads we still need to insert the thread_id
1096 // to our own data
1097 if let Some(thread_id) = event.thread_id {
1098 self.thread_states.stop_thread(ThreadId(thread_id));
1099
1100 self.invalidate_state(
1101 &StackTraceCommand {
1102 thread_id,
1103 start_frame: None,
1104 levels: None,
1105 }
1106 .into(),
1107 );
1108 }
1109
1110 self.invalidate_generic();
1111 self.threads.clear();
1112 self.variables.clear();
1113 cx.emit(SessionEvent::Stopped(
1114 event
1115 .thread_id
1116 .map(Into::into)
1117 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1118 ));
1119 cx.notify();
1120 }
1121
1122 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1123 match *event {
1124 Events::Initialized(_) => {
1125 debug_assert!(
1126 false,
1127 "Initialized event should have been handled in LocalMode"
1128 );
1129 }
1130 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1131 Events::Continued(event) => {
1132 if event.all_threads_continued.unwrap_or_default() {
1133 self.thread_states.continue_all_threads();
1134 } else {
1135 self.thread_states
1136 .continue_thread(ThreadId(event.thread_id));
1137 }
1138 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1139 self.invalidate_generic();
1140 }
1141 Events::Exited(_event) => {
1142 self.clear_active_debug_line(cx);
1143 }
1144 Events::Terminated(_) => {
1145 self.is_session_terminated = true;
1146 self.clear_active_debug_line(cx);
1147 }
1148 Events::Thread(event) => {
1149 let thread_id = ThreadId(event.thread_id);
1150
1151 match event.reason {
1152 dap::ThreadEventReason::Started => {
1153 self.thread_states.continue_thread(thread_id);
1154 }
1155 dap::ThreadEventReason::Exited => {
1156 self.thread_states.exit_thread(thread_id);
1157 }
1158 reason => {
1159 log::error!("Unhandled thread event reason {:?}", reason);
1160 }
1161 }
1162 self.invalidate_state(&ThreadsCommand.into());
1163 cx.notify();
1164 }
1165 Events::Output(event) => {
1166 if event
1167 .category
1168 .as_ref()
1169 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1170 {
1171 return;
1172 }
1173
1174 self.output.push_back(event);
1175 self.output_token.0 += 1;
1176 cx.notify();
1177 }
1178 Events::Breakpoint(_) => {}
1179 Events::Module(event) => {
1180 match event.reason {
1181 dap::ModuleEventReason::New => {
1182 self.modules.push(event.module);
1183 }
1184 dap::ModuleEventReason::Changed => {
1185 if let Some(module) = self
1186 .modules
1187 .iter_mut()
1188 .find(|other| event.module.id == other.id)
1189 {
1190 *module = event.module;
1191 }
1192 }
1193 dap::ModuleEventReason::Removed => {
1194 self.modules.retain(|other| event.module.id != other.id);
1195 }
1196 }
1197
1198 // todo(debugger): We should only send the invalidate command to downstream clients.
1199 // self.invalidate_state(&ModulesCommand.into());
1200 }
1201 Events::LoadedSource(_) => {
1202 self.invalidate_state(&LoadedSourcesCommand.into());
1203 }
1204 Events::Capabilities(event) => {
1205 self.capabilities = self.capabilities.merge(event.capabilities);
1206 cx.notify();
1207 }
1208 Events::Memory(_) => {}
1209 Events::Process(_) => {}
1210 Events::ProgressEnd(_) => {}
1211 Events::ProgressStart(_) => {}
1212 Events::ProgressUpdate(_) => {}
1213 Events::Invalidated(_) => {}
1214 Events::Other(_) => {}
1215 }
1216 }
1217
1218 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1219 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1220 &mut self,
1221 request: T,
1222 process_result: impl FnOnce(
1223 &mut Self,
1224 Result<T::Response>,
1225 &mut Context<Self>,
1226 ) -> Option<T::Response>
1227 + 'static,
1228 cx: &mut Context<Self>,
1229 ) {
1230 const {
1231 assert!(
1232 T::CACHEABLE,
1233 "Only requests marked as cacheable should invoke `fetch`"
1234 );
1235 }
1236
1237 if !self.thread_states.any_stopped_thread()
1238 && request.type_id() != TypeId::of::<ThreadsCommand>()
1239 || self.is_session_terminated
1240 {
1241 return;
1242 }
1243
1244 let request_map = self
1245 .requests
1246 .entry(std::any::TypeId::of::<T>())
1247 .or_default();
1248
1249 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1250 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1251
1252 let task = Self::request_inner::<Arc<T>>(
1253 &self.capabilities,
1254 self.id,
1255 &self.mode,
1256 command,
1257 process_result,
1258 cx,
1259 );
1260 let task = cx
1261 .background_executor()
1262 .spawn(async move {
1263 let _ = task.await?;
1264 Some(())
1265 })
1266 .shared();
1267
1268 vacant.insert(task);
1269 cx.notify();
1270 }
1271 }
1272
1273 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1274 capabilities: &Capabilities,
1275 session_id: SessionId,
1276 mode: &Mode,
1277 request: T,
1278 process_result: impl FnOnce(
1279 &mut Self,
1280 Result<T::Response>,
1281 &mut Context<Self>,
1282 ) -> Option<T::Response>
1283 + 'static,
1284 cx: &mut Context<Self>,
1285 ) -> Task<Option<T::Response>> {
1286 if !T::is_supported(&capabilities) {
1287 log::warn!(
1288 "Attempted to send a DAP request that isn't supported: {:?}",
1289 request
1290 );
1291 let error = Err(anyhow::Error::msg(
1292 "Couldn't complete request because it's not supported",
1293 ));
1294 return cx.spawn(async move |this, cx| {
1295 this.update(cx, |this, cx| process_result(this, error, cx))
1296 .log_err()
1297 .flatten()
1298 });
1299 }
1300
1301 let request = mode.request_dap(session_id, request, cx);
1302 cx.spawn(async move |this, cx| {
1303 let result = request.await;
1304 this.update(cx, |this, cx| process_result(this, result, cx))
1305 .log_err()
1306 .flatten()
1307 })
1308 }
1309
1310 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1311 &self,
1312 request: T,
1313 process_result: impl FnOnce(
1314 &mut Self,
1315 Result<T::Response>,
1316 &mut Context<Self>,
1317 ) -> Option<T::Response>
1318 + 'static,
1319 cx: &mut Context<Self>,
1320 ) -> Task<Option<T::Response>> {
1321 Self::request_inner(
1322 &self.capabilities,
1323 self.id,
1324 &self.mode,
1325 request,
1326 process_result,
1327 cx,
1328 )
1329 }
1330
1331 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1332 self.requests.remove(&std::any::TypeId::of::<Command>());
1333 }
1334
1335 fn invalidate_generic(&mut self) {
1336 self.invalidate_command_type::<ModulesCommand>();
1337 self.invalidate_command_type::<LoadedSourcesCommand>();
1338 self.invalidate_command_type::<ThreadsCommand>();
1339 }
1340
1341 fn invalidate_state(&mut self, key: &RequestSlot) {
1342 self.requests
1343 .entry(key.0.as_any().type_id())
1344 .and_modify(|request_map| {
1345 request_map.remove(&key);
1346 });
1347 }
1348
1349 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1350 self.thread_states.thread_status(thread_id)
1351 }
1352
1353 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1354 self.fetch(
1355 dap_command::ThreadsCommand,
1356 |this, result, cx| {
1357 let result = result.log_err()?;
1358
1359 this.threads = result
1360 .iter()
1361 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1362 .collect();
1363
1364 this.invalidate_command_type::<StackTraceCommand>();
1365 cx.emit(SessionEvent::Threads);
1366 cx.notify();
1367
1368 Some(result)
1369 },
1370 cx,
1371 );
1372
1373 self.threads
1374 .values()
1375 .map(|thread| {
1376 (
1377 thread.dap.clone(),
1378 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1379 )
1380 })
1381 .collect()
1382 }
1383
1384 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1385 self.fetch(
1386 dap_command::ModulesCommand,
1387 |this, result, cx| {
1388 let result = result.log_err()?;
1389
1390 this.modules = result.iter().cloned().collect();
1391 cx.emit(SessionEvent::Modules);
1392 cx.notify();
1393
1394 Some(result)
1395 },
1396 cx,
1397 );
1398
1399 &self.modules
1400 }
1401
1402 pub fn ignore_breakpoints(&self) -> bool {
1403 self.ignore_breakpoints
1404 }
1405
1406 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1407 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1408 }
1409
1410 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1411 if self.ignore_breakpoints == ignore {
1412 return Task::ready(());
1413 }
1414
1415 self.ignore_breakpoints = ignore;
1416
1417 if let Some(local) = self.as_local() {
1418 local.send_all_breakpoints(ignore, cx)
1419 } else {
1420 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1421 unimplemented!()
1422 }
1423 }
1424
1425 pub fn breakpoints_enabled(&self) -> bool {
1426 self.ignore_breakpoints
1427 }
1428
1429 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1430 self.fetch(
1431 dap_command::LoadedSourcesCommand,
1432 |this, result, cx| {
1433 let result = result.log_err()?;
1434 this.loaded_sources = result.iter().cloned().collect();
1435 cx.emit(SessionEvent::LoadedSources);
1436 cx.notify();
1437 Some(result)
1438 },
1439 cx,
1440 );
1441
1442 &self.loaded_sources
1443 }
1444
1445 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1446 res.log_err()?;
1447 Some(())
1448 }
1449
1450 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1451 thread_id: ThreadId,
1452 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1453 {
1454 move |this, response, cx| match response.log_err() {
1455 Some(response) => Some(response),
1456 None => {
1457 this.thread_states.stop_thread(thread_id);
1458 cx.notify();
1459 None
1460 }
1461 }
1462 }
1463
1464 fn clear_active_debug_line_response(
1465 &mut self,
1466 response: Result<()>,
1467 cx: &mut Context<Session>,
1468 ) -> Option<()> {
1469 response.log_err()?;
1470 self.clear_active_debug_line(cx);
1471 Some(())
1472 }
1473
1474 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1475 self.as_local()
1476 .expect("Message handler will only run in local mode")
1477 .breakpoint_store
1478 .update(cx, |store, cx| {
1479 store.remove_active_position(Some(self.id), cx)
1480 });
1481 }
1482
1483 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1484 self.request(
1485 PauseCommand {
1486 thread_id: thread_id.0,
1487 },
1488 Self::empty_response,
1489 cx,
1490 )
1491 .detach();
1492 }
1493
1494 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1495 self.request(
1496 RestartStackFrameCommand { stack_frame_id },
1497 Self::empty_response,
1498 cx,
1499 )
1500 .detach();
1501 }
1502
1503 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1504 if self.capabilities.supports_restart_request.unwrap_or(false) {
1505 self.request(
1506 RestartCommand {
1507 raw: args.unwrap_or(Value::Null),
1508 },
1509 Self::empty_response,
1510 cx,
1511 )
1512 .detach();
1513 } else {
1514 self.request(
1515 DisconnectCommand {
1516 restart: Some(false),
1517 terminate_debuggee: Some(true),
1518 suspend_debuggee: Some(false),
1519 },
1520 Self::empty_response,
1521 cx,
1522 )
1523 .detach();
1524 }
1525 }
1526
1527 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1528 self.is_session_terminated = true;
1529 self.thread_states.exit_all_threads();
1530 cx.notify();
1531
1532 let task = if self
1533 .capabilities
1534 .supports_terminate_request
1535 .unwrap_or_default()
1536 {
1537 self.request(
1538 TerminateCommand {
1539 restart: Some(false),
1540 },
1541 Self::clear_active_debug_line_response,
1542 cx,
1543 )
1544 } else {
1545 self.request(
1546 DisconnectCommand {
1547 restart: Some(false),
1548 terminate_debuggee: Some(true),
1549 suspend_debuggee: Some(false),
1550 },
1551 Self::clear_active_debug_line_response,
1552 cx,
1553 )
1554 };
1555
1556 cx.background_spawn(async move {
1557 let _ = task.await;
1558 })
1559 }
1560
1561 pub fn completions(
1562 &mut self,
1563 query: CompletionsQuery,
1564 cx: &mut Context<Self>,
1565 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1566 let task = self.request(query, |_, result, _| result.log_err(), cx);
1567
1568 cx.background_executor().spawn(async move {
1569 anyhow::Ok(
1570 task.await
1571 .map(|response| response.targets)
1572 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1573 )
1574 })
1575 }
1576
1577 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1578 self.thread_states.continue_thread(thread_id);
1579 self.request(
1580 ContinueCommand {
1581 args: ContinueArguments {
1582 thread_id: thread_id.0,
1583 single_thread: Some(true),
1584 },
1585 },
1586 Self::on_step_response::<ContinueCommand>(thread_id),
1587 cx,
1588 )
1589 .detach();
1590 }
1591
1592 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1593 match self.mode {
1594 Mode::Local(ref local) => Some(local.client.clone()),
1595 Mode::Remote(_) => None,
1596 }
1597 }
1598
1599 pub fn step_over(
1600 &mut self,
1601 thread_id: ThreadId,
1602 granularity: SteppingGranularity,
1603 cx: &mut Context<Self>,
1604 ) {
1605 let supports_single_thread_execution_requests =
1606 self.capabilities.supports_single_thread_execution_requests;
1607 let supports_stepping_granularity = self
1608 .capabilities
1609 .supports_stepping_granularity
1610 .unwrap_or_default();
1611
1612 let command = NextCommand {
1613 inner: StepCommand {
1614 thread_id: thread_id.0,
1615 granularity: supports_stepping_granularity.then(|| granularity),
1616 single_thread: supports_single_thread_execution_requests,
1617 },
1618 };
1619
1620 self.thread_states.process_step(thread_id);
1621 self.request(
1622 command,
1623 Self::on_step_response::<NextCommand>(thread_id),
1624 cx,
1625 )
1626 .detach();
1627 }
1628
1629 pub fn step_in(
1630 &mut self,
1631 thread_id: ThreadId,
1632 granularity: SteppingGranularity,
1633 cx: &mut Context<Self>,
1634 ) {
1635 let supports_single_thread_execution_requests =
1636 self.capabilities.supports_single_thread_execution_requests;
1637 let supports_stepping_granularity = self
1638 .capabilities
1639 .supports_stepping_granularity
1640 .unwrap_or_default();
1641
1642 let command = StepInCommand {
1643 inner: StepCommand {
1644 thread_id: thread_id.0,
1645 granularity: supports_stepping_granularity.then(|| granularity),
1646 single_thread: supports_single_thread_execution_requests,
1647 },
1648 };
1649
1650 self.thread_states.process_step(thread_id);
1651 self.request(
1652 command,
1653 Self::on_step_response::<StepInCommand>(thread_id),
1654 cx,
1655 )
1656 .detach();
1657 }
1658
1659 pub fn step_out(
1660 &mut self,
1661 thread_id: ThreadId,
1662 granularity: SteppingGranularity,
1663 cx: &mut Context<Self>,
1664 ) {
1665 let supports_single_thread_execution_requests =
1666 self.capabilities.supports_single_thread_execution_requests;
1667 let supports_stepping_granularity = self
1668 .capabilities
1669 .supports_stepping_granularity
1670 .unwrap_or_default();
1671
1672 let command = StepOutCommand {
1673 inner: StepCommand {
1674 thread_id: thread_id.0,
1675 granularity: supports_stepping_granularity.then(|| granularity),
1676 single_thread: supports_single_thread_execution_requests,
1677 },
1678 };
1679
1680 self.thread_states.process_step(thread_id);
1681 self.request(
1682 command,
1683 Self::on_step_response::<StepOutCommand>(thread_id),
1684 cx,
1685 )
1686 .detach();
1687 }
1688
1689 pub fn step_back(
1690 &mut self,
1691 thread_id: ThreadId,
1692 granularity: SteppingGranularity,
1693 cx: &mut Context<Self>,
1694 ) {
1695 let supports_single_thread_execution_requests =
1696 self.capabilities.supports_single_thread_execution_requests;
1697 let supports_stepping_granularity = self
1698 .capabilities
1699 .supports_stepping_granularity
1700 .unwrap_or_default();
1701
1702 let command = StepBackCommand {
1703 inner: StepCommand {
1704 thread_id: thread_id.0,
1705 granularity: supports_stepping_granularity.then(|| granularity),
1706 single_thread: supports_single_thread_execution_requests,
1707 },
1708 };
1709
1710 self.thread_states.process_step(thread_id);
1711
1712 self.request(
1713 command,
1714 Self::on_step_response::<StepBackCommand>(thread_id),
1715 cx,
1716 )
1717 .detach();
1718 }
1719
1720 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1721 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1722 && self.requests.contains_key(&ThreadsCommand.type_id())
1723 && self.threads.contains_key(&thread_id)
1724 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1725 // We could still be using an old thread id and have sent a new thread's request
1726 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1727 // But it very well could cause a minor bug in the future that is hard to track down
1728 {
1729 self.fetch(
1730 super::dap_command::StackTraceCommand {
1731 thread_id: thread_id.0,
1732 start_frame: None,
1733 levels: None,
1734 },
1735 move |this, stack_frames, cx| {
1736 let stack_frames = stack_frames.log_err()?;
1737
1738 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1739 thread.stack_frame_ids =
1740 stack_frames.iter().map(|frame| frame.id).collect();
1741 });
1742 debug_assert!(
1743 matches!(entry, indexmap::map::Entry::Occupied(_)),
1744 "Sent request for thread_id that doesn't exist"
1745 );
1746
1747 this.stack_frames.extend(
1748 stack_frames
1749 .iter()
1750 .cloned()
1751 .map(|frame| (frame.id, StackFrame::from(frame))),
1752 );
1753
1754 this.invalidate_command_type::<ScopesCommand>();
1755 this.invalidate_command_type::<VariablesCommand>();
1756
1757 cx.emit(SessionEvent::StackTrace);
1758 cx.notify();
1759 Some(stack_frames)
1760 },
1761 cx,
1762 );
1763 }
1764
1765 self.threads
1766 .get(&thread_id)
1767 .map(|thread| {
1768 thread
1769 .stack_frame_ids
1770 .iter()
1771 .filter_map(|id| self.stack_frames.get(id))
1772 .cloned()
1773 .collect()
1774 })
1775 .unwrap_or_default()
1776 }
1777
1778 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1779 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1780 && self
1781 .requests
1782 .contains_key(&TypeId::of::<StackTraceCommand>())
1783 {
1784 self.fetch(
1785 ScopesCommand { stack_frame_id },
1786 move |this, scopes, cx| {
1787 let scopes = scopes.log_err()?;
1788
1789 for scope in scopes .iter(){
1790 this.variables(scope.variables_reference, cx);
1791 }
1792
1793 let entry = this
1794 .stack_frames
1795 .entry(stack_frame_id)
1796 .and_modify(|stack_frame| {
1797 stack_frame.scopes = scopes.clone();
1798 });
1799
1800 cx.emit(SessionEvent::Variables);
1801
1802 debug_assert!(
1803 matches!(entry, indexmap::map::Entry::Occupied(_)),
1804 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1805 );
1806
1807 Some(scopes)
1808 },
1809 cx,
1810 );
1811 }
1812
1813 self.stack_frames
1814 .get(&stack_frame_id)
1815 .map(|frame| frame.scopes.as_slice())
1816 .unwrap_or_default()
1817 }
1818
1819 pub fn variables(
1820 &mut self,
1821 variables_reference: VariableReference,
1822 cx: &mut Context<Self>,
1823 ) -> Vec<dap::Variable> {
1824 let command = VariablesCommand {
1825 variables_reference,
1826 filter: None,
1827 start: None,
1828 count: None,
1829 format: None,
1830 };
1831
1832 self.fetch(
1833 command,
1834 move |this, variables, cx| {
1835 let variables = variables.log_err()?;
1836 this.variables
1837 .insert(variables_reference, variables.clone());
1838
1839 cx.emit(SessionEvent::Variables);
1840 Some(variables)
1841 },
1842 cx,
1843 );
1844
1845 self.variables
1846 .get(&variables_reference)
1847 .cloned()
1848 .unwrap_or_default()
1849 }
1850
1851 pub fn set_variable_value(
1852 &mut self,
1853 variables_reference: u64,
1854 name: String,
1855 value: String,
1856 cx: &mut Context<Self>,
1857 ) {
1858 if self.capabilities.supports_set_variable.unwrap_or_default() {
1859 self.request(
1860 SetVariableValueCommand {
1861 name,
1862 value,
1863 variables_reference,
1864 },
1865 move |this, response, cx| {
1866 let response = response.log_err()?;
1867 this.invalidate_command_type::<VariablesCommand>();
1868 cx.notify();
1869 Some(response)
1870 },
1871 cx,
1872 )
1873 .detach()
1874 }
1875 }
1876
1877 pub fn evaluate(
1878 &mut self,
1879 expression: String,
1880 context: Option<EvaluateArgumentsContext>,
1881 frame_id: Option<u64>,
1882 source: Option<Source>,
1883 cx: &mut Context<Self>,
1884 ) {
1885 self.request(
1886 EvaluateCommand {
1887 expression,
1888 context,
1889 frame_id,
1890 source,
1891 },
1892 |this, response, cx| {
1893 let response = response.log_err()?;
1894 this.output_token.0 += 1;
1895 this.output.push_back(dap::OutputEvent {
1896 category: None,
1897 output: response.result.clone(),
1898 group: None,
1899 variables_reference: Some(response.variables_reference),
1900 source: None,
1901 line: None,
1902 column: None,
1903 data: None,
1904 location_reference: None,
1905 });
1906
1907 this.invalidate_command_type::<ScopesCommand>();
1908 cx.notify();
1909 Some(response)
1910 },
1911 cx,
1912 )
1913 .detach();
1914 }
1915
1916 pub fn location(
1917 &mut self,
1918 reference: u64,
1919 cx: &mut Context<Self>,
1920 ) -> Option<dap::LocationsResponse> {
1921 self.fetch(
1922 LocationsCommand { reference },
1923 move |this, response, _| {
1924 let response = response.log_err()?;
1925 this.locations.insert(reference, response.clone());
1926 Some(response)
1927 },
1928 cx,
1929 );
1930 self.locations.get(&reference).cloned()
1931 }
1932 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1933 let command = DisconnectCommand {
1934 restart: Some(false),
1935 terminate_debuggee: Some(true),
1936 suspend_debuggee: Some(false),
1937 };
1938
1939 self.request(command, Self::empty_response, cx).detach()
1940 }
1941
1942 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1943 if self
1944 .capabilities
1945 .supports_terminate_threads_request
1946 .unwrap_or_default()
1947 {
1948 self.request(
1949 TerminateThreadsCommand {
1950 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1951 },
1952 Self::clear_active_debug_line_response,
1953 cx,
1954 )
1955 .detach();
1956 } else {
1957 self.shutdown(cx).detach();
1958 }
1959 }
1960}
1961
1962fn create_local_session(
1963 breakpoint_store: Entity<BreakpointStore>,
1964 session_id: SessionId,
1965 parent_session: Option<Entity<Session>>,
1966 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1967 initialized_tx: oneshot::Sender<()>,
1968 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1969 mode: LocalMode,
1970 cx: &mut Context<Session>,
1971) -> Session {
1972 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1973 let mut initialized_tx = Some(initialized_tx);
1974 while let Some(message) = message_rx.next().await {
1975 if let Message::Event(event) = message {
1976 if let Events::Initialized(_) = *event {
1977 if let Some(tx) = initialized_tx.take() {
1978 tx.send(()).ok();
1979 }
1980 } else {
1981 let Ok(_) = this.update(cx, |session, cx| {
1982 session.handle_dap_event(event, cx);
1983 }) else {
1984 break;
1985 };
1986 }
1987 } else {
1988 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1989 else {
1990 break;
1991 };
1992 }
1993 }
1994 })];
1995
1996 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1997 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1998 if let Some(local) = (!this.ignore_breakpoints)
1999 .then(|| this.as_local_mut())
2000 .flatten()
2001 {
2002 local
2003 .send_breakpoints_from_path(path.clone(), *reason, cx)
2004 .detach();
2005 };
2006 }
2007 BreakpointStoreEvent::BreakpointsCleared(paths) => {
2008 if let Some(local) = (!this.ignore_breakpoints)
2009 .then(|| this.as_local_mut())
2010 .flatten()
2011 {
2012 local.unset_breakpoints_from_paths(paths, cx).detach();
2013 }
2014 }
2015 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2016 })
2017 .detach();
2018
2019 Session {
2020 mode: Mode::Local(mode),
2021 id: session_id,
2022 child_session_ids: HashSet::default(),
2023 parent_id: parent_session.map(|session| session.read(cx).id),
2024 variables: Default::default(),
2025 capabilities: Capabilities::default(),
2026 thread_states: ThreadStates::default(),
2027 output_token: OutputToken(0),
2028 ignore_breakpoints: false,
2029 output: circular_buffer::CircularBuffer::boxed(),
2030 requests: HashMap::default(),
2031 modules: Vec::default(),
2032 loaded_sources: Vec::default(),
2033 threads: IndexMap::default(),
2034 stack_frames: IndexMap::default(),
2035 locations: Default::default(),
2036 _background_tasks,
2037 is_session_terminated: false,
2038 }
2039}