1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{
4 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
5};
6use super::dap_command::{
7 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
8 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
9 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
10 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
11 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
12 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
13};
14use super::dap_store::DapAdapterDelegate;
15use anyhow::{Context as _, Result, anyhow};
16use collections::{HashMap, HashSet, IndexMap, IndexSet};
17use dap::adapters::{DebugAdapter, DebugAdapterBinary};
18use dap::messages::Response;
19use dap::{
20 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
21 SteppingGranularity, StoppedEvent, VariableReference,
22 adapters::{DapDelegate, DapStatus},
23 client::{DebugAdapterClient, SessionId},
24 messages::{Events, Message},
25};
26use dap::{
27 DapRegistry, DebugRequestType, ExceptionBreakpointsFilter, ExceptionFilterOptions,
28 OutputEventCategory,
29};
30use futures::channel::oneshot;
31use futures::{FutureExt, future::Shared};
32use gpui::{
33 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
34 Task, WeakEntity,
35};
36use rpc::AnyProtoClient;
37use serde_json::{Value, json};
38use settings::Settings;
39use smol::stream::StreamExt;
40use std::any::TypeId;
41use std::collections::BTreeMap;
42use std::path::PathBuf;
43use std::u64;
44use std::{
45 any::Any,
46 collections::hash_map::Entry,
47 hash::{Hash, Hasher},
48 path::Path,
49 sync::Arc,
50};
51use task::{DebugAdapterConfig, DebugTaskDefinition};
52use text::{PointUtf16, ToPointUtf16};
53use util::{ResultExt, merge_json_value_into};
54
55#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
56#[repr(transparent)]
57pub struct ThreadId(pub u64);
58
59impl ThreadId {
60 pub const MIN: ThreadId = ThreadId(u64::MIN);
61 pub const MAX: ThreadId = ThreadId(u64::MAX);
62}
63
64impl From<u64> for ThreadId {
65 fn from(id: u64) -> Self {
66 Self(id)
67 }
68}
69
70#[derive(Clone, Debug)]
71pub struct StackFrame {
72 pub dap: dap::StackFrame,
73 pub scopes: Vec<dap::Scope>,
74}
75
76impl From<dap::StackFrame> for StackFrame {
77 fn from(stack_frame: dap::StackFrame) -> Self {
78 Self {
79 scopes: vec![],
80 dap: stack_frame,
81 }
82 }
83}
84
85#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
86pub enum ThreadStatus {
87 #[default]
88 Running,
89 Stopped,
90 Stepping,
91 Exited,
92 Ended,
93}
94
95impl ThreadStatus {
96 pub fn label(&self) -> &'static str {
97 match self {
98 ThreadStatus::Running => "Running",
99 ThreadStatus::Stopped => "Stopped",
100 ThreadStatus::Stepping => "Stepping",
101 ThreadStatus::Exited => "Exited",
102 ThreadStatus::Ended => "Ended",
103 }
104 }
105}
106
107#[derive(Debug)]
108pub struct Thread {
109 dap: dap::Thread,
110 stack_frame_ids: IndexSet<StackFrameId>,
111 _has_stopped: bool,
112}
113
114impl From<dap::Thread> for Thread {
115 fn from(dap: dap::Thread) -> Self {
116 Self {
117 dap,
118 stack_frame_ids: Default::default(),
119 _has_stopped: false,
120 }
121 }
122}
123
124type UpstreamProjectId = u64;
125
126struct RemoteConnection {
127 _client: AnyProtoClient,
128 _upstream_project_id: UpstreamProjectId,
129 _adapter_name: SharedString,
130}
131
132impl RemoteConnection {
133 fn send_proto_client_request<R: DapCommand>(
134 &self,
135 _request: R,
136 _session_id: SessionId,
137 cx: &mut App,
138 ) -> Task<Result<R::Response>> {
139 // let message = request.to_proto(session_id, self.upstream_project_id);
140 // let upstream_client = self.client.clone();
141 cx.background_executor().spawn(async move {
142 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
143 // let response = upstream_client.request(message).await?;
144 // request.response_from_proto(response)
145 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
146 })
147 }
148
149 fn request<R: DapCommand>(
150 &self,
151 request: R,
152 session_id: SessionId,
153 cx: &mut App,
154 ) -> Task<Result<R::Response>>
155 where
156 <R::DapRequest as dap::requests::Request>::Response: 'static,
157 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
158 {
159 return self.send_proto_client_request::<R>(request, session_id, cx);
160 }
161}
162
163enum Mode {
164 Local(LocalMode),
165 Remote(RemoteConnection),
166}
167
168#[derive(Clone)]
169pub struct LocalMode {
170 client: Arc<DebugAdapterClient>,
171 config: DebugAdapterConfig,
172 adapter: Arc<dyn DebugAdapter>,
173 breakpoint_store: Entity<BreakpointStore>,
174 tmp_breakpoint: Option<SourceBreakpoint>,
175}
176
177fn client_source(abs_path: &Path) -> dap::Source {
178 dap::Source {
179 name: abs_path
180 .file_name()
181 .map(|filename| filename.to_string_lossy().to_string()),
182 path: Some(abs_path.to_string_lossy().to_string()),
183 source_reference: None,
184 presentation_hint: None,
185 origin: None,
186 sources: None,
187 adapter_data: None,
188 checksums: None,
189 }
190}
191
192impl LocalMode {
193 fn new(
194 debug_adapters: Arc<DapRegistry>,
195 session_id: SessionId,
196 parent_session: Option<Entity<Session>>,
197 breakpoint_store: Entity<BreakpointStore>,
198 config: DebugAdapterConfig,
199 delegate: DapAdapterDelegate,
200 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
201 cx: AsyncApp,
202 ) -> Task<Result<Self>> {
203 Self::new_inner(
204 debug_adapters,
205 session_id,
206 parent_session,
207 breakpoint_store,
208 config,
209 delegate,
210 messages_tx,
211 async |_, _| {},
212 cx,
213 )
214 }
215 #[cfg(any(test, feature = "test-support"))]
216 fn new_fake(
217 session_id: SessionId,
218 parent_session: Option<Entity<Session>>,
219 breakpoint_store: Entity<BreakpointStore>,
220 config: DebugAdapterConfig,
221 delegate: DapAdapterDelegate,
222 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
223 caps: Capabilities,
224 fail: bool,
225 cx: AsyncApp,
226 ) -> Task<Result<Self>> {
227 use task::DebugRequestDisposition;
228
229 let request = match config.request.clone() {
230 DebugRequestDisposition::UserConfigured(request) => request,
231 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
232 match reverse_request_args.request {
233 dap::StartDebuggingRequestArgumentsRequest::Launch => {
234 DebugRequestType::Launch(task::LaunchConfig {
235 program: "".to_owned(),
236 cwd: None,
237 args: Default::default(),
238 })
239 }
240 dap::StartDebuggingRequestArgumentsRequest::Attach => {
241 DebugRequestType::Attach(task::AttachConfig {
242 process_id: Some(0),
243 })
244 }
245 }
246 }
247 };
248
249 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
250 session
251 .client
252 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
253 .await;
254
255 let paths = cx
256 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
257 .expect("Breakpoint store should exist in all tests that start debuggers");
258
259 session
260 .client
261 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
262 let p = Arc::from(Path::new(&args.source.path.unwrap()));
263 if !paths.contains(&p) {
264 panic!("Sent breakpoints for path without any")
265 }
266
267 Ok(dap::SetBreakpointsResponse {
268 breakpoints: Vec::default(),
269 })
270 })
271 .await;
272
273 match request {
274 dap::DebugRequestType::Launch(_) => {
275 if fail {
276 session
277 .client
278 .on_request::<dap::requests::Launch, _>(move |_, _| {
279 Err(dap::ErrorResponse {
280 error: Some(dap::Message {
281 id: 1,
282 format: "error".into(),
283 variables: None,
284 send_telemetry: None,
285 show_user: None,
286 url: None,
287 url_label: None,
288 }),
289 })
290 })
291 .await;
292 } else {
293 session
294 .client
295 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
296 .await;
297 }
298 }
299 dap::DebugRequestType::Attach(attach_config) => {
300 if fail {
301 session
302 .client
303 .on_request::<dap::requests::Attach, _>(move |_, _| {
304 Err(dap::ErrorResponse {
305 error: Some(dap::Message {
306 id: 1,
307 format: "error".into(),
308 variables: None,
309 send_telemetry: None,
310 show_user: None,
311 url: None,
312 url_label: None,
313 }),
314 })
315 })
316 .await;
317 } else {
318 session
319 .client
320 .on_request::<dap::requests::Attach, _>(move |_, args| {
321 assert_eq!(
322 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
323 args.raw
324 );
325
326 Ok(())
327 })
328 .await;
329 }
330 }
331 }
332
333 session
334 .client
335 .on_request::<dap::requests::SetExceptionBreakpoints, _>(move |_, _| {
336 Ok(dap::SetExceptionBreakpointsResponse { breakpoints: None })
337 })
338 .await;
339
340 session
341 .client
342 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
343 .await;
344 session.client.fake_event(Events::Initialized(None)).await;
345 };
346 Self::new_inner(
347 DapRegistry::fake().into(),
348 session_id,
349 parent_session,
350 breakpoint_store,
351 config,
352 delegate,
353 messages_tx,
354 callback,
355 cx,
356 )
357 }
358 fn new_inner(
359 registry: Arc<DapRegistry>,
360 session_id: SessionId,
361 parent_session: Option<Entity<Session>>,
362 breakpoint_store: Entity<BreakpointStore>,
363 config: DebugAdapterConfig,
364 delegate: DapAdapterDelegate,
365 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
366 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
367 cx: AsyncApp,
368 ) -> Task<Result<Self>> {
369 cx.spawn(async move |cx| {
370 let (adapter, binary) =
371 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
372
373 let message_handler = Box::new(move |message| {
374 messages_tx.unbounded_send(message).ok();
375 });
376
377 let client = Arc::new(
378 if let Some(client) = parent_session
379 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
380 .flatten()
381 {
382 client
383 .reconnect(session_id, binary, message_handler, cx.clone())
384 .await?
385 } else {
386 DebugAdapterClient::start(
387 session_id,
388 adapter.name(),
389 binary,
390 message_handler,
391 cx.clone(),
392 )
393 .await
394 .with_context(|| "Failed to start communication with debug adapter")?
395 },
396 );
397
398 let mut session = Self {
399 client,
400 adapter,
401 breakpoint_store,
402 tmp_breakpoint: None,
403 config: config.clone(),
404 };
405
406 on_initialized(&mut session, cx.clone()).await;
407
408 Ok(session)
409 })
410 }
411
412 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
413 let tasks: Vec<_> = paths
414 .into_iter()
415 .map(|path| {
416 self.request(
417 dap_command::SetBreakpoints {
418 source: client_source(path),
419 source_modified: None,
420 breakpoints: vec![],
421 },
422 cx.background_executor().clone(),
423 )
424 })
425 .collect();
426
427 cx.background_spawn(async move {
428 futures::future::join_all(tasks)
429 .await
430 .iter()
431 .for_each(|res| match res {
432 Ok(_) => {}
433 Err(err) => {
434 log::warn!("Set breakpoints request failed: {}", err);
435 }
436 });
437 })
438 }
439
440 fn send_breakpoints_from_path(
441 &self,
442 abs_path: Arc<Path>,
443 reason: BreakpointUpdatedReason,
444 cx: &mut App,
445 ) -> Task<()> {
446 let breakpoints = self
447 .breakpoint_store
448 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
449 .into_iter()
450 .filter(|bp| bp.state.is_enabled())
451 .chain(self.tmp_breakpoint.clone())
452 .map(Into::into)
453 .collect();
454
455 let task = self.request(
456 dap_command::SetBreakpoints {
457 source: client_source(&abs_path),
458 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
459 breakpoints,
460 },
461 cx.background_executor().clone(),
462 );
463
464 cx.background_spawn(async move {
465 match task.await {
466 Ok(_) => {}
467 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
468 }
469 })
470 }
471
472 fn send_exception_breakpoints(
473 &self,
474 filters: Vec<ExceptionBreakpointsFilter>,
475 supports_filter_options: bool,
476 cx: &App,
477 ) -> Task<Result<Vec<dap::Breakpoint>>> {
478 let arg = if supports_filter_options {
479 SetExceptionBreakpoints::WithOptions {
480 filters: filters
481 .into_iter()
482 .map(|filter| ExceptionFilterOptions {
483 filter_id: filter.filter,
484 condition: None,
485 mode: None,
486 })
487 .collect(),
488 }
489 } else {
490 SetExceptionBreakpoints::Plain {
491 filters: filters.into_iter().map(|filter| filter.filter).collect(),
492 }
493 };
494 self.request(arg, cx.background_executor().clone())
495 }
496 fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
497 let mut breakpoint_tasks = Vec::new();
498 let breakpoints = self
499 .breakpoint_store
500 .read_with(cx, |store, cx| store.all_breakpoints(cx));
501
502 for (path, breakpoints) in breakpoints {
503 let breakpoints = if ignore_breakpoints {
504 vec![]
505 } else {
506 breakpoints
507 .into_iter()
508 .filter(|bp| bp.state.is_enabled())
509 .map(Into::into)
510 .collect()
511 };
512
513 breakpoint_tasks.push(self.request(
514 dap_command::SetBreakpoints {
515 source: client_source(&path),
516 source_modified: Some(false),
517 breakpoints,
518 },
519 cx.background_executor().clone(),
520 ));
521 }
522
523 cx.background_spawn(async move {
524 futures::future::join_all(breakpoint_tasks)
525 .await
526 .iter()
527 .for_each(|res| match res {
528 Ok(_) => {}
529 Err(err) => {
530 log::warn!("Set breakpoints request failed: {}", err);
531 }
532 });
533 })
534 }
535
536 async fn get_adapter_binary(
537 registry: &Arc<DapRegistry>,
538 config: &DebugAdapterConfig,
539 delegate: &DapAdapterDelegate,
540 cx: &mut AsyncApp,
541 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
542 let adapter = registry
543 .adapter(&config.adapter)
544 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
545
546 let binary = cx.update(|cx| {
547 ProjectSettings::get_global(cx)
548 .dap
549 .get(&adapter.name())
550 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
551 })?;
552
553 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
554 Err(error) => {
555 delegate.update_status(
556 adapter.name(),
557 DapStatus::Failed {
558 error: error.to_string(),
559 },
560 );
561
562 return Err(error);
563 }
564 Ok(mut binary) => {
565 delegate.update_status(adapter.name(), DapStatus::None);
566
567 let shell_env = delegate.shell_env().await;
568 let mut envs = binary.envs.unwrap_or_default();
569 envs.extend(shell_env);
570 binary.envs = Some(envs);
571
572 binary
573 }
574 };
575
576 Ok((adapter, binary))
577 }
578
579 pub fn label(&self) -> String {
580 self.config.label.clone()
581 }
582
583 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
584 let adapter_id = self.adapter.name().to_string();
585
586 self.request(Initialize { adapter_id }, cx.background_executor().clone())
587 }
588
589 fn initialize_sequence(
590 &self,
591 capabilities: &Capabilities,
592 initialized_rx: oneshot::Receiver<()>,
593 cx: &App,
594 ) -> Task<Result<()>> {
595 let (mut raw, is_launch) = match &self.config.request {
596 task::DebugRequestDisposition::UserConfigured(_) => {
597 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
598 debug_assert!(false, "This part of code should be unreachable in practice");
599 return Task::ready(Err(anyhow!(
600 "Expected debug config conversion to succeed"
601 )));
602 };
603 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
604 let raw = self.adapter.request_args(&raw);
605 (raw, is_launch)
606 }
607 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
608 start_debugging_request_arguments.configuration.clone(),
609 matches!(
610 start_debugging_request_arguments.request,
611 dap::StartDebuggingRequestArgumentsRequest::Launch
612 ),
613 ),
614 };
615
616 merge_json_value_into(
617 self.config.initialize_args.clone().unwrap_or(json!({})),
618 &mut raw,
619 );
620 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
621 let launch = if is_launch {
622 self.request(Launch { raw }, cx.background_executor().clone())
623 } else {
624 self.request(Attach { raw }, cx.background_executor().clone())
625 };
626
627 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
628 let exception_filters = capabilities
629 .exception_breakpoint_filters
630 .as_ref()
631 .map(|exception_filters| {
632 exception_filters
633 .iter()
634 .filter(|filter| filter.default == Some(true))
635 .cloned()
636 .collect::<Vec<_>>()
637 })
638 .unwrap_or_default();
639 let supports_exception_filters = capabilities
640 .supports_exception_filter_options
641 .unwrap_or_default();
642 let configuration_sequence = cx.spawn({
643 let this = self.clone();
644 async move |cx| {
645 initialized_rx.await?;
646 // todo(debugger) figure out if we want to handle a breakpoint response error
647 // This will probably consist of letting a user know that breakpoints failed to be set
648 cx.update(|cx| this.send_source_breakpoints(false, cx))?
649 .await;
650 cx.update(|cx| {
651 this.send_exception_breakpoints(
652 exception_filters,
653 supports_exception_filters,
654 cx,
655 )
656 })?
657 .await
658 .ok();
659 if configuration_done_supported {
660 this.request(ConfigurationDone {}, cx.background_executor().clone())
661 } else {
662 Task::ready(Ok(()))
663 }
664 .await
665 }
666 });
667
668 cx.background_spawn(async move {
669 futures::future::try_join(launch, configuration_sequence).await?;
670 Ok(())
671 })
672 }
673
674 fn request<R: LocalDapCommand>(
675 &self,
676 request: R,
677 executor: BackgroundExecutor,
678 ) -> Task<Result<R::Response>>
679 where
680 <R::DapRequest as dap::requests::Request>::Response: 'static,
681 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
682 {
683 let request = Arc::new(request);
684
685 let request_clone = request.clone();
686 let connection = self.client.clone();
687 let request_task = executor.spawn(async move {
688 let args = request_clone.to_dap();
689 connection.request::<R::DapRequest>(args).await
690 });
691
692 executor.spawn(async move {
693 let response = request.response_from_dap(request_task.await?);
694 response
695 })
696 }
697}
698impl From<RemoteConnection> for Mode {
699 fn from(value: RemoteConnection) -> Self {
700 Self::Remote(value)
701 }
702}
703
704impl Mode {
705 fn request_dap<R: DapCommand>(
706 &self,
707 session_id: SessionId,
708 request: R,
709 cx: &mut Context<Session>,
710 ) -> Task<Result<R::Response>>
711 where
712 <R::DapRequest as dap::requests::Request>::Response: 'static,
713 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
714 {
715 match self {
716 Mode::Local(debug_adapter_client) => {
717 debug_adapter_client.request(request, cx.background_executor().clone())
718 }
719 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
720 }
721 }
722}
723
724#[derive(Default)]
725struct ThreadStates {
726 global_state: Option<ThreadStatus>,
727 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
728}
729
730impl ThreadStates {
731 fn stop_all_threads(&mut self) {
732 self.global_state = Some(ThreadStatus::Stopped);
733 self.known_thread_states.clear();
734 }
735
736 fn exit_all_threads(&mut self) {
737 self.global_state = Some(ThreadStatus::Exited);
738 self.known_thread_states.clear();
739 }
740
741 fn continue_all_threads(&mut self) {
742 self.global_state = Some(ThreadStatus::Running);
743 self.known_thread_states.clear();
744 }
745
746 fn stop_thread(&mut self, thread_id: ThreadId) {
747 self.known_thread_states
748 .insert(thread_id, ThreadStatus::Stopped);
749 }
750
751 fn continue_thread(&mut self, thread_id: ThreadId) {
752 self.known_thread_states
753 .insert(thread_id, ThreadStatus::Running);
754 }
755
756 fn process_step(&mut self, thread_id: ThreadId) {
757 self.known_thread_states
758 .insert(thread_id, ThreadStatus::Stepping);
759 }
760
761 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
762 self.thread_state(thread_id)
763 .unwrap_or(ThreadStatus::Running)
764 }
765
766 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
767 self.known_thread_states
768 .get(&thread_id)
769 .copied()
770 .or(self.global_state)
771 }
772
773 fn exit_thread(&mut self, thread_id: ThreadId) {
774 self.known_thread_states
775 .insert(thread_id, ThreadStatus::Exited);
776 }
777
778 fn any_stopped_thread(&self) -> bool {
779 self.global_state
780 .is_some_and(|state| state == ThreadStatus::Stopped)
781 || self
782 .known_thread_states
783 .values()
784 .any(|status| *status == ThreadStatus::Stopped)
785 }
786}
787const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
788
789type IsEnabled = bool;
790
791#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
792pub struct OutputToken(pub usize);
793/// Represents a current state of a single debug adapter and provides ways to mutate it.
794pub struct Session {
795 mode: Mode,
796 pub(super) capabilities: Capabilities,
797 id: SessionId,
798 child_session_ids: HashSet<SessionId>,
799 parent_id: Option<SessionId>,
800 ignore_breakpoints: bool,
801 modules: Vec<dap::Module>,
802 loaded_sources: Vec<dap::Source>,
803 output_token: OutputToken,
804 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
805 threads: IndexMap<ThreadId, Thread>,
806 thread_states: ThreadStates,
807 variables: HashMap<VariableReference, Vec<dap::Variable>>,
808 stack_frames: IndexMap<StackFrameId, StackFrame>,
809 locations: HashMap<u64, dap::LocationsResponse>,
810 is_session_terminated: bool,
811 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
812 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
813 _background_tasks: Vec<Task<()>>,
814}
815
816trait CacheableCommand: Any + Send + Sync {
817 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
818 fn dyn_hash(&self, hasher: &mut dyn Hasher);
819 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
820}
821
822impl<T> CacheableCommand for T
823where
824 T: DapCommand + PartialEq + Eq + Hash,
825{
826 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
827 (rhs as &dyn Any)
828 .downcast_ref::<Self>()
829 .map_or(false, |rhs| self == rhs)
830 }
831
832 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
833 T::hash(self, &mut hasher);
834 }
835
836 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
837 self
838 }
839}
840
841pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
842
843impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
844 fn from(request: T) -> Self {
845 Self(Arc::new(request))
846 }
847}
848
849impl PartialEq for RequestSlot {
850 fn eq(&self, other: &Self) -> bool {
851 self.0.dyn_eq(other.0.as_ref())
852 }
853}
854
855impl Eq for RequestSlot {}
856
857impl Hash for RequestSlot {
858 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
859 self.0.dyn_hash(state);
860 (&*self.0 as &dyn Any).type_id().hash(state)
861 }
862}
863
864#[derive(Debug, Clone, Hash, PartialEq, Eq)]
865pub struct CompletionsQuery {
866 pub query: String,
867 pub column: u64,
868 pub line: Option<u64>,
869 pub frame_id: Option<u64>,
870}
871
872impl CompletionsQuery {
873 pub fn new(
874 buffer: &language::Buffer,
875 cursor_position: language::Anchor,
876 frame_id: Option<u64>,
877 ) -> Self {
878 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
879 Self {
880 query: buffer.text(),
881 column: column as u64,
882 frame_id,
883 line: Some(row as u64),
884 }
885 }
886}
887
888pub enum SessionEvent {
889 Modules,
890 LoadedSources,
891 Stopped(Option<ThreadId>),
892 StackTrace,
893 Variables,
894 Threads,
895}
896
897pub(crate) enum SessionStateEvent {
898 Shutdown,
899}
900
901impl EventEmitter<SessionEvent> for Session {}
902impl EventEmitter<SessionStateEvent> for Session {}
903
904// local session will send breakpoint updates to DAP for all new breakpoints
905// remote side will only send breakpoint updates when it is a breakpoint created by that peer
906// BreakpointStore notifies session on breakpoint changes
907impl Session {
908 pub(crate) fn local(
909 breakpoint_store: Entity<BreakpointStore>,
910 session_id: SessionId,
911 parent_session: Option<Entity<Session>>,
912 delegate: DapAdapterDelegate,
913 config: DebugAdapterConfig,
914 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
915 initialized_tx: oneshot::Sender<()>,
916 debug_adapters: Arc<DapRegistry>,
917 cx: &mut App,
918 ) -> Task<Result<Entity<Self>>> {
919 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
920
921 cx.spawn(async move |cx| {
922 let mode = LocalMode::new(
923 debug_adapters,
924 session_id,
925 parent_session.clone(),
926 breakpoint_store.clone(),
927 config.clone(),
928 delegate,
929 message_tx,
930 cx.clone(),
931 )
932 .await?;
933
934 cx.new(|cx| {
935 create_local_session(
936 breakpoint_store,
937 session_id,
938 parent_session,
939 start_debugging_requests_tx,
940 initialized_tx,
941 message_rx,
942 mode,
943 cx,
944 )
945 })
946 })
947 }
948
949 #[cfg(any(test, feature = "test-support"))]
950 pub(crate) fn fake(
951 breakpoint_store: Entity<BreakpointStore>,
952 session_id: SessionId,
953 parent_session: Option<Entity<Session>>,
954 delegate: DapAdapterDelegate,
955 config: DebugAdapterConfig,
956 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
957 initialized_tx: oneshot::Sender<()>,
958 caps: Capabilities,
959 fails: bool,
960 cx: &mut App,
961 ) -> Task<Result<Entity<Session>>> {
962 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
963
964 cx.spawn(async move |cx| {
965 let mode = LocalMode::new_fake(
966 session_id,
967 parent_session.clone(),
968 breakpoint_store.clone(),
969 config.clone(),
970 delegate,
971 message_tx,
972 caps,
973 fails,
974 cx.clone(),
975 )
976 .await?;
977
978 cx.new(|cx| {
979 create_local_session(
980 breakpoint_store,
981 session_id,
982 parent_session,
983 start_debugging_requests_tx,
984 initialized_tx,
985 message_rx,
986 mode,
987 cx,
988 )
989 })
990 })
991 }
992
993 pub(crate) fn remote(
994 session_id: SessionId,
995 client: AnyProtoClient,
996 upstream_project_id: u64,
997 ignore_breakpoints: bool,
998 ) -> Self {
999 Self {
1000 mode: Mode::Remote(RemoteConnection {
1001 _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
1002 _client: client,
1003 _upstream_project_id: upstream_project_id,
1004 }),
1005 id: session_id,
1006 child_session_ids: HashSet::default(),
1007 parent_id: None,
1008 capabilities: Capabilities::default(),
1009 ignore_breakpoints,
1010 variables: Default::default(),
1011 stack_frames: Default::default(),
1012 thread_states: ThreadStates::default(),
1013 output_token: OutputToken(0),
1014 output: circular_buffer::CircularBuffer::boxed(),
1015 requests: HashMap::default(),
1016 modules: Vec::default(),
1017 loaded_sources: Vec::default(),
1018 threads: IndexMap::default(),
1019 _background_tasks: Vec::default(),
1020 locations: Default::default(),
1021 is_session_terminated: false,
1022 exception_breakpoints: Default::default(),
1023 }
1024 }
1025
1026 pub fn session_id(&self) -> SessionId {
1027 self.id
1028 }
1029
1030 pub fn child_session_ids(&self) -> HashSet<SessionId> {
1031 self.child_session_ids.clone()
1032 }
1033
1034 pub fn add_child_session_id(&mut self, session_id: SessionId) {
1035 self.child_session_ids.insert(session_id);
1036 }
1037
1038 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1039 self.child_session_ids.remove(&session_id);
1040 }
1041
1042 pub fn parent_id(&self) -> Option<SessionId> {
1043 self.parent_id
1044 }
1045
1046 pub fn capabilities(&self) -> &Capabilities {
1047 &self.capabilities
1048 }
1049
1050 pub fn adapter_name(&self) -> SharedString {
1051 match &self.mode {
1052 Mode::Local(local_mode) => local_mode.adapter.name().into(),
1053 Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
1054 }
1055 }
1056
1057 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
1058 if let Mode::Local(local_mode) = &self.mode {
1059 Some(local_mode.config.clone())
1060 } else {
1061 None
1062 }
1063 }
1064
1065 pub fn is_terminated(&self) -> bool {
1066 self.is_session_terminated
1067 }
1068
1069 pub fn is_local(&self) -> bool {
1070 matches!(self.mode, Mode::Local(_))
1071 }
1072
1073 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1074 match &mut self.mode {
1075 Mode::Local(local_mode) => Some(local_mode),
1076 Mode::Remote(_) => None,
1077 }
1078 }
1079
1080 pub fn as_local(&self) -> Option<&LocalMode> {
1081 match &self.mode {
1082 Mode::Local(local_mode) => Some(local_mode),
1083 Mode::Remote(_) => None,
1084 }
1085 }
1086
1087 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1088 match &self.mode {
1089 Mode::Local(local_mode) => {
1090 let capabilities = local_mode.clone().request_initialization(cx);
1091
1092 cx.spawn(async move |this, cx| {
1093 let capabilities = capabilities.await?;
1094 this.update(cx, |session, _| {
1095 session.capabilities = capabilities;
1096 let filters = session
1097 .capabilities
1098 .exception_breakpoint_filters
1099 .clone()
1100 .unwrap_or_default();
1101 for filter in filters {
1102 let default = filter.default.unwrap_or_default();
1103 session
1104 .exception_breakpoints
1105 .entry(filter.filter.clone())
1106 .or_insert_with(|| (filter, default));
1107 }
1108 })?;
1109 Ok(())
1110 })
1111 }
1112 Mode::Remote(_) => Task::ready(Err(anyhow!(
1113 "Cannot send initialize request from remote session"
1114 ))),
1115 }
1116 }
1117
1118 pub(super) fn initialize_sequence(
1119 &mut self,
1120 initialize_rx: oneshot::Receiver<()>,
1121 cx: &mut Context<Self>,
1122 ) -> Task<Result<()>> {
1123 match &self.mode {
1124 Mode::Local(local_mode) => {
1125 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1126 }
1127 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1128 }
1129 }
1130
1131 pub fn run_to_position(
1132 &mut self,
1133 breakpoint: SourceBreakpoint,
1134 active_thread_id: ThreadId,
1135 cx: &mut Context<Self>,
1136 ) {
1137 match &mut self.mode {
1138 Mode::Local(local_mode) => {
1139 if !matches!(
1140 self.thread_states.thread_state(active_thread_id),
1141 Some(ThreadStatus::Stopped)
1142 ) {
1143 return;
1144 };
1145 let path = breakpoint.path.clone();
1146 local_mode.tmp_breakpoint = Some(breakpoint);
1147 let task = local_mode.send_breakpoints_from_path(
1148 path,
1149 BreakpointUpdatedReason::Toggled,
1150 cx,
1151 );
1152
1153 cx.spawn(async move |this, cx| {
1154 task.await;
1155 this.update(cx, |this, cx| {
1156 this.continue_thread(active_thread_id, cx);
1157 })
1158 })
1159 .detach();
1160 }
1161 Mode::Remote(_) => {}
1162 }
1163 }
1164
1165 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1166 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1167 }
1168
1169 pub fn output(
1170 &self,
1171 since: OutputToken,
1172 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1173 if self.output_token.0 == 0 {
1174 return (self.output.range(0..0), OutputToken(0));
1175 };
1176
1177 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1178
1179 let clamped_events_since = events_since.clamp(0, self.output.len());
1180 (
1181 self.output
1182 .range(self.output.len() - clamped_events_since..),
1183 self.output_token,
1184 )
1185 }
1186
1187 pub fn respond_to_client(
1188 &self,
1189 request_seq: u64,
1190 success: bool,
1191 command: String,
1192 body: Option<serde_json::Value>,
1193 cx: &mut Context<Self>,
1194 ) -> Task<Result<()>> {
1195 let Some(local_session) = self.as_local().cloned() else {
1196 unreachable!("Cannot respond to remote client");
1197 };
1198
1199 cx.background_spawn(async move {
1200 local_session
1201 .client
1202 .send_message(Message::Response(Response {
1203 body,
1204 success,
1205 command,
1206 seq: request_seq + 1,
1207 request_seq,
1208 message: None,
1209 }))
1210 .await
1211 })
1212 }
1213
1214 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1215 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1216 let breakpoint = local.tmp_breakpoint.take()?;
1217 let path = breakpoint.path.clone();
1218 Some((local, path))
1219 }) {
1220 local
1221 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1222 .detach();
1223 };
1224
1225 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1226 self.thread_states.stop_all_threads();
1227
1228 self.invalidate_command_type::<StackTraceCommand>();
1229 }
1230
1231 // Event if we stopped all threads we still need to insert the thread_id
1232 // to our own data
1233 if let Some(thread_id) = event.thread_id {
1234 self.thread_states.stop_thread(ThreadId(thread_id));
1235
1236 self.invalidate_state(
1237 &StackTraceCommand {
1238 thread_id,
1239 start_frame: None,
1240 levels: None,
1241 }
1242 .into(),
1243 );
1244 }
1245
1246 self.invalidate_generic();
1247 self.threads.clear();
1248 self.variables.clear();
1249 cx.emit(SessionEvent::Stopped(
1250 event
1251 .thread_id
1252 .map(Into::into)
1253 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1254 ));
1255 cx.notify();
1256 }
1257
1258 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1259 match *event {
1260 Events::Initialized(_) => {
1261 debug_assert!(
1262 false,
1263 "Initialized event should have been handled in LocalMode"
1264 );
1265 }
1266 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1267 Events::Continued(event) => {
1268 if event.all_threads_continued.unwrap_or_default() {
1269 self.thread_states.continue_all_threads();
1270 } else {
1271 self.thread_states
1272 .continue_thread(ThreadId(event.thread_id));
1273 }
1274 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1275 self.invalidate_generic();
1276 }
1277 Events::Exited(_event) => {
1278 self.clear_active_debug_line(cx);
1279 }
1280 Events::Terminated(_) => {
1281 self.is_session_terminated = true;
1282 self.clear_active_debug_line(cx);
1283 }
1284 Events::Thread(event) => {
1285 let thread_id = ThreadId(event.thread_id);
1286
1287 match event.reason {
1288 dap::ThreadEventReason::Started => {
1289 self.thread_states.continue_thread(thread_id);
1290 }
1291 dap::ThreadEventReason::Exited => {
1292 self.thread_states.exit_thread(thread_id);
1293 }
1294 reason => {
1295 log::error!("Unhandled thread event reason {:?}", reason);
1296 }
1297 }
1298 self.invalidate_state(&ThreadsCommand.into());
1299 cx.notify();
1300 }
1301 Events::Output(event) => {
1302 if event
1303 .category
1304 .as_ref()
1305 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1306 {
1307 return;
1308 }
1309
1310 self.output.push_back(event);
1311 self.output_token.0 += 1;
1312 cx.notify();
1313 }
1314 Events::Breakpoint(_) => {}
1315 Events::Module(event) => {
1316 match event.reason {
1317 dap::ModuleEventReason::New => {
1318 self.modules.push(event.module);
1319 }
1320 dap::ModuleEventReason::Changed => {
1321 if let Some(module) = self
1322 .modules
1323 .iter_mut()
1324 .find(|other| event.module.id == other.id)
1325 {
1326 *module = event.module;
1327 }
1328 }
1329 dap::ModuleEventReason::Removed => {
1330 self.modules.retain(|other| event.module.id != other.id);
1331 }
1332 }
1333
1334 // todo(debugger): We should only send the invalidate command to downstream clients.
1335 // self.invalidate_state(&ModulesCommand.into());
1336 }
1337 Events::LoadedSource(_) => {
1338 self.invalidate_state(&LoadedSourcesCommand.into());
1339 }
1340 Events::Capabilities(event) => {
1341 self.capabilities = self.capabilities.merge(event.capabilities);
1342 cx.notify();
1343 }
1344 Events::Memory(_) => {}
1345 Events::Process(_) => {}
1346 Events::ProgressEnd(_) => {}
1347 Events::ProgressStart(_) => {}
1348 Events::ProgressUpdate(_) => {}
1349 Events::Invalidated(_) => {}
1350 Events::Other(_) => {}
1351 }
1352 }
1353
1354 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1355 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1356 &mut self,
1357 request: T,
1358 process_result: impl FnOnce(
1359 &mut Self,
1360 Result<T::Response>,
1361 &mut Context<Self>,
1362 ) -> Option<T::Response>
1363 + 'static,
1364 cx: &mut Context<Self>,
1365 ) {
1366 const {
1367 assert!(
1368 T::CACHEABLE,
1369 "Only requests marked as cacheable should invoke `fetch`"
1370 );
1371 }
1372
1373 if !self.thread_states.any_stopped_thread()
1374 && request.type_id() != TypeId::of::<ThreadsCommand>()
1375 || self.is_session_terminated
1376 {
1377 return;
1378 }
1379
1380 let request_map = self
1381 .requests
1382 .entry(std::any::TypeId::of::<T>())
1383 .or_default();
1384
1385 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1386 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1387
1388 let task = Self::request_inner::<Arc<T>>(
1389 &self.capabilities,
1390 self.id,
1391 &self.mode,
1392 command,
1393 process_result,
1394 cx,
1395 );
1396 let task = cx
1397 .background_executor()
1398 .spawn(async move {
1399 let _ = task.await?;
1400 Some(())
1401 })
1402 .shared();
1403
1404 vacant.insert(task);
1405 cx.notify();
1406 }
1407 }
1408
1409 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1410 capabilities: &Capabilities,
1411 session_id: SessionId,
1412 mode: &Mode,
1413 request: T,
1414 process_result: impl FnOnce(
1415 &mut Self,
1416 Result<T::Response>,
1417 &mut Context<Self>,
1418 ) -> Option<T::Response>
1419 + 'static,
1420 cx: &mut Context<Self>,
1421 ) -> Task<Option<T::Response>> {
1422 if !T::is_supported(&capabilities) {
1423 log::warn!(
1424 "Attempted to send a DAP request that isn't supported: {:?}",
1425 request
1426 );
1427 let error = Err(anyhow::Error::msg(
1428 "Couldn't complete request because it's not supported",
1429 ));
1430 return cx.spawn(async move |this, cx| {
1431 this.update(cx, |this, cx| process_result(this, error, cx))
1432 .log_err()
1433 .flatten()
1434 });
1435 }
1436
1437 let request = mode.request_dap(session_id, request, cx);
1438 cx.spawn(async move |this, cx| {
1439 let result = request.await;
1440 this.update(cx, |this, cx| process_result(this, result, cx))
1441 .log_err()
1442 .flatten()
1443 })
1444 }
1445
1446 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1447 &self,
1448 request: T,
1449 process_result: impl FnOnce(
1450 &mut Self,
1451 Result<T::Response>,
1452 &mut Context<Self>,
1453 ) -> Option<T::Response>
1454 + 'static,
1455 cx: &mut Context<Self>,
1456 ) -> Task<Option<T::Response>> {
1457 Self::request_inner(
1458 &self.capabilities,
1459 self.id,
1460 &self.mode,
1461 request,
1462 process_result,
1463 cx,
1464 )
1465 }
1466
1467 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1468 self.requests.remove(&std::any::TypeId::of::<Command>());
1469 }
1470
1471 fn invalidate_generic(&mut self) {
1472 self.invalidate_command_type::<ModulesCommand>();
1473 self.invalidate_command_type::<LoadedSourcesCommand>();
1474 self.invalidate_command_type::<ThreadsCommand>();
1475 }
1476
1477 fn invalidate_state(&mut self, key: &RequestSlot) {
1478 self.requests
1479 .entry((&*key.0 as &dyn Any).type_id())
1480 .and_modify(|request_map| {
1481 request_map.remove(&key);
1482 });
1483 }
1484
1485 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1486 self.thread_states.thread_status(thread_id)
1487 }
1488
1489 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1490 self.fetch(
1491 dap_command::ThreadsCommand,
1492 |this, result, cx| {
1493 let result = result.log_err()?;
1494
1495 this.threads = result
1496 .iter()
1497 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1498 .collect();
1499
1500 this.invalidate_command_type::<StackTraceCommand>();
1501 cx.emit(SessionEvent::Threads);
1502 cx.notify();
1503
1504 Some(result)
1505 },
1506 cx,
1507 );
1508
1509 self.threads
1510 .values()
1511 .map(|thread| {
1512 (
1513 thread.dap.clone(),
1514 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1515 )
1516 })
1517 .collect()
1518 }
1519
1520 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1521 self.fetch(
1522 dap_command::ModulesCommand,
1523 |this, result, cx| {
1524 let result = result.log_err()?;
1525
1526 this.modules = result.iter().cloned().collect();
1527 cx.emit(SessionEvent::Modules);
1528 cx.notify();
1529
1530 Some(result)
1531 },
1532 cx,
1533 );
1534
1535 &self.modules
1536 }
1537
1538 pub fn ignore_breakpoints(&self) -> bool {
1539 self.ignore_breakpoints
1540 }
1541
1542 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1543 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1544 }
1545
1546 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1547 if self.ignore_breakpoints == ignore {
1548 return Task::ready(());
1549 }
1550
1551 self.ignore_breakpoints = ignore;
1552
1553 if let Some(local) = self.as_local() {
1554 local.send_source_breakpoints(ignore, cx)
1555 } else {
1556 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1557 unimplemented!()
1558 }
1559 }
1560
1561 pub fn exception_breakpoints(
1562 &self,
1563 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1564 self.exception_breakpoints.values()
1565 }
1566
1567 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1568 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1569 *is_enabled = !*is_enabled;
1570 self.send_exception_breakpoints(cx);
1571 }
1572 }
1573
1574 fn send_exception_breakpoints(&mut self, cx: &App) {
1575 if let Some(local) = self.as_local() {
1576 let exception_filters = self
1577 .exception_breakpoints
1578 .values()
1579 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1580 .collect();
1581
1582 let supports_exception_filters = self
1583 .capabilities
1584 .supports_exception_filter_options
1585 .unwrap_or_default();
1586 local
1587 .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1588 .detach_and_log_err(cx);
1589 } else {
1590 debug_assert!(false, "Not implemented");
1591 }
1592 }
1593
1594 pub fn breakpoints_enabled(&self) -> bool {
1595 self.ignore_breakpoints
1596 }
1597
1598 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1599 self.fetch(
1600 dap_command::LoadedSourcesCommand,
1601 |this, result, cx| {
1602 let result = result.log_err()?;
1603 this.loaded_sources = result.iter().cloned().collect();
1604 cx.emit(SessionEvent::LoadedSources);
1605 cx.notify();
1606 Some(result)
1607 },
1608 cx,
1609 );
1610
1611 &self.loaded_sources
1612 }
1613
1614 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1615 res.log_err()?;
1616 Some(())
1617 }
1618
1619 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1620 thread_id: ThreadId,
1621 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1622 {
1623 move |this, response, cx| match response.log_err() {
1624 Some(response) => Some(response),
1625 None => {
1626 this.thread_states.stop_thread(thread_id);
1627 cx.notify();
1628 None
1629 }
1630 }
1631 }
1632
1633 fn clear_active_debug_line_response(
1634 &mut self,
1635 response: Result<()>,
1636 cx: &mut Context<Session>,
1637 ) -> Option<()> {
1638 response.log_err()?;
1639 self.clear_active_debug_line(cx);
1640 Some(())
1641 }
1642
1643 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1644 self.as_local()
1645 .expect("Message handler will only run in local mode")
1646 .breakpoint_store
1647 .update(cx, |store, cx| {
1648 store.remove_active_position(Some(self.id), cx)
1649 });
1650 }
1651
1652 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1653 self.request(
1654 PauseCommand {
1655 thread_id: thread_id.0,
1656 },
1657 Self::empty_response,
1658 cx,
1659 )
1660 .detach();
1661 }
1662
1663 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1664 self.request(
1665 RestartStackFrameCommand { stack_frame_id },
1666 Self::empty_response,
1667 cx,
1668 )
1669 .detach();
1670 }
1671
1672 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1673 if self.capabilities.supports_restart_request.unwrap_or(false) {
1674 self.request(
1675 RestartCommand {
1676 raw: args.unwrap_or(Value::Null),
1677 },
1678 Self::empty_response,
1679 cx,
1680 )
1681 .detach();
1682 } else {
1683 self.request(
1684 DisconnectCommand {
1685 restart: Some(false),
1686 terminate_debuggee: Some(true),
1687 suspend_debuggee: Some(false),
1688 },
1689 Self::empty_response,
1690 cx,
1691 )
1692 .detach();
1693 }
1694 }
1695
1696 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1697 self.is_session_terminated = true;
1698 self.thread_states.exit_all_threads();
1699 cx.notify();
1700
1701 let task = if self
1702 .capabilities
1703 .supports_terminate_request
1704 .unwrap_or_default()
1705 {
1706 self.request(
1707 TerminateCommand {
1708 restart: Some(false),
1709 },
1710 Self::clear_active_debug_line_response,
1711 cx,
1712 )
1713 } else {
1714 self.request(
1715 DisconnectCommand {
1716 restart: Some(false),
1717 terminate_debuggee: Some(true),
1718 suspend_debuggee: Some(false),
1719 },
1720 Self::clear_active_debug_line_response,
1721 cx,
1722 )
1723 };
1724
1725 cx.emit(SessionStateEvent::Shutdown);
1726
1727 cx.background_spawn(async move {
1728 let _ = task.await;
1729 })
1730 }
1731
1732 pub fn completions(
1733 &mut self,
1734 query: CompletionsQuery,
1735 cx: &mut Context<Self>,
1736 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1737 let task = self.request(query, |_, result, _| result.log_err(), cx);
1738
1739 cx.background_executor().spawn(async move {
1740 anyhow::Ok(
1741 task.await
1742 .map(|response| response.targets)
1743 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1744 )
1745 })
1746 }
1747
1748 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1749 self.thread_states.continue_thread(thread_id);
1750 self.request(
1751 ContinueCommand {
1752 args: ContinueArguments {
1753 thread_id: thread_id.0,
1754 single_thread: Some(true),
1755 },
1756 },
1757 Self::on_step_response::<ContinueCommand>(thread_id),
1758 cx,
1759 )
1760 .detach();
1761 }
1762
1763 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1764 match self.mode {
1765 Mode::Local(ref local) => Some(local.client.clone()),
1766 Mode::Remote(_) => None,
1767 }
1768 }
1769
1770 pub fn step_over(
1771 &mut self,
1772 thread_id: ThreadId,
1773 granularity: SteppingGranularity,
1774 cx: &mut Context<Self>,
1775 ) {
1776 let supports_single_thread_execution_requests =
1777 self.capabilities.supports_single_thread_execution_requests;
1778 let supports_stepping_granularity = self
1779 .capabilities
1780 .supports_stepping_granularity
1781 .unwrap_or_default();
1782
1783 let command = NextCommand {
1784 inner: StepCommand {
1785 thread_id: thread_id.0,
1786 granularity: supports_stepping_granularity.then(|| granularity),
1787 single_thread: supports_single_thread_execution_requests,
1788 },
1789 };
1790
1791 self.thread_states.process_step(thread_id);
1792 self.request(
1793 command,
1794 Self::on_step_response::<NextCommand>(thread_id),
1795 cx,
1796 )
1797 .detach();
1798 }
1799
1800 pub fn step_in(
1801 &mut self,
1802 thread_id: ThreadId,
1803 granularity: SteppingGranularity,
1804 cx: &mut Context<Self>,
1805 ) {
1806 let supports_single_thread_execution_requests =
1807 self.capabilities.supports_single_thread_execution_requests;
1808 let supports_stepping_granularity = self
1809 .capabilities
1810 .supports_stepping_granularity
1811 .unwrap_or_default();
1812
1813 let command = StepInCommand {
1814 inner: StepCommand {
1815 thread_id: thread_id.0,
1816 granularity: supports_stepping_granularity.then(|| granularity),
1817 single_thread: supports_single_thread_execution_requests,
1818 },
1819 };
1820
1821 self.thread_states.process_step(thread_id);
1822 self.request(
1823 command,
1824 Self::on_step_response::<StepInCommand>(thread_id),
1825 cx,
1826 )
1827 .detach();
1828 }
1829
1830 pub fn step_out(
1831 &mut self,
1832 thread_id: ThreadId,
1833 granularity: SteppingGranularity,
1834 cx: &mut Context<Self>,
1835 ) {
1836 let supports_single_thread_execution_requests =
1837 self.capabilities.supports_single_thread_execution_requests;
1838 let supports_stepping_granularity = self
1839 .capabilities
1840 .supports_stepping_granularity
1841 .unwrap_or_default();
1842
1843 let command = StepOutCommand {
1844 inner: StepCommand {
1845 thread_id: thread_id.0,
1846 granularity: supports_stepping_granularity.then(|| granularity),
1847 single_thread: supports_single_thread_execution_requests,
1848 },
1849 };
1850
1851 self.thread_states.process_step(thread_id);
1852 self.request(
1853 command,
1854 Self::on_step_response::<StepOutCommand>(thread_id),
1855 cx,
1856 )
1857 .detach();
1858 }
1859
1860 pub fn step_back(
1861 &mut self,
1862 thread_id: ThreadId,
1863 granularity: SteppingGranularity,
1864 cx: &mut Context<Self>,
1865 ) {
1866 let supports_single_thread_execution_requests =
1867 self.capabilities.supports_single_thread_execution_requests;
1868 let supports_stepping_granularity = self
1869 .capabilities
1870 .supports_stepping_granularity
1871 .unwrap_or_default();
1872
1873 let command = StepBackCommand {
1874 inner: StepCommand {
1875 thread_id: thread_id.0,
1876 granularity: supports_stepping_granularity.then(|| granularity),
1877 single_thread: supports_single_thread_execution_requests,
1878 },
1879 };
1880
1881 self.thread_states.process_step(thread_id);
1882
1883 self.request(
1884 command,
1885 Self::on_step_response::<StepBackCommand>(thread_id),
1886 cx,
1887 )
1888 .detach();
1889 }
1890
1891 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1892 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1893 && self.requests.contains_key(&ThreadsCommand.type_id())
1894 && self.threads.contains_key(&thread_id)
1895 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1896 // We could still be using an old thread id and have sent a new thread's request
1897 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1898 // But it very well could cause a minor bug in the future that is hard to track down
1899 {
1900 self.fetch(
1901 super::dap_command::StackTraceCommand {
1902 thread_id: thread_id.0,
1903 start_frame: None,
1904 levels: None,
1905 },
1906 move |this, stack_frames, cx| {
1907 let stack_frames = stack_frames.log_err()?;
1908
1909 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1910 thread.stack_frame_ids =
1911 stack_frames.iter().map(|frame| frame.id).collect();
1912 });
1913 debug_assert!(
1914 matches!(entry, indexmap::map::Entry::Occupied(_)),
1915 "Sent request for thread_id that doesn't exist"
1916 );
1917
1918 this.stack_frames.extend(
1919 stack_frames
1920 .iter()
1921 .cloned()
1922 .map(|frame| (frame.id, StackFrame::from(frame))),
1923 );
1924
1925 this.invalidate_command_type::<ScopesCommand>();
1926 this.invalidate_command_type::<VariablesCommand>();
1927
1928 cx.emit(SessionEvent::StackTrace);
1929 cx.notify();
1930 Some(stack_frames)
1931 },
1932 cx,
1933 );
1934 }
1935
1936 self.threads
1937 .get(&thread_id)
1938 .map(|thread| {
1939 thread
1940 .stack_frame_ids
1941 .iter()
1942 .filter_map(|id| self.stack_frames.get(id))
1943 .cloned()
1944 .collect()
1945 })
1946 .unwrap_or_default()
1947 }
1948
1949 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1950 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1951 && self
1952 .requests
1953 .contains_key(&TypeId::of::<StackTraceCommand>())
1954 {
1955 self.fetch(
1956 ScopesCommand { stack_frame_id },
1957 move |this, scopes, cx| {
1958 let scopes = scopes.log_err()?;
1959
1960 for scope in scopes .iter(){
1961 this.variables(scope.variables_reference, cx);
1962 }
1963
1964 let entry = this
1965 .stack_frames
1966 .entry(stack_frame_id)
1967 .and_modify(|stack_frame| {
1968 stack_frame.scopes = scopes.clone();
1969 });
1970
1971 cx.emit(SessionEvent::Variables);
1972
1973 debug_assert!(
1974 matches!(entry, indexmap::map::Entry::Occupied(_)),
1975 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1976 );
1977
1978 Some(scopes)
1979 },
1980 cx,
1981 );
1982 }
1983
1984 self.stack_frames
1985 .get(&stack_frame_id)
1986 .map(|frame| frame.scopes.as_slice())
1987 .unwrap_or_default()
1988 }
1989
1990 pub fn variables(
1991 &mut self,
1992 variables_reference: VariableReference,
1993 cx: &mut Context<Self>,
1994 ) -> Vec<dap::Variable> {
1995 let command = VariablesCommand {
1996 variables_reference,
1997 filter: None,
1998 start: None,
1999 count: None,
2000 format: None,
2001 };
2002
2003 self.fetch(
2004 command,
2005 move |this, variables, cx| {
2006 let variables = variables.log_err()?;
2007 this.variables
2008 .insert(variables_reference, variables.clone());
2009
2010 cx.emit(SessionEvent::Variables);
2011 Some(variables)
2012 },
2013 cx,
2014 );
2015
2016 self.variables
2017 .get(&variables_reference)
2018 .cloned()
2019 .unwrap_or_default()
2020 }
2021
2022 pub fn set_variable_value(
2023 &mut self,
2024 variables_reference: u64,
2025 name: String,
2026 value: String,
2027 cx: &mut Context<Self>,
2028 ) {
2029 if self.capabilities.supports_set_variable.unwrap_or_default() {
2030 self.request(
2031 SetVariableValueCommand {
2032 name,
2033 value,
2034 variables_reference,
2035 },
2036 move |this, response, cx| {
2037 let response = response.log_err()?;
2038 this.invalidate_command_type::<VariablesCommand>();
2039 cx.notify();
2040 Some(response)
2041 },
2042 cx,
2043 )
2044 .detach()
2045 }
2046 }
2047
2048 pub fn evaluate(
2049 &mut self,
2050 expression: String,
2051 context: Option<EvaluateArgumentsContext>,
2052 frame_id: Option<u64>,
2053 source: Option<Source>,
2054 cx: &mut Context<Self>,
2055 ) {
2056 self.request(
2057 EvaluateCommand {
2058 expression,
2059 context,
2060 frame_id,
2061 source,
2062 },
2063 |this, response, cx| {
2064 let response = response.log_err()?;
2065 this.output_token.0 += 1;
2066 this.output.push_back(dap::OutputEvent {
2067 category: None,
2068 output: response.result.clone(),
2069 group: None,
2070 variables_reference: Some(response.variables_reference),
2071 source: None,
2072 line: None,
2073 column: None,
2074 data: None,
2075 location_reference: None,
2076 });
2077
2078 this.invalidate_command_type::<ScopesCommand>();
2079 cx.notify();
2080 Some(response)
2081 },
2082 cx,
2083 )
2084 .detach();
2085 }
2086
2087 pub fn location(
2088 &mut self,
2089 reference: u64,
2090 cx: &mut Context<Self>,
2091 ) -> Option<dap::LocationsResponse> {
2092 self.fetch(
2093 LocationsCommand { reference },
2094 move |this, response, _| {
2095 let response = response.log_err()?;
2096 this.locations.insert(reference, response.clone());
2097 Some(response)
2098 },
2099 cx,
2100 );
2101 self.locations.get(&reference).cloned()
2102 }
2103 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2104 let command = DisconnectCommand {
2105 restart: Some(false),
2106 terminate_debuggee: Some(true),
2107 suspend_debuggee: Some(false),
2108 };
2109
2110 self.request(command, Self::empty_response, cx).detach()
2111 }
2112
2113 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2114 if self
2115 .capabilities
2116 .supports_terminate_threads_request
2117 .unwrap_or_default()
2118 {
2119 self.request(
2120 TerminateThreadsCommand {
2121 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2122 },
2123 Self::clear_active_debug_line_response,
2124 cx,
2125 )
2126 .detach();
2127 } else {
2128 self.shutdown(cx).detach();
2129 }
2130 }
2131}
2132
2133fn create_local_session(
2134 breakpoint_store: Entity<BreakpointStore>,
2135 session_id: SessionId,
2136 parent_session: Option<Entity<Session>>,
2137 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
2138 initialized_tx: oneshot::Sender<()>,
2139 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
2140 mode: LocalMode,
2141 cx: &mut Context<Session>,
2142) -> Session {
2143 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
2144 let mut initialized_tx = Some(initialized_tx);
2145 while let Some(message) = message_rx.next().await {
2146 if let Message::Event(event) = message {
2147 if let Events::Initialized(_) = *event {
2148 if let Some(tx) = initialized_tx.take() {
2149 tx.send(()).ok();
2150 }
2151 } else {
2152 let Ok(_) = this.update(cx, |session, cx| {
2153 session.handle_dap_event(event, cx);
2154 }) else {
2155 break;
2156 };
2157 }
2158 } else {
2159 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
2160 else {
2161 break;
2162 };
2163 }
2164 }
2165 })];
2166
2167 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2168 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2169 if let Some(local) = (!this.ignore_breakpoints)
2170 .then(|| this.as_local_mut())
2171 .flatten()
2172 {
2173 local
2174 .send_breakpoints_from_path(path.clone(), *reason, cx)
2175 .detach();
2176 };
2177 }
2178 BreakpointStoreEvent::BreakpointsCleared(paths) => {
2179 if let Some(local) = (!this.ignore_breakpoints)
2180 .then(|| this.as_local_mut())
2181 .flatten()
2182 {
2183 local.unset_breakpoints_from_paths(paths, cx).detach();
2184 }
2185 }
2186 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2187 })
2188 .detach();
2189
2190 Session {
2191 mode: Mode::Local(mode),
2192 id: session_id,
2193 child_session_ids: HashSet::default(),
2194 parent_id: parent_session.map(|session| session.read(cx).id),
2195 variables: Default::default(),
2196 capabilities: Capabilities::default(),
2197 thread_states: ThreadStates::default(),
2198 output_token: OutputToken(0),
2199 ignore_breakpoints: false,
2200 output: circular_buffer::CircularBuffer::boxed(),
2201 requests: HashMap::default(),
2202 modules: Vec::default(),
2203 loaded_sources: Vec::default(),
2204 threads: IndexMap::default(),
2205 stack_frames: IndexMap::default(),
2206 locations: Default::default(),
2207 exception_breakpoints: Default::default(),
2208 _background_tasks,
2209 is_session_terminated: false,
2210 }
2211}