1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
9 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
10 VariablesCommand,
11};
12use super::dap_store::DapAdapterDelegate;
13use anyhow::{Context as _, Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::{DebugAdapter, DebugAdapterBinary};
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 adapters::{DapDelegate, DapStatus},
21 client::{DebugAdapterClient, SessionId},
22 messages::{Events, Message},
23};
24use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
25use futures::channel::oneshot;
26use futures::{FutureExt, future::Shared};
27use gpui::{
28 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
29};
30use rpc::AnyProtoClient;
31use serde_json::{Value, json};
32use settings::Settings;
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::path::PathBuf;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::{DebugAdapterConfig, DebugTaskDefinition};
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47
48#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
49#[repr(transparent)]
50pub struct ThreadId(pub u64);
51
52impl ThreadId {
53 pub const MIN: ThreadId = ThreadId(u64::MIN);
54 pub const MAX: ThreadId = ThreadId(u64::MAX);
55}
56
57impl From<u64> for ThreadId {
58 fn from(id: u64) -> Self {
59 Self(id)
60 }
61}
62
63#[derive(Clone, Debug)]
64pub struct StackFrame {
65 pub dap: dap::StackFrame,
66 pub scopes: Vec<dap::Scope>,
67}
68
69impl From<dap::StackFrame> for StackFrame {
70 fn from(stack_frame: dap::StackFrame) -> Self {
71 Self {
72 scopes: vec![],
73 dap: stack_frame,
74 }
75 }
76}
77
78#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
79pub enum ThreadStatus {
80 #[default]
81 Running,
82 Stopped,
83 Stepping,
84 Exited,
85 Ended,
86}
87
88impl ThreadStatus {
89 pub fn label(&self) -> &'static str {
90 match self {
91 ThreadStatus::Running => "Running",
92 ThreadStatus::Stopped => "Stopped",
93 ThreadStatus::Stepping => "Stepping",
94 ThreadStatus::Exited => "Exited",
95 ThreadStatus::Ended => "Ended",
96 }
97 }
98}
99
100#[derive(Debug)]
101pub struct Thread {
102 dap: dap::Thread,
103 stack_frame_ids: IndexSet<StackFrameId>,
104 _has_stopped: bool,
105}
106
107impl From<dap::Thread> for Thread {
108 fn from(dap: dap::Thread) -> Self {
109 Self {
110 dap,
111 stack_frame_ids: Default::default(),
112 _has_stopped: false,
113 }
114 }
115}
116
117type UpstreamProjectId = u64;
118
119struct RemoteConnection {
120 _client: AnyProtoClient,
121 _upstream_project_id: UpstreamProjectId,
122}
123
124impl RemoteConnection {
125 fn send_proto_client_request<R: DapCommand>(
126 &self,
127 _request: R,
128 _session_id: SessionId,
129 cx: &mut App,
130 ) -> Task<Result<R::Response>> {
131 // let message = request.to_proto(session_id, self.upstream_project_id);
132 // let upstream_client = self.client.clone();
133 cx.background_executor().spawn(async move {
134 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
135 // let response = upstream_client.request(message).await?;
136 // request.response_from_proto(response)
137 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
138 })
139 }
140
141 fn request<R: DapCommand>(
142 &self,
143 request: R,
144 session_id: SessionId,
145 cx: &mut App,
146 ) -> Task<Result<R::Response>>
147 where
148 <R::DapRequest as dap::requests::Request>::Response: 'static,
149 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
150 {
151 return self.send_proto_client_request::<R>(request, session_id, cx);
152 }
153}
154
155enum Mode {
156 Local(LocalMode),
157 Remote(RemoteConnection),
158}
159
160#[derive(Clone)]
161pub struct LocalMode {
162 client: Arc<DebugAdapterClient>,
163 config: DebugAdapterConfig,
164 adapter: Arc<dyn DebugAdapter>,
165 breakpoint_store: Entity<BreakpointStore>,
166}
167
168fn client_source(abs_path: &Path) -> dap::Source {
169 dap::Source {
170 name: abs_path
171 .file_name()
172 .map(|filename| filename.to_string_lossy().to_string()),
173 path: Some(abs_path.to_string_lossy().to_string()),
174 source_reference: None,
175 presentation_hint: None,
176 origin: None,
177 sources: None,
178 adapter_data: None,
179 checksums: None,
180 }
181}
182
183impl LocalMode {
184 fn new(
185 debug_adapters: Arc<DapRegistry>,
186 session_id: SessionId,
187 parent_session: Option<Entity<Session>>,
188 breakpoint_store: Entity<BreakpointStore>,
189 config: DebugAdapterConfig,
190 delegate: DapAdapterDelegate,
191 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
192 cx: AsyncApp,
193 ) -> Task<Result<Self>> {
194 Self::new_inner(
195 debug_adapters,
196 session_id,
197 parent_session,
198 breakpoint_store,
199 config,
200 delegate,
201 messages_tx,
202 async |_, _| {},
203 cx,
204 )
205 }
206 #[cfg(any(test, feature = "test-support"))]
207 fn new_fake(
208 session_id: SessionId,
209 parent_session: Option<Entity<Session>>,
210 breakpoint_store: Entity<BreakpointStore>,
211 config: DebugAdapterConfig,
212 delegate: DapAdapterDelegate,
213 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
214 caps: Capabilities,
215 fail: bool,
216 cx: AsyncApp,
217 ) -> Task<Result<Self>> {
218 use task::DebugRequestDisposition;
219
220 let request = match config.request.clone() {
221 DebugRequestDisposition::UserConfigured(request) => request,
222 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
223 match reverse_request_args.request {
224 dap::StartDebuggingRequestArgumentsRequest::Launch => {
225 DebugRequestType::Launch(task::LaunchConfig {
226 program: "".to_owned(),
227 cwd: None,
228 args: Default::default(),
229 })
230 }
231 dap::StartDebuggingRequestArgumentsRequest::Attach => {
232 DebugRequestType::Attach(task::AttachConfig {
233 process_id: Some(0),
234 })
235 }
236 }
237 }
238 };
239
240 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
241 session
242 .client
243 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
244 .await;
245
246 let paths = cx
247 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
248 .expect("Breakpoint store should exist in all tests that start debuggers");
249
250 session
251 .client
252 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
253 let p = Arc::from(Path::new(&args.source.path.unwrap()));
254 if !paths.contains(&p) {
255 panic!("Sent breakpoints for path without any")
256 }
257
258 Ok(dap::SetBreakpointsResponse {
259 breakpoints: Vec::default(),
260 })
261 })
262 .await;
263
264 match request {
265 dap::DebugRequestType::Launch(_) => {
266 if fail {
267 session
268 .client
269 .on_request::<dap::requests::Launch, _>(move |_, _| {
270 Err(dap::ErrorResponse {
271 error: Some(dap::Message {
272 id: 1,
273 format: "error".into(),
274 variables: None,
275 send_telemetry: None,
276 show_user: None,
277 url: None,
278 url_label: None,
279 }),
280 })
281 })
282 .await;
283 } else {
284 session
285 .client
286 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
287 .await;
288 }
289 }
290 dap::DebugRequestType::Attach(attach_config) => {
291 if fail {
292 session
293 .client
294 .on_request::<dap::requests::Attach, _>(move |_, _| {
295 Err(dap::ErrorResponse {
296 error: Some(dap::Message {
297 id: 1,
298 format: "error".into(),
299 variables: None,
300 send_telemetry: None,
301 show_user: None,
302 url: None,
303 url_label: None,
304 }),
305 })
306 })
307 .await;
308 } else {
309 session
310 .client
311 .on_request::<dap::requests::Attach, _>(move |_, args| {
312 assert_eq!(
313 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
314 args.raw
315 );
316
317 Ok(())
318 })
319 .await;
320 }
321 }
322 }
323
324 session
325 .client
326 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
327 .await;
328 session.client.fake_event(Events::Initialized(None)).await;
329 };
330 Self::new_inner(
331 DapRegistry::fake().into(),
332 session_id,
333 parent_session,
334 breakpoint_store,
335 config,
336 delegate,
337 messages_tx,
338 callback,
339 cx,
340 )
341 }
342 fn new_inner(
343 registry: Arc<DapRegistry>,
344 session_id: SessionId,
345 parent_session: Option<Entity<Session>>,
346 breakpoint_store: Entity<BreakpointStore>,
347 config: DebugAdapterConfig,
348 delegate: DapAdapterDelegate,
349 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
350 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
351 cx: AsyncApp,
352 ) -> Task<Result<Self>> {
353 cx.spawn(async move |cx| {
354 let (adapter, binary) =
355 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
356
357 let message_handler = Box::new(move |message| {
358 messages_tx.unbounded_send(message).ok();
359 });
360
361 let client = Arc::new(
362 if let Some(client) = parent_session
363 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
364 .flatten()
365 {
366 client
367 .reconnect(session_id, binary, message_handler, cx.clone())
368 .await?
369 } else {
370 DebugAdapterClient::start(
371 session_id,
372 adapter.name(),
373 binary,
374 message_handler,
375 cx.clone(),
376 )
377 .await
378 .with_context(|| "Failed to start communication with debug adapter")?
379 },
380 );
381
382 let mut session = Self {
383 client,
384 adapter,
385 breakpoint_store,
386 config: config.clone(),
387 };
388
389 on_initialized(&mut session, cx.clone()).await;
390
391 Ok(session)
392 })
393 }
394
395 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
396 let tasks: Vec<_> = paths
397 .into_iter()
398 .map(|path| {
399 self.request(
400 dap_command::SetBreakpoints {
401 source: client_source(path),
402 source_modified: None,
403 breakpoints: vec![],
404 },
405 cx.background_executor().clone(),
406 )
407 })
408 .collect();
409
410 cx.background_spawn(async move {
411 futures::future::join_all(tasks)
412 .await
413 .iter()
414 .for_each(|res| match res {
415 Ok(_) => {}
416 Err(err) => {
417 log::warn!("Set breakpoints request failed: {}", err);
418 }
419 });
420 })
421 }
422
423 fn send_breakpoints_from_path(
424 &self,
425 abs_path: Arc<Path>,
426 reason: BreakpointUpdatedReason,
427 cx: &mut App,
428 ) -> Task<()> {
429 let breakpoints = self
430 .breakpoint_store
431 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
432 .into_iter()
433 .filter(|bp| bp.state.is_enabled())
434 .map(Into::into)
435 .collect();
436
437 let task = self.request(
438 dap_command::SetBreakpoints {
439 source: client_source(&abs_path),
440 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
441 breakpoints,
442 },
443 cx.background_executor().clone(),
444 );
445
446 cx.background_spawn(async move {
447 match task.await {
448 Ok(_) => {}
449 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
450 }
451 })
452 }
453
454 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
455 let mut breakpoint_tasks = Vec::new();
456 let breakpoints = self
457 .breakpoint_store
458 .read_with(cx, |store, cx| store.all_breakpoints(cx));
459
460 for (path, breakpoints) in breakpoints {
461 let breakpoints = if ignore_breakpoints {
462 vec![]
463 } else {
464 breakpoints
465 .into_iter()
466 .filter(|bp| bp.state.is_enabled())
467 .map(Into::into)
468 .collect()
469 };
470
471 breakpoint_tasks.push(self.request(
472 dap_command::SetBreakpoints {
473 source: client_source(&path),
474 source_modified: Some(false),
475 breakpoints,
476 },
477 cx.background_executor().clone(),
478 ));
479 }
480
481 cx.background_spawn(async move {
482 futures::future::join_all(breakpoint_tasks)
483 .await
484 .iter()
485 .for_each(|res| match res {
486 Ok(_) => {}
487 Err(err) => {
488 log::warn!("Set breakpoints request failed: {}", err);
489 }
490 });
491 })
492 }
493
494 async fn get_adapter_binary(
495 registry: &Arc<DapRegistry>,
496 config: &DebugAdapterConfig,
497 delegate: &DapAdapterDelegate,
498 cx: &mut AsyncApp,
499 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
500 let adapter = registry
501 .adapter(&config.adapter)
502 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
503
504 let binary = cx.update(|cx| {
505 ProjectSettings::get_global(cx)
506 .dap
507 .get(&adapter.name())
508 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
509 })?;
510
511 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
512 Err(error) => {
513 delegate.update_status(
514 adapter.name(),
515 DapStatus::Failed {
516 error: error.to_string(),
517 },
518 );
519
520 return Err(error);
521 }
522 Ok(mut binary) => {
523 delegate.update_status(adapter.name(), DapStatus::None);
524
525 let shell_env = delegate.shell_env().await;
526 let mut envs = binary.envs.unwrap_or_default();
527 envs.extend(shell_env);
528 binary.envs = Some(envs);
529
530 binary
531 }
532 };
533
534 Ok((adapter, binary))
535 }
536
537 pub fn label(&self) -> String {
538 self.config.label.clone()
539 }
540
541 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
542 let adapter_id = self.adapter.name().to_string();
543
544 self.request(Initialize { adapter_id }, cx.background_executor().clone())
545 }
546
547 fn initialize_sequence(
548 &self,
549 capabilities: &Capabilities,
550 initialized_rx: oneshot::Receiver<()>,
551 cx: &App,
552 ) -> Task<Result<()>> {
553 let (mut raw, is_launch) = match &self.config.request {
554 task::DebugRequestDisposition::UserConfigured(_) => {
555 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
556 debug_assert!(false, "This part of code should be unreachable in practice");
557 return Task::ready(Err(anyhow!(
558 "Expected debug config conversion to succeed"
559 )));
560 };
561 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
562 let raw = self.adapter.request_args(&raw);
563 (raw, is_launch)
564 }
565 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
566 start_debugging_request_arguments.configuration.clone(),
567 matches!(
568 start_debugging_request_arguments.request,
569 dap::StartDebuggingRequestArgumentsRequest::Launch
570 ),
571 ),
572 };
573
574 merge_json_value_into(
575 self.config.initialize_args.clone().unwrap_or(json!({})),
576 &mut raw,
577 );
578 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
579 let launch = if is_launch {
580 self.request(Launch { raw }, cx.background_executor().clone())
581 } else {
582 self.request(Attach { raw }, cx.background_executor().clone())
583 };
584
585 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
586
587 let configuration_sequence = cx.spawn({
588 let this = self.clone();
589 async move |cx| {
590 initialized_rx.await?;
591 // todo(debugger) figure out if we want to handle a breakpoint response error
592 // This will probably consist of letting a user know that breakpoints failed to be set
593 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
594
595 if configuration_done_supported {
596 this.request(ConfigurationDone {}, cx.background_executor().clone())
597 } else {
598 Task::ready(Ok(()))
599 }
600 .await
601 }
602 });
603
604 cx.background_spawn(async move {
605 futures::future::try_join(launch, configuration_sequence).await?;
606 Ok(())
607 })
608 }
609
610 fn request<R: LocalDapCommand>(
611 &self,
612 request: R,
613 executor: BackgroundExecutor,
614 ) -> Task<Result<R::Response>>
615 where
616 <R::DapRequest as dap::requests::Request>::Response: 'static,
617 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
618 {
619 let request = Arc::new(request);
620
621 let request_clone = request.clone();
622 let connection = self.client.clone();
623 let request_task = executor.spawn(async move {
624 let args = request_clone.to_dap();
625 connection.request::<R::DapRequest>(args).await
626 });
627
628 executor.spawn(async move {
629 let response = request.response_from_dap(request_task.await?);
630 response
631 })
632 }
633}
634impl From<RemoteConnection> for Mode {
635 fn from(value: RemoteConnection) -> Self {
636 Self::Remote(value)
637 }
638}
639
640impl Mode {
641 fn request_dap<R: DapCommand>(
642 &self,
643 session_id: SessionId,
644 request: R,
645 cx: &mut Context<Session>,
646 ) -> Task<Result<R::Response>>
647 where
648 <R::DapRequest as dap::requests::Request>::Response: 'static,
649 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
650 {
651 match self {
652 Mode::Local(debug_adapter_client) => {
653 debug_adapter_client.request(request, cx.background_executor().clone())
654 }
655 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
656 }
657 }
658}
659
660#[derive(Default)]
661struct ThreadStates {
662 global_state: Option<ThreadStatus>,
663 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
664}
665
666impl ThreadStates {
667 fn stop_all_threads(&mut self) {
668 self.global_state = Some(ThreadStatus::Stopped);
669 self.known_thread_states.clear();
670 }
671
672 fn exit_all_threads(&mut self) {
673 self.global_state = Some(ThreadStatus::Exited);
674 self.known_thread_states.clear();
675 }
676
677 fn continue_all_threads(&mut self) {
678 self.global_state = Some(ThreadStatus::Running);
679 self.known_thread_states.clear();
680 }
681
682 fn stop_thread(&mut self, thread_id: ThreadId) {
683 self.known_thread_states
684 .insert(thread_id, ThreadStatus::Stopped);
685 }
686
687 fn continue_thread(&mut self, thread_id: ThreadId) {
688 self.known_thread_states
689 .insert(thread_id, ThreadStatus::Running);
690 }
691
692 fn process_step(&mut self, thread_id: ThreadId) {
693 self.known_thread_states
694 .insert(thread_id, ThreadStatus::Stepping);
695 }
696
697 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
698 self.thread_state(thread_id)
699 .unwrap_or(ThreadStatus::Running)
700 }
701
702 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
703 self.known_thread_states
704 .get(&thread_id)
705 .copied()
706 .or(self.global_state)
707 }
708
709 fn exit_thread(&mut self, thread_id: ThreadId) {
710 self.known_thread_states
711 .insert(thread_id, ThreadStatus::Exited);
712 }
713
714 fn any_stopped_thread(&self) -> bool {
715 self.global_state
716 .is_some_and(|state| state == ThreadStatus::Stopped)
717 || self
718 .known_thread_states
719 .values()
720 .any(|status| *status == ThreadStatus::Stopped)
721 }
722}
723const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
724
725#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
726pub struct OutputToken(pub usize);
727/// Represents a current state of a single debug adapter and provides ways to mutate it.
728pub struct Session {
729 mode: Mode,
730 pub(super) capabilities: Capabilities,
731 id: SessionId,
732 child_session_ids: HashSet<SessionId>,
733 parent_id: Option<SessionId>,
734 ignore_breakpoints: bool,
735 modules: Vec<dap::Module>,
736 loaded_sources: Vec<dap::Source>,
737 output_token: OutputToken,
738 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
739 threads: IndexMap<ThreadId, Thread>,
740 thread_states: ThreadStates,
741 variables: HashMap<VariableReference, Vec<dap::Variable>>,
742 stack_frames: IndexMap<StackFrameId, StackFrame>,
743 locations: HashMap<u64, dap::LocationsResponse>,
744 is_session_terminated: bool,
745 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
746 _background_tasks: Vec<Task<()>>,
747}
748
749trait CacheableCommand: Any + Send + Sync {
750 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
751 fn dyn_hash(&self, hasher: &mut dyn Hasher);
752 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
753}
754
755impl<T> CacheableCommand for T
756where
757 T: DapCommand + PartialEq + Eq + Hash,
758{
759 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
760 (rhs as &dyn Any)
761 .downcast_ref::<Self>()
762 .map_or(false, |rhs| self == rhs)
763 }
764
765 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
766 T::hash(self, &mut hasher);
767 }
768
769 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
770 self
771 }
772}
773
774pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
775
776impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
777 fn from(request: T) -> Self {
778 Self(Arc::new(request))
779 }
780}
781
782impl PartialEq for RequestSlot {
783 fn eq(&self, other: &Self) -> bool {
784 self.0.dyn_eq(other.0.as_ref())
785 }
786}
787
788impl Eq for RequestSlot {}
789
790impl Hash for RequestSlot {
791 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
792 self.0.dyn_hash(state);
793 (&*self.0 as &dyn Any).type_id().hash(state)
794 }
795}
796
797#[derive(Debug, Clone, Hash, PartialEq, Eq)]
798pub struct CompletionsQuery {
799 pub query: String,
800 pub column: u64,
801 pub line: Option<u64>,
802 pub frame_id: Option<u64>,
803}
804
805impl CompletionsQuery {
806 pub fn new(
807 buffer: &language::Buffer,
808 cursor_position: language::Anchor,
809 frame_id: Option<u64>,
810 ) -> Self {
811 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
812 Self {
813 query: buffer.text(),
814 column: column as u64,
815 frame_id,
816 line: Some(row as u64),
817 }
818 }
819}
820
821pub enum SessionEvent {
822 Modules,
823 LoadedSources,
824 Stopped(Option<ThreadId>),
825 StackTrace,
826 Variables,
827 Threads,
828}
829
830pub(crate) enum SessionStateEvent {
831 Shutdown,
832}
833
834impl EventEmitter<SessionEvent> for Session {}
835impl EventEmitter<SessionStateEvent> for Session {}
836
837// local session will send breakpoint updates to DAP for all new breakpoints
838// remote side will only send breakpoint updates when it is a breakpoint created by that peer
839// BreakpointStore notifies session on breakpoint changes
840impl Session {
841 pub(crate) fn local(
842 breakpoint_store: Entity<BreakpointStore>,
843 session_id: SessionId,
844 parent_session: Option<Entity<Session>>,
845 delegate: DapAdapterDelegate,
846 config: DebugAdapterConfig,
847 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
848 initialized_tx: oneshot::Sender<()>,
849 debug_adapters: Arc<DapRegistry>,
850 cx: &mut App,
851 ) -> Task<Result<Entity<Self>>> {
852 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
853
854 cx.spawn(async move |cx| {
855 let mode = LocalMode::new(
856 debug_adapters,
857 session_id,
858 parent_session.clone(),
859 breakpoint_store.clone(),
860 config.clone(),
861 delegate,
862 message_tx,
863 cx.clone(),
864 )
865 .await?;
866
867 cx.new(|cx| {
868 create_local_session(
869 breakpoint_store,
870 session_id,
871 parent_session,
872 start_debugging_requests_tx,
873 initialized_tx,
874 message_rx,
875 mode,
876 cx,
877 )
878 })
879 })
880 }
881
882 #[cfg(any(test, feature = "test-support"))]
883 pub(crate) fn fake(
884 breakpoint_store: Entity<BreakpointStore>,
885 session_id: SessionId,
886 parent_session: Option<Entity<Session>>,
887 delegate: DapAdapterDelegate,
888 config: DebugAdapterConfig,
889 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
890 initialized_tx: oneshot::Sender<()>,
891 caps: Capabilities,
892 fails: bool,
893 cx: &mut App,
894 ) -> Task<Result<Entity<Session>>> {
895 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
896
897 cx.spawn(async move |cx| {
898 let mode = LocalMode::new_fake(
899 session_id,
900 parent_session.clone(),
901 breakpoint_store.clone(),
902 config.clone(),
903 delegate,
904 message_tx,
905 caps,
906 fails,
907 cx.clone(),
908 )
909 .await?;
910
911 cx.new(|cx| {
912 create_local_session(
913 breakpoint_store,
914 session_id,
915 parent_session,
916 start_debugging_requests_tx,
917 initialized_tx,
918 message_rx,
919 mode,
920 cx,
921 )
922 })
923 })
924 }
925
926 pub(crate) fn remote(
927 session_id: SessionId,
928 client: AnyProtoClient,
929 upstream_project_id: u64,
930 ignore_breakpoints: bool,
931 ) -> Self {
932 Self {
933 mode: Mode::Remote(RemoteConnection {
934 _client: client,
935 _upstream_project_id: upstream_project_id,
936 }),
937 id: session_id,
938 child_session_ids: HashSet::default(),
939 parent_id: None,
940 capabilities: Capabilities::default(),
941 ignore_breakpoints,
942 variables: Default::default(),
943 stack_frames: Default::default(),
944 thread_states: ThreadStates::default(),
945 output_token: OutputToken(0),
946 output: circular_buffer::CircularBuffer::boxed(),
947 requests: HashMap::default(),
948 modules: Vec::default(),
949 loaded_sources: Vec::default(),
950 threads: IndexMap::default(),
951 _background_tasks: Vec::default(),
952 locations: Default::default(),
953 is_session_terminated: false,
954 }
955 }
956
957 pub fn session_id(&self) -> SessionId {
958 self.id
959 }
960
961 pub fn child_session_ids(&self) -> HashSet<SessionId> {
962 self.child_session_ids.clone()
963 }
964
965 pub fn add_child_session_id(&mut self, session_id: SessionId) {
966 self.child_session_ids.insert(session_id);
967 }
968
969 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
970 self.child_session_ids.remove(&session_id);
971 }
972
973 pub fn parent_id(&self) -> Option<SessionId> {
974 self.parent_id
975 }
976
977 pub fn capabilities(&self) -> &Capabilities {
978 &self.capabilities
979 }
980
981 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
982 if let Mode::Local(local_mode) = &self.mode {
983 Some(local_mode.config.clone())
984 } else {
985 None
986 }
987 }
988
989 pub fn is_terminated(&self) -> bool {
990 self.is_session_terminated
991 }
992
993 pub fn is_local(&self) -> bool {
994 matches!(self.mode, Mode::Local(_))
995 }
996
997 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
998 match &mut self.mode {
999 Mode::Local(local_mode) => Some(local_mode),
1000 Mode::Remote(_) => None,
1001 }
1002 }
1003
1004 pub fn as_local(&self) -> Option<&LocalMode> {
1005 match &self.mode {
1006 Mode::Local(local_mode) => Some(local_mode),
1007 Mode::Remote(_) => None,
1008 }
1009 }
1010
1011 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1012 match &self.mode {
1013 Mode::Local(local_mode) => {
1014 let capabilities = local_mode.clone().request_initialization(cx);
1015
1016 cx.spawn(async move |this, cx| {
1017 let capabilities = capabilities.await?;
1018 this.update(cx, |session, _| {
1019 session.capabilities = capabilities;
1020 })?;
1021 Ok(())
1022 })
1023 }
1024 Mode::Remote(_) => Task::ready(Err(anyhow!(
1025 "Cannot send initialize request from remote session"
1026 ))),
1027 }
1028 }
1029
1030 pub(super) fn initialize_sequence(
1031 &mut self,
1032 initialize_rx: oneshot::Receiver<()>,
1033 cx: &mut Context<Self>,
1034 ) -> Task<Result<()>> {
1035 match &self.mode {
1036 Mode::Local(local_mode) => {
1037 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1038 }
1039 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1040 }
1041 }
1042
1043 pub fn output(
1044 &self,
1045 since: OutputToken,
1046 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1047 if self.output_token.0 == 0 {
1048 return (self.output.range(0..0), OutputToken(0));
1049 };
1050
1051 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1052
1053 let clamped_events_since = events_since.clamp(0, self.output.len());
1054 (
1055 self.output
1056 .range(self.output.len() - clamped_events_since..),
1057 self.output_token,
1058 )
1059 }
1060
1061 pub fn respond_to_client(
1062 &self,
1063 request_seq: u64,
1064 success: bool,
1065 command: String,
1066 body: Option<serde_json::Value>,
1067 cx: &mut Context<Self>,
1068 ) -> Task<Result<()>> {
1069 let Some(local_session) = self.as_local().cloned() else {
1070 unreachable!("Cannot respond to remote client");
1071 };
1072
1073 cx.background_spawn(async move {
1074 local_session
1075 .client
1076 .send_message(Message::Response(Response {
1077 body,
1078 success,
1079 command,
1080 seq: request_seq + 1,
1081 request_seq,
1082 message: None,
1083 }))
1084 .await
1085 })
1086 }
1087
1088 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1089 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1090 self.thread_states.stop_all_threads();
1091
1092 self.invalidate_command_type::<StackTraceCommand>();
1093 }
1094
1095 // Event if we stopped all threads we still need to insert the thread_id
1096 // to our own data
1097 if let Some(thread_id) = event.thread_id {
1098 self.thread_states.stop_thread(ThreadId(thread_id));
1099
1100 self.invalidate_state(
1101 &StackTraceCommand {
1102 thread_id,
1103 start_frame: None,
1104 levels: None,
1105 }
1106 .into(),
1107 );
1108 }
1109
1110 self.invalidate_generic();
1111 self.threads.clear();
1112 self.variables.clear();
1113 cx.emit(SessionEvent::Stopped(
1114 event
1115 .thread_id
1116 .map(Into::into)
1117 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1118 ));
1119 cx.notify();
1120 }
1121
1122 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1123 match *event {
1124 Events::Initialized(_) => {
1125 debug_assert!(
1126 false,
1127 "Initialized event should have been handled in LocalMode"
1128 );
1129 }
1130 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1131 Events::Continued(event) => {
1132 if event.all_threads_continued.unwrap_or_default() {
1133 self.thread_states.continue_all_threads();
1134 } else {
1135 self.thread_states
1136 .continue_thread(ThreadId(event.thread_id));
1137 }
1138 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1139 self.invalidate_generic();
1140 }
1141 Events::Exited(_event) => {
1142 self.clear_active_debug_line(cx);
1143 }
1144 Events::Terminated(_) => {
1145 self.is_session_terminated = true;
1146 self.clear_active_debug_line(cx);
1147 }
1148 Events::Thread(event) => {
1149 let thread_id = ThreadId(event.thread_id);
1150
1151 match event.reason {
1152 dap::ThreadEventReason::Started => {
1153 self.thread_states.continue_thread(thread_id);
1154 }
1155 dap::ThreadEventReason::Exited => {
1156 self.thread_states.exit_thread(thread_id);
1157 }
1158 reason => {
1159 log::error!("Unhandled thread event reason {:?}", reason);
1160 }
1161 }
1162 self.invalidate_state(&ThreadsCommand.into());
1163 cx.notify();
1164 }
1165 Events::Output(event) => {
1166 if event
1167 .category
1168 .as_ref()
1169 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1170 {
1171 return;
1172 }
1173
1174 self.output.push_back(event);
1175 self.output_token.0 += 1;
1176 cx.notify();
1177 }
1178 Events::Breakpoint(_) => {}
1179 Events::Module(event) => {
1180 match event.reason {
1181 dap::ModuleEventReason::New => {
1182 self.modules.push(event.module);
1183 }
1184 dap::ModuleEventReason::Changed => {
1185 if let Some(module) = self
1186 .modules
1187 .iter_mut()
1188 .find(|other| event.module.id == other.id)
1189 {
1190 *module = event.module;
1191 }
1192 }
1193 dap::ModuleEventReason::Removed => {
1194 self.modules.retain(|other| event.module.id != other.id);
1195 }
1196 }
1197
1198 // todo(debugger): We should only send the invalidate command to downstream clients.
1199 // self.invalidate_state(&ModulesCommand.into());
1200 }
1201 Events::LoadedSource(_) => {
1202 self.invalidate_state(&LoadedSourcesCommand.into());
1203 }
1204 Events::Capabilities(event) => {
1205 self.capabilities = self.capabilities.merge(event.capabilities);
1206 cx.notify();
1207 }
1208 Events::Memory(_) => {}
1209 Events::Process(_) => {}
1210 Events::ProgressEnd(_) => {}
1211 Events::ProgressStart(_) => {}
1212 Events::ProgressUpdate(_) => {}
1213 Events::Invalidated(_) => {}
1214 Events::Other(_) => {}
1215 }
1216 }
1217
1218 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1219 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1220 &mut self,
1221 request: T,
1222 process_result: impl FnOnce(
1223 &mut Self,
1224 Result<T::Response>,
1225 &mut Context<Self>,
1226 ) -> Option<T::Response>
1227 + 'static,
1228 cx: &mut Context<Self>,
1229 ) {
1230 const {
1231 assert!(
1232 T::CACHEABLE,
1233 "Only requests marked as cacheable should invoke `fetch`"
1234 );
1235 }
1236
1237 if !self.thread_states.any_stopped_thread()
1238 && request.type_id() != TypeId::of::<ThreadsCommand>()
1239 || self.is_session_terminated
1240 {
1241 return;
1242 }
1243
1244 let request_map = self
1245 .requests
1246 .entry(std::any::TypeId::of::<T>())
1247 .or_default();
1248
1249 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1250 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1251
1252 let task = Self::request_inner::<Arc<T>>(
1253 &self.capabilities,
1254 self.id,
1255 &self.mode,
1256 command,
1257 process_result,
1258 cx,
1259 );
1260 let task = cx
1261 .background_executor()
1262 .spawn(async move {
1263 let _ = task.await?;
1264 Some(())
1265 })
1266 .shared();
1267
1268 vacant.insert(task);
1269 cx.notify();
1270 }
1271 }
1272
1273 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1274 capabilities: &Capabilities,
1275 session_id: SessionId,
1276 mode: &Mode,
1277 request: T,
1278 process_result: impl FnOnce(
1279 &mut Self,
1280 Result<T::Response>,
1281 &mut Context<Self>,
1282 ) -> Option<T::Response>
1283 + 'static,
1284 cx: &mut Context<Self>,
1285 ) -> Task<Option<T::Response>> {
1286 if !T::is_supported(&capabilities) {
1287 log::warn!(
1288 "Attempted to send a DAP request that isn't supported: {:?}",
1289 request
1290 );
1291 let error = Err(anyhow::Error::msg(
1292 "Couldn't complete request because it's not supported",
1293 ));
1294 return cx.spawn(async move |this, cx| {
1295 this.update(cx, |this, cx| process_result(this, error, cx))
1296 .log_err()
1297 .flatten()
1298 });
1299 }
1300
1301 let request = mode.request_dap(session_id, request, cx);
1302 cx.spawn(async move |this, cx| {
1303 let result = request.await;
1304 this.update(cx, |this, cx| process_result(this, result, cx))
1305 .log_err()
1306 .flatten()
1307 })
1308 }
1309
1310 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1311 &self,
1312 request: T,
1313 process_result: impl FnOnce(
1314 &mut Self,
1315 Result<T::Response>,
1316 &mut Context<Self>,
1317 ) -> Option<T::Response>
1318 + 'static,
1319 cx: &mut Context<Self>,
1320 ) -> Task<Option<T::Response>> {
1321 Self::request_inner(
1322 &self.capabilities,
1323 self.id,
1324 &self.mode,
1325 request,
1326 process_result,
1327 cx,
1328 )
1329 }
1330
1331 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1332 self.requests.remove(&std::any::TypeId::of::<Command>());
1333 }
1334
1335 fn invalidate_generic(&mut self) {
1336 self.invalidate_command_type::<ModulesCommand>();
1337 self.invalidate_command_type::<LoadedSourcesCommand>();
1338 self.invalidate_command_type::<ThreadsCommand>();
1339 }
1340
1341 fn invalidate_state(&mut self, key: &RequestSlot) {
1342 self.requests
1343 .entry((&*key.0 as &dyn Any).type_id())
1344 .and_modify(|request_map| {
1345 request_map.remove(&key);
1346 });
1347 }
1348
1349 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1350 self.thread_states.thread_status(thread_id)
1351 }
1352
1353 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1354 self.fetch(
1355 dap_command::ThreadsCommand,
1356 |this, result, cx| {
1357 let result = result.log_err()?;
1358
1359 this.threads = result
1360 .iter()
1361 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1362 .collect();
1363
1364 this.invalidate_command_type::<StackTraceCommand>();
1365 cx.emit(SessionEvent::Threads);
1366 cx.notify();
1367
1368 Some(result)
1369 },
1370 cx,
1371 );
1372
1373 self.threads
1374 .values()
1375 .map(|thread| {
1376 (
1377 thread.dap.clone(),
1378 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1379 )
1380 })
1381 .collect()
1382 }
1383
1384 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1385 self.fetch(
1386 dap_command::ModulesCommand,
1387 |this, result, cx| {
1388 let result = result.log_err()?;
1389
1390 this.modules = result.iter().cloned().collect();
1391 cx.emit(SessionEvent::Modules);
1392 cx.notify();
1393
1394 Some(result)
1395 },
1396 cx,
1397 );
1398
1399 &self.modules
1400 }
1401
1402 pub fn ignore_breakpoints(&self) -> bool {
1403 self.ignore_breakpoints
1404 }
1405
1406 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1407 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1408 }
1409
1410 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1411 if self.ignore_breakpoints == ignore {
1412 return Task::ready(());
1413 }
1414
1415 self.ignore_breakpoints = ignore;
1416
1417 if let Some(local) = self.as_local() {
1418 local.send_all_breakpoints(ignore, cx)
1419 } else {
1420 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1421 unimplemented!()
1422 }
1423 }
1424
1425 pub fn breakpoints_enabled(&self) -> bool {
1426 self.ignore_breakpoints
1427 }
1428
1429 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1430 self.fetch(
1431 dap_command::LoadedSourcesCommand,
1432 |this, result, cx| {
1433 let result = result.log_err()?;
1434 this.loaded_sources = result.iter().cloned().collect();
1435 cx.emit(SessionEvent::LoadedSources);
1436 cx.notify();
1437 Some(result)
1438 },
1439 cx,
1440 );
1441
1442 &self.loaded_sources
1443 }
1444
1445 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1446 res.log_err()?;
1447 Some(())
1448 }
1449
1450 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1451 thread_id: ThreadId,
1452 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1453 {
1454 move |this, response, cx| match response.log_err() {
1455 Some(response) => Some(response),
1456 None => {
1457 this.thread_states.stop_thread(thread_id);
1458 cx.notify();
1459 None
1460 }
1461 }
1462 }
1463
1464 fn clear_active_debug_line_response(
1465 &mut self,
1466 response: Result<()>,
1467 cx: &mut Context<Session>,
1468 ) -> Option<()> {
1469 response.log_err()?;
1470 self.clear_active_debug_line(cx);
1471 Some(())
1472 }
1473
1474 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1475 self.as_local()
1476 .expect("Message handler will only run in local mode")
1477 .breakpoint_store
1478 .update(cx, |store, cx| {
1479 store.remove_active_position(Some(self.id), cx)
1480 });
1481 }
1482
1483 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1484 self.request(
1485 PauseCommand {
1486 thread_id: thread_id.0,
1487 },
1488 Self::empty_response,
1489 cx,
1490 )
1491 .detach();
1492 }
1493
1494 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1495 self.request(
1496 RestartStackFrameCommand { stack_frame_id },
1497 Self::empty_response,
1498 cx,
1499 )
1500 .detach();
1501 }
1502
1503 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1504 if self.capabilities.supports_restart_request.unwrap_or(false) {
1505 self.request(
1506 RestartCommand {
1507 raw: args.unwrap_or(Value::Null),
1508 },
1509 Self::empty_response,
1510 cx,
1511 )
1512 .detach();
1513 } else {
1514 self.request(
1515 DisconnectCommand {
1516 restart: Some(false),
1517 terminate_debuggee: Some(true),
1518 suspend_debuggee: Some(false),
1519 },
1520 Self::empty_response,
1521 cx,
1522 )
1523 .detach();
1524 }
1525 }
1526
1527 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1528 self.is_session_terminated = true;
1529 self.thread_states.exit_all_threads();
1530 cx.notify();
1531
1532 let task = if self
1533 .capabilities
1534 .supports_terminate_request
1535 .unwrap_or_default()
1536 {
1537 self.request(
1538 TerminateCommand {
1539 restart: Some(false),
1540 },
1541 Self::clear_active_debug_line_response,
1542 cx,
1543 )
1544 } else {
1545 self.request(
1546 DisconnectCommand {
1547 restart: Some(false),
1548 terminate_debuggee: Some(true),
1549 suspend_debuggee: Some(false),
1550 },
1551 Self::clear_active_debug_line_response,
1552 cx,
1553 )
1554 };
1555
1556 cx.emit(SessionStateEvent::Shutdown);
1557
1558 cx.background_spawn(async move {
1559 let _ = task.await;
1560 })
1561 }
1562
1563 pub fn completions(
1564 &mut self,
1565 query: CompletionsQuery,
1566 cx: &mut Context<Self>,
1567 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1568 let task = self.request(query, |_, result, _| result.log_err(), cx);
1569
1570 cx.background_executor().spawn(async move {
1571 anyhow::Ok(
1572 task.await
1573 .map(|response| response.targets)
1574 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1575 )
1576 })
1577 }
1578
1579 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1580 self.thread_states.continue_thread(thread_id);
1581 self.request(
1582 ContinueCommand {
1583 args: ContinueArguments {
1584 thread_id: thread_id.0,
1585 single_thread: Some(true),
1586 },
1587 },
1588 Self::on_step_response::<ContinueCommand>(thread_id),
1589 cx,
1590 )
1591 .detach();
1592 }
1593
1594 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1595 match self.mode {
1596 Mode::Local(ref local) => Some(local.client.clone()),
1597 Mode::Remote(_) => None,
1598 }
1599 }
1600
1601 pub fn step_over(
1602 &mut self,
1603 thread_id: ThreadId,
1604 granularity: SteppingGranularity,
1605 cx: &mut Context<Self>,
1606 ) {
1607 let supports_single_thread_execution_requests =
1608 self.capabilities.supports_single_thread_execution_requests;
1609 let supports_stepping_granularity = self
1610 .capabilities
1611 .supports_stepping_granularity
1612 .unwrap_or_default();
1613
1614 let command = NextCommand {
1615 inner: StepCommand {
1616 thread_id: thread_id.0,
1617 granularity: supports_stepping_granularity.then(|| granularity),
1618 single_thread: supports_single_thread_execution_requests,
1619 },
1620 };
1621
1622 self.thread_states.process_step(thread_id);
1623 self.request(
1624 command,
1625 Self::on_step_response::<NextCommand>(thread_id),
1626 cx,
1627 )
1628 .detach();
1629 }
1630
1631 pub fn step_in(
1632 &mut self,
1633 thread_id: ThreadId,
1634 granularity: SteppingGranularity,
1635 cx: &mut Context<Self>,
1636 ) {
1637 let supports_single_thread_execution_requests =
1638 self.capabilities.supports_single_thread_execution_requests;
1639 let supports_stepping_granularity = self
1640 .capabilities
1641 .supports_stepping_granularity
1642 .unwrap_or_default();
1643
1644 let command = StepInCommand {
1645 inner: StepCommand {
1646 thread_id: thread_id.0,
1647 granularity: supports_stepping_granularity.then(|| granularity),
1648 single_thread: supports_single_thread_execution_requests,
1649 },
1650 };
1651
1652 self.thread_states.process_step(thread_id);
1653 self.request(
1654 command,
1655 Self::on_step_response::<StepInCommand>(thread_id),
1656 cx,
1657 )
1658 .detach();
1659 }
1660
1661 pub fn step_out(
1662 &mut self,
1663 thread_id: ThreadId,
1664 granularity: SteppingGranularity,
1665 cx: &mut Context<Self>,
1666 ) {
1667 let supports_single_thread_execution_requests =
1668 self.capabilities.supports_single_thread_execution_requests;
1669 let supports_stepping_granularity = self
1670 .capabilities
1671 .supports_stepping_granularity
1672 .unwrap_or_default();
1673
1674 let command = StepOutCommand {
1675 inner: StepCommand {
1676 thread_id: thread_id.0,
1677 granularity: supports_stepping_granularity.then(|| granularity),
1678 single_thread: supports_single_thread_execution_requests,
1679 },
1680 };
1681
1682 self.thread_states.process_step(thread_id);
1683 self.request(
1684 command,
1685 Self::on_step_response::<StepOutCommand>(thread_id),
1686 cx,
1687 )
1688 .detach();
1689 }
1690
1691 pub fn step_back(
1692 &mut self,
1693 thread_id: ThreadId,
1694 granularity: SteppingGranularity,
1695 cx: &mut Context<Self>,
1696 ) {
1697 let supports_single_thread_execution_requests =
1698 self.capabilities.supports_single_thread_execution_requests;
1699 let supports_stepping_granularity = self
1700 .capabilities
1701 .supports_stepping_granularity
1702 .unwrap_or_default();
1703
1704 let command = StepBackCommand {
1705 inner: StepCommand {
1706 thread_id: thread_id.0,
1707 granularity: supports_stepping_granularity.then(|| granularity),
1708 single_thread: supports_single_thread_execution_requests,
1709 },
1710 };
1711
1712 self.thread_states.process_step(thread_id);
1713
1714 self.request(
1715 command,
1716 Self::on_step_response::<StepBackCommand>(thread_id),
1717 cx,
1718 )
1719 .detach();
1720 }
1721
1722 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1723 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1724 && self.requests.contains_key(&ThreadsCommand.type_id())
1725 && self.threads.contains_key(&thread_id)
1726 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1727 // We could still be using an old thread id and have sent a new thread's request
1728 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1729 // But it very well could cause a minor bug in the future that is hard to track down
1730 {
1731 self.fetch(
1732 super::dap_command::StackTraceCommand {
1733 thread_id: thread_id.0,
1734 start_frame: None,
1735 levels: None,
1736 },
1737 move |this, stack_frames, cx| {
1738 let stack_frames = stack_frames.log_err()?;
1739
1740 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1741 thread.stack_frame_ids =
1742 stack_frames.iter().map(|frame| frame.id).collect();
1743 });
1744 debug_assert!(
1745 matches!(entry, indexmap::map::Entry::Occupied(_)),
1746 "Sent request for thread_id that doesn't exist"
1747 );
1748
1749 this.stack_frames.extend(
1750 stack_frames
1751 .iter()
1752 .cloned()
1753 .map(|frame| (frame.id, StackFrame::from(frame))),
1754 );
1755
1756 this.invalidate_command_type::<ScopesCommand>();
1757 this.invalidate_command_type::<VariablesCommand>();
1758
1759 cx.emit(SessionEvent::StackTrace);
1760 cx.notify();
1761 Some(stack_frames)
1762 },
1763 cx,
1764 );
1765 }
1766
1767 self.threads
1768 .get(&thread_id)
1769 .map(|thread| {
1770 thread
1771 .stack_frame_ids
1772 .iter()
1773 .filter_map(|id| self.stack_frames.get(id))
1774 .cloned()
1775 .collect()
1776 })
1777 .unwrap_or_default()
1778 }
1779
1780 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1781 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1782 && self
1783 .requests
1784 .contains_key(&TypeId::of::<StackTraceCommand>())
1785 {
1786 self.fetch(
1787 ScopesCommand { stack_frame_id },
1788 move |this, scopes, cx| {
1789 let scopes = scopes.log_err()?;
1790
1791 for scope in scopes .iter(){
1792 this.variables(scope.variables_reference, cx);
1793 }
1794
1795 let entry = this
1796 .stack_frames
1797 .entry(stack_frame_id)
1798 .and_modify(|stack_frame| {
1799 stack_frame.scopes = scopes.clone();
1800 });
1801
1802 cx.emit(SessionEvent::Variables);
1803
1804 debug_assert!(
1805 matches!(entry, indexmap::map::Entry::Occupied(_)),
1806 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1807 );
1808
1809 Some(scopes)
1810 },
1811 cx,
1812 );
1813 }
1814
1815 self.stack_frames
1816 .get(&stack_frame_id)
1817 .map(|frame| frame.scopes.as_slice())
1818 .unwrap_or_default()
1819 }
1820
1821 pub fn variables(
1822 &mut self,
1823 variables_reference: VariableReference,
1824 cx: &mut Context<Self>,
1825 ) -> Vec<dap::Variable> {
1826 let command = VariablesCommand {
1827 variables_reference,
1828 filter: None,
1829 start: None,
1830 count: None,
1831 format: None,
1832 };
1833
1834 self.fetch(
1835 command,
1836 move |this, variables, cx| {
1837 let variables = variables.log_err()?;
1838 this.variables
1839 .insert(variables_reference, variables.clone());
1840
1841 cx.emit(SessionEvent::Variables);
1842 Some(variables)
1843 },
1844 cx,
1845 );
1846
1847 self.variables
1848 .get(&variables_reference)
1849 .cloned()
1850 .unwrap_or_default()
1851 }
1852
1853 pub fn set_variable_value(
1854 &mut self,
1855 variables_reference: u64,
1856 name: String,
1857 value: String,
1858 cx: &mut Context<Self>,
1859 ) {
1860 if self.capabilities.supports_set_variable.unwrap_or_default() {
1861 self.request(
1862 SetVariableValueCommand {
1863 name,
1864 value,
1865 variables_reference,
1866 },
1867 move |this, response, cx| {
1868 let response = response.log_err()?;
1869 this.invalidate_command_type::<VariablesCommand>();
1870 cx.notify();
1871 Some(response)
1872 },
1873 cx,
1874 )
1875 .detach()
1876 }
1877 }
1878
1879 pub fn evaluate(
1880 &mut self,
1881 expression: String,
1882 context: Option<EvaluateArgumentsContext>,
1883 frame_id: Option<u64>,
1884 source: Option<Source>,
1885 cx: &mut Context<Self>,
1886 ) {
1887 self.request(
1888 EvaluateCommand {
1889 expression,
1890 context,
1891 frame_id,
1892 source,
1893 },
1894 |this, response, cx| {
1895 let response = response.log_err()?;
1896 this.output_token.0 += 1;
1897 this.output.push_back(dap::OutputEvent {
1898 category: None,
1899 output: response.result.clone(),
1900 group: None,
1901 variables_reference: Some(response.variables_reference),
1902 source: None,
1903 line: None,
1904 column: None,
1905 data: None,
1906 location_reference: None,
1907 });
1908
1909 this.invalidate_command_type::<ScopesCommand>();
1910 cx.notify();
1911 Some(response)
1912 },
1913 cx,
1914 )
1915 .detach();
1916 }
1917
1918 pub fn location(
1919 &mut self,
1920 reference: u64,
1921 cx: &mut Context<Self>,
1922 ) -> Option<dap::LocationsResponse> {
1923 self.fetch(
1924 LocationsCommand { reference },
1925 move |this, response, _| {
1926 let response = response.log_err()?;
1927 this.locations.insert(reference, response.clone());
1928 Some(response)
1929 },
1930 cx,
1931 );
1932 self.locations.get(&reference).cloned()
1933 }
1934 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1935 let command = DisconnectCommand {
1936 restart: Some(false),
1937 terminate_debuggee: Some(true),
1938 suspend_debuggee: Some(false),
1939 };
1940
1941 self.request(command, Self::empty_response, cx).detach()
1942 }
1943
1944 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1945 if self
1946 .capabilities
1947 .supports_terminate_threads_request
1948 .unwrap_or_default()
1949 {
1950 self.request(
1951 TerminateThreadsCommand {
1952 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1953 },
1954 Self::clear_active_debug_line_response,
1955 cx,
1956 )
1957 .detach();
1958 } else {
1959 self.shutdown(cx).detach();
1960 }
1961 }
1962}
1963
1964fn create_local_session(
1965 breakpoint_store: Entity<BreakpointStore>,
1966 session_id: SessionId,
1967 parent_session: Option<Entity<Session>>,
1968 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1969 initialized_tx: oneshot::Sender<()>,
1970 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1971 mode: LocalMode,
1972 cx: &mut Context<Session>,
1973) -> Session {
1974 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1975 let mut initialized_tx = Some(initialized_tx);
1976 while let Some(message) = message_rx.next().await {
1977 if let Message::Event(event) = message {
1978 if let Events::Initialized(_) = *event {
1979 if let Some(tx) = initialized_tx.take() {
1980 tx.send(()).ok();
1981 }
1982 } else {
1983 let Ok(_) = this.update(cx, |session, cx| {
1984 session.handle_dap_event(event, cx);
1985 }) else {
1986 break;
1987 };
1988 }
1989 } else {
1990 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1991 else {
1992 break;
1993 };
1994 }
1995 }
1996 })];
1997
1998 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1999 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2000 if let Some(local) = (!this.ignore_breakpoints)
2001 .then(|| this.as_local_mut())
2002 .flatten()
2003 {
2004 local
2005 .send_breakpoints_from_path(path.clone(), *reason, cx)
2006 .detach();
2007 };
2008 }
2009 BreakpointStoreEvent::BreakpointsCleared(paths) => {
2010 if let Some(local) = (!this.ignore_breakpoints)
2011 .then(|| this.as_local_mut())
2012 .flatten()
2013 {
2014 local.unset_breakpoints_from_paths(paths, cx).detach();
2015 }
2016 }
2017 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2018 })
2019 .detach();
2020
2021 Session {
2022 mode: Mode::Local(mode),
2023 id: session_id,
2024 child_session_ids: HashSet::default(),
2025 parent_id: parent_session.map(|session| session.read(cx).id),
2026 variables: Default::default(),
2027 capabilities: Capabilities::default(),
2028 thread_states: ThreadStates::default(),
2029 output_token: OutputToken(0),
2030 ignore_breakpoints: false,
2031 output: circular_buffer::CircularBuffer::boxed(),
2032 requests: HashMap::default(),
2033 modules: Vec::default(),
2034 loaded_sources: Vec::default(),
2035 threads: IndexMap::default(),
2036 stack_frames: IndexMap::default(),
2037 locations: Default::default(),
2038 _background_tasks,
2039 is_session_terminated: false,
2040 }
2041}