1use crate::project_settings::ProjectSettings;
2
3use super::breakpoint_store::{
4 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
5};
6use super::dap_command::{
7 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
8 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
9 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
10 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
11 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
12 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
13};
14use super::dap_store::DapAdapterDelegate;
15use anyhow::{Context as _, Result, anyhow};
16use collections::{HashMap, HashSet, IndexMap, IndexSet};
17use dap::adapters::{DebugAdapter, DebugAdapterBinary};
18use dap::messages::Response;
19use dap::{
20 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
21 SteppingGranularity, StoppedEvent, VariableReference,
22 adapters::{DapDelegate, DapStatus},
23 client::{DebugAdapterClient, SessionId},
24 messages::{Events, Message},
25};
26use dap::{
27 DapRegistry, DebugRequestType, ExceptionBreakpointsFilter, ExceptionFilterOptions,
28 OutputEventCategory,
29};
30use futures::channel::oneshot;
31use futures::{FutureExt, future::Shared};
32use gpui::{
33 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, Task, WeakEntity,
34};
35use rpc::AnyProtoClient;
36use serde_json::{Value, json};
37use settings::Settings;
38use smol::stream::StreamExt;
39use std::any::TypeId;
40use std::collections::BTreeMap;
41use std::path::PathBuf;
42use std::u64;
43use std::{
44 any::Any,
45 collections::hash_map::Entry,
46 hash::{Hash, Hasher},
47 path::Path,
48 sync::Arc,
49};
50use task::{DebugAdapterConfig, DebugTaskDefinition};
51use text::{PointUtf16, ToPointUtf16};
52use util::{ResultExt, merge_json_value_into};
53
54#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
55#[repr(transparent)]
56pub struct ThreadId(pub u64);
57
58impl ThreadId {
59 pub const MIN: ThreadId = ThreadId(u64::MIN);
60 pub const MAX: ThreadId = ThreadId(u64::MAX);
61}
62
63impl From<u64> for ThreadId {
64 fn from(id: u64) -> Self {
65 Self(id)
66 }
67}
68
69#[derive(Clone, Debug)]
70pub struct StackFrame {
71 pub dap: dap::StackFrame,
72 pub scopes: Vec<dap::Scope>,
73}
74
75impl From<dap::StackFrame> for StackFrame {
76 fn from(stack_frame: dap::StackFrame) -> Self {
77 Self {
78 scopes: vec![],
79 dap: stack_frame,
80 }
81 }
82}
83
84#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
85pub enum ThreadStatus {
86 #[default]
87 Running,
88 Stopped,
89 Stepping,
90 Exited,
91 Ended,
92}
93
94impl ThreadStatus {
95 pub fn label(&self) -> &'static str {
96 match self {
97 ThreadStatus::Running => "Running",
98 ThreadStatus::Stopped => "Stopped",
99 ThreadStatus::Stepping => "Stepping",
100 ThreadStatus::Exited => "Exited",
101 ThreadStatus::Ended => "Ended",
102 }
103 }
104}
105
106#[derive(Debug)]
107pub struct Thread {
108 dap: dap::Thread,
109 stack_frame_ids: IndexSet<StackFrameId>,
110 _has_stopped: bool,
111}
112
113impl From<dap::Thread> for Thread {
114 fn from(dap: dap::Thread) -> Self {
115 Self {
116 dap,
117 stack_frame_ids: Default::default(),
118 _has_stopped: false,
119 }
120 }
121}
122
123type UpstreamProjectId = u64;
124
125struct RemoteConnection {
126 _client: AnyProtoClient,
127 _upstream_project_id: UpstreamProjectId,
128}
129
130impl RemoteConnection {
131 fn send_proto_client_request<R: DapCommand>(
132 &self,
133 _request: R,
134 _session_id: SessionId,
135 cx: &mut App,
136 ) -> Task<Result<R::Response>> {
137 // let message = request.to_proto(session_id, self.upstream_project_id);
138 // let upstream_client = self.client.clone();
139 cx.background_executor().spawn(async move {
140 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
141 // let response = upstream_client.request(message).await?;
142 // request.response_from_proto(response)
143 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
144 })
145 }
146
147 fn request<R: DapCommand>(
148 &self,
149 request: R,
150 session_id: SessionId,
151 cx: &mut App,
152 ) -> Task<Result<R::Response>>
153 where
154 <R::DapRequest as dap::requests::Request>::Response: 'static,
155 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
156 {
157 return self.send_proto_client_request::<R>(request, session_id, cx);
158 }
159}
160
161enum Mode {
162 Local(LocalMode),
163 Remote(RemoteConnection),
164}
165
166#[derive(Clone)]
167pub struct LocalMode {
168 client: Arc<DebugAdapterClient>,
169 config: DebugAdapterConfig,
170 adapter: Arc<dyn DebugAdapter>,
171 breakpoint_store: Entity<BreakpointStore>,
172 tmp_breakpoint: Option<SourceBreakpoint>,
173}
174
175fn client_source(abs_path: &Path) -> dap::Source {
176 dap::Source {
177 name: abs_path
178 .file_name()
179 .map(|filename| filename.to_string_lossy().to_string()),
180 path: Some(abs_path.to_string_lossy().to_string()),
181 source_reference: None,
182 presentation_hint: None,
183 origin: None,
184 sources: None,
185 adapter_data: None,
186 checksums: None,
187 }
188}
189
190impl LocalMode {
191 fn new(
192 debug_adapters: Arc<DapRegistry>,
193 session_id: SessionId,
194 parent_session: Option<Entity<Session>>,
195 breakpoint_store: Entity<BreakpointStore>,
196 config: DebugAdapterConfig,
197 delegate: DapAdapterDelegate,
198 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
199 cx: AsyncApp,
200 ) -> Task<Result<Self>> {
201 Self::new_inner(
202 debug_adapters,
203 session_id,
204 parent_session,
205 breakpoint_store,
206 config,
207 delegate,
208 messages_tx,
209 async |_, _| {},
210 cx,
211 )
212 }
213 #[cfg(any(test, feature = "test-support"))]
214 fn new_fake(
215 session_id: SessionId,
216 parent_session: Option<Entity<Session>>,
217 breakpoint_store: Entity<BreakpointStore>,
218 config: DebugAdapterConfig,
219 delegate: DapAdapterDelegate,
220 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
221 caps: Capabilities,
222 fail: bool,
223 cx: AsyncApp,
224 ) -> Task<Result<Self>> {
225 use task::DebugRequestDisposition;
226
227 let request = match config.request.clone() {
228 DebugRequestDisposition::UserConfigured(request) => request,
229 DebugRequestDisposition::ReverseRequest(reverse_request_args) => {
230 match reverse_request_args.request {
231 dap::StartDebuggingRequestArgumentsRequest::Launch => {
232 DebugRequestType::Launch(task::LaunchConfig {
233 program: "".to_owned(),
234 cwd: None,
235 args: Default::default(),
236 })
237 }
238 dap::StartDebuggingRequestArgumentsRequest::Attach => {
239 DebugRequestType::Attach(task::AttachConfig {
240 process_id: Some(0),
241 })
242 }
243 }
244 }
245 };
246
247 let callback = async move |session: &mut LocalMode, cx: AsyncApp| {
248 session
249 .client
250 .on_request::<dap::requests::Initialize, _>(move |_, _| Ok(caps.clone()))
251 .await;
252
253 let paths = cx
254 .update(|cx| session.breakpoint_store.read(cx).breakpoint_paths())
255 .expect("Breakpoint store should exist in all tests that start debuggers");
256
257 session
258 .client
259 .on_request::<dap::requests::SetBreakpoints, _>(move |_, args| {
260 let p = Arc::from(Path::new(&args.source.path.unwrap()));
261 if !paths.contains(&p) {
262 panic!("Sent breakpoints for path without any")
263 }
264
265 Ok(dap::SetBreakpointsResponse {
266 breakpoints: Vec::default(),
267 })
268 })
269 .await;
270
271 match request {
272 dap::DebugRequestType::Launch(_) => {
273 if fail {
274 session
275 .client
276 .on_request::<dap::requests::Launch, _>(move |_, _| {
277 Err(dap::ErrorResponse {
278 error: Some(dap::Message {
279 id: 1,
280 format: "error".into(),
281 variables: None,
282 send_telemetry: None,
283 show_user: None,
284 url: None,
285 url_label: None,
286 }),
287 })
288 })
289 .await;
290 } else {
291 session
292 .client
293 .on_request::<dap::requests::Launch, _>(move |_, _| Ok(()))
294 .await;
295 }
296 }
297 dap::DebugRequestType::Attach(attach_config) => {
298 if fail {
299 session
300 .client
301 .on_request::<dap::requests::Attach, _>(move |_, _| {
302 Err(dap::ErrorResponse {
303 error: Some(dap::Message {
304 id: 1,
305 format: "error".into(),
306 variables: None,
307 send_telemetry: None,
308 show_user: None,
309 url: None,
310 url_label: None,
311 }),
312 })
313 })
314 .await;
315 } else {
316 session
317 .client
318 .on_request::<dap::requests::Attach, _>(move |_, args| {
319 assert_eq!(
320 json!({"request": "attach", "process_id": attach_config.process_id.unwrap()}),
321 args.raw
322 );
323
324 Ok(())
325 })
326 .await;
327 }
328 }
329 }
330
331 session
332 .client
333 .on_request::<dap::requests::SetExceptionBreakpoints, _>(move |_, _| {
334 Ok(dap::SetExceptionBreakpointsResponse { breakpoints: None })
335 })
336 .await;
337
338 session
339 .client
340 .on_request::<dap::requests::Disconnect, _>(move |_, _| Ok(()))
341 .await;
342 session.client.fake_event(Events::Initialized(None)).await;
343 };
344 Self::new_inner(
345 DapRegistry::fake().into(),
346 session_id,
347 parent_session,
348 breakpoint_store,
349 config,
350 delegate,
351 messages_tx,
352 callback,
353 cx,
354 )
355 }
356 fn new_inner(
357 registry: Arc<DapRegistry>,
358 session_id: SessionId,
359 parent_session: Option<Entity<Session>>,
360 breakpoint_store: Entity<BreakpointStore>,
361 config: DebugAdapterConfig,
362 delegate: DapAdapterDelegate,
363 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
364 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
365 cx: AsyncApp,
366 ) -> Task<Result<Self>> {
367 cx.spawn(async move |cx| {
368 let (adapter, binary) =
369 Self::get_adapter_binary(®istry, &config, &delegate, cx).await?;
370
371 let message_handler = Box::new(move |message| {
372 messages_tx.unbounded_send(message).ok();
373 });
374
375 let client = Arc::new(
376 if let Some(client) = parent_session
377 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
378 .flatten()
379 {
380 client
381 .reconnect(session_id, binary, message_handler, cx.clone())
382 .await?
383 } else {
384 DebugAdapterClient::start(
385 session_id,
386 adapter.name(),
387 binary,
388 message_handler,
389 cx.clone(),
390 )
391 .await
392 .with_context(|| "Failed to start communication with debug adapter")?
393 },
394 );
395
396 let mut session = Self {
397 client,
398 adapter,
399 breakpoint_store,
400 tmp_breakpoint: None,
401 config: config.clone(),
402 };
403
404 on_initialized(&mut session, cx.clone()).await;
405
406 Ok(session)
407 })
408 }
409
410 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
411 let tasks: Vec<_> = paths
412 .into_iter()
413 .map(|path| {
414 self.request(
415 dap_command::SetBreakpoints {
416 source: client_source(path),
417 source_modified: None,
418 breakpoints: vec![],
419 },
420 cx.background_executor().clone(),
421 )
422 })
423 .collect();
424
425 cx.background_spawn(async move {
426 futures::future::join_all(tasks)
427 .await
428 .iter()
429 .for_each(|res| match res {
430 Ok(_) => {}
431 Err(err) => {
432 log::warn!("Set breakpoints request failed: {}", err);
433 }
434 });
435 })
436 }
437
438 fn send_breakpoints_from_path(
439 &self,
440 abs_path: Arc<Path>,
441 reason: BreakpointUpdatedReason,
442 cx: &mut App,
443 ) -> Task<()> {
444 let breakpoints = self
445 .breakpoint_store
446 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
447 .into_iter()
448 .filter(|bp| bp.state.is_enabled())
449 .chain(self.tmp_breakpoint.clone())
450 .map(Into::into)
451 .collect();
452
453 let task = self.request(
454 dap_command::SetBreakpoints {
455 source: client_source(&abs_path),
456 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
457 breakpoints,
458 },
459 cx.background_executor().clone(),
460 );
461
462 cx.background_spawn(async move {
463 match task.await {
464 Ok(_) => {}
465 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
466 }
467 })
468 }
469
470 fn send_exception_breakpoints(
471 &self,
472 filters: Vec<ExceptionBreakpointsFilter>,
473 supports_filter_options: bool,
474 cx: &App,
475 ) -> Task<Result<Vec<dap::Breakpoint>>> {
476 let arg = if supports_filter_options {
477 SetExceptionBreakpoints::WithOptions {
478 filters: filters
479 .into_iter()
480 .map(|filter| ExceptionFilterOptions {
481 filter_id: filter.filter,
482 condition: None,
483 mode: None,
484 })
485 .collect(),
486 }
487 } else {
488 SetExceptionBreakpoints::Plain {
489 filters: filters.into_iter().map(|filter| filter.filter).collect(),
490 }
491 };
492 self.request(arg, cx.background_executor().clone())
493 }
494 fn send_source_breakpoints(&self, ignore_breakpoints: bool, cx: &App) -> Task<()> {
495 let mut breakpoint_tasks = Vec::new();
496 let breakpoints = self
497 .breakpoint_store
498 .read_with(cx, |store, cx| store.all_breakpoints(cx));
499
500 for (path, breakpoints) in breakpoints {
501 let breakpoints = if ignore_breakpoints {
502 vec![]
503 } else {
504 breakpoints
505 .into_iter()
506 .filter(|bp| bp.state.is_enabled())
507 .map(Into::into)
508 .collect()
509 };
510
511 breakpoint_tasks.push(self.request(
512 dap_command::SetBreakpoints {
513 source: client_source(&path),
514 source_modified: Some(false),
515 breakpoints,
516 },
517 cx.background_executor().clone(),
518 ));
519 }
520
521 cx.background_spawn(async move {
522 futures::future::join_all(breakpoint_tasks)
523 .await
524 .iter()
525 .for_each(|res| match res {
526 Ok(_) => {}
527 Err(err) => {
528 log::warn!("Set breakpoints request failed: {}", err);
529 }
530 });
531 })
532 }
533
534 async fn get_adapter_binary(
535 registry: &Arc<DapRegistry>,
536 config: &DebugAdapterConfig,
537 delegate: &DapAdapterDelegate,
538 cx: &mut AsyncApp,
539 ) -> Result<(Arc<dyn DebugAdapter>, DebugAdapterBinary)> {
540 let adapter = registry
541 .adapter(&config.adapter)
542 .ok_or_else(|| anyhow!("Debug adapter with name `{}` was not found", config.adapter))?;
543
544 let binary = cx.update(|cx| {
545 ProjectSettings::get_global(cx)
546 .dap
547 .get(&adapter.name())
548 .and_then(|s| s.binary.as_ref().map(PathBuf::from))
549 })?;
550
551 let binary = match adapter.get_binary(delegate, &config, binary, cx).await {
552 Err(error) => {
553 delegate.update_status(
554 adapter.name(),
555 DapStatus::Failed {
556 error: error.to_string(),
557 },
558 );
559
560 return Err(error);
561 }
562 Ok(mut binary) => {
563 delegate.update_status(adapter.name(), DapStatus::None);
564
565 let shell_env = delegate.shell_env().await;
566 let mut envs = binary.envs.unwrap_or_default();
567 envs.extend(shell_env);
568 binary.envs = Some(envs);
569
570 binary
571 }
572 };
573
574 Ok((adapter, binary))
575 }
576
577 pub fn label(&self) -> String {
578 self.config.label.clone()
579 }
580
581 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
582 let adapter_id = self.adapter.name().to_string();
583
584 self.request(Initialize { adapter_id }, cx.background_executor().clone())
585 }
586
587 fn initialize_sequence(
588 &self,
589 capabilities: &Capabilities,
590 initialized_rx: oneshot::Receiver<()>,
591 cx: &App,
592 ) -> Task<Result<()>> {
593 let (mut raw, is_launch) = match &self.config.request {
594 task::DebugRequestDisposition::UserConfigured(_) => {
595 let Ok(raw) = DebugTaskDefinition::try_from(self.config.clone()) else {
596 debug_assert!(false, "This part of code should be unreachable in practice");
597 return Task::ready(Err(anyhow!(
598 "Expected debug config conversion to succeed"
599 )));
600 };
601 let is_launch = matches!(raw.request, DebugRequestType::Launch(_));
602 let raw = self.adapter.request_args(&raw);
603 (raw, is_launch)
604 }
605 task::DebugRequestDisposition::ReverseRequest(start_debugging_request_arguments) => (
606 start_debugging_request_arguments.configuration.clone(),
607 matches!(
608 start_debugging_request_arguments.request,
609 dap::StartDebuggingRequestArgumentsRequest::Launch
610 ),
611 ),
612 };
613
614 merge_json_value_into(
615 self.config.initialize_args.clone().unwrap_or(json!({})),
616 &mut raw,
617 );
618 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
619 let launch = if is_launch {
620 self.request(Launch { raw }, cx.background_executor().clone())
621 } else {
622 self.request(Attach { raw }, cx.background_executor().clone())
623 };
624
625 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
626 let exception_filters = capabilities
627 .exception_breakpoint_filters
628 .as_ref()
629 .map(|exception_filters| {
630 exception_filters
631 .iter()
632 .filter(|filter| filter.default == Some(true))
633 .cloned()
634 .collect::<Vec<_>>()
635 })
636 .unwrap_or_default();
637 let supports_exception_filters = capabilities
638 .supports_exception_filter_options
639 .unwrap_or_default();
640 let configuration_sequence = cx.spawn({
641 let this = self.clone();
642 async move |cx| {
643 initialized_rx.await?;
644 // todo(debugger) figure out if we want to handle a breakpoint response error
645 // This will probably consist of letting a user know that breakpoints failed to be set
646 cx.update(|cx| this.send_source_breakpoints(false, cx))?
647 .await;
648 cx.update(|cx| {
649 this.send_exception_breakpoints(
650 exception_filters,
651 supports_exception_filters,
652 cx,
653 )
654 })?
655 .await
656 .ok();
657 if configuration_done_supported {
658 this.request(ConfigurationDone {}, cx.background_executor().clone())
659 } else {
660 Task::ready(Ok(()))
661 }
662 .await
663 }
664 });
665
666 cx.background_spawn(async move {
667 futures::future::try_join(launch, configuration_sequence).await?;
668 Ok(())
669 })
670 }
671
672 fn request<R: LocalDapCommand>(
673 &self,
674 request: R,
675 executor: BackgroundExecutor,
676 ) -> Task<Result<R::Response>>
677 where
678 <R::DapRequest as dap::requests::Request>::Response: 'static,
679 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
680 {
681 let request = Arc::new(request);
682
683 let request_clone = request.clone();
684 let connection = self.client.clone();
685 let request_task = executor.spawn(async move {
686 let args = request_clone.to_dap();
687 connection.request::<R::DapRequest>(args).await
688 });
689
690 executor.spawn(async move {
691 let response = request.response_from_dap(request_task.await?);
692 response
693 })
694 }
695}
696impl From<RemoteConnection> for Mode {
697 fn from(value: RemoteConnection) -> Self {
698 Self::Remote(value)
699 }
700}
701
702impl Mode {
703 fn request_dap<R: DapCommand>(
704 &self,
705 session_id: SessionId,
706 request: R,
707 cx: &mut Context<Session>,
708 ) -> Task<Result<R::Response>>
709 where
710 <R::DapRequest as dap::requests::Request>::Response: 'static,
711 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
712 {
713 match self {
714 Mode::Local(debug_adapter_client) => {
715 debug_adapter_client.request(request, cx.background_executor().clone())
716 }
717 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
718 }
719 }
720}
721
722#[derive(Default)]
723struct ThreadStates {
724 global_state: Option<ThreadStatus>,
725 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
726}
727
728impl ThreadStates {
729 fn stop_all_threads(&mut self) {
730 self.global_state = Some(ThreadStatus::Stopped);
731 self.known_thread_states.clear();
732 }
733
734 fn exit_all_threads(&mut self) {
735 self.global_state = Some(ThreadStatus::Exited);
736 self.known_thread_states.clear();
737 }
738
739 fn continue_all_threads(&mut self) {
740 self.global_state = Some(ThreadStatus::Running);
741 self.known_thread_states.clear();
742 }
743
744 fn stop_thread(&mut self, thread_id: ThreadId) {
745 self.known_thread_states
746 .insert(thread_id, ThreadStatus::Stopped);
747 }
748
749 fn continue_thread(&mut self, thread_id: ThreadId) {
750 self.known_thread_states
751 .insert(thread_id, ThreadStatus::Running);
752 }
753
754 fn process_step(&mut self, thread_id: ThreadId) {
755 self.known_thread_states
756 .insert(thread_id, ThreadStatus::Stepping);
757 }
758
759 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
760 self.thread_state(thread_id)
761 .unwrap_or(ThreadStatus::Running)
762 }
763
764 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
765 self.known_thread_states
766 .get(&thread_id)
767 .copied()
768 .or(self.global_state)
769 }
770
771 fn exit_thread(&mut self, thread_id: ThreadId) {
772 self.known_thread_states
773 .insert(thread_id, ThreadStatus::Exited);
774 }
775
776 fn any_stopped_thread(&self) -> bool {
777 self.global_state
778 .is_some_and(|state| state == ThreadStatus::Stopped)
779 || self
780 .known_thread_states
781 .values()
782 .any(|status| *status == ThreadStatus::Stopped)
783 }
784}
785const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
786
787type IsEnabled = bool;
788
789#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
790pub struct OutputToken(pub usize);
791/// Represents a current state of a single debug adapter and provides ways to mutate it.
792pub struct Session {
793 mode: Mode,
794 pub(super) capabilities: Capabilities,
795 id: SessionId,
796 child_session_ids: HashSet<SessionId>,
797 parent_id: Option<SessionId>,
798 ignore_breakpoints: bool,
799 modules: Vec<dap::Module>,
800 loaded_sources: Vec<dap::Source>,
801 output_token: OutputToken,
802 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
803 threads: IndexMap<ThreadId, Thread>,
804 thread_states: ThreadStates,
805 variables: HashMap<VariableReference, Vec<dap::Variable>>,
806 stack_frames: IndexMap<StackFrameId, StackFrame>,
807 locations: HashMap<u64, dap::LocationsResponse>,
808 is_session_terminated: bool,
809 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
810 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
811 _background_tasks: Vec<Task<()>>,
812}
813
814trait CacheableCommand: Any + Send + Sync {
815 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
816 fn dyn_hash(&self, hasher: &mut dyn Hasher);
817 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
818}
819
820impl<T> CacheableCommand for T
821where
822 T: DapCommand + PartialEq + Eq + Hash,
823{
824 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
825 (rhs as &dyn Any)
826 .downcast_ref::<Self>()
827 .map_or(false, |rhs| self == rhs)
828 }
829
830 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
831 T::hash(self, &mut hasher);
832 }
833
834 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
835 self
836 }
837}
838
839pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
840
841impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
842 fn from(request: T) -> Self {
843 Self(Arc::new(request))
844 }
845}
846
847impl PartialEq for RequestSlot {
848 fn eq(&self, other: &Self) -> bool {
849 self.0.dyn_eq(other.0.as_ref())
850 }
851}
852
853impl Eq for RequestSlot {}
854
855impl Hash for RequestSlot {
856 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
857 self.0.dyn_hash(state);
858 (&*self.0 as &dyn Any).type_id().hash(state)
859 }
860}
861
862#[derive(Debug, Clone, Hash, PartialEq, Eq)]
863pub struct CompletionsQuery {
864 pub query: String,
865 pub column: u64,
866 pub line: Option<u64>,
867 pub frame_id: Option<u64>,
868}
869
870impl CompletionsQuery {
871 pub fn new(
872 buffer: &language::Buffer,
873 cursor_position: language::Anchor,
874 frame_id: Option<u64>,
875 ) -> Self {
876 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
877 Self {
878 query: buffer.text(),
879 column: column as u64,
880 frame_id,
881 line: Some(row as u64),
882 }
883 }
884}
885
886pub enum SessionEvent {
887 Modules,
888 LoadedSources,
889 Stopped(Option<ThreadId>),
890 StackTrace,
891 Variables,
892 Threads,
893}
894
895pub(crate) enum SessionStateEvent {
896 Shutdown,
897}
898
899impl EventEmitter<SessionEvent> for Session {}
900impl EventEmitter<SessionStateEvent> for Session {}
901
902// local session will send breakpoint updates to DAP for all new breakpoints
903// remote side will only send breakpoint updates when it is a breakpoint created by that peer
904// BreakpointStore notifies session on breakpoint changes
905impl Session {
906 pub(crate) fn local(
907 breakpoint_store: Entity<BreakpointStore>,
908 session_id: SessionId,
909 parent_session: Option<Entity<Session>>,
910 delegate: DapAdapterDelegate,
911 config: DebugAdapterConfig,
912 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
913 initialized_tx: oneshot::Sender<()>,
914 debug_adapters: Arc<DapRegistry>,
915 cx: &mut App,
916 ) -> Task<Result<Entity<Self>>> {
917 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
918
919 cx.spawn(async move |cx| {
920 let mode = LocalMode::new(
921 debug_adapters,
922 session_id,
923 parent_session.clone(),
924 breakpoint_store.clone(),
925 config.clone(),
926 delegate,
927 message_tx,
928 cx.clone(),
929 )
930 .await?;
931
932 cx.new(|cx| {
933 create_local_session(
934 breakpoint_store,
935 session_id,
936 parent_session,
937 start_debugging_requests_tx,
938 initialized_tx,
939 message_rx,
940 mode,
941 cx,
942 )
943 })
944 })
945 }
946
947 #[cfg(any(test, feature = "test-support"))]
948 pub(crate) fn fake(
949 breakpoint_store: Entity<BreakpointStore>,
950 session_id: SessionId,
951 parent_session: Option<Entity<Session>>,
952 delegate: DapAdapterDelegate,
953 config: DebugAdapterConfig,
954 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
955 initialized_tx: oneshot::Sender<()>,
956 caps: Capabilities,
957 fails: bool,
958 cx: &mut App,
959 ) -> Task<Result<Entity<Session>>> {
960 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
961
962 cx.spawn(async move |cx| {
963 let mode = LocalMode::new_fake(
964 session_id,
965 parent_session.clone(),
966 breakpoint_store.clone(),
967 config.clone(),
968 delegate,
969 message_tx,
970 caps,
971 fails,
972 cx.clone(),
973 )
974 .await?;
975
976 cx.new(|cx| {
977 create_local_session(
978 breakpoint_store,
979 session_id,
980 parent_session,
981 start_debugging_requests_tx,
982 initialized_tx,
983 message_rx,
984 mode,
985 cx,
986 )
987 })
988 })
989 }
990
991 pub(crate) fn remote(
992 session_id: SessionId,
993 client: AnyProtoClient,
994 upstream_project_id: u64,
995 ignore_breakpoints: bool,
996 ) -> Self {
997 Self {
998 mode: Mode::Remote(RemoteConnection {
999 _client: client,
1000 _upstream_project_id: upstream_project_id,
1001 }),
1002 id: session_id,
1003 child_session_ids: HashSet::default(),
1004 parent_id: None,
1005 capabilities: Capabilities::default(),
1006 ignore_breakpoints,
1007 variables: Default::default(),
1008 stack_frames: Default::default(),
1009 thread_states: ThreadStates::default(),
1010 output_token: OutputToken(0),
1011 output: circular_buffer::CircularBuffer::boxed(),
1012 requests: HashMap::default(),
1013 modules: Vec::default(),
1014 loaded_sources: Vec::default(),
1015 threads: IndexMap::default(),
1016 _background_tasks: Vec::default(),
1017 locations: Default::default(),
1018 is_session_terminated: false,
1019 exception_breakpoints: Default::default(),
1020 }
1021 }
1022
1023 pub fn session_id(&self) -> SessionId {
1024 self.id
1025 }
1026
1027 pub fn child_session_ids(&self) -> HashSet<SessionId> {
1028 self.child_session_ids.clone()
1029 }
1030
1031 pub fn add_child_session_id(&mut self, session_id: SessionId) {
1032 self.child_session_ids.insert(session_id);
1033 }
1034
1035 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1036 self.child_session_ids.remove(&session_id);
1037 }
1038
1039 pub fn parent_id(&self) -> Option<SessionId> {
1040 self.parent_id
1041 }
1042
1043 pub fn capabilities(&self) -> &Capabilities {
1044 &self.capabilities
1045 }
1046
1047 pub fn configuration(&self) -> Option<DebugAdapterConfig> {
1048 if let Mode::Local(local_mode) = &self.mode {
1049 Some(local_mode.config.clone())
1050 } else {
1051 None
1052 }
1053 }
1054
1055 pub fn is_terminated(&self) -> bool {
1056 self.is_session_terminated
1057 }
1058
1059 pub fn is_local(&self) -> bool {
1060 matches!(self.mode, Mode::Local(_))
1061 }
1062
1063 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
1064 match &mut self.mode {
1065 Mode::Local(local_mode) => Some(local_mode),
1066 Mode::Remote(_) => None,
1067 }
1068 }
1069
1070 pub fn as_local(&self) -> Option<&LocalMode> {
1071 match &self.mode {
1072 Mode::Local(local_mode) => Some(local_mode),
1073 Mode::Remote(_) => None,
1074 }
1075 }
1076
1077 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1078 match &self.mode {
1079 Mode::Local(local_mode) => {
1080 let capabilities = local_mode.clone().request_initialization(cx);
1081
1082 cx.spawn(async move |this, cx| {
1083 let capabilities = capabilities.await?;
1084 this.update(cx, |session, _| {
1085 session.capabilities = capabilities;
1086 let filters = session
1087 .capabilities
1088 .exception_breakpoint_filters
1089 .clone()
1090 .unwrap_or_default();
1091 for filter in filters {
1092 let default = filter.default.unwrap_or_default();
1093 session
1094 .exception_breakpoints
1095 .entry(filter.filter.clone())
1096 .or_insert_with(|| (filter, default));
1097 }
1098 })?;
1099 Ok(())
1100 })
1101 }
1102 Mode::Remote(_) => Task::ready(Err(anyhow!(
1103 "Cannot send initialize request from remote session"
1104 ))),
1105 }
1106 }
1107
1108 pub(super) fn initialize_sequence(
1109 &mut self,
1110 initialize_rx: oneshot::Receiver<()>,
1111 cx: &mut Context<Self>,
1112 ) -> Task<Result<()>> {
1113 match &self.mode {
1114 Mode::Local(local_mode) => {
1115 local_mode.initialize_sequence(&self.capabilities, initialize_rx, cx)
1116 }
1117 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
1118 }
1119 }
1120
1121 pub fn run_to_position(
1122 &mut self,
1123 breakpoint: SourceBreakpoint,
1124 active_thread_id: ThreadId,
1125 cx: &mut Context<Self>,
1126 ) {
1127 match &mut self.mode {
1128 Mode::Local(local_mode) => {
1129 if !matches!(
1130 self.thread_states.thread_state(active_thread_id),
1131 Some(ThreadStatus::Stopped)
1132 ) {
1133 return;
1134 };
1135 let path = breakpoint.path.clone();
1136 local_mode.tmp_breakpoint = Some(breakpoint);
1137 let task = local_mode.send_breakpoints_from_path(
1138 path,
1139 BreakpointUpdatedReason::Toggled,
1140 cx,
1141 );
1142
1143 cx.spawn(async move |this, cx| {
1144 task.await;
1145 this.update(cx, |this, cx| {
1146 this.continue_thread(active_thread_id, cx);
1147 })
1148 })
1149 .detach();
1150 }
1151 Mode::Remote(_) => {}
1152 }
1153 }
1154
1155 pub fn output(
1156 &self,
1157 since: OutputToken,
1158 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1159 if self.output_token.0 == 0 {
1160 return (self.output.range(0..0), OutputToken(0));
1161 };
1162
1163 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1164
1165 let clamped_events_since = events_since.clamp(0, self.output.len());
1166 (
1167 self.output
1168 .range(self.output.len() - clamped_events_since..),
1169 self.output_token,
1170 )
1171 }
1172
1173 pub fn respond_to_client(
1174 &self,
1175 request_seq: u64,
1176 success: bool,
1177 command: String,
1178 body: Option<serde_json::Value>,
1179 cx: &mut Context<Self>,
1180 ) -> Task<Result<()>> {
1181 let Some(local_session) = self.as_local().cloned() else {
1182 unreachable!("Cannot respond to remote client");
1183 };
1184
1185 cx.background_spawn(async move {
1186 local_session
1187 .client
1188 .send_message(Message::Response(Response {
1189 body,
1190 success,
1191 command,
1192 seq: request_seq + 1,
1193 request_seq,
1194 message: None,
1195 }))
1196 .await
1197 })
1198 }
1199
1200 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1201 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1202 let breakpoint = local.tmp_breakpoint.take()?;
1203 let path = breakpoint.path.clone();
1204 Some((local, path))
1205 }) {
1206 local
1207 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1208 .detach();
1209 };
1210
1211 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1212 self.thread_states.stop_all_threads();
1213
1214 self.invalidate_command_type::<StackTraceCommand>();
1215 }
1216
1217 // Event if we stopped all threads we still need to insert the thread_id
1218 // to our own data
1219 if let Some(thread_id) = event.thread_id {
1220 self.thread_states.stop_thread(ThreadId(thread_id));
1221
1222 self.invalidate_state(
1223 &StackTraceCommand {
1224 thread_id,
1225 start_frame: None,
1226 levels: None,
1227 }
1228 .into(),
1229 );
1230 }
1231
1232 self.invalidate_generic();
1233 self.threads.clear();
1234 self.variables.clear();
1235 cx.emit(SessionEvent::Stopped(
1236 event
1237 .thread_id
1238 .map(Into::into)
1239 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1240 ));
1241 cx.notify();
1242 }
1243
1244 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1245 match *event {
1246 Events::Initialized(_) => {
1247 debug_assert!(
1248 false,
1249 "Initialized event should have been handled in LocalMode"
1250 );
1251 }
1252 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1253 Events::Continued(event) => {
1254 if event.all_threads_continued.unwrap_or_default() {
1255 self.thread_states.continue_all_threads();
1256 } else {
1257 self.thread_states
1258 .continue_thread(ThreadId(event.thread_id));
1259 }
1260 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1261 self.invalidate_generic();
1262 }
1263 Events::Exited(_event) => {
1264 self.clear_active_debug_line(cx);
1265 }
1266 Events::Terminated(_) => {
1267 self.is_session_terminated = true;
1268 self.clear_active_debug_line(cx);
1269 }
1270 Events::Thread(event) => {
1271 let thread_id = ThreadId(event.thread_id);
1272
1273 match event.reason {
1274 dap::ThreadEventReason::Started => {
1275 self.thread_states.continue_thread(thread_id);
1276 }
1277 dap::ThreadEventReason::Exited => {
1278 self.thread_states.exit_thread(thread_id);
1279 }
1280 reason => {
1281 log::error!("Unhandled thread event reason {:?}", reason);
1282 }
1283 }
1284 self.invalidate_state(&ThreadsCommand.into());
1285 cx.notify();
1286 }
1287 Events::Output(event) => {
1288 if event
1289 .category
1290 .as_ref()
1291 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1292 {
1293 return;
1294 }
1295
1296 self.output.push_back(event);
1297 self.output_token.0 += 1;
1298 cx.notify();
1299 }
1300 Events::Breakpoint(_) => {}
1301 Events::Module(event) => {
1302 match event.reason {
1303 dap::ModuleEventReason::New => {
1304 self.modules.push(event.module);
1305 }
1306 dap::ModuleEventReason::Changed => {
1307 if let Some(module) = self
1308 .modules
1309 .iter_mut()
1310 .find(|other| event.module.id == other.id)
1311 {
1312 *module = event.module;
1313 }
1314 }
1315 dap::ModuleEventReason::Removed => {
1316 self.modules.retain(|other| event.module.id != other.id);
1317 }
1318 }
1319
1320 // todo(debugger): We should only send the invalidate command to downstream clients.
1321 // self.invalidate_state(&ModulesCommand.into());
1322 }
1323 Events::LoadedSource(_) => {
1324 self.invalidate_state(&LoadedSourcesCommand.into());
1325 }
1326 Events::Capabilities(event) => {
1327 self.capabilities = self.capabilities.merge(event.capabilities);
1328 cx.notify();
1329 }
1330 Events::Memory(_) => {}
1331 Events::Process(_) => {}
1332 Events::ProgressEnd(_) => {}
1333 Events::ProgressStart(_) => {}
1334 Events::ProgressUpdate(_) => {}
1335 Events::Invalidated(_) => {}
1336 Events::Other(_) => {}
1337 }
1338 }
1339
1340 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1341 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1342 &mut self,
1343 request: T,
1344 process_result: impl FnOnce(
1345 &mut Self,
1346 Result<T::Response>,
1347 &mut Context<Self>,
1348 ) -> Option<T::Response>
1349 + 'static,
1350 cx: &mut Context<Self>,
1351 ) {
1352 const {
1353 assert!(
1354 T::CACHEABLE,
1355 "Only requests marked as cacheable should invoke `fetch`"
1356 );
1357 }
1358
1359 if !self.thread_states.any_stopped_thread()
1360 && request.type_id() != TypeId::of::<ThreadsCommand>()
1361 || self.is_session_terminated
1362 {
1363 return;
1364 }
1365
1366 let request_map = self
1367 .requests
1368 .entry(std::any::TypeId::of::<T>())
1369 .or_default();
1370
1371 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1372 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1373
1374 let task = Self::request_inner::<Arc<T>>(
1375 &self.capabilities,
1376 self.id,
1377 &self.mode,
1378 command,
1379 process_result,
1380 cx,
1381 );
1382 let task = cx
1383 .background_executor()
1384 .spawn(async move {
1385 let _ = task.await?;
1386 Some(())
1387 })
1388 .shared();
1389
1390 vacant.insert(task);
1391 cx.notify();
1392 }
1393 }
1394
1395 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1396 capabilities: &Capabilities,
1397 session_id: SessionId,
1398 mode: &Mode,
1399 request: T,
1400 process_result: impl FnOnce(
1401 &mut Self,
1402 Result<T::Response>,
1403 &mut Context<Self>,
1404 ) -> Option<T::Response>
1405 + 'static,
1406 cx: &mut Context<Self>,
1407 ) -> Task<Option<T::Response>> {
1408 if !T::is_supported(&capabilities) {
1409 log::warn!(
1410 "Attempted to send a DAP request that isn't supported: {:?}",
1411 request
1412 );
1413 let error = Err(anyhow::Error::msg(
1414 "Couldn't complete request because it's not supported",
1415 ));
1416 return cx.spawn(async move |this, cx| {
1417 this.update(cx, |this, cx| process_result(this, error, cx))
1418 .log_err()
1419 .flatten()
1420 });
1421 }
1422
1423 let request = mode.request_dap(session_id, request, cx);
1424 cx.spawn(async move |this, cx| {
1425 let result = request.await;
1426 this.update(cx, |this, cx| process_result(this, result, cx))
1427 .log_err()
1428 .flatten()
1429 })
1430 }
1431
1432 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1433 &self,
1434 request: T,
1435 process_result: impl FnOnce(
1436 &mut Self,
1437 Result<T::Response>,
1438 &mut Context<Self>,
1439 ) -> Option<T::Response>
1440 + 'static,
1441 cx: &mut Context<Self>,
1442 ) -> Task<Option<T::Response>> {
1443 Self::request_inner(
1444 &self.capabilities,
1445 self.id,
1446 &self.mode,
1447 request,
1448 process_result,
1449 cx,
1450 )
1451 }
1452
1453 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1454 self.requests.remove(&std::any::TypeId::of::<Command>());
1455 }
1456
1457 fn invalidate_generic(&mut self) {
1458 self.invalidate_command_type::<ModulesCommand>();
1459 self.invalidate_command_type::<LoadedSourcesCommand>();
1460 self.invalidate_command_type::<ThreadsCommand>();
1461 }
1462
1463 fn invalidate_state(&mut self, key: &RequestSlot) {
1464 self.requests
1465 .entry((&*key.0 as &dyn Any).type_id())
1466 .and_modify(|request_map| {
1467 request_map.remove(&key);
1468 });
1469 }
1470
1471 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1472 self.thread_states.thread_status(thread_id)
1473 }
1474
1475 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1476 self.fetch(
1477 dap_command::ThreadsCommand,
1478 |this, result, cx| {
1479 let result = result.log_err()?;
1480
1481 this.threads = result
1482 .iter()
1483 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1484 .collect();
1485
1486 this.invalidate_command_type::<StackTraceCommand>();
1487 cx.emit(SessionEvent::Threads);
1488 cx.notify();
1489
1490 Some(result)
1491 },
1492 cx,
1493 );
1494
1495 self.threads
1496 .values()
1497 .map(|thread| {
1498 (
1499 thread.dap.clone(),
1500 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1501 )
1502 })
1503 .collect()
1504 }
1505
1506 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1507 self.fetch(
1508 dap_command::ModulesCommand,
1509 |this, result, cx| {
1510 let result = result.log_err()?;
1511
1512 this.modules = result.iter().cloned().collect();
1513 cx.emit(SessionEvent::Modules);
1514 cx.notify();
1515
1516 Some(result)
1517 },
1518 cx,
1519 );
1520
1521 &self.modules
1522 }
1523
1524 pub fn ignore_breakpoints(&self) -> bool {
1525 self.ignore_breakpoints
1526 }
1527
1528 pub fn toggle_ignore_breakpoints(&mut self, cx: &mut App) -> Task<()> {
1529 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1530 }
1531
1532 pub(crate) fn set_ignore_breakpoints(&mut self, ignore: bool, cx: &mut App) -> Task<()> {
1533 if self.ignore_breakpoints == ignore {
1534 return Task::ready(());
1535 }
1536
1537 self.ignore_breakpoints = ignore;
1538
1539 if let Some(local) = self.as_local() {
1540 local.send_source_breakpoints(ignore, cx)
1541 } else {
1542 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1543 unimplemented!()
1544 }
1545 }
1546
1547 pub fn exception_breakpoints(
1548 &self,
1549 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1550 self.exception_breakpoints.values()
1551 }
1552
1553 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1554 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1555 *is_enabled = !*is_enabled;
1556 self.send_exception_breakpoints(cx);
1557 }
1558 }
1559
1560 fn send_exception_breakpoints(&mut self, cx: &App) {
1561 if let Some(local) = self.as_local() {
1562 let exception_filters = self
1563 .exception_breakpoints
1564 .values()
1565 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1566 .collect();
1567
1568 let supports_exception_filters = self
1569 .capabilities
1570 .supports_exception_filter_options
1571 .unwrap_or_default();
1572 local
1573 .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1574 .detach_and_log_err(cx);
1575 } else {
1576 debug_assert!(false, "Not implemented");
1577 }
1578 }
1579
1580 pub fn breakpoints_enabled(&self) -> bool {
1581 self.ignore_breakpoints
1582 }
1583
1584 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1585 self.fetch(
1586 dap_command::LoadedSourcesCommand,
1587 |this, result, cx| {
1588 let result = result.log_err()?;
1589 this.loaded_sources = result.iter().cloned().collect();
1590 cx.emit(SessionEvent::LoadedSources);
1591 cx.notify();
1592 Some(result)
1593 },
1594 cx,
1595 );
1596
1597 &self.loaded_sources
1598 }
1599
1600 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1601 res.log_err()?;
1602 Some(())
1603 }
1604
1605 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1606 thread_id: ThreadId,
1607 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1608 {
1609 move |this, response, cx| match response.log_err() {
1610 Some(response) => Some(response),
1611 None => {
1612 this.thread_states.stop_thread(thread_id);
1613 cx.notify();
1614 None
1615 }
1616 }
1617 }
1618
1619 fn clear_active_debug_line_response(
1620 &mut self,
1621 response: Result<()>,
1622 cx: &mut Context<Session>,
1623 ) -> Option<()> {
1624 response.log_err()?;
1625 self.clear_active_debug_line(cx);
1626 Some(())
1627 }
1628
1629 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1630 self.as_local()
1631 .expect("Message handler will only run in local mode")
1632 .breakpoint_store
1633 .update(cx, |store, cx| {
1634 store.remove_active_position(Some(self.id), cx)
1635 });
1636 }
1637
1638 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1639 self.request(
1640 PauseCommand {
1641 thread_id: thread_id.0,
1642 },
1643 Self::empty_response,
1644 cx,
1645 )
1646 .detach();
1647 }
1648
1649 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1650 self.request(
1651 RestartStackFrameCommand { stack_frame_id },
1652 Self::empty_response,
1653 cx,
1654 )
1655 .detach();
1656 }
1657
1658 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1659 if self.capabilities.supports_restart_request.unwrap_or(false) {
1660 self.request(
1661 RestartCommand {
1662 raw: args.unwrap_or(Value::Null),
1663 },
1664 Self::empty_response,
1665 cx,
1666 )
1667 .detach();
1668 } else {
1669 self.request(
1670 DisconnectCommand {
1671 restart: Some(false),
1672 terminate_debuggee: Some(true),
1673 suspend_debuggee: Some(false),
1674 },
1675 Self::empty_response,
1676 cx,
1677 )
1678 .detach();
1679 }
1680 }
1681
1682 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1683 self.is_session_terminated = true;
1684 self.thread_states.exit_all_threads();
1685 cx.notify();
1686
1687 let task = if self
1688 .capabilities
1689 .supports_terminate_request
1690 .unwrap_or_default()
1691 {
1692 self.request(
1693 TerminateCommand {
1694 restart: Some(false),
1695 },
1696 Self::clear_active_debug_line_response,
1697 cx,
1698 )
1699 } else {
1700 self.request(
1701 DisconnectCommand {
1702 restart: Some(false),
1703 terminate_debuggee: Some(true),
1704 suspend_debuggee: Some(false),
1705 },
1706 Self::clear_active_debug_line_response,
1707 cx,
1708 )
1709 };
1710
1711 cx.emit(SessionStateEvent::Shutdown);
1712
1713 cx.background_spawn(async move {
1714 let _ = task.await;
1715 })
1716 }
1717
1718 pub fn completions(
1719 &mut self,
1720 query: CompletionsQuery,
1721 cx: &mut Context<Self>,
1722 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1723 let task = self.request(query, |_, result, _| result.log_err(), cx);
1724
1725 cx.background_executor().spawn(async move {
1726 anyhow::Ok(
1727 task.await
1728 .map(|response| response.targets)
1729 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1730 )
1731 })
1732 }
1733
1734 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1735 self.thread_states.continue_thread(thread_id);
1736 self.request(
1737 ContinueCommand {
1738 args: ContinueArguments {
1739 thread_id: thread_id.0,
1740 single_thread: Some(true),
1741 },
1742 },
1743 Self::on_step_response::<ContinueCommand>(thread_id),
1744 cx,
1745 )
1746 .detach();
1747 }
1748
1749 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1750 match self.mode {
1751 Mode::Local(ref local) => Some(local.client.clone()),
1752 Mode::Remote(_) => None,
1753 }
1754 }
1755
1756 pub fn step_over(
1757 &mut self,
1758 thread_id: ThreadId,
1759 granularity: SteppingGranularity,
1760 cx: &mut Context<Self>,
1761 ) {
1762 let supports_single_thread_execution_requests =
1763 self.capabilities.supports_single_thread_execution_requests;
1764 let supports_stepping_granularity = self
1765 .capabilities
1766 .supports_stepping_granularity
1767 .unwrap_or_default();
1768
1769 let command = NextCommand {
1770 inner: StepCommand {
1771 thread_id: thread_id.0,
1772 granularity: supports_stepping_granularity.then(|| granularity),
1773 single_thread: supports_single_thread_execution_requests,
1774 },
1775 };
1776
1777 self.thread_states.process_step(thread_id);
1778 self.request(
1779 command,
1780 Self::on_step_response::<NextCommand>(thread_id),
1781 cx,
1782 )
1783 .detach();
1784 }
1785
1786 pub fn step_in(
1787 &mut self,
1788 thread_id: ThreadId,
1789 granularity: SteppingGranularity,
1790 cx: &mut Context<Self>,
1791 ) {
1792 let supports_single_thread_execution_requests =
1793 self.capabilities.supports_single_thread_execution_requests;
1794 let supports_stepping_granularity = self
1795 .capabilities
1796 .supports_stepping_granularity
1797 .unwrap_or_default();
1798
1799 let command = StepInCommand {
1800 inner: StepCommand {
1801 thread_id: thread_id.0,
1802 granularity: supports_stepping_granularity.then(|| granularity),
1803 single_thread: supports_single_thread_execution_requests,
1804 },
1805 };
1806
1807 self.thread_states.process_step(thread_id);
1808 self.request(
1809 command,
1810 Self::on_step_response::<StepInCommand>(thread_id),
1811 cx,
1812 )
1813 .detach();
1814 }
1815
1816 pub fn step_out(
1817 &mut self,
1818 thread_id: ThreadId,
1819 granularity: SteppingGranularity,
1820 cx: &mut Context<Self>,
1821 ) {
1822 let supports_single_thread_execution_requests =
1823 self.capabilities.supports_single_thread_execution_requests;
1824 let supports_stepping_granularity = self
1825 .capabilities
1826 .supports_stepping_granularity
1827 .unwrap_or_default();
1828
1829 let command = StepOutCommand {
1830 inner: StepCommand {
1831 thread_id: thread_id.0,
1832 granularity: supports_stepping_granularity.then(|| granularity),
1833 single_thread: supports_single_thread_execution_requests,
1834 },
1835 };
1836
1837 self.thread_states.process_step(thread_id);
1838 self.request(
1839 command,
1840 Self::on_step_response::<StepOutCommand>(thread_id),
1841 cx,
1842 )
1843 .detach();
1844 }
1845
1846 pub fn step_back(
1847 &mut self,
1848 thread_id: ThreadId,
1849 granularity: SteppingGranularity,
1850 cx: &mut Context<Self>,
1851 ) {
1852 let supports_single_thread_execution_requests =
1853 self.capabilities.supports_single_thread_execution_requests;
1854 let supports_stepping_granularity = self
1855 .capabilities
1856 .supports_stepping_granularity
1857 .unwrap_or_default();
1858
1859 let command = StepBackCommand {
1860 inner: StepCommand {
1861 thread_id: thread_id.0,
1862 granularity: supports_stepping_granularity.then(|| granularity),
1863 single_thread: supports_single_thread_execution_requests,
1864 },
1865 };
1866
1867 self.thread_states.process_step(thread_id);
1868
1869 self.request(
1870 command,
1871 Self::on_step_response::<StepBackCommand>(thread_id),
1872 cx,
1873 )
1874 .detach();
1875 }
1876
1877 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1878 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1879 && self.requests.contains_key(&ThreadsCommand.type_id())
1880 && self.threads.contains_key(&thread_id)
1881 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1882 // We could still be using an old thread id and have sent a new thread's request
1883 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1884 // But it very well could cause a minor bug in the future that is hard to track down
1885 {
1886 self.fetch(
1887 super::dap_command::StackTraceCommand {
1888 thread_id: thread_id.0,
1889 start_frame: None,
1890 levels: None,
1891 },
1892 move |this, stack_frames, cx| {
1893 let stack_frames = stack_frames.log_err()?;
1894
1895 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1896 thread.stack_frame_ids =
1897 stack_frames.iter().map(|frame| frame.id).collect();
1898 });
1899 debug_assert!(
1900 matches!(entry, indexmap::map::Entry::Occupied(_)),
1901 "Sent request for thread_id that doesn't exist"
1902 );
1903
1904 this.stack_frames.extend(
1905 stack_frames
1906 .iter()
1907 .cloned()
1908 .map(|frame| (frame.id, StackFrame::from(frame))),
1909 );
1910
1911 this.invalidate_command_type::<ScopesCommand>();
1912 this.invalidate_command_type::<VariablesCommand>();
1913
1914 cx.emit(SessionEvent::StackTrace);
1915 cx.notify();
1916 Some(stack_frames)
1917 },
1918 cx,
1919 );
1920 }
1921
1922 self.threads
1923 .get(&thread_id)
1924 .map(|thread| {
1925 thread
1926 .stack_frame_ids
1927 .iter()
1928 .filter_map(|id| self.stack_frames.get(id))
1929 .cloned()
1930 .collect()
1931 })
1932 .unwrap_or_default()
1933 }
1934
1935 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1936 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1937 && self
1938 .requests
1939 .contains_key(&TypeId::of::<StackTraceCommand>())
1940 {
1941 self.fetch(
1942 ScopesCommand { stack_frame_id },
1943 move |this, scopes, cx| {
1944 let scopes = scopes.log_err()?;
1945
1946 for scope in scopes .iter(){
1947 this.variables(scope.variables_reference, cx);
1948 }
1949
1950 let entry = this
1951 .stack_frames
1952 .entry(stack_frame_id)
1953 .and_modify(|stack_frame| {
1954 stack_frame.scopes = scopes.clone();
1955 });
1956
1957 cx.emit(SessionEvent::Variables);
1958
1959 debug_assert!(
1960 matches!(entry, indexmap::map::Entry::Occupied(_)),
1961 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1962 );
1963
1964 Some(scopes)
1965 },
1966 cx,
1967 );
1968 }
1969
1970 self.stack_frames
1971 .get(&stack_frame_id)
1972 .map(|frame| frame.scopes.as_slice())
1973 .unwrap_or_default()
1974 }
1975
1976 pub fn variables(
1977 &mut self,
1978 variables_reference: VariableReference,
1979 cx: &mut Context<Self>,
1980 ) -> Vec<dap::Variable> {
1981 let command = VariablesCommand {
1982 variables_reference,
1983 filter: None,
1984 start: None,
1985 count: None,
1986 format: None,
1987 };
1988
1989 self.fetch(
1990 command,
1991 move |this, variables, cx| {
1992 let variables = variables.log_err()?;
1993 this.variables
1994 .insert(variables_reference, variables.clone());
1995
1996 cx.emit(SessionEvent::Variables);
1997 Some(variables)
1998 },
1999 cx,
2000 );
2001
2002 self.variables
2003 .get(&variables_reference)
2004 .cloned()
2005 .unwrap_or_default()
2006 }
2007
2008 pub fn set_variable_value(
2009 &mut self,
2010 variables_reference: u64,
2011 name: String,
2012 value: String,
2013 cx: &mut Context<Self>,
2014 ) {
2015 if self.capabilities.supports_set_variable.unwrap_or_default() {
2016 self.request(
2017 SetVariableValueCommand {
2018 name,
2019 value,
2020 variables_reference,
2021 },
2022 move |this, response, cx| {
2023 let response = response.log_err()?;
2024 this.invalidate_command_type::<VariablesCommand>();
2025 cx.notify();
2026 Some(response)
2027 },
2028 cx,
2029 )
2030 .detach()
2031 }
2032 }
2033
2034 pub fn evaluate(
2035 &mut self,
2036 expression: String,
2037 context: Option<EvaluateArgumentsContext>,
2038 frame_id: Option<u64>,
2039 source: Option<Source>,
2040 cx: &mut Context<Self>,
2041 ) {
2042 self.request(
2043 EvaluateCommand {
2044 expression,
2045 context,
2046 frame_id,
2047 source,
2048 },
2049 |this, response, cx| {
2050 let response = response.log_err()?;
2051 this.output_token.0 += 1;
2052 this.output.push_back(dap::OutputEvent {
2053 category: None,
2054 output: response.result.clone(),
2055 group: None,
2056 variables_reference: Some(response.variables_reference),
2057 source: None,
2058 line: None,
2059 column: None,
2060 data: None,
2061 location_reference: None,
2062 });
2063
2064 this.invalidate_command_type::<ScopesCommand>();
2065 cx.notify();
2066 Some(response)
2067 },
2068 cx,
2069 )
2070 .detach();
2071 }
2072
2073 pub fn location(
2074 &mut self,
2075 reference: u64,
2076 cx: &mut Context<Self>,
2077 ) -> Option<dap::LocationsResponse> {
2078 self.fetch(
2079 LocationsCommand { reference },
2080 move |this, response, _| {
2081 let response = response.log_err()?;
2082 this.locations.insert(reference, response.clone());
2083 Some(response)
2084 },
2085 cx,
2086 );
2087 self.locations.get(&reference).cloned()
2088 }
2089 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2090 let command = DisconnectCommand {
2091 restart: Some(false),
2092 terminate_debuggee: Some(true),
2093 suspend_debuggee: Some(false),
2094 };
2095
2096 self.request(command, Self::empty_response, cx).detach()
2097 }
2098
2099 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2100 if self
2101 .capabilities
2102 .supports_terminate_threads_request
2103 .unwrap_or_default()
2104 {
2105 self.request(
2106 TerminateThreadsCommand {
2107 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2108 },
2109 Self::clear_active_debug_line_response,
2110 cx,
2111 )
2112 .detach();
2113 } else {
2114 self.shutdown(cx).detach();
2115 }
2116 }
2117}
2118
2119fn create_local_session(
2120 breakpoint_store: Entity<BreakpointStore>,
2121 session_id: SessionId,
2122 parent_session: Option<Entity<Session>>,
2123 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
2124 initialized_tx: oneshot::Sender<()>,
2125 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
2126 mode: LocalMode,
2127 cx: &mut Context<Session>,
2128) -> Session {
2129 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
2130 let mut initialized_tx = Some(initialized_tx);
2131 while let Some(message) = message_rx.next().await {
2132 if let Message::Event(event) = message {
2133 if let Events::Initialized(_) = *event {
2134 if let Some(tx) = initialized_tx.take() {
2135 tx.send(()).ok();
2136 }
2137 } else {
2138 let Ok(_) = this.update(cx, |session, cx| {
2139 session.handle_dap_event(event, cx);
2140 }) else {
2141 break;
2142 };
2143 }
2144 } else {
2145 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
2146 else {
2147 break;
2148 };
2149 }
2150 }
2151 })];
2152
2153 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
2154 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
2155 if let Some(local) = (!this.ignore_breakpoints)
2156 .then(|| this.as_local_mut())
2157 .flatten()
2158 {
2159 local
2160 .send_breakpoints_from_path(path.clone(), *reason, cx)
2161 .detach();
2162 };
2163 }
2164 BreakpointStoreEvent::BreakpointsCleared(paths) => {
2165 if let Some(local) = (!this.ignore_breakpoints)
2166 .then(|| this.as_local_mut())
2167 .flatten()
2168 {
2169 local.unset_breakpoints_from_paths(paths, cx).detach();
2170 }
2171 }
2172 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2173 })
2174 .detach();
2175
2176 Session {
2177 mode: Mode::Local(mode),
2178 id: session_id,
2179 child_session_ids: HashSet::default(),
2180 parent_id: parent_session.map(|session| session.read(cx).id),
2181 variables: Default::default(),
2182 capabilities: Capabilities::default(),
2183 thread_states: ThreadStates::default(),
2184 output_token: OutputToken(0),
2185 ignore_breakpoints: false,
2186 output: circular_buffer::CircularBuffer::boxed(),
2187 requests: HashMap::default(),
2188 modules: Vec::default(),
2189 loaded_sources: Vec::default(),
2190 threads: IndexMap::default(),
2191 stack_frames: IndexMap::default(),
2192 locations: Default::default(),
2193 exception_breakpoints: Default::default(),
2194 _background_tasks,
2195 is_session_terminated: false,
2196 }
2197}