1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{
4 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
5};
6use super::dap_command::{
7 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
8 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
9 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
10 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
11 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
12 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
13};
14use super::dap_store::DapAdapterDelegate;
15use anyhow::{Context as _, Result, anyhow};
16use collections::{HashMap, HashSet, IndexMap, IndexSet};
17use dap::adapters::{DebugAdapter, DebugAdapterBinary};
18use dap::messages::Response;
19use dap::{
20 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
21 SteppingGranularity, StoppedEvent, VariableReference,
22 adapters::{DapDelegate, DapStatus},
23 client::{DebugAdapterClient, SessionId},
24 messages::{Events, Message},
25};
26use dap::{
27 DapRegistry, DebugRequestType, ExceptionBreakpointsFilter, ExceptionFilterOptions,
28 OutputEventCategory,
29};
30use futures::channel::oneshot;
31use futures::{FutureExt, future::Shared};
32use gpui::{
33 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
34};
35use rpc::AnyProtoClient;
36use serde_json::{Value, json};
37use settings::Settings;
38use smol::stream::StreamExt;
39use std::any::TypeId;
40use std::collections::BTreeMap;
41use std::path::PathBuf;
42use std::u64;
43use std::{
44 any::Any,
45 collections::hash_map::Entry,
46 hash::{Hash, Hasher},
47 path::Path,
48 sync::Arc,
49};
50use task::{DebugAdapterConfig, DebugTaskDefinition};
51use text::{PointUtf16, ToPointUtf16};
52use util::{ResultExt, merge_json_value_into};
53
54#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
55#[repr(transparent)]
56pub struct ThreadId(pub u64);
57
58impl ThreadId {
59 pub const MIN: ThreadId = ThreadId(u64::MIN);
60 pub const MAX: ThreadId = ThreadId(u64::MAX);
61}
62
63impl From<u64> for ThreadId {
64 fn from(id: u64) -> Self {
65 Self(id)
66 }
67}
68
69#[derive(Clone, Debug)]
70pub struct StackFrame {
71 pub dap: dap::StackFrame,
72 pub scopes: Vec<dap::Scope>,
73}
74
75impl From<dap::StackFrame> for StackFrame {
76 fn from(stack_frame: dap::StackFrame) -> Self {
77 Self {
78 scopes: vec![],
79 dap: stack_frame,
80 }
81 }
82}
83
84#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
85pub enum ThreadStatus {
86 #[default]
87 Running,
88 Stopped,
89 Stepping,
90 Exited,
91 Ended,
92}
93
94impl ThreadStatus {
95 pub fn label(&self) -> &'static str {
96 match self {
97 ThreadStatus::Running => "Running",
98 ThreadStatus::Stopped => "Stopped",
99 ThreadStatus::Stepping => "Stepping",
100 ThreadStatus::Exited => "Exited",
101 ThreadStatus::Ended => "Ended",
102 }
103 }
104}
105
106#[derive(Debug)]
107pub struct Thread {
108 dap: dap::Thread,
109 stack_frame_ids: IndexSet<StackFrameId>,
110 _has_stopped: bool,
111}
112
113impl From<dap::Thread> for Thread {
114 fn from(dap: dap::Thread) -> Self {
115 Self {
116 dap,
117 stack_frame_ids: Default::default(),
118 _has_stopped: false,
119 }
120 }
121}
122
123type UpstreamProjectId = u64;
124
125struct RemoteConnection {
126 _client: AnyProtoClient,
127 _upstream_project_id: UpstreamProjectId,
128}
129
130impl RemoteConnection {
131 fn send_proto_client_request<R: DapCommand>(
132 &self,
133 _request: R,
134 _session_id: SessionId,
135 cx: &mut App,
136 ) -> Task<Result<R::Response>> {
137 // let message = request.to_proto(session_id, self.upstream_project_id);
138 // let upstream_client = self.client.clone();
139 cx.background_executor().spawn(async move {
140 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
141 // let response = upstream_client.request(message).await?;
142 // request.response_from_proto(response)
143 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
144 })
145 }
146
147 fn request<R: DapCommand>(
148 &self,
149 request: R,
150 session_id: SessionId,
151 cx: &mut App,
152 ) -> Task<Result<R::Response>>
153 where
154 <R::DapRequest as dap::requests::Request>::Response: 'static,
155 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
156 {
157 return self.send_proto_client_request::<R>(request, session_id, cx);
158 }
159}
160
161enum Mode {
162 Local(LocalMode),
163 Remote(RemoteConnection),
164}
165
166#[derive(Clone)]
167pub struct LocalMode {
168 client: Arc<DebugAdapterClient>,
169 config: DebugAdapterConfig,
170 adapter: Arc<dyn DebugAdapter>,
171 breakpoint_store: Entity<BreakpointStore>,
172 tmp_breakpoint: Option<SourceBreakpoint>,
173}
174
175fn client_source(abs_path: &Path) -> dap::Source {
176 dap::Source {
177 name: abs_path
178 .file_name()
179 .map(|filename| filename.to_string_lossy().to_string()),
180 path: Some(abs_path.to_string_lossy().to_string()),
181 source_reference: None,
182 presentation_hint: None,
183 origin: None,
184 sources: None,
185 adapter_data: None,
186 checksums: None,
187 }
188}
189
190impl LocalMode {
191 fn new(
192 debug_adapters: Arc<DapRegistry>,
193 session_id: SessionId,
194 parent_session: Option<Entity<Session>>,
195 breakpoint_store: Entity<BreakpointStore>,
196 config: DebugAdapterConfig,
197 delegate: DapAdapterDelegate,
198 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
199 cx: AsyncApp,
200 ) -> Task<Result<Self>> {
201 Self::new_inner(
202 debug_adapters,
203 session_id,
204 parent_session,
205 breakpoint_store,
206 config,
207 delegate,
208 messages_tx,
209 async |_, _| {},
210 cx,
211 )
212 }
213 #[cfg(any(test, feature = "test-support"))]
214 fn new_fake(
215 session_id: SessionId,
216 parent_session: Option<Entity<Session>>,
217 breakpoint_store: Entity<BreakpointStore>,
218 config: DebugAdapterConfig,
219 delegate: DapAdapterDelegate,
220 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
221 caps: Capabilities,
222 fail: bool,
223 cx: AsyncApp,
224 ) -> Task<Result<Self>> {
225 use task::DebugRequestDisposition;
226
227 let request = match config.request.clone() {
228 DebugRequestDisposition::UserConfigured(request) => request,
229 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
230 match reverse_request_args.request {
231 dap::StartDebuggingRequestArgumentsRequest::Launch => {
232 DebugRequestType::Launch(task::LaunchConfig {
233 program: "".to_owned(),
234 cwd: None,
235 args: Default::default(),
236 })
237 }
238 dap::StartDebuggingRequestArgumentsRequest::Attach => {
239 DebugRequestType::Attach(task::AttachConfig {
240 process_id: Some(0),
241 })
242 }
243 }
244 }
245 };
246
247 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
248 session
249 .client
250 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
251 .await;
252
253 let paths = cx
254 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
255 .expect("Breakpoint store should exist in all tests that start debuggers");
256
257 session
258 .client
259 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
260 let p = Arc::from(Path::new(&args.source.path.unwrap()));
261 if !paths.contains(&p) {
262 panic!("Sent breakpoints for path without any")
263 }
264
265 Ok(dap::SetBreakpointsResponse {
266 breakpoints: Vec::default(),
267 })
268 })
269 .await;
270
271 match request {
272 dap::DebugRequestType::Launch(_) => {
273 if fail {
274 session
275 .client
276 .on_request::<dap::requests::Launch, _>(move |_, _| {
277 Err(dap::ErrorResponse {
278 error: Some(dap::Message {
279 id: 1,
280 format: "error".into(),
281 variables: None,
282 send_telemetry: None,
283 show_user: None,
284 url: None,
285 url_label: None,
286 }),
287 })
288 })
289 .await;
290 } else {
291 session
292 .client
293 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
294 .await;
295 }
296 }
297 dap::DebugRequestType::Attach(attach_config) => {
298 if fail {
299 session
300 .client
301 .on_request::<dap::requests::Attach, _>(move |_, _| {
302 Err(dap::ErrorResponse {
303 error: Some(dap::Message {
304 id: 1,
305 format: "error".into(),
306 variables: None,
307 send_telemetry: None,
308 show_user: None,
309 url: None,
310 url_label: None,
311 }),
312 })
313 })
314 .await;
315 } else {
316 session
317 .client
318 .on_request::<dap::requests::Attach, _>(move |_, args| {
319 assert_eq!(
320 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
321 args.raw
322 );
323
324 Ok(())
325 })
326 .await;
327 }
328 }
329 }
330
331 session
332 .client
333 .on_request::<dap::requests::SetExceptionBreakpoints, _>(move |_, _| {
334 Ok(dap::SetExceptionBreakpointsResponse { breakpoints: None })
335 })
336 .await;
337
338 session
339 .client
340 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
341 .await;
342 session.client.fake_event(Events::Initialized(None)).await;
343 };
344 Self::new_inner(
345 DapRegistry::fake().into(),
346 session_id,
347 parent_session,
348 breakpoint_store,
349 config,
350 delegate,
351 messages_tx,
352 callback,
353 cx,
354 )
355 }
356 fn new_inner(
357 registry: Arc<DapRegistry>,
358 session_id: SessionId,
359 parent_session: Option<Entity<Session>>,
360 breakpoint_store: Entity<BreakpointStore>,
361 config: DebugAdapterConfig,
362 delegate: DapAdapterDelegate,
363 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
364 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
365 cx: AsyncApp,
366 ) -> Task<Result<Self>> {
367 cx.spawn(async move |cx| {
368 let (adapter, binary) =
369 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
370
371 let message_handler = Box::new(move |message| {
372 messages_tx.unbounded_send(message).ok();
373 });
374
375 let client = Arc::new(
376 if let Some(client) = parent_session
377 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
378 .flatten()
379 {
380 client
381 .reconnect(session_id, binary, message_handler, cx.clone())
382 .await?
383 } else {
384 DebugAdapterClient::start(
385 session_id,
386 adapter.name(),
387 binary,
388 message_handler,
389 cx.clone(),
390 )
391 .await
392 .with_context(|| "Failed to start communication with debug adapter")?
393 },
394 );
395
396 let mut session = Self {
397 client,
398 adapter,
399 breakpoint_store,
400 tmp_breakpoint: None,
401 config: config.clone(),
402 };
403
404 on_initialized(&mut session, cx.clone()).await;
405
406 Ok(session)
407 })
408 }
409
410 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
411 let tasks: Vec<_> = paths
412 .into_iter()
413 .map(|path| {
414 self.request(
415 dap_command::SetBreakpoints {
416 source: client_source(path),
417 source_modified: None,
418 breakpoints: vec![],
419 },
420 cx.background_executor().clone(),
421 )
422 })
423 .collect();
424
425 cx.background_spawn(async move {
426 futures::future::join_all(tasks)
427 .await
428 .iter()
429 .for_each(|res| match res {
430 Ok(_) => {}
431 Err(err) => {
432 log::warn!("Set breakpoints request failed: {}", err);
433 }
434 });
435 })
436 }
437
438 fn send_breakpoints_from_path(
439 &self,
440 abs_path: Arc<Path>,
441 reason: BreakpointUpdatedReason,
442 cx: &mut App,
443 ) -> Task<()> {
444 let breakpoints = self
445 .breakpoint_store
446 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
447 .into_iter()
448 .filter(|bp| bp.state.is_enabled())
449 .chain(self.tmp_breakpoint.clone())
450 .map(Into::into)
451 .collect();
452
453 let task = self.request(
454 dap_command::SetBreakpoints {
455 source: client_source(&abs_path),
456 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
457 breakpoints,
458 },
459 cx.background_executor().clone(),
460 );
461
462 cx.background_spawn(async move {
463 match task.await {
464 Ok(_) => {}
465 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
466 }
467 })
468 }
469
470 fn send_exception_breakpoints(
471 &self,
472 filters: Vec<ExceptionBreakpointsFilter>,
473 supports_filter_options: bool,
474 cx: &App,
475 ) -> Task<Result<Vec<dap::Breakpoint>>> {
476 let arg = if supports_filter_options {
477 SetExceptionBreakpoints::WithOptions {
478 filters: filters
479 .into_iter()
480 .map(|filter| ExceptionFilterOptions {
481 filter_id: filter.filter,
482 condition: None,
483 mode: None,
484 })
485 .collect(),
486 }
487 } else {
488 SetExceptionBreakpoints::Plain {
489 filters: filters.into_iter().map(|filter| filter.filter).collect(),
490 }
491 };
492 self.request(arg, cx.background_executor().clone())
493 }
494 fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
495 let mut breakpoint_tasks = Vec::new();
496 let breakpoints = self
497 .breakpoint_store
498 .read_with(cx, |store, cx| store.all_breakpoints(cx));
499
500 for (path, breakpoints) in breakpoints {
501 let breakpoints = if ignore_breakpoints {
502 vec![]
503 } else {
504 breakpoints
505 .into_iter()
506 .filter(|bp| bp.state.is_enabled())
507 .map(Into::into)
508 .collect()
509 };
510
511 breakpoint_tasks.push(self.request(
512 dap_command::SetBreakpoints {
513 source: client_source(&path),
514 source_modified: Some(false),
515 breakpoints,
516 },
517 cx.background_executor().clone(),
518 ));
519 }
520
521 cx.background_spawn(async move {
522 futures::future::join_all(breakpoint_tasks)
523 .await
524 .iter()
525 .for_each(|res| match res {
526 Ok(_) => {}
527 Err(err) => {
528 log::warn!("Set breakpoints request failed: {}", err);
529 }
530 });
531 })
532 }
533
534 async fn get_adapter_binary(
535 registry: &Arc<DapRegistry>,
536 config: &DebugAdapterConfig,
537 delegate: &DapAdapterDelegate,
538 cx: &mut AsyncApp,
539 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
540 let adapter = registry
541 .adapter(&config.adapter)
542 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
543
544 let binary = cx.update(|cx| {
545 ProjectSettings::get_global(cx)
546 .dap
547 .get(&adapter.name())
548 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
549 })?;
550
551 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
552 Err(error) => {
553 delegate.update_status(
554 adapter.name(),
555 DapStatus::Failed {
556 error: error.to_string(),
557 },
558 );
559
560 return Err(error);
561 }
562 Ok(mut binary) => {
563 delegate.update_status(adapter.name(), DapStatus::None);
564
565 let shell_env = delegate.shell_env().await;
566 let mut envs = binary.envs.unwrap_or_default();
567 envs.extend(shell_env);
568 binary.envs = Some(envs);
569
570 binary
571 }
572 };
573
574 Ok((adapter, binary))
575 }
576
577 pub fn label(&self) -> String {
578 self.config.label.clone()
579 }
580
581 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
582 let adapter_id = self.adapter.name().to_string();
583
584 self.request(Initialize { adapter_id }, cx.background_executor().clone())
585 }
586
587 fn initialize_sequence(
588 &self,
589 capabilities: &Capabilities,
590 initialized_rx: oneshot::Receiver<()>,
591 cx: &App,
592 ) -> Task<Result<()>> {
593 let (mut raw, is_launch) = match &self.config.request {
594 task::DebugRequestDisposition::UserConfigured(_) => {
595 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
596 debug_assert!(false, "This part of code should be unreachable in practice");
597 return Task::ready(Err(anyhow!(
598 "Expected debug config conversion to succeed"
599 )));
600 };
601 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
602 let raw = self.adapter.request_args(&raw);
603 (raw, is_launch)
604 }
605 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
606 start_debugging_request_arguments.configuration.clone(),
607 matches!(
608 start_debugging_request_arguments.request,
609 dap::StartDebuggingRequestArgumentsRequest::Launch
610 ),
611 ),
612 };
613
614 merge_json_value_into(
615 self.config.initialize_args.clone().unwrap_or(json!({})),
616 &mut raw,
617 );
618 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
619 let launch = if is_launch {
620 self.request(Launch { raw }, cx.background_executor().clone())
621 } else {
622 self.request(Attach { raw }, cx.background_executor().clone())
623 };
624
625 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
626 let exception_filters = capabilities
627 .exception_breakpoint_filters
628 .as_ref()
629 .map(|exception_filters| {
630 exception_filters
631 .iter()
632 .filter(|filter| filter.default == Some(true))
633 .cloned()
634 .collect::<Vec<_>>()
635 })
636 .unwrap_or_default();
637 let supports_exception_filters = capabilities
638 .supports_exception_filter_options
639 .unwrap_or_default();
640 let configuration_sequence = cx.spawn({
641 let this = self.clone();
642 async move |cx| {
643 initialized_rx.await?;
644 // todo(debugger) figure out if we want to handle a breakpoint response error
645 // This will probably consist of letting a user know that breakpoints failed to be set
646 cx.update(|cx| this.send_source_breakpoints(false, cx))?
647 .await;
648 cx.update(|cx| {
649 this.send_exception_breakpoints(
650 exception_filters,
651 supports_exception_filters,
652 cx,
653 )
654 })?
655 .await
656 .ok();
657 if configuration_done_supported {
658 this.request(ConfigurationDone {}, cx.background_executor().clone())
659 } else {
660 Task::ready(Ok(()))
661 }
662 .await
663 }
664 });
665
666 cx.background_spawn(async move {
667 futures::future::try_join(launch, configuration_sequence).await?;
668 Ok(())
669 })
670 }
671
672 fn request<R: LocalDapCommand>(
673 &self,
674 request: R,
675 executor: BackgroundExecutor,
676 ) -> Task<Result<R::Response>>
677 where
678 <R::DapRequest as dap::requests::Request>::Response: 'static,
679 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
680 {
681 let request = Arc::new(request);
682
683 let request_clone = request.clone();
684 let connection = self.client.clone();
685 let request_task = executor.spawn(async move {
686 let args = request_clone.to_dap();
687 connection.request::<R::DapRequest>(args).await
688 });
689
690 executor.spawn(async move {
691 let response = request.response_from_dap(request_task.await?);
692 response
693 })
694 }
695}
696impl From<RemoteConnection> for Mode {
697 fn from(value: RemoteConnection) -> Self {
698 Self::Remote(value)
699 }
700}
701
702impl Mode {
703 fn request_dap<R: DapCommand>(
704 &self,
705 session_id: SessionId,
706 request: R,
707 cx: &mut Context<Session>,
708 ) -> Task<Result<R::Response>>
709 where
710 <R::DapRequest as dap::requests::Request>::Response: 'static,
711 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
712 {
713 match self {
714 Mode::Local(debug_adapter_client) => {
715 debug_adapter_client.request(request, cx.background_executor().clone())
716 }
717 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
718 }
719 }
720}
721
722#[derive(Default)]
723struct ThreadStates {
724 global_state: Option<ThreadStatus>,
725 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
726}
727
728impl ThreadStates {
729 fn stop_all_threads(&mut self) {
730 self.global_state = Some(ThreadStatus::Stopped);
731 self.known_thread_states.clear();
732 }
733
734 fn exit_all_threads(&mut self) {
735 self.global_state = Some(ThreadStatus::Exited);
736 self.known_thread_states.clear();
737 }
738
739 fn continue_all_threads(&mut self) {
740 self.global_state = Some(ThreadStatus::Running);
741 self.known_thread_states.clear();
742 }
743
744 fn stop_thread(&mut self, thread_id: ThreadId) {
745 self.known_thread_states
746 .insert(thread_id, ThreadStatus::Stopped);
747 }
748
749 fn continue_thread(&mut self, thread_id: ThreadId) {
750 self.known_thread_states
751 .insert(thread_id, ThreadStatus::Running);
752 }
753
754 fn process_step(&mut self, thread_id: ThreadId) {
755 self.known_thread_states
756 .insert(thread_id, ThreadStatus::Stepping);
757 }
758
759 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
760 self.thread_state(thread_id)
761 .unwrap_or(ThreadStatus::Running)
762 }
763
764 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
765 self.known_thread_states
766 .get(&thread_id)
767 .copied()
768 .or(self.global_state)
769 }
770
771 fn exit_thread(&mut self, thread_id: ThreadId) {
772 self.known_thread_states
773 .insert(thread_id, ThreadStatus::Exited);
774 }
775
776 fn any_stopped_thread(&self) -> bool {
777 self.global_state
778 .is_some_and(|state| state == ThreadStatus::Stopped)
779 || self
780 .known_thread_states
781 .values()
782 .any(|status| *status == ThreadStatus::Stopped)
783 }
784}
785const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
786
787type IsEnabled = bool;
788
789#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
790pub struct OutputToken(pub usize);
791/// Represents a current state of a single debug adapter and provides ways to mutate it.
792pub struct Session {
793 mode: Mode,
794 pub(super) capabilities: Capabilities,
795 id: SessionId,
796 child_session_ids: HashSet<SessionId>,
797 parent_id: Option<SessionId>,
798 ignore_breakpoints: bool,
799 modules: Vec<dap::Module>,
800 loaded_sources: Vec<dap::Source>,
801 output_token: OutputToken,
802 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
803 threads: IndexMap<ThreadId, Thread>,
804 thread_states: ThreadStates,
805 variables: HashMap<VariableReference, Vec<dap::Variable>>,
806 stack_frames: IndexMap<StackFrameId, StackFrame>,
807 locations: HashMap<u64, dap::LocationsResponse>,
808 is_session_terminated: bool,
809 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
810 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
811 _background_tasks: Vec<Task<()>>,
812}
813
814trait CacheableCommand: Any + Send + Sync {
815 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
816 fn dyn_hash(&self, hasher: &mut dyn Hasher);
817 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
818}
819
820impl<T> CacheableCommand for T
821where
822 T: DapCommand + PartialEq + Eq + Hash,
823{
824 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
825 (rhs as &dyn Any)
826 .downcast_ref::<Self>()
827 .map_or(false, |rhs| self == rhs)
828 }
829
830 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
831 T::hash(self, &mut hasher);
832 }
833
834 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
835 self
836 }
837}
838
839pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
840
841impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
842 fn from(request: T) -> Self {
843 Self(Arc::new(request))
844 }
845}
846
847impl PartialEq for RequestSlot {
848 fn eq(&self, other: &Self) -> bool {
849 self.0.dyn_eq(other.0.as_ref())
850 }
851}
852
853impl Eq for RequestSlot {}
854
855impl Hash for RequestSlot {
856 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
857 self.0.dyn_hash(state);
858 (&*self.0 as &dyn Any).type_id().hash(state)
859 }
860}
861
862#[derive(Debug, Clone, Hash, PartialEq, Eq)]
863pub struct CompletionsQuery {
864 pub query: String,
865 pub column: u64,
866 pub line: Option<u64>,
867 pub frame_id: Option<u64>,
868}
869
870impl CompletionsQuery {
871 pub fn new(
872 buffer: &language::Buffer,
873 cursor_position: language::Anchor,
874 frame_id: Option<u64>,
875 ) -> Self {
876 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
877 Self {
878 query: buffer.text(),
879 column: column as u64,
880 frame_id,
881 line: Some(row as u64),
882 }
883 }
884}
885
886pub enum SessionEvent {
887 Modules,
888 LoadedSources,
889 Stopped(Option<ThreadId>),
890 StackTrace,
891 Variables,
892 Threads,
893}
894
895pub(crate) enum SessionStateEvent {
896 Shutdown,
897}
898
899impl EventEmitter<SessionEvent> for Session {}
900impl EventEmitter<SessionStateEvent> for Session {}
901
902// local session will send breakpoint updates to DAP for all new breakpoints
903// remote side will only send breakpoint updates when it is a breakpoint created by that peer
904// BreakpointStore notifies session on breakpoint changes
905impl Session {
906 pub(crate) fn local(
907 breakpoint_store: Entity<BreakpointStore>,
908 session_id: SessionId,
909 parent_session: Option<Entity<Session>>,
910 delegate: DapAdapterDelegate,
911 config: DebugAdapterConfig,
912 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
913 initialized_tx: oneshot::Sender<()>,
914 debug_adapters: Arc<DapRegistry>,
915 cx: &mut App,
916 ) -> Task<Result<Entity<Self>>> {
917 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
918
919 cx.spawn(async move |cx| {
920 let mode = LocalMode::new(
921 debug_adapters,
922 session_id,
923 parent_session.clone(),
924 breakpoint_store.clone(),
925 config.clone(),
926 delegate,
927 message_tx,
928 cx.clone(),
929 )
930 .await?;
931
932 cx.new(|cx| {
933 create_local_session(
934 breakpoint_store,
935 session_id,
936 parent_session,
937 start_debugging_requests_tx,
938 initialized_tx,
939 message_rx,
940 mode,
941 cx,
942 )
943 })
944 })
945 }
946
947 #[cfg(any(test, feature = "test-support"))]
948 pub(crate) fn fake(
949 breakpoint_store: Entity<BreakpointStore>,
950 session_id: SessionId,
951 parent_session: Option<Entity<Session>>,
952 delegate: DapAdapterDelegate,
953 config: DebugAdapterConfig,
954 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
955 initialized_tx: oneshot::Sender<()>,
956 caps: Capabilities,
957 fails: bool,
958 cx: &mut App,
959 ) -> Task<Result<Entity<Session>>> {
960 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
961
962 cx.spawn(async move |cx| {
963 let mode = LocalMode::new_fake(
964 session_id,
965 parent_session.clone(),
966 breakpoint_store.clone(),
967 config.clone(),
968 delegate,
969 message_tx,
970 caps,
971 fails,
972 cx.clone(),
973 )
974 .await?;
975
976 cx.new(|cx| {
977 create_local_session(
978 breakpoint_store,
979 session_id,
980 parent_session,
981 start_debugging_requests_tx,
982 initialized_tx,
983 message_rx,
984 mode,
985 cx,
986 )
987 })
988 })
989 }
990
991 pub(crate) fn remote(
992 session_id: SessionId,
993 client: AnyProtoClient,
994 upstream_project_id: u64,
995 ignore_breakpoints: bool,
996 ) -> Self {
997 Self {
998 mode: Mode::Remote(RemoteConnection {
999 _client: client,
1000 _upstream_project_id: upstream_project_id,
1001 }),
1002 id: session_id,
1003 child_session_ids: HashSet::default(),
1004 parent_id: None,
1005 capabilities: Capabilities::default(),
1006 ignore_breakpoints,
1007 variables: Default::default(),
1008 stack_frames: Default::default(),
1009 thread_states: ThreadStates::default(),
1010 output_token: OutputToken(0),
1011 output: circular_buffer::CircularBuffer::boxed(),
1012 requests: HashMap::default(),
1013 modules: Vec::default(),
1014 loaded_sources: Vec::default(),
1015 threads: IndexMap::default(),
1016 _background_tasks: Vec::default(),
1017 locations: Default::default(),
1018 is_session_terminated: false,
1019 exception_breakpoints: Default::default(),
1020 }
1021 }
1022
1023 pub fn session_id(&self) -> SessionId {
1024 self.id
1025 }
1026
1027 pub fn child_session_ids(&self) -> HashSet<SessionId> {
1028 self.child_session_ids.clone()
1029 }
1030
1031 pub fn add_child_session_id(&mut self, session_id: SessionId) {
1032 self.child_session_ids.insert(session_id);
1033 }
1034
1035 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1036 self.child_session_ids.remove(&session_id);
1037 }
1038
1039 pub fn parent_id(&self) -> Option<SessionId> {
1040 self.parent_id
1041 }
1042
1043 pub fn capabilities(&self) -> &Capabilities {
1044 &self.capabilities
1045 }
1046
1047 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
1048 if let Mode::Local(local_mode) = &self.mode {
1049 Some(local_mode.config.clone())
1050 } else {
1051 None
1052 }
1053 }
1054
1055 pub fn is_terminated(&self) -> bool {
1056 self.is_session_terminated
1057 }
1058
1059 pub fn is_local(&self) -> bool {
1060 matches!(self.mode, Mode::Local(_))
1061 }
1062
1063 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1064 match &mut self.mode {
1065 Mode::Local(local_mode) => Some(local_mode),
1066 Mode::Remote(_) => None,
1067 }
1068 }
1069
1070 pub fn as_local(&self) -> Option<&LocalMode> {
1071 match &self.mode {
1072 Mode::Local(local_mode) => Some(local_mode),
1073 Mode::Remote(_) => None,
1074 }
1075 }
1076
1077 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1078 match &self.mode {
1079 Mode::Local(local_mode) => {
1080 let capabilities = local_mode.clone().request_initialization(cx);
1081
1082 cx.spawn(async move |this, cx| {
1083 let capabilities = capabilities.await?;
1084 this.update(cx, |session, _| {
1085 session.capabilities = capabilities;
1086 let filters = session
1087 .capabilities
1088 .exception_breakpoint_filters
1089 .clone()
1090 .unwrap_or_default();
1091 for filter in filters {
1092 let default = filter.default.unwrap_or_default();
1093 session
1094 .exception_breakpoints
1095 .entry(filter.filter.clone())
1096 .or_insert_with(|| (filter, default));
1097 }
1098 })?;
1099 Ok(())
1100 })
1101 }
1102 Mode::Remote(_) => Task::ready(Err(anyhow!(
1103 "Cannot send initialize request from remote session"
1104 ))),
1105 }
1106 }
1107
1108 pub(super) fn initialize_sequence(
1109 &mut self,
1110 initialize_rx: oneshot::Receiver<()>,
1111 cx: &mut Context<Self>,
1112 ) -> Task<Result<()>> {
1113 match &self.mode {
1114 Mode::Local(local_mode) => {
1115 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1116 }
1117 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1118 }
1119 }
1120
1121 pub fn run_to_position(
1122 &mut self,
1123 breakpoint: SourceBreakpoint,
1124 active_thread_id: ThreadId,
1125 cx: &mut Context<Self>,
1126 ) {
1127 match &mut self.mode {
1128 Mode::Local(local_mode) => {
1129 if !matches!(
1130 self.thread_states.thread_state(active_thread_id),
1131 Some(ThreadStatus::Stopped)
1132 ) {
1133 return;
1134 };
1135 let path = breakpoint.path.clone();
1136 local_mode.tmp_breakpoint = Some(breakpoint);
1137 let task = local_mode.send_breakpoints_from_path(
1138 path,
1139 BreakpointUpdatedReason::Toggled,
1140 cx,
1141 );
1142
1143 cx.spawn(async move |this, cx| {
1144 task.await;
1145 this.update(cx, |this, cx| {
1146 this.continue_thread(active_thread_id, cx);
1147 })
1148 })
1149 .detach();
1150 }
1151 Mode::Remote(_) => {}
1152 }
1153 }
1154
1155 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1156 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1157 }
1158
1159 pub fn output(
1160 &self,
1161 since: OutputToken,
1162 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1163 if self.output_token.0 == 0 {
1164 return (self.output.range(0..0), OutputToken(0));
1165 };
1166
1167 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1168
1169 let clamped_events_since = events_since.clamp(0, self.output.len());
1170 (
1171 self.output
1172 .range(self.output.len() - clamped_events_since..),
1173 self.output_token,
1174 )
1175 }
1176
1177 pub fn respond_to_client(
1178 &self,
1179 request_seq: u64,
1180 success: bool,
1181 command: String,
1182 body: Option<serde_json::Value>,
1183 cx: &mut Context<Self>,
1184 ) -> Task<Result<()>> {
1185 let Some(local_session) = self.as_local().cloned() else {
1186 unreachable!("Cannot respond to remote client");
1187 };
1188
1189 cx.background_spawn(async move {
1190 local_session
1191 .client
1192 .send_message(Message::Response(Response {
1193 body,
1194 success,
1195 command,
1196 seq: request_seq + 1,
1197 request_seq,
1198 message: None,
1199 }))
1200 .await
1201 })
1202 }
1203
1204 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1205 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1206 let breakpoint = local.tmp_breakpoint.take()?;
1207 let path = breakpoint.path.clone();
1208 Some((local, path))
1209 }) {
1210 local
1211 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1212 .detach();
1213 };
1214
1215 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1216 self.thread_states.stop_all_threads();
1217
1218 self.invalidate_command_type::<StackTraceCommand>();
1219 }
1220
1221 // Event if we stopped all threads we still need to insert the thread_id
1222 // to our own data
1223 if let Some(thread_id) = event.thread_id {
1224 self.thread_states.stop_thread(ThreadId(thread_id));
1225
1226 self.invalidate_state(
1227 &StackTraceCommand {
1228 thread_id,
1229 start_frame: None,
1230 levels: None,
1231 }
1232 .into(),
1233 );
1234 }
1235
1236 self.invalidate_generic();
1237 self.threads.clear();
1238 self.variables.clear();
1239 cx.emit(SessionEvent::Stopped(
1240 event
1241 .thread_id
1242 .map(Into::into)
1243 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1244 ));
1245 cx.notify();
1246 }
1247
1248 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1249 match *event {
1250 Events::Initialized(_) => {
1251 debug_assert!(
1252 false,
1253 "Initialized event should have been handled in LocalMode"
1254 );
1255 }
1256 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1257 Events::Continued(event) => {
1258 if event.all_threads_continued.unwrap_or_default() {
1259 self.thread_states.continue_all_threads();
1260 } else {
1261 self.thread_states
1262 .continue_thread(ThreadId(event.thread_id));
1263 }
1264 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1265 self.invalidate_generic();
1266 }
1267 Events::Exited(_event) => {
1268 self.clear_active_debug_line(cx);
1269 }
1270 Events::Terminated(_) => {
1271 self.is_session_terminated = true;
1272 self.clear_active_debug_line(cx);
1273 }
1274 Events::Thread(event) => {
1275 let thread_id = ThreadId(event.thread_id);
1276
1277 match event.reason {
1278 dap::ThreadEventReason::Started => {
1279 self.thread_states.continue_thread(thread_id);
1280 }
1281 dap::ThreadEventReason::Exited => {
1282 self.thread_states.exit_thread(thread_id);
1283 }
1284 reason => {
1285 log::error!("Unhandled thread event reason {:?}", reason);
1286 }
1287 }
1288 self.invalidate_state(&ThreadsCommand.into());
1289 cx.notify();
1290 }
1291 Events::Output(event) => {
1292 if event
1293 .category
1294 .as_ref()
1295 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1296 {
1297 return;
1298 }
1299
1300 self.output.push_back(event);
1301 self.output_token.0 += 1;
1302 cx.notify();
1303 }
1304 Events::Breakpoint(_) => {}
1305 Events::Module(event) => {
1306 match event.reason {
1307 dap::ModuleEventReason::New => {
1308 self.modules.push(event.module);
1309 }
1310 dap::ModuleEventReason::Changed => {
1311 if let Some(module) = self
1312 .modules
1313 .iter_mut()
1314 .find(|other| event.module.id == other.id)
1315 {
1316 *module = event.module;
1317 }
1318 }
1319 dap::ModuleEventReason::Removed => {
1320 self.modules.retain(|other| event.module.id != other.id);
1321 }
1322 }
1323
1324 // todo(debugger): We should only send the invalidate command to downstream clients.
1325 // self.invalidate_state(&ModulesCommand.into());
1326 }
1327 Events::LoadedSource(_) => {
1328 self.invalidate_state(&LoadedSourcesCommand.into());
1329 }
1330 Events::Capabilities(event) => {
1331 self.capabilities = self.capabilities.merge(event.capabilities);
1332 cx.notify();
1333 }
1334 Events::Memory(_) => {}
1335 Events::Process(_) => {}
1336 Events::ProgressEnd(_) => {}
1337 Events::ProgressStart(_) => {}
1338 Events::ProgressUpdate(_) => {}
1339 Events::Invalidated(_) => {}
1340 Events::Other(_) => {}
1341 }
1342 }
1343
1344 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1345 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1346 &mut self,
1347 request: T,
1348 process_result: impl FnOnce(
1349 &mut Self,
1350 Result<T::Response>,
1351 &mut Context<Self>,
1352 ) -> Option<T::Response>
1353 + 'static,
1354 cx: &mut Context<Self>,
1355 ) {
1356 const {
1357 assert!(
1358 T::CACHEABLE,
1359 "Only requests marked as cacheable should invoke `fetch`"
1360 );
1361 }
1362
1363 if !self.thread_states.any_stopped_thread()
1364 && request.type_id() != TypeId::of::<ThreadsCommand>()
1365 || self.is_session_terminated
1366 {
1367 return;
1368 }
1369
1370 let request_map = self
1371 .requests
1372 .entry(std::any::TypeId::of::<T>())
1373 .or_default();
1374
1375 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1376 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1377
1378 let task = Self::request_inner::<Arc<T>>(
1379 &self.capabilities,
1380 self.id,
1381 &self.mode,
1382 command,
1383 process_result,
1384 cx,
1385 );
1386 let task = cx
1387 .background_executor()
1388 .spawn(async move {
1389 let _ = task.await?;
1390 Some(())
1391 })
1392 .shared();
1393
1394 vacant.insert(task);
1395 cx.notify();
1396 }
1397 }
1398
1399 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1400 capabilities: &Capabilities,
1401 session_id: SessionId,
1402 mode: &Mode,
1403 request: T,
1404 process_result: impl FnOnce(
1405 &mut Self,
1406 Result<T::Response>,
1407 &mut Context<Self>,
1408 ) -> Option<T::Response>
1409 + 'static,
1410 cx: &mut Context<Self>,
1411 ) -> Task<Option<T::Response>> {
1412 if !T::is_supported(&capabilities) {
1413 log::warn!(
1414 "Attempted to send a DAP request that isn't supported: {:?}",
1415 request
1416 );
1417 let error = Err(anyhow::Error::msg(
1418 "Couldn't complete request because it's not supported",
1419 ));
1420 return cx.spawn(async move |this, cx| {
1421 this.update(cx, |this, cx| process_result(this, error, cx))
1422 .log_err()
1423 .flatten()
1424 });
1425 }
1426
1427 let request = mode.request_dap(session_id, request, cx);
1428 cx.spawn(async move |this, cx| {
1429 let result = request.await;
1430 this.update(cx, |this, cx| process_result(this, result, cx))
1431 .log_err()
1432 .flatten()
1433 })
1434 }
1435
1436 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1437 &self,
1438 request: T,
1439 process_result: impl FnOnce(
1440 &mut Self,
1441 Result<T::Response>,
1442 &mut Context<Self>,
1443 ) -> Option<T::Response>
1444 + 'static,
1445 cx: &mut Context<Self>,
1446 ) -> Task<Option<T::Response>> {
1447 Self::request_inner(
1448 &self.capabilities,
1449 self.id,
1450 &self.mode,
1451 request,
1452 process_result,
1453 cx,
1454 )
1455 }
1456
1457 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1458 self.requests.remove(&std::any::TypeId::of::<Command>());
1459 }
1460
1461 fn invalidate_generic(&mut self) {
1462 self.invalidate_command_type::<ModulesCommand>();
1463 self.invalidate_command_type::<LoadedSourcesCommand>();
1464 self.invalidate_command_type::<ThreadsCommand>();
1465 }
1466
1467 fn invalidate_state(&mut self, key: &RequestSlot) {
1468 self.requests
1469 .entry((&*key.0 as &dyn Any).type_id())
1470 .and_modify(|request_map| {
1471 request_map.remove(&key);
1472 });
1473 }
1474
1475 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1476 self.thread_states.thread_status(thread_id)
1477 }
1478
1479 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1480 self.fetch(
1481 dap_command::ThreadsCommand,
1482 |this, result, cx| {
1483 let result = result.log_err()?;
1484
1485 this.threads = result
1486 .iter()
1487 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1488 .collect();
1489
1490 this.invalidate_command_type::<StackTraceCommand>();
1491 cx.emit(SessionEvent::Threads);
1492 cx.notify();
1493
1494 Some(result)
1495 },
1496 cx,
1497 );
1498
1499 self.threads
1500 .values()
1501 .map(|thread| {
1502 (
1503 thread.dap.clone(),
1504 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1505 )
1506 })
1507 .collect()
1508 }
1509
1510 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1511 self.fetch(
1512 dap_command::ModulesCommand,
1513 |this, result, cx| {
1514 let result = result.log_err()?;
1515
1516 this.modules = result.iter().cloned().collect();
1517 cx.emit(SessionEvent::Modules);
1518 cx.notify();
1519
1520 Some(result)
1521 },
1522 cx,
1523 );
1524
1525 &self.modules
1526 }
1527
1528 pub fn ignore_breakpoints(&self) -> bool {
1529 self.ignore_breakpoints
1530 }
1531
1532 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1533 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1534 }
1535
1536 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1537 if self.ignore_breakpoints == ignore {
1538 return Task::ready(());
1539 }
1540
1541 self.ignore_breakpoints = ignore;
1542
1543 if let Some(local) = self.as_local() {
1544 local.send_source_breakpoints(ignore, cx)
1545 } else {
1546 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1547 unimplemented!()
1548 }
1549 }
1550
1551 pub fn exception_breakpoints(
1552 &self,
1553 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1554 self.exception_breakpoints.values()
1555 }
1556
1557 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1558 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1559 *is_enabled = !*is_enabled;
1560 self.send_exception_breakpoints(cx);
1561 }
1562 }
1563
1564 fn send_exception_breakpoints(&mut self, cx: &App) {
1565 if let Some(local) = self.as_local() {
1566 let exception_filters = self
1567 .exception_breakpoints
1568 .values()
1569 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1570 .collect();
1571
1572 let supports_exception_filters = self
1573 .capabilities
1574 .supports_exception_filter_options
1575 .unwrap_or_default();
1576 local
1577 .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1578 .detach_and_log_err(cx);
1579 } else {
1580 debug_assert!(false, "Not implemented");
1581 }
1582 }
1583
1584 pub fn breakpoints_enabled(&self) -> bool {
1585 self.ignore_breakpoints
1586 }
1587
1588 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1589 self.fetch(
1590 dap_command::LoadedSourcesCommand,
1591 |this, result, cx| {
1592 let result = result.log_err()?;
1593 this.loaded_sources = result.iter().cloned().collect();
1594 cx.emit(SessionEvent::LoadedSources);
1595 cx.notify();
1596 Some(result)
1597 },
1598 cx,
1599 );
1600
1601 &self.loaded_sources
1602 }
1603
1604 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1605 res.log_err()?;
1606 Some(())
1607 }
1608
1609 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1610 thread_id: ThreadId,
1611 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1612 {
1613 move |this, response, cx| match response.log_err() {
1614 Some(response) => Some(response),
1615 None => {
1616 this.thread_states.stop_thread(thread_id);
1617 cx.notify();
1618 None
1619 }
1620 }
1621 }
1622
1623 fn clear_active_debug_line_response(
1624 &mut self,
1625 response: Result<()>,
1626 cx: &mut Context<Session>,
1627 ) -> Option<()> {
1628 response.log_err()?;
1629 self.clear_active_debug_line(cx);
1630 Some(())
1631 }
1632
1633 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1634 self.as_local()
1635 .expect("Message handler will only run in local mode")
1636 .breakpoint_store
1637 .update(cx, |store, cx| {
1638 store.remove_active_position(Some(self.id), cx)
1639 });
1640 }
1641
1642 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1643 self.request(
1644 PauseCommand {
1645 thread_id: thread_id.0,
1646 },
1647 Self::empty_response,
1648 cx,
1649 )
1650 .detach();
1651 }
1652
1653 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1654 self.request(
1655 RestartStackFrameCommand { stack_frame_id },
1656 Self::empty_response,
1657 cx,
1658 )
1659 .detach();
1660 }
1661
1662 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1663 if self.capabilities.supports_restart_request.unwrap_or(false) {
1664 self.request(
1665 RestartCommand {
1666 raw: args.unwrap_or(Value::Null),
1667 },
1668 Self::empty_response,
1669 cx,
1670 )
1671 .detach();
1672 } else {
1673 self.request(
1674 DisconnectCommand {
1675 restart: Some(false),
1676 terminate_debuggee: Some(true),
1677 suspend_debuggee: Some(false),
1678 },
1679 Self::empty_response,
1680 cx,
1681 )
1682 .detach();
1683 }
1684 }
1685
1686 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1687 self.is_session_terminated = true;
1688 self.thread_states.exit_all_threads();
1689 cx.notify();
1690
1691 let task = if self
1692 .capabilities
1693 .supports_terminate_request
1694 .unwrap_or_default()
1695 {
1696 self.request(
1697 TerminateCommand {
1698 restart: Some(false),
1699 },
1700 Self::clear_active_debug_line_response,
1701 cx,
1702 )
1703 } else {
1704 self.request(
1705 DisconnectCommand {
1706 restart: Some(false),
1707 terminate_debuggee: Some(true),
1708 suspend_debuggee: Some(false),
1709 },
1710 Self::clear_active_debug_line_response,
1711 cx,
1712 )
1713 };
1714
1715 cx.emit(SessionStateEvent::Shutdown);
1716
1717 cx.background_spawn(async move {
1718 let _ = task.await;
1719 })
1720 }
1721
1722 pub fn completions(
1723 &mut self,
1724 query: CompletionsQuery,
1725 cx: &mut Context<Self>,
1726 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1727 let task = self.request(query, |_, result, _| result.log_err(), cx);
1728
1729 cx.background_executor().spawn(async move {
1730 anyhow::Ok(
1731 task.await
1732 .map(|response| response.targets)
1733 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1734 )
1735 })
1736 }
1737
1738 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1739 self.thread_states.continue_thread(thread_id);
1740 self.request(
1741 ContinueCommand {
1742 args: ContinueArguments {
1743 thread_id: thread_id.0,
1744 single_thread: Some(true),
1745 },
1746 },
1747 Self::on_step_response::<ContinueCommand>(thread_id),
1748 cx,
1749 )
1750 .detach();
1751 }
1752
1753 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1754 match self.mode {
1755 Mode::Local(ref local) => Some(local.client.clone()),
1756 Mode::Remote(_) => None,
1757 }
1758 }
1759
1760 pub fn step_over(
1761 &mut self,
1762 thread_id: ThreadId,
1763 granularity: SteppingGranularity,
1764 cx: &mut Context<Self>,
1765 ) {
1766 let supports_single_thread_execution_requests =
1767 self.capabilities.supports_single_thread_execution_requests;
1768 let supports_stepping_granularity = self
1769 .capabilities
1770 .supports_stepping_granularity
1771 .unwrap_or_default();
1772
1773 let command = NextCommand {
1774 inner: StepCommand {
1775 thread_id: thread_id.0,
1776 granularity: supports_stepping_granularity.then(|| granularity),
1777 single_thread: supports_single_thread_execution_requests,
1778 },
1779 };
1780
1781 self.thread_states.process_step(thread_id);
1782 self.request(
1783 command,
1784 Self::on_step_response::<NextCommand>(thread_id),
1785 cx,
1786 )
1787 .detach();
1788 }
1789
1790 pub fn step_in(
1791 &mut self,
1792 thread_id: ThreadId,
1793 granularity: SteppingGranularity,
1794 cx: &mut Context<Self>,
1795 ) {
1796 let supports_single_thread_execution_requests =
1797 self.capabilities.supports_single_thread_execution_requests;
1798 let supports_stepping_granularity = self
1799 .capabilities
1800 .supports_stepping_granularity
1801 .unwrap_or_default();
1802
1803 let command = StepInCommand {
1804 inner: StepCommand {
1805 thread_id: thread_id.0,
1806 granularity: supports_stepping_granularity.then(|| granularity),
1807 single_thread: supports_single_thread_execution_requests,
1808 },
1809 };
1810
1811 self.thread_states.process_step(thread_id);
1812 self.request(
1813 command,
1814 Self::on_step_response::<StepInCommand>(thread_id),
1815 cx,
1816 )
1817 .detach();
1818 }
1819
1820 pub fn step_out(
1821 &mut self,
1822 thread_id: ThreadId,
1823 granularity: SteppingGranularity,
1824 cx: &mut Context<Self>,
1825 ) {
1826 let supports_single_thread_execution_requests =
1827 self.capabilities.supports_single_thread_execution_requests;
1828 let supports_stepping_granularity = self
1829 .capabilities
1830 .supports_stepping_granularity
1831 .unwrap_or_default();
1832
1833 let command = StepOutCommand {
1834 inner: StepCommand {
1835 thread_id: thread_id.0,
1836 granularity: supports_stepping_granularity.then(|| granularity),
1837 single_thread: supports_single_thread_execution_requests,
1838 },
1839 };
1840
1841 self.thread_states.process_step(thread_id);
1842 self.request(
1843 command,
1844 Self::on_step_response::<StepOutCommand>(thread_id),
1845 cx,
1846 )
1847 .detach();
1848 }
1849
1850 pub fn step_back(
1851 &mut self,
1852 thread_id: ThreadId,
1853 granularity: SteppingGranularity,
1854 cx: &mut Context<Self>,
1855 ) {
1856 let supports_single_thread_execution_requests =
1857 self.capabilities.supports_single_thread_execution_requests;
1858 let supports_stepping_granularity = self
1859 .capabilities
1860 .supports_stepping_granularity
1861 .unwrap_or_default();
1862
1863 let command = StepBackCommand {
1864 inner: StepCommand {
1865 thread_id: thread_id.0,
1866 granularity: supports_stepping_granularity.then(|| granularity),
1867 single_thread: supports_single_thread_execution_requests,
1868 },
1869 };
1870
1871 self.thread_states.process_step(thread_id);
1872
1873 self.request(
1874 command,
1875 Self::on_step_response::<StepBackCommand>(thread_id),
1876 cx,
1877 )
1878 .detach();
1879 }
1880
1881 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1882 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1883 && self.requests.contains_key(&ThreadsCommand.type_id())
1884 && self.threads.contains_key(&thread_id)
1885 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1886 // We could still be using an old thread id and have sent a new thread's request
1887 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1888 // But it very well could cause a minor bug in the future that is hard to track down
1889 {
1890 self.fetch(
1891 super::dap_command::StackTraceCommand {
1892 thread_id: thread_id.0,
1893 start_frame: None,
1894 levels: None,
1895 },
1896 move |this, stack_frames, cx| {
1897 let stack_frames = stack_frames.log_err()?;
1898
1899 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1900 thread.stack_frame_ids =
1901 stack_frames.iter().map(|frame| frame.id).collect();
1902 });
1903 debug_assert!(
1904 matches!(entry, indexmap::map::Entry::Occupied(_)),
1905 "Sent request for thread_id that doesn't exist"
1906 );
1907
1908 this.stack_frames.extend(
1909 stack_frames
1910 .iter()
1911 .cloned()
1912 .map(|frame| (frame.id, StackFrame::from(frame))),
1913 );
1914
1915 this.invalidate_command_type::<ScopesCommand>();
1916 this.invalidate_command_type::<VariablesCommand>();
1917
1918 cx.emit(SessionEvent::StackTrace);
1919 cx.notify();
1920 Some(stack_frames)
1921 },
1922 cx,
1923 );
1924 }
1925
1926 self.threads
1927 .get(&thread_id)
1928 .map(|thread| {
1929 thread
1930 .stack_frame_ids
1931 .iter()
1932 .filter_map(|id| self.stack_frames.get(id))
1933 .cloned()
1934 .collect()
1935 })
1936 .unwrap_or_default()
1937 }
1938
1939 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1940 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1941 && self
1942 .requests
1943 .contains_key(&TypeId::of::<StackTraceCommand>())
1944 {
1945 self.fetch(
1946 ScopesCommand { stack_frame_id },
1947 move |this, scopes, cx| {
1948 let scopes = scopes.log_err()?;
1949
1950 for scope in scopes .iter(){
1951 this.variables(scope.variables_reference, cx);
1952 }
1953
1954 let entry = this
1955 .stack_frames
1956 .entry(stack_frame_id)
1957 .and_modify(|stack_frame| {
1958 stack_frame.scopes = scopes.clone();
1959 });
1960
1961 cx.emit(SessionEvent::Variables);
1962
1963 debug_assert!(
1964 matches!(entry, indexmap::map::Entry::Occupied(_)),
1965 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1966 );
1967
1968 Some(scopes)
1969 },
1970 cx,
1971 );
1972 }
1973
1974 self.stack_frames
1975 .get(&stack_frame_id)
1976 .map(|frame| frame.scopes.as_slice())
1977 .unwrap_or_default()
1978 }
1979
1980 pub fn variables(
1981 &mut self,
1982 variables_reference: VariableReference,
1983 cx: &mut Context<Self>,
1984 ) -> Vec<dap::Variable> {
1985 let command = VariablesCommand {
1986 variables_reference,
1987 filter: None,
1988 start: None,
1989 count: None,
1990 format: None,
1991 };
1992
1993 self.fetch(
1994 command,
1995 move |this, variables, cx| {
1996 let variables = variables.log_err()?;
1997 this.variables
1998 .insert(variables_reference, variables.clone());
1999
2000 cx.emit(SessionEvent::Variables);
2001 Some(variables)
2002 },
2003 cx,
2004 );
2005
2006 self.variables
2007 .get(&variables_reference)
2008 .cloned()
2009 .unwrap_or_default()
2010 }
2011
2012 pub fn set_variable_value(
2013 &mut self,
2014 variables_reference: u64,
2015 name: String,
2016 value: String,
2017 cx: &mut Context<Self>,
2018 ) {
2019 if self.capabilities.supports_set_variable.unwrap_or_default() {
2020 self.request(
2021 SetVariableValueCommand {
2022 name,
2023 value,
2024 variables_reference,
2025 },
2026 move |this, response, cx| {
2027 let response = response.log_err()?;
2028 this.invalidate_command_type::<VariablesCommand>();
2029 cx.notify();
2030 Some(response)
2031 },
2032 cx,
2033 )
2034 .detach()
2035 }
2036 }
2037
2038 pub fn evaluate(
2039 &mut self,
2040 expression: String,
2041 context: Option<EvaluateArgumentsContext>,
2042 frame_id: Option<u64>,
2043 source: Option<Source>,
2044 cx: &mut Context<Self>,
2045 ) {
2046 self.request(
2047 EvaluateCommand {
2048 expression,
2049 context,
2050 frame_id,
2051 source,
2052 },
2053 |this, response, cx| {
2054 let response = response.log_err()?;
2055 this.output_token.0 += 1;
2056 this.output.push_back(dap::OutputEvent {
2057 category: None,
2058 output: response.result.clone(),
2059 group: None,
2060 variables_reference: Some(response.variables_reference),
2061 source: None,
2062 line: None,
2063 column: None,
2064 data: None,
2065 location_reference: None,
2066 });
2067
2068 this.invalidate_command_type::<ScopesCommand>();
2069 cx.notify();
2070 Some(response)
2071 },
2072 cx,
2073 )
2074 .detach();
2075 }
2076
2077 pub fn location(
2078 &mut self,
2079 reference: u64,
2080 cx: &mut Context<Self>,
2081 ) -> Option<dap::LocationsResponse> {
2082 self.fetch(
2083 LocationsCommand { reference },
2084 move |this, response, _| {
2085 let response = response.log_err()?;
2086 this.locations.insert(reference, response.clone());
2087 Some(response)
2088 },
2089 cx,
2090 );
2091 self.locations.get(&reference).cloned()
2092 }
2093 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2094 let command = DisconnectCommand {
2095 restart: Some(false),
2096 terminate_debuggee: Some(true),
2097 suspend_debuggee: Some(false),
2098 };
2099
2100 self.request(command, Self::empty_response, cx).detach()
2101 }
2102
2103 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2104 if self
2105 .capabilities
2106 .supports_terminate_threads_request
2107 .unwrap_or_default()
2108 {
2109 self.request(
2110 TerminateThreadsCommand {
2111 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2112 },
2113 Self::clear_active_debug_line_response,
2114 cx,
2115 )
2116 .detach();
2117 } else {
2118 self.shutdown(cx).detach();
2119 }
2120 }
2121}
2122
2123fn create_local_session(
2124 breakpoint_store: Entity<BreakpointStore>,
2125 session_id: SessionId,
2126 parent_session: Option<Entity<Session>>,
2127 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
2128 initialized_tx: oneshot::Sender<()>,
2129 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
2130 mode: LocalMode,
2131 cx: &mut Context<Session>,
2132) -> Session {
2133 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
2134 let mut initialized_tx = Some(initialized_tx);
2135 while let Some(message) = message_rx.next().await {
2136 if let Message::Event(event) = message {
2137 if let Events::Initialized(_) = *event {
2138 if let Some(tx) = initialized_tx.take() {
2139 tx.send(()).ok();
2140 }
2141 } else {
2142 let Ok(_) = this.update(cx, |session, cx| {
2143 session.handle_dap_event(event, cx);
2144 }) else {
2145 break;
2146 };
2147 }
2148 } else {
2149 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
2150 else {
2151 break;
2152 };
2153 }
2154 }
2155 })];
2156
2157 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2158 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2159 if let Some(local) = (!this.ignore_breakpoints)
2160 .then(|| this.as_local_mut())
2161 .flatten()
2162 {
2163 local
2164 .send_breakpoints_from_path(path.clone(), *reason, cx)
2165 .detach();
2166 };
2167 }
2168 BreakpointStoreEvent::BreakpointsCleared(paths) => {
2169 if let Some(local) = (!this.ignore_breakpoints)
2170 .then(|| this.as_local_mut())
2171 .flatten()
2172 {
2173 local.unset_breakpoints_from_paths(paths, cx).detach();
2174 }
2175 }
2176 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2177 })
2178 .detach();
2179
2180 Session {
2181 mode: Mode::Local(mode),
2182 id: session_id,
2183 child_session_ids: HashSet::default(),
2184 parent_id: parent_session.map(|session| session.read(cx).id),
2185 variables: Default::default(),
2186 capabilities: Capabilities::default(),
2187 thread_states: ThreadStates::default(),
2188 output_token: OutputToken(0),
2189 ignore_breakpoints: false,
2190 output: circular_buffer::CircularBuffer::boxed(),
2191 requests: HashMap::default(),
2192 modules: Vec::default(),
2193 loaded_sources: Vec::default(),
2194 threads: IndexMap::default(),
2195 stack_frames: IndexMap::default(),
2196 locations: Default::default(),
2197 exception_breakpoints: Default::default(),
2198 _background_tasks,
2199 is_session_terminated: false,
2200 }
2201}