1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{
4 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
5};
6use super::dap_command::{
7 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
8 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
9 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
10 ScopesCommand, SetVariableValueCommand, StackTraceCommand, StepBackCommand, StepCommand,
11 StepInCommand, StepOutCommand, TerminateCommand, TerminateThreadsCommand, ThreadsCommand,
12 VariablesCommand,
13};
14use super::dap_store::DapAdapterDelegate;
15use anyhow::{Context as _, Result, anyhow};
16use collections::{HashMap, HashSet, IndexMap, IndexSet};
17use dap::adapters::{DebugAdapter, DebugAdapterBinary};
18use dap::messages::Response;
19use dap::{
20 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
21 SteppingGranularity, StoppedEvent, VariableReference,
22 adapters::{DapDelegate, DapStatus},
23 client::{DebugAdapterClient, SessionId},
24 messages::{Events, Message},
25};
26use dap::{DapRegistry, DebugRequestType, OutputEventCategory};
27use futures::channel::oneshot;
28use futures::{FutureExt, future::Shared};
29use gpui::{
30 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
31};
32use rpc::AnyProtoClient;
33use serde_json::{Value, json};
34use settings::Settings;
35use smol::stream::StreamExt;
36use std::any::TypeId;
37use std::path::PathBuf;
38use std::u64;
39use std::{
40 any::Any,
41 collections::hash_map::Entry,
42 hash::{Hash, Hasher},
43 path::Path,
44 sync::Arc,
45};
46use task::{DebugAdapterConfig, DebugTaskDefinition};
47use text::{PointUtf16, ToPointUtf16};
48use util::{ResultExt, merge_json_value_into};
49
50#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
51#[repr(transparent)]
52pub struct ThreadId(pub u64);
53
54impl ThreadId {
55 pub const MIN: ThreadId = ThreadId(u64::MIN);
56 pub const MAX: ThreadId = ThreadId(u64::MAX);
57}
58
59impl From<u64> for ThreadId {
60 fn from(id: u64) -> Self {
61 Self(id)
62 }
63}
64
65#[derive(Clone, Debug)]
66pub struct StackFrame {
67 pub dap: dap::StackFrame,
68 pub scopes: Vec<dap::Scope>,
69}
70
71impl From<dap::StackFrame> for StackFrame {
72 fn from(stack_frame: dap::StackFrame) -> Self {
73 Self {
74 scopes: vec![],
75 dap: stack_frame,
76 }
77 }
78}
79
80#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
81pub enum ThreadStatus {
82 #[default]
83 Running,
84 Stopped,
85 Stepping,
86 Exited,
87 Ended,
88}
89
90impl ThreadStatus {
91 pub fn label(&self) -> &'static str {
92 match self {
93 ThreadStatus::Running => "Running",
94 ThreadStatus::Stopped => "Stopped",
95 ThreadStatus::Stepping => "Stepping",
96 ThreadStatus::Exited => "Exited",
97 ThreadStatus::Ended => "Ended",
98 }
99 }
100}
101
102#[derive(Debug)]
103pub struct Thread {
104 dap: dap::Thread,
105 stack_frame_ids: IndexSet<StackFrameId>,
106 _has_stopped: bool,
107}
108
109impl From<dap::Thread> for Thread {
110 fn from(dap: dap::Thread) -> Self {
111 Self {
112 dap,
113 stack_frame_ids: Default::default(),
114 _has_stopped: false,
115 }
116 }
117}
118
119type UpstreamProjectId = u64;
120
121struct RemoteConnection {
122 _client: AnyProtoClient,
123 _upstream_project_id: UpstreamProjectId,
124}
125
126impl RemoteConnection {
127 fn send_proto_client_request<R: DapCommand>(
128 &self,
129 _request: R,
130 _session_id: SessionId,
131 cx: &mut App,
132 ) -> Task<Result<R::Response>> {
133 // let message = request.to_proto(session_id, self.upstream_project_id);
134 // let upstream_client = self.client.clone();
135 cx.background_executor().spawn(async move {
136 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
137 // let response = upstream_client.request(message).await?;
138 // request.response_from_proto(response)
139 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
140 })
141 }
142
143 fn request<R: DapCommand>(
144 &self,
145 request: R,
146 session_id: SessionId,
147 cx: &mut App,
148 ) -> Task<Result<R::Response>>
149 where
150 <R::DapRequest as dap::requests::Request>::Response: 'static,
151 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
152 {
153 return self.send_proto_client_request::<R>(request, session_id, cx);
154 }
155}
156
157enum Mode {
158 Local(LocalMode),
159 Remote(RemoteConnection),
160}
161
162#[derive(Clone)]
163pub struct LocalMode {
164 client: Arc<DebugAdapterClient>,
165 config: DebugAdapterConfig,
166 adapter: Arc<dyn DebugAdapter>,
167 breakpoint_store: Entity<BreakpointStore>,
168 tmp_breakpoint: Option<SourceBreakpoint>,
169}
170
171fn client_source(abs_path: &Path) -> dap::Source {
172 dap::Source {
173 name: abs_path
174 .file_name()
175 .map(|filename| filename.to_string_lossy().to_string()),
176 path: Some(abs_path.to_string_lossy().to_string()),
177 source_reference: None,
178 presentation_hint: None,
179 origin: None,
180 sources: None,
181 adapter_data: None,
182 checksums: None,
183 }
184}
185
186impl LocalMode {
187 fn new(
188 debug_adapters: Arc<DapRegistry>,
189 session_id: SessionId,
190 parent_session: Option<Entity<Session>>,
191 breakpoint_store: Entity<BreakpointStore>,
192 config: DebugAdapterConfig,
193 delegate: DapAdapterDelegate,
194 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
195 cx: AsyncApp,
196 ) -> Task<Result<Self>> {
197 Self::new_inner(
198 debug_adapters,
199 session_id,
200 parent_session,
201 breakpoint_store,
202 config,
203 delegate,
204 messages_tx,
205 async |_, _| {},
206 cx,
207 )
208 }
209 #[cfg(any(test, feature = "test-support"))]
210 fn new_fake(
211 session_id: SessionId,
212 parent_session: Option<Entity<Session>>,
213 breakpoint_store: Entity<BreakpointStore>,
214 config: DebugAdapterConfig,
215 delegate: DapAdapterDelegate,
216 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
217 caps: Capabilities,
218 fail: bool,
219 cx: AsyncApp,
220 ) -> Task<Result<Self>> {
221 use task::DebugRequestDisposition;
222
223 let request = match config.request.clone() {
224 DebugRequestDisposition::UserConfigured(request) => request,
225 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
226 match reverse_request_args.request {
227 dap::StartDebuggingRequestArgumentsRequest::Launch => {
228 DebugRequestType::Launch(task::LaunchConfig {
229 program: "".to_owned(),
230 cwd: None,
231 args: Default::default(),
232 })
233 }
234 dap::StartDebuggingRequestArgumentsRequest::Attach => {
235 DebugRequestType::Attach(task::AttachConfig {
236 process_id: Some(0),
237 })
238 }
239 }
240 }
241 };
242
243 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
244 session
245 .client
246 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
247 .await;
248
249 let paths = cx
250 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
251 .expect("Breakpoint store should exist in all tests that start debuggers");
252
253 session
254 .client
255 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
256 let p = Arc::from(Path::new(&args.source.path.unwrap()));
257 if !paths.contains(&p) {
258 panic!("Sent breakpoints for path without any")
259 }
260
261 Ok(dap::SetBreakpointsResponse {
262 breakpoints: Vec::default(),
263 })
264 })
265 .await;
266
267 match request {
268 dap::DebugRequestType::Launch(_) => {
269 if fail {
270 session
271 .client
272 .on_request::<dap::requests::Launch, _>(move |_, _| {
273 Err(dap::ErrorResponse {
274 error: Some(dap::Message {
275 id: 1,
276 format: "error".into(),
277 variables: None,
278 send_telemetry: None,
279 show_user: None,
280 url: None,
281 url_label: None,
282 }),
283 })
284 })
285 .await;
286 } else {
287 session
288 .client
289 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
290 .await;
291 }
292 }
293 dap::DebugRequestType::Attach(attach_config) => {
294 if fail {
295 session
296 .client
297 .on_request::<dap::requests::Attach, _>(move |_, _| {
298 Err(dap::ErrorResponse {
299 error: Some(dap::Message {
300 id: 1,
301 format: "error".into(),
302 variables: None,
303 send_telemetry: None,
304 show_user: None,
305 url: None,
306 url_label: None,
307 }),
308 })
309 })
310 .await;
311 } else {
312 session
313 .client
314 .on_request::<dap::requests::Attach, _>(move |_, args| {
315 assert_eq!(
316 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
317 args.raw
318 );
319
320 Ok(())
321 })
322 .await;
323 }
324 }
325 }
326
327 session
328 .client
329 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
330 .await;
331 session.client.fake_event(Events::Initialized(None)).await;
332 };
333 Self::new_inner(
334 DapRegistry::fake().into(),
335 session_id,
336 parent_session,
337 breakpoint_store,
338 config,
339 delegate,
340 messages_tx,
341 callback,
342 cx,
343 )
344 }
345 fn new_inner(
346 registry: Arc<DapRegistry>,
347 session_id: SessionId,
348 parent_session: Option<Entity<Session>>,
349 breakpoint_store: Entity<BreakpointStore>,
350 config: DebugAdapterConfig,
351 delegate: DapAdapterDelegate,
352 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
353 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
354 cx: AsyncApp,
355 ) -> Task<Result<Self>> {
356 cx.spawn(async move |cx| {
357 let (adapter, binary) =
358 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
359
360 let message_handler = Box::new(move |message| {
361 messages_tx.unbounded_send(message).ok();
362 });
363
364 let client = Arc::new(
365 if let Some(client) = parent_session
366 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
367 .flatten()
368 {
369 client
370 .reconnect(session_id, binary, message_handler, cx.clone())
371 .await?
372 } else {
373 DebugAdapterClient::start(
374 session_id,
375 adapter.name(),
376 binary,
377 message_handler,
378 cx.clone(),
379 )
380 .await
381 .with_context(|| "Failed to start communication with debug adapter")?
382 },
383 );
384
385 let mut session = Self {
386 client,
387 adapter,
388 breakpoint_store,
389 tmp_breakpoint: None,
390 config: config.clone(),
391 };
392
393 on_initialized(&mut session, cx.clone()).await;
394
395 Ok(session)
396 })
397 }
398
399 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
400 let tasks: Vec<_> = paths
401 .into_iter()
402 .map(|path| {
403 self.request(
404 dap_command::SetBreakpoints {
405 source: client_source(path),
406 source_modified: None,
407 breakpoints: vec![],
408 },
409 cx.background_executor().clone(),
410 )
411 })
412 .collect();
413
414 cx.background_spawn(async move {
415 futures::future::join_all(tasks)
416 .await
417 .iter()
418 .for_each(|res| match res {
419 Ok(_) => {}
420 Err(err) => {
421 log::warn!("Set breakpoints request failed: {}", err);
422 }
423 });
424 })
425 }
426
427 fn send_breakpoints_from_path(
428 &self,
429 abs_path: Arc<Path>,
430 reason: BreakpointUpdatedReason,
431 cx: &mut App,
432 ) -> Task<()> {
433 let breakpoints = self
434 .breakpoint_store
435 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
436 .into_iter()
437 .filter(|bp| bp.state.is_enabled())
438 .chain(self.tmp_breakpoint.clone())
439 .map(Into::into)
440 .collect();
441
442 let task = self.request(
443 dap_command::SetBreakpoints {
444 source: client_source(&abs_path),
445 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
446 breakpoints,
447 },
448 cx.background_executor().clone(),
449 );
450
451 cx.background_spawn(async move {
452 match task.await {
453 Ok(_) => {}
454 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
455 }
456 })
457 }
458
459 fn send_all_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
460 let mut breakpoint_tasks = Vec::new();
461 let breakpoints = self
462 .breakpoint_store
463 .read_with(cx, |store, cx| store.all_breakpoints(cx));
464
465 for (path, breakpoints) in breakpoints {
466 let breakpoints = if ignore_breakpoints {
467 vec![]
468 } else {
469 breakpoints
470 .into_iter()
471 .filter(|bp| bp.state.is_enabled())
472 .map(Into::into)
473 .collect()
474 };
475
476 breakpoint_tasks.push(self.request(
477 dap_command::SetBreakpoints {
478 source: client_source(&path),
479 source_modified: Some(false),
480 breakpoints,
481 },
482 cx.background_executor().clone(),
483 ));
484 }
485
486 cx.background_spawn(async move {
487 futures::future::join_all(breakpoint_tasks)
488 .await
489 .iter()
490 .for_each(|res| match res {
491 Ok(_) => {}
492 Err(err) => {
493 log::warn!("Set breakpoints request failed: {}", err);
494 }
495 });
496 })
497 }
498
499 async fn get_adapter_binary(
500 registry: &Arc<DapRegistry>,
501 config: &DebugAdapterConfig,
502 delegate: &DapAdapterDelegate,
503 cx: &mut AsyncApp,
504 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
505 let adapter = registry
506 .adapter(&config.adapter)
507 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
508
509 let binary = cx.update(|cx| {
510 ProjectSettings::get_global(cx)
511 .dap
512 .get(&adapter.name())
513 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
514 })?;
515
516 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
517 Err(error) => {
518 delegate.update_status(
519 adapter.name(),
520 DapStatus::Failed {
521 error: error.to_string(),
522 },
523 );
524
525 return Err(error);
526 }
527 Ok(mut binary) => {
528 delegate.update_status(adapter.name(), DapStatus::None);
529
530 let shell_env = delegate.shell_env().await;
531 let mut envs = binary.envs.unwrap_or_default();
532 envs.extend(shell_env);
533 binary.envs = Some(envs);
534
535 binary
536 }
537 };
538
539 Ok((adapter, binary))
540 }
541
542 pub fn label(&self) -> String {
543 self.config.label.clone()
544 }
545
546 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
547 let adapter_id = self.adapter.name().to_string();
548
549 self.request(Initialize { adapter_id }, cx.background_executor().clone())
550 }
551
552 fn initialize_sequence(
553 &self,
554 capabilities: &Capabilities,
555 initialized_rx: oneshot::Receiver<()>,
556 cx: &App,
557 ) -> Task<Result<()>> {
558 let (mut raw, is_launch) = match &self.config.request {
559 task::DebugRequestDisposition::UserConfigured(_) => {
560 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
561 debug_assert!(false, "This part of code should be unreachable in practice");
562 return Task::ready(Err(anyhow!(
563 "Expected debug config conversion to succeed"
564 )));
565 };
566 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
567 let raw = self.adapter.request_args(&raw);
568 (raw, is_launch)
569 }
570 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
571 start_debugging_request_arguments.configuration.clone(),
572 matches!(
573 start_debugging_request_arguments.request,
574 dap::StartDebuggingRequestArgumentsRequest::Launch
575 ),
576 ),
577 };
578
579 merge_json_value_into(
580 self.config.initialize_args.clone().unwrap_or(json!({})),
581 &mut raw,
582 );
583 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
584 let launch = if is_launch {
585 self.request(Launch { raw }, cx.background_executor().clone())
586 } else {
587 self.request(Attach { raw }, cx.background_executor().clone())
588 };
589
590 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
591
592 let configuration_sequence = cx.spawn({
593 let this = self.clone();
594 async move |cx| {
595 initialized_rx.await?;
596 // todo(debugger) figure out if we want to handle a breakpoint response error
597 // This will probably consist of letting a user know that breakpoints failed to be set
598 cx.update(|cx| this.send_all_breakpoints(false, cx))?.await;
599
600 if configuration_done_supported {
601 this.request(ConfigurationDone {}, cx.background_executor().clone())
602 } else {
603 Task::ready(Ok(()))
604 }
605 .await
606 }
607 });
608
609 cx.background_spawn(async move {
610 futures::future::try_join(launch, configuration_sequence).await?;
611 Ok(())
612 })
613 }
614
615 fn request<R: LocalDapCommand>(
616 &self,
617 request: R,
618 executor: BackgroundExecutor,
619 ) -> Task<Result<R::Response>>
620 where
621 <R::DapRequest as dap::requests::Request>::Response: 'static,
622 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
623 {
624 let request = Arc::new(request);
625
626 let request_clone = request.clone();
627 let connection = self.client.clone();
628 let request_task = executor.spawn(async move {
629 let args = request_clone.to_dap();
630 connection.request::<R::DapRequest>(args).await
631 });
632
633 executor.spawn(async move {
634 let response = request.response_from_dap(request_task.await?);
635 response
636 })
637 }
638}
639impl From<RemoteConnection> for Mode {
640 fn from(value: RemoteConnection) -> Self {
641 Self::Remote(value)
642 }
643}
644
645impl Mode {
646 fn request_dap<R: DapCommand>(
647 &self,
648 session_id: SessionId,
649 request: R,
650 cx: &mut Context<Session>,
651 ) -> Task<Result<R::Response>>
652 where
653 <R::DapRequest as dap::requests::Request>::Response: 'static,
654 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
655 {
656 match self {
657 Mode::Local(debug_adapter_client) => {
658 debug_adapter_client.request(request, cx.background_executor().clone())
659 }
660 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
661 }
662 }
663}
664
665#[derive(Default)]
666struct ThreadStates {
667 global_state: Option<ThreadStatus>,
668 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
669}
670
671impl ThreadStates {
672 fn stop_all_threads(&mut self) {
673 self.global_state = Some(ThreadStatus::Stopped);
674 self.known_thread_states.clear();
675 }
676
677 fn exit_all_threads(&mut self) {
678 self.global_state = Some(ThreadStatus::Exited);
679 self.known_thread_states.clear();
680 }
681
682 fn continue_all_threads(&mut self) {
683 self.global_state = Some(ThreadStatus::Running);
684 self.known_thread_states.clear();
685 }
686
687 fn stop_thread(&mut self, thread_id: ThreadId) {
688 self.known_thread_states
689 .insert(thread_id, ThreadStatus::Stopped);
690 }
691
692 fn continue_thread(&mut self, thread_id: ThreadId) {
693 self.known_thread_states
694 .insert(thread_id, ThreadStatus::Running);
695 }
696
697 fn process_step(&mut self, thread_id: ThreadId) {
698 self.known_thread_states
699 .insert(thread_id, ThreadStatus::Stepping);
700 }
701
702 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
703 self.thread_state(thread_id)
704 .unwrap_or(ThreadStatus::Running)
705 }
706
707 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
708 self.known_thread_states
709 .get(&thread_id)
710 .copied()
711 .or(self.global_state)
712 }
713
714 fn exit_thread(&mut self, thread_id: ThreadId) {
715 self.known_thread_states
716 .insert(thread_id, ThreadStatus::Exited);
717 }
718
719 fn any_stopped_thread(&self) -> bool {
720 self.global_state
721 .is_some_and(|state| state == ThreadStatus::Stopped)
722 || self
723 .known_thread_states
724 .values()
725 .any(|status| *status == ThreadStatus::Stopped)
726 }
727}
728const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
729
730#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
731pub struct OutputToken(pub usize);
732/// Represents a current state of a single debug adapter and provides ways to mutate it.
733pub struct Session {
734 mode: Mode,
735 pub(super) capabilities: Capabilities,
736 id: SessionId,
737 child_session_ids: HashSet<SessionId>,
738 parent_id: Option<SessionId>,
739 ignore_breakpoints: bool,
740 modules: Vec<dap::Module>,
741 loaded_sources: Vec<dap::Source>,
742 output_token: OutputToken,
743 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
744 threads: IndexMap<ThreadId, Thread>,
745 thread_states: ThreadStates,
746 variables: HashMap<VariableReference, Vec<dap::Variable>>,
747 stack_frames: IndexMap<StackFrameId, StackFrame>,
748 locations: HashMap<u64, dap::LocationsResponse>,
749 is_session_terminated: bool,
750 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
751 _background_tasks: Vec<Task<()>>,
752}
753
754trait CacheableCommand: Any + Send + Sync {
755 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
756 fn dyn_hash(&self, hasher: &mut dyn Hasher);
757 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
758}
759
760impl<T> CacheableCommand for T
761where
762 T: DapCommand + PartialEq + Eq + Hash,
763{
764 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
765 (rhs as &dyn Any)
766 .downcast_ref::<Self>()
767 .map_or(false, |rhs| self == rhs)
768 }
769
770 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
771 T::hash(self, &mut hasher);
772 }
773
774 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
775 self
776 }
777}
778
779pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
780
781impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
782 fn from(request: T) -> Self {
783 Self(Arc::new(request))
784 }
785}
786
787impl PartialEq for RequestSlot {
788 fn eq(&self, other: &Self) -> bool {
789 self.0.dyn_eq(other.0.as_ref())
790 }
791}
792
793impl Eq for RequestSlot {}
794
795impl Hash for RequestSlot {
796 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
797 self.0.dyn_hash(state);
798 (&*self.0 as &dyn Any).type_id().hash(state)
799 }
800}
801
802#[derive(Debug, Clone, Hash, PartialEq, Eq)]
803pub struct CompletionsQuery {
804 pub query: String,
805 pub column: u64,
806 pub line: Option<u64>,
807 pub frame_id: Option<u64>,
808}
809
810impl CompletionsQuery {
811 pub fn new(
812 buffer: &language::Buffer,
813 cursor_position: language::Anchor,
814 frame_id: Option<u64>,
815 ) -> Self {
816 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
817 Self {
818 query: buffer.text(),
819 column: column as u64,
820 frame_id,
821 line: Some(row as u64),
822 }
823 }
824}
825
826pub enum SessionEvent {
827 Modules,
828 LoadedSources,
829 Stopped(Option<ThreadId>),
830 StackTrace,
831 Variables,
832 Threads,
833}
834
835pub(crate) enum SessionStateEvent {
836 Shutdown,
837}
838
839impl EventEmitter<SessionEvent> for Session {}
840impl EventEmitter<SessionStateEvent> for Session {}
841
842// local session will send breakpoint updates to DAP for all new breakpoints
843// remote side will only send breakpoint updates when it is a breakpoint created by that peer
844// BreakpointStore notifies session on breakpoint changes
845impl Session {
846 pub(crate) fn local(
847 breakpoint_store: Entity<BreakpointStore>,
848 session_id: SessionId,
849 parent_session: Option<Entity<Session>>,
850 delegate: DapAdapterDelegate,
851 config: DebugAdapterConfig,
852 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
853 initialized_tx: oneshot::Sender<()>,
854 debug_adapters: Arc<DapRegistry>,
855 cx: &mut App,
856 ) -> Task<Result<Entity<Self>>> {
857 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
858
859 cx.spawn(async move |cx| {
860 let mode = LocalMode::new(
861 debug_adapters,
862 session_id,
863 parent_session.clone(),
864 breakpoint_store.clone(),
865 config.clone(),
866 delegate,
867 message_tx,
868 cx.clone(),
869 )
870 .await?;
871
872 cx.new(|cx| {
873 create_local_session(
874 breakpoint_store,
875 session_id,
876 parent_session,
877 start_debugging_requests_tx,
878 initialized_tx,
879 message_rx,
880 mode,
881 cx,
882 )
883 })
884 })
885 }
886
887 #[cfg(any(test, feature = "test-support"))]
888 pub(crate) fn fake(
889 breakpoint_store: Entity<BreakpointStore>,
890 session_id: SessionId,
891 parent_session: Option<Entity<Session>>,
892 delegate: DapAdapterDelegate,
893 config: DebugAdapterConfig,
894 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
895 initialized_tx: oneshot::Sender<()>,
896 caps: Capabilities,
897 fails: bool,
898 cx: &mut App,
899 ) -> Task<Result<Entity<Session>>> {
900 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
901
902 cx.spawn(async move |cx| {
903 let mode = LocalMode::new_fake(
904 session_id,
905 parent_session.clone(),
906 breakpoint_store.clone(),
907 config.clone(),
908 delegate,
909 message_tx,
910 caps,
911 fails,
912 cx.clone(),
913 )
914 .await?;
915
916 cx.new(|cx| {
917 create_local_session(
918 breakpoint_store,
919 session_id,
920 parent_session,
921 start_debugging_requests_tx,
922 initialized_tx,
923 message_rx,
924 mode,
925 cx,
926 )
927 })
928 })
929 }
930
931 pub(crate) fn remote(
932 session_id: SessionId,
933 client: AnyProtoClient,
934 upstream_project_id: u64,
935 ignore_breakpoints: bool,
936 ) -> Self {
937 Self {
938 mode: Mode::Remote(RemoteConnection {
939 _client: client,
940 _upstream_project_id: upstream_project_id,
941 }),
942 id: session_id,
943 child_session_ids: HashSet::default(),
944 parent_id: None,
945 capabilities: Capabilities::default(),
946 ignore_breakpoints,
947 variables: Default::default(),
948 stack_frames: Default::default(),
949 thread_states: ThreadStates::default(),
950 output_token: OutputToken(0),
951 output: circular_buffer::CircularBuffer::boxed(),
952 requests: HashMap::default(),
953 modules: Vec::default(),
954 loaded_sources: Vec::default(),
955 threads: IndexMap::default(),
956 _background_tasks: Vec::default(),
957 locations: Default::default(),
958 is_session_terminated: false,
959 }
960 }
961
962 pub fn session_id(&self) -> SessionId {
963 self.id
964 }
965
966 pub fn child_session_ids(&self) -> HashSet<SessionId> {
967 self.child_session_ids.clone()
968 }
969
970 pub fn add_child_session_id(&mut self, session_id: SessionId) {
971 self.child_session_ids.insert(session_id);
972 }
973
974 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
975 self.child_session_ids.remove(&session_id);
976 }
977
978 pub fn parent_id(&self) -> Option<SessionId> {
979 self.parent_id
980 }
981
982 pub fn capabilities(&self) -> &Capabilities {
983 &self.capabilities
984 }
985
986 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
987 if let Mode::Local(local_mode) = &self.mode {
988 Some(local_mode.config.clone())
989 } else {
990 None
991 }
992 }
993
994 pub fn is_terminated(&self) -> bool {
995 self.is_session_terminated
996 }
997
998 pub fn is_local(&self) -> bool {
999 matches!(self.mode, Mode::Local(_))
1000 }
1001
1002 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1003 match &mut self.mode {
1004 Mode::Local(local_mode) => Some(local_mode),
1005 Mode::Remote(_) => None,
1006 }
1007 }
1008
1009 pub fn as_local(&self) -> Option<&LocalMode> {
1010 match &self.mode {
1011 Mode::Local(local_mode) => Some(local_mode),
1012 Mode::Remote(_) => None,
1013 }
1014 }
1015
1016 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1017 match &self.mode {
1018 Mode::Local(local_mode) => {
1019 let capabilities = local_mode.clone().request_initialization(cx);
1020
1021 cx.spawn(async move |this, cx| {
1022 let capabilities = capabilities.await?;
1023 this.update(cx, |session, _| {
1024 session.capabilities = capabilities;
1025 })?;
1026 Ok(())
1027 })
1028 }
1029 Mode::Remote(_) => Task::ready(Err(anyhow!(
1030 "Cannot send initialize request from remote session"
1031 ))),
1032 }
1033 }
1034
1035 pub(super) fn initialize_sequence(
1036 &mut self,
1037 initialize_rx: oneshot::Receiver<()>,
1038 cx: &mut Context<Self>,
1039 ) -> Task<Result<()>> {
1040 match &self.mode {
1041 Mode::Local(local_mode) => {
1042 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1043 }
1044 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1045 }
1046 }
1047
1048 pub fn run_to_position(
1049 &mut self,
1050 breakpoint: SourceBreakpoint,
1051 active_thread_id: ThreadId,
1052 cx: &mut Context<Self>,
1053 ) {
1054 match &mut self.mode {
1055 Mode::Local(local_mode) => {
1056 if !matches!(
1057 self.thread_states.thread_state(active_thread_id),
1058 Some(ThreadStatus::Stopped)
1059 ) {
1060 return;
1061 };
1062 let path = breakpoint.path.clone();
1063 local_mode.tmp_breakpoint = Some(breakpoint);
1064 let task = local_mode.send_breakpoints_from_path(
1065 path,
1066 BreakpointUpdatedReason::Toggled,
1067 cx,
1068 );
1069
1070 cx.spawn(async move |this, cx| {
1071 task.await;
1072 this.update(cx, |this, cx| {
1073 this.continue_thread(active_thread_id, cx);
1074 })
1075 })
1076 .detach();
1077 }
1078 Mode::Remote(_) => {}
1079 }
1080 }
1081
1082 pub fn output(
1083 &self,
1084 since: OutputToken,
1085 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1086 if self.output_token.0 == 0 {
1087 return (self.output.range(0..0), OutputToken(0));
1088 };
1089
1090 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1091
1092 let clamped_events_since = events_since.clamp(0, self.output.len());
1093 (
1094 self.output
1095 .range(self.output.len() - clamped_events_since..),
1096 self.output_token,
1097 )
1098 }
1099
1100 pub fn respond_to_client(
1101 &self,
1102 request_seq: u64,
1103 success: bool,
1104 command: String,
1105 body: Option<serde_json::Value>,
1106 cx: &mut Context<Self>,
1107 ) -> Task<Result<()>> {
1108 let Some(local_session) = self.as_local().cloned() else {
1109 unreachable!("Cannot respond to remote client");
1110 };
1111
1112 cx.background_spawn(async move {
1113 local_session
1114 .client
1115 .send_message(Message::Response(Response {
1116 body,
1117 success,
1118 command,
1119 seq: request_seq + 1,
1120 request_seq,
1121 message: None,
1122 }))
1123 .await
1124 })
1125 }
1126
1127 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1128 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1129 let breakpoint = local.tmp_breakpoint.take()?;
1130 let path = breakpoint.path.clone();
1131 Some((local, path))
1132 }) {
1133 local
1134 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1135 .detach();
1136 };
1137
1138 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1139 self.thread_states.stop_all_threads();
1140
1141 self.invalidate_command_type::<StackTraceCommand>();
1142 }
1143
1144 // Event if we stopped all threads we still need to insert the thread_id
1145 // to our own data
1146 if let Some(thread_id) = event.thread_id {
1147 self.thread_states.stop_thread(ThreadId(thread_id));
1148
1149 self.invalidate_state(
1150 &StackTraceCommand {
1151 thread_id,
1152 start_frame: None,
1153 levels: None,
1154 }
1155 .into(),
1156 );
1157 }
1158
1159 self.invalidate_generic();
1160 self.threads.clear();
1161 self.variables.clear();
1162 cx.emit(SessionEvent::Stopped(
1163 event
1164 .thread_id
1165 .map(Into::into)
1166 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1167 ));
1168 cx.notify();
1169 }
1170
1171 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1172 match *event {
1173 Events::Initialized(_) => {
1174 debug_assert!(
1175 false,
1176 "Initialized event should have been handled in LocalMode"
1177 );
1178 }
1179 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1180 Events::Continued(event) => {
1181 if event.all_threads_continued.unwrap_or_default() {
1182 self.thread_states.continue_all_threads();
1183 } else {
1184 self.thread_states
1185 .continue_thread(ThreadId(event.thread_id));
1186 }
1187 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1188 self.invalidate_generic();
1189 }
1190 Events::Exited(_event) => {
1191 self.clear_active_debug_line(cx);
1192 }
1193 Events::Terminated(_) => {
1194 self.is_session_terminated = true;
1195 self.clear_active_debug_line(cx);
1196 }
1197 Events::Thread(event) => {
1198 let thread_id = ThreadId(event.thread_id);
1199
1200 match event.reason {
1201 dap::ThreadEventReason::Started => {
1202 self.thread_states.continue_thread(thread_id);
1203 }
1204 dap::ThreadEventReason::Exited => {
1205 self.thread_states.exit_thread(thread_id);
1206 }
1207 reason => {
1208 log::error!("Unhandled thread event reason {:?}", reason);
1209 }
1210 }
1211 self.invalidate_state(&ThreadsCommand.into());
1212 cx.notify();
1213 }
1214 Events::Output(event) => {
1215 if event
1216 .category
1217 .as_ref()
1218 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1219 {
1220 return;
1221 }
1222
1223 self.output.push_back(event);
1224 self.output_token.0 += 1;
1225 cx.notify();
1226 }
1227 Events::Breakpoint(_) => {}
1228 Events::Module(event) => {
1229 match event.reason {
1230 dap::ModuleEventReason::New => {
1231 self.modules.push(event.module);
1232 }
1233 dap::ModuleEventReason::Changed => {
1234 if let Some(module) = self
1235 .modules
1236 .iter_mut()
1237 .find(|other| event.module.id == other.id)
1238 {
1239 *module = event.module;
1240 }
1241 }
1242 dap::ModuleEventReason::Removed => {
1243 self.modules.retain(|other| event.module.id != other.id);
1244 }
1245 }
1246
1247 // todo(debugger): We should only send the invalidate command to downstream clients.
1248 // self.invalidate_state(&ModulesCommand.into());
1249 }
1250 Events::LoadedSource(_) => {
1251 self.invalidate_state(&LoadedSourcesCommand.into());
1252 }
1253 Events::Capabilities(event) => {
1254 self.capabilities = self.capabilities.merge(event.capabilities);
1255 cx.notify();
1256 }
1257 Events::Memory(_) => {}
1258 Events::Process(_) => {}
1259 Events::ProgressEnd(_) => {}
1260 Events::ProgressStart(_) => {}
1261 Events::ProgressUpdate(_) => {}
1262 Events::Invalidated(_) => {}
1263 Events::Other(_) => {}
1264 }
1265 }
1266
1267 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1268 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1269 &mut self,
1270 request: T,
1271 process_result: impl FnOnce(
1272 &mut Self,
1273 Result<T::Response>,
1274 &mut Context<Self>,
1275 ) -> Option<T::Response>
1276 + 'static,
1277 cx: &mut Context<Self>,
1278 ) {
1279 const {
1280 assert!(
1281 T::CACHEABLE,
1282 "Only requests marked as cacheable should invoke `fetch`"
1283 );
1284 }
1285
1286 if !self.thread_states.any_stopped_thread()
1287 && request.type_id() != TypeId::of::<ThreadsCommand>()
1288 || self.is_session_terminated
1289 {
1290 return;
1291 }
1292
1293 let request_map = self
1294 .requests
1295 .entry(std::any::TypeId::of::<T>())
1296 .or_default();
1297
1298 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1299 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1300
1301 let task = Self::request_inner::<Arc<T>>(
1302 &self.capabilities,
1303 self.id,
1304 &self.mode,
1305 command,
1306 process_result,
1307 cx,
1308 );
1309 let task = cx
1310 .background_executor()
1311 .spawn(async move {
1312 let _ = task.await?;
1313 Some(())
1314 })
1315 .shared();
1316
1317 vacant.insert(task);
1318 cx.notify();
1319 }
1320 }
1321
1322 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1323 capabilities: &Capabilities,
1324 session_id: SessionId,
1325 mode: &Mode,
1326 request: T,
1327 process_result: impl FnOnce(
1328 &mut Self,
1329 Result<T::Response>,
1330 &mut Context<Self>,
1331 ) -> Option<T::Response>
1332 + 'static,
1333 cx: &mut Context<Self>,
1334 ) -> Task<Option<T::Response>> {
1335 if !T::is_supported(&capabilities) {
1336 log::warn!(
1337 "Attempted to send a DAP request that isn't supported: {:?}",
1338 request
1339 );
1340 let error = Err(anyhow::Error::msg(
1341 "Couldn't complete request because it's not supported",
1342 ));
1343 return cx.spawn(async move |this, cx| {
1344 this.update(cx, |this, cx| process_result(this, error, cx))
1345 .log_err()
1346 .flatten()
1347 });
1348 }
1349
1350 let request = mode.request_dap(session_id, request, cx);
1351 cx.spawn(async move |this, cx| {
1352 let result = request.await;
1353 this.update(cx, |this, cx| process_result(this, result, cx))
1354 .log_err()
1355 .flatten()
1356 })
1357 }
1358
1359 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1360 &self,
1361 request: T,
1362 process_result: impl FnOnce(
1363 &mut Self,
1364 Result<T::Response>,
1365 &mut Context<Self>,
1366 ) -> Option<T::Response>
1367 + 'static,
1368 cx: &mut Context<Self>,
1369 ) -> Task<Option<T::Response>> {
1370 Self::request_inner(
1371 &self.capabilities,
1372 self.id,
1373 &self.mode,
1374 request,
1375 process_result,
1376 cx,
1377 )
1378 }
1379
1380 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1381 self.requests.remove(&std::any::TypeId::of::<Command>());
1382 }
1383
1384 fn invalidate_generic(&mut self) {
1385 self.invalidate_command_type::<ModulesCommand>();
1386 self.invalidate_command_type::<LoadedSourcesCommand>();
1387 self.invalidate_command_type::<ThreadsCommand>();
1388 }
1389
1390 fn invalidate_state(&mut self, key: &RequestSlot) {
1391 self.requests
1392 .entry((&*key.0 as &dyn Any).type_id())
1393 .and_modify(|request_map| {
1394 request_map.remove(&key);
1395 });
1396 }
1397
1398 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1399 self.thread_states.thread_status(thread_id)
1400 }
1401
1402 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1403 self.fetch(
1404 dap_command::ThreadsCommand,
1405 |this, result, cx| {
1406 let result = result.log_err()?;
1407
1408 this.threads = result
1409 .iter()
1410 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1411 .collect();
1412
1413 this.invalidate_command_type::<StackTraceCommand>();
1414 cx.emit(SessionEvent::Threads);
1415 cx.notify();
1416
1417 Some(result)
1418 },
1419 cx,
1420 );
1421
1422 self.threads
1423 .values()
1424 .map(|thread| {
1425 (
1426 thread.dap.clone(),
1427 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1428 )
1429 })
1430 .collect()
1431 }
1432
1433 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1434 self.fetch(
1435 dap_command::ModulesCommand,
1436 |this, result, cx| {
1437 let result = result.log_err()?;
1438
1439 this.modules = result.iter().cloned().collect();
1440 cx.emit(SessionEvent::Modules);
1441 cx.notify();
1442
1443 Some(result)
1444 },
1445 cx,
1446 );
1447
1448 &self.modules
1449 }
1450
1451 pub fn ignore_breakpoints(&self) -> bool {
1452 self.ignore_breakpoints
1453 }
1454
1455 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1456 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1457 }
1458
1459 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1460 if self.ignore_breakpoints == ignore {
1461 return Task::ready(());
1462 }
1463
1464 self.ignore_breakpoints = ignore;
1465
1466 if let Some(local) = self.as_local() {
1467 local.send_all_breakpoints(ignore, cx)
1468 } else {
1469 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1470 unimplemented!()
1471 }
1472 }
1473
1474 pub fn breakpoints_enabled(&self) -> bool {
1475 self.ignore_breakpoints
1476 }
1477
1478 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1479 self.fetch(
1480 dap_command::LoadedSourcesCommand,
1481 |this, result, cx| {
1482 let result = result.log_err()?;
1483 this.loaded_sources = result.iter().cloned().collect();
1484 cx.emit(SessionEvent::LoadedSources);
1485 cx.notify();
1486 Some(result)
1487 },
1488 cx,
1489 );
1490
1491 &self.loaded_sources
1492 }
1493
1494 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1495 res.log_err()?;
1496 Some(())
1497 }
1498
1499 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1500 thread_id: ThreadId,
1501 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1502 {
1503 move |this, response, cx| match response.log_err() {
1504 Some(response) => Some(response),
1505 None => {
1506 this.thread_states.stop_thread(thread_id);
1507 cx.notify();
1508 None
1509 }
1510 }
1511 }
1512
1513 fn clear_active_debug_line_response(
1514 &mut self,
1515 response: Result<()>,
1516 cx: &mut Context<Session>,
1517 ) -> Option<()> {
1518 response.log_err()?;
1519 self.clear_active_debug_line(cx);
1520 Some(())
1521 }
1522
1523 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1524 self.as_local()
1525 .expect("Message handler will only run in local mode")
1526 .breakpoint_store
1527 .update(cx, |store, cx| {
1528 store.remove_active_position(Some(self.id), cx)
1529 });
1530 }
1531
1532 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1533 self.request(
1534 PauseCommand {
1535 thread_id: thread_id.0,
1536 },
1537 Self::empty_response,
1538 cx,
1539 )
1540 .detach();
1541 }
1542
1543 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1544 self.request(
1545 RestartStackFrameCommand { stack_frame_id },
1546 Self::empty_response,
1547 cx,
1548 )
1549 .detach();
1550 }
1551
1552 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1553 if self.capabilities.supports_restart_request.unwrap_or(false) {
1554 self.request(
1555 RestartCommand {
1556 raw: args.unwrap_or(Value::Null),
1557 },
1558 Self::empty_response,
1559 cx,
1560 )
1561 .detach();
1562 } else {
1563 self.request(
1564 DisconnectCommand {
1565 restart: Some(false),
1566 terminate_debuggee: Some(true),
1567 suspend_debuggee: Some(false),
1568 },
1569 Self::empty_response,
1570 cx,
1571 )
1572 .detach();
1573 }
1574 }
1575
1576 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1577 self.is_session_terminated = true;
1578 self.thread_states.exit_all_threads();
1579 cx.notify();
1580
1581 let task = if self
1582 .capabilities
1583 .supports_terminate_request
1584 .unwrap_or_default()
1585 {
1586 self.request(
1587 TerminateCommand {
1588 restart: Some(false),
1589 },
1590 Self::clear_active_debug_line_response,
1591 cx,
1592 )
1593 } else {
1594 self.request(
1595 DisconnectCommand {
1596 restart: Some(false),
1597 terminate_debuggee: Some(true),
1598 suspend_debuggee: Some(false),
1599 },
1600 Self::clear_active_debug_line_response,
1601 cx,
1602 )
1603 };
1604
1605 cx.emit(SessionStateEvent::Shutdown);
1606
1607 cx.background_spawn(async move {
1608 let _ = task.await;
1609 })
1610 }
1611
1612 pub fn completions(
1613 &mut self,
1614 query: CompletionsQuery,
1615 cx: &mut Context<Self>,
1616 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1617 let task = self.request(query, |_, result, _| result.log_err(), cx);
1618
1619 cx.background_executor().spawn(async move {
1620 anyhow::Ok(
1621 task.await
1622 .map(|response| response.targets)
1623 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1624 )
1625 })
1626 }
1627
1628 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1629 self.thread_states.continue_thread(thread_id);
1630 self.request(
1631 ContinueCommand {
1632 args: ContinueArguments {
1633 thread_id: thread_id.0,
1634 single_thread: Some(true),
1635 },
1636 },
1637 Self::on_step_response::<ContinueCommand>(thread_id),
1638 cx,
1639 )
1640 .detach();
1641 }
1642
1643 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1644 match self.mode {
1645 Mode::Local(ref local) => Some(local.client.clone()),
1646 Mode::Remote(_) => None,
1647 }
1648 }
1649
1650 pub fn step_over(
1651 &mut self,
1652 thread_id: ThreadId,
1653 granularity: SteppingGranularity,
1654 cx: &mut Context<Self>,
1655 ) {
1656 let supports_single_thread_execution_requests =
1657 self.capabilities.supports_single_thread_execution_requests;
1658 let supports_stepping_granularity = self
1659 .capabilities
1660 .supports_stepping_granularity
1661 .unwrap_or_default();
1662
1663 let command = NextCommand {
1664 inner: StepCommand {
1665 thread_id: thread_id.0,
1666 granularity: supports_stepping_granularity.then(|| granularity),
1667 single_thread: supports_single_thread_execution_requests,
1668 },
1669 };
1670
1671 self.thread_states.process_step(thread_id);
1672 self.request(
1673 command,
1674 Self::on_step_response::<NextCommand>(thread_id),
1675 cx,
1676 )
1677 .detach();
1678 }
1679
1680 pub fn step_in(
1681 &mut self,
1682 thread_id: ThreadId,
1683 granularity: SteppingGranularity,
1684 cx: &mut Context<Self>,
1685 ) {
1686 let supports_single_thread_execution_requests =
1687 self.capabilities.supports_single_thread_execution_requests;
1688 let supports_stepping_granularity = self
1689 .capabilities
1690 .supports_stepping_granularity
1691 .unwrap_or_default();
1692
1693 let command = StepInCommand {
1694 inner: StepCommand {
1695 thread_id: thread_id.0,
1696 granularity: supports_stepping_granularity.then(|| granularity),
1697 single_thread: supports_single_thread_execution_requests,
1698 },
1699 };
1700
1701 self.thread_states.process_step(thread_id);
1702 self.request(
1703 command,
1704 Self::on_step_response::<StepInCommand>(thread_id),
1705 cx,
1706 )
1707 .detach();
1708 }
1709
1710 pub fn step_out(
1711 &mut self,
1712 thread_id: ThreadId,
1713 granularity: SteppingGranularity,
1714 cx: &mut Context<Self>,
1715 ) {
1716 let supports_single_thread_execution_requests =
1717 self.capabilities.supports_single_thread_execution_requests;
1718 let supports_stepping_granularity = self
1719 .capabilities
1720 .supports_stepping_granularity
1721 .unwrap_or_default();
1722
1723 let command = StepOutCommand {
1724 inner: StepCommand {
1725 thread_id: thread_id.0,
1726 granularity: supports_stepping_granularity.then(|| granularity),
1727 single_thread: supports_single_thread_execution_requests,
1728 },
1729 };
1730
1731 self.thread_states.process_step(thread_id);
1732 self.request(
1733 command,
1734 Self::on_step_response::<StepOutCommand>(thread_id),
1735 cx,
1736 )
1737 .detach();
1738 }
1739
1740 pub fn step_back(
1741 &mut self,
1742 thread_id: ThreadId,
1743 granularity: SteppingGranularity,
1744 cx: &mut Context<Self>,
1745 ) {
1746 let supports_single_thread_execution_requests =
1747 self.capabilities.supports_single_thread_execution_requests;
1748 let supports_stepping_granularity = self
1749 .capabilities
1750 .supports_stepping_granularity
1751 .unwrap_or_default();
1752
1753 let command = StepBackCommand {
1754 inner: StepCommand {
1755 thread_id: thread_id.0,
1756 granularity: supports_stepping_granularity.then(|| granularity),
1757 single_thread: supports_single_thread_execution_requests,
1758 },
1759 };
1760
1761 self.thread_states.process_step(thread_id);
1762
1763 self.request(
1764 command,
1765 Self::on_step_response::<StepBackCommand>(thread_id),
1766 cx,
1767 )
1768 .detach();
1769 }
1770
1771 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1772 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1773 && self.requests.contains_key(&ThreadsCommand.type_id())
1774 && self.threads.contains_key(&thread_id)
1775 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1776 // We could still be using an old thread id and have sent a new thread's request
1777 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1778 // But it very well could cause a minor bug in the future that is hard to track down
1779 {
1780 self.fetch(
1781 super::dap_command::StackTraceCommand {
1782 thread_id: thread_id.0,
1783 start_frame: None,
1784 levels: None,
1785 },
1786 move |this, stack_frames, cx| {
1787 let stack_frames = stack_frames.log_err()?;
1788
1789 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1790 thread.stack_frame_ids =
1791 stack_frames.iter().map(|frame| frame.id).collect();
1792 });
1793 debug_assert!(
1794 matches!(entry, indexmap::map::Entry::Occupied(_)),
1795 "Sent request for thread_id that doesn't exist"
1796 );
1797
1798 this.stack_frames.extend(
1799 stack_frames
1800 .iter()
1801 .cloned()
1802 .map(|frame| (frame.id, StackFrame::from(frame))),
1803 );
1804
1805 this.invalidate_command_type::<ScopesCommand>();
1806 this.invalidate_command_type::<VariablesCommand>();
1807
1808 cx.emit(SessionEvent::StackTrace);
1809 cx.notify();
1810 Some(stack_frames)
1811 },
1812 cx,
1813 );
1814 }
1815
1816 self.threads
1817 .get(&thread_id)
1818 .map(|thread| {
1819 thread
1820 .stack_frame_ids
1821 .iter()
1822 .filter_map(|id| self.stack_frames.get(id))
1823 .cloned()
1824 .collect()
1825 })
1826 .unwrap_or_default()
1827 }
1828
1829 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1830 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1831 && self
1832 .requests
1833 .contains_key(&TypeId::of::<StackTraceCommand>())
1834 {
1835 self.fetch(
1836 ScopesCommand { stack_frame_id },
1837 move |this, scopes, cx| {
1838 let scopes = scopes.log_err()?;
1839
1840 for scope in scopes .iter(){
1841 this.variables(scope.variables_reference, cx);
1842 }
1843
1844 let entry = this
1845 .stack_frames
1846 .entry(stack_frame_id)
1847 .and_modify(|stack_frame| {
1848 stack_frame.scopes = scopes.clone();
1849 });
1850
1851 cx.emit(SessionEvent::Variables);
1852
1853 debug_assert!(
1854 matches!(entry, indexmap::map::Entry::Occupied(_)),
1855 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1856 );
1857
1858 Some(scopes)
1859 },
1860 cx,
1861 );
1862 }
1863
1864 self.stack_frames
1865 .get(&stack_frame_id)
1866 .map(|frame| frame.scopes.as_slice())
1867 .unwrap_or_default()
1868 }
1869
1870 pub fn variables(
1871 &mut self,
1872 variables_reference: VariableReference,
1873 cx: &mut Context<Self>,
1874 ) -> Vec<dap::Variable> {
1875 let command = VariablesCommand {
1876 variables_reference,
1877 filter: None,
1878 start: None,
1879 count: None,
1880 format: None,
1881 };
1882
1883 self.fetch(
1884 command,
1885 move |this, variables, cx| {
1886 let variables = variables.log_err()?;
1887 this.variables
1888 .insert(variables_reference, variables.clone());
1889
1890 cx.emit(SessionEvent::Variables);
1891 Some(variables)
1892 },
1893 cx,
1894 );
1895
1896 self.variables
1897 .get(&variables_reference)
1898 .cloned()
1899 .unwrap_or_default()
1900 }
1901
1902 pub fn set_variable_value(
1903 &mut self,
1904 variables_reference: u64,
1905 name: String,
1906 value: String,
1907 cx: &mut Context<Self>,
1908 ) {
1909 if self.capabilities.supports_set_variable.unwrap_or_default() {
1910 self.request(
1911 SetVariableValueCommand {
1912 name,
1913 value,
1914 variables_reference,
1915 },
1916 move |this, response, cx| {
1917 let response = response.log_err()?;
1918 this.invalidate_command_type::<VariablesCommand>();
1919 cx.notify();
1920 Some(response)
1921 },
1922 cx,
1923 )
1924 .detach()
1925 }
1926 }
1927
1928 pub fn evaluate(
1929 &mut self,
1930 expression: String,
1931 context: Option<EvaluateArgumentsContext>,
1932 frame_id: Option<u64>,
1933 source: Option<Source>,
1934 cx: &mut Context<Self>,
1935 ) {
1936 self.request(
1937 EvaluateCommand {
1938 expression,
1939 context,
1940 frame_id,
1941 source,
1942 },
1943 |this, response, cx| {
1944 let response = response.log_err()?;
1945 this.output_token.0 += 1;
1946 this.output.push_back(dap::OutputEvent {
1947 category: None,
1948 output: response.result.clone(),
1949 group: None,
1950 variables_reference: Some(response.variables_reference),
1951 source: None,
1952 line: None,
1953 column: None,
1954 data: None,
1955 location_reference: None,
1956 });
1957
1958 this.invalidate_command_type::<ScopesCommand>();
1959 cx.notify();
1960 Some(response)
1961 },
1962 cx,
1963 )
1964 .detach();
1965 }
1966
1967 pub fn location(
1968 &mut self,
1969 reference: u64,
1970 cx: &mut Context<Self>,
1971 ) -> Option<dap::LocationsResponse> {
1972 self.fetch(
1973 LocationsCommand { reference },
1974 move |this, response, _| {
1975 let response = response.log_err()?;
1976 this.locations.insert(reference, response.clone());
1977 Some(response)
1978 },
1979 cx,
1980 );
1981 self.locations.get(&reference).cloned()
1982 }
1983 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1984 let command = DisconnectCommand {
1985 restart: Some(false),
1986 terminate_debuggee: Some(true),
1987 suspend_debuggee: Some(false),
1988 };
1989
1990 self.request(command, Self::empty_response, cx).detach()
1991 }
1992
1993 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1994 if self
1995 .capabilities
1996 .supports_terminate_threads_request
1997 .unwrap_or_default()
1998 {
1999 self.request(
2000 TerminateThreadsCommand {
2001 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2002 },
2003 Self::clear_active_debug_line_response,
2004 cx,
2005 )
2006 .detach();
2007 } else {
2008 self.shutdown(cx).detach();
2009 }
2010 }
2011}
2012
2013fn create_local_session(
2014 breakpoint_store: Entity<BreakpointStore>,
2015 session_id: SessionId,
2016 parent_session: Option<Entity<Session>>,
2017 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
2018 initialized_tx: oneshot::Sender<()>,
2019 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
2020 mode: LocalMode,
2021 cx: &mut Context<Session>,
2022) -> Session {
2023 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
2024 let mut initialized_tx = Some(initialized_tx);
2025 while let Some(message) = message_rx.next().await {
2026 if let Message::Event(event) = message {
2027 if let Events::Initialized(_) = *event {
2028 if let Some(tx) = initialized_tx.take() {
2029 tx.send(()).ok();
2030 }
2031 } else {
2032 let Ok(_) = this.update(cx, |session, cx| {
2033 session.handle_dap_event(event, cx);
2034 }) else {
2035 break;
2036 };
2037 }
2038 } else {
2039 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
2040 else {
2041 break;
2042 };
2043 }
2044 }
2045 })];
2046
2047 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2048 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2049 if let Some(local) = (!this.ignore_breakpoints)
2050 .then(|| this.as_local_mut())
2051 .flatten()
2052 {
2053 local
2054 .send_breakpoints_from_path(path.clone(), *reason, cx)
2055 .detach();
2056 };
2057 }
2058 BreakpointStoreEvent::BreakpointsCleared(paths) => {
2059 if let Some(local) = (!this.ignore_breakpoints)
2060 .then(|| this.as_local_mut())
2061 .flatten()
2062 {
2063 local.unset_breakpoints_from_paths(paths, cx).detach();
2064 }
2065 }
2066 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2067 })
2068 .detach();
2069
2070 Session {
2071 mode: Mode::Local(mode),
2072 id: session_id,
2073 child_session_ids: HashSet::default(),
2074 parent_id: parent_session.map(|session| session.read(cx).id),
2075 variables: Default::default(),
2076 capabilities: Capabilities::default(),
2077 thread_states: ThreadStates::default(),
2078 output_token: OutputToken(0),
2079 ignore_breakpoints: false,
2080 output: circular_buffer::CircularBuffer::boxed(),
2081 requests: HashMap::default(),
2082 modules: Vec::default(),
2083 loaded_sources: Vec::default(),
2084 threads: IndexMap::default(),
2085 stack_frames: IndexMap::default(),
2086 locations: Default::default(),
2087 _background_tasks,
2088 is_session_terminated: false,
2089 }
2090}