1use super::breakpoint_store::{
2 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
3};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
9 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
10 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
11};
12use super::dap_store::DapStore;
13use anyhow::{Context as _, Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::DebugAdapterBinary;
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 client::{DebugAdapterClient, SessionId},
21 messages::{Events, Message},
22};
23use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
24use futures::channel::oneshot;
25use futures::{FutureExt, future::Shared};
26use gpui::{
27 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
28 Task, WeakEntity,
29};
30
31use rpc::AnyProtoClient;
32use serde_json::{Value, json};
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::collections::BTreeMap;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::DebugTaskDefinition;
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47use worktree::Worktree;
48
49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
50#[repr(transparent)]
51pub struct ThreadId(pub u64);
52
53impl ThreadId {
54 pub const MIN: ThreadId = ThreadId(u64::MIN);
55 pub const MAX: ThreadId = ThreadId(u64::MAX);
56}
57
58impl From<u64> for ThreadId {
59 fn from(id: u64) -> Self {
60 Self(id)
61 }
62}
63
64#[derive(Clone, Debug)]
65pub struct StackFrame {
66 pub dap: dap::StackFrame,
67 pub scopes: Vec<dap::Scope>,
68}
69
70impl From<dap::StackFrame> for StackFrame {
71 fn from(stack_frame: dap::StackFrame) -> Self {
72 Self {
73 scopes: vec![],
74 dap: stack_frame,
75 }
76 }
77}
78
79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
80pub enum ThreadStatus {
81 #[default]
82 Running,
83 Stopped,
84 Stepping,
85 Exited,
86 Ended,
87}
88
89impl ThreadStatus {
90 pub fn label(&self) -> &'static str {
91 match self {
92 ThreadStatus::Running => "Running",
93 ThreadStatus::Stopped => "Stopped",
94 ThreadStatus::Stepping => "Stepping",
95 ThreadStatus::Exited => "Exited",
96 ThreadStatus::Ended => "Ended",
97 }
98 }
99}
100
101#[derive(Debug)]
102pub struct Thread {
103 dap: dap::Thread,
104 stack_frame_ids: IndexSet<StackFrameId>,
105 _has_stopped: bool,
106}
107
108impl From<dap::Thread> for Thread {
109 fn from(dap: dap::Thread) -> Self {
110 Self {
111 dap,
112 stack_frame_ids: Default::default(),
113 _has_stopped: false,
114 }
115 }
116}
117
118type UpstreamProjectId = u64;
119
120struct RemoteConnection {
121 _client: AnyProtoClient,
122 _upstream_project_id: UpstreamProjectId,
123 _adapter_name: SharedString,
124}
125
126impl RemoteConnection {
127 fn send_proto_client_request<R: DapCommand>(
128 &self,
129 _request: R,
130 _session_id: SessionId,
131 cx: &mut App,
132 ) -> Task<Result<R::Response>> {
133 // let message = request.to_proto(session_id, self.upstream_project_id);
134 // let upstream_client = self.client.clone();
135 cx.background_executor().spawn(async move {
136 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
137 // let response = upstream_client.request(message).await?;
138 // request.response_from_proto(response)
139 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
140 })
141 }
142
143 fn request<R: DapCommand>(
144 &self,
145 request: R,
146 session_id: SessionId,
147 cx: &mut App,
148 ) -> Task<Result<R::Response>>
149 where
150 <R::DapRequest as dap::requests::Request>::Response: 'static,
151 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
152 {
153 return self.send_proto_client_request::<R>(request, session_id, cx);
154 }
155}
156
157enum Mode {
158 Local(LocalMode),
159 Remote(RemoteConnection),
160}
161
162#[derive(Clone)]
163pub struct LocalMode {
164 client: Arc<DebugAdapterClient>,
165 definition: DebugTaskDefinition,
166 binary: DebugAdapterBinary,
167 pub(crate) breakpoint_store: Entity<BreakpointStore>,
168 tmp_breakpoint: Option<SourceBreakpoint>,
169 worktree: WeakEntity<Worktree>,
170}
171
172fn client_source(abs_path: &Path) -> dap::Source {
173 dap::Source {
174 name: abs_path
175 .file_name()
176 .map(|filename| filename.to_string_lossy().to_string()),
177 path: Some(abs_path.to_string_lossy().to_string()),
178 source_reference: None,
179 presentation_hint: None,
180 origin: None,
181 sources: None,
182 adapter_data: None,
183 checksums: None,
184 }
185}
186
187impl LocalMode {
188 fn new(
189 session_id: SessionId,
190 parent_session: Option<Entity<Session>>,
191 worktree: WeakEntity<Worktree>,
192 breakpoint_store: Entity<BreakpointStore>,
193 config: DebugTaskDefinition,
194 binary: DebugAdapterBinary,
195 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
196 cx: AsyncApp,
197 ) -> Task<Result<Self>> {
198 Self::new_inner(
199 session_id,
200 parent_session,
201 breakpoint_store,
202 worktree,
203 config,
204 binary,
205 messages_tx,
206 async |_, _| {},
207 cx,
208 )
209 }
210
211 fn new_inner(
212 session_id: SessionId,
213 parent_session: Option<Entity<Session>>,
214 breakpoint_store: Entity<BreakpointStore>,
215 worktree: WeakEntity<Worktree>,
216 config: DebugTaskDefinition,
217 binary: DebugAdapterBinary,
218 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
219 on_initialized: impl AsyncFnOnce(&mut LocalMode, AsyncApp) + 'static,
220 cx: AsyncApp,
221 ) -> Task<Result<Self>> {
222 cx.spawn(async move |cx| {
223 let message_handler = Box::new(move |message| {
224 messages_tx.unbounded_send(message).ok();
225 });
226
227 let client = Arc::new(
228 if let Some(client) = parent_session
229 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
230 .flatten()
231 {
232 client
233 .reconnect(session_id, binary.clone(), message_handler, cx.clone())
234 .await?
235 } else {
236 DebugAdapterClient::start(
237 session_id,
238 binary.clone(),
239 message_handler,
240 cx.clone(),
241 )
242 .await
243 .with_context(|| "Failed to start communication with debug adapter")?
244 },
245 );
246
247 let mut session = Self {
248 client,
249 breakpoint_store,
250 worktree,
251 tmp_breakpoint: None,
252 definition: config,
253 binary,
254 };
255
256 on_initialized(&mut session, cx.clone()).await;
257
258 Ok(session)
259 })
260 }
261
262 pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
263 &self.worktree
264 }
265
266 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
267 let tasks: Vec<_> = paths
268 .into_iter()
269 .map(|path| {
270 self.request(
271 dap_command::SetBreakpoints {
272 source: client_source(path),
273 source_modified: None,
274 breakpoints: vec![],
275 },
276 cx.background_executor().clone(),
277 )
278 })
279 .collect();
280
281 cx.background_spawn(async move {
282 futures::future::join_all(tasks)
283 .await
284 .iter()
285 .for_each(|res| match res {
286 Ok(_) => {}
287 Err(err) => {
288 log::warn!("Set breakpoints request failed: {}", err);
289 }
290 });
291 })
292 }
293
294 fn send_breakpoints_from_path(
295 &self,
296 abs_path: Arc<Path>,
297 reason: BreakpointUpdatedReason,
298 cx: &mut App,
299 ) -> Task<()> {
300 let breakpoints = self
301 .breakpoint_store
302 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
303 .into_iter()
304 .filter(|bp| bp.state.is_enabled())
305 .chain(self.tmp_breakpoint.clone())
306 .map(Into::into)
307 .collect();
308
309 let task = self.request(
310 dap_command::SetBreakpoints {
311 source: client_source(&abs_path),
312 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
313 breakpoints,
314 },
315 cx.background_executor().clone(),
316 );
317
318 cx.background_spawn(async move {
319 match task.await {
320 Ok(_) => {}
321 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
322 }
323 })
324 }
325
326 fn send_exception_breakpoints(
327 &self,
328 filters: Vec<ExceptionBreakpointsFilter>,
329 supports_filter_options: bool,
330 cx: &App,
331 ) -> Task<Result<Vec<dap::Breakpoint>>> {
332 let arg = if supports_filter_options {
333 SetExceptionBreakpoints::WithOptions {
334 filters: filters
335 .into_iter()
336 .map(|filter| ExceptionFilterOptions {
337 filter_id: filter.filter,
338 condition: None,
339 mode: None,
340 })
341 .collect(),
342 }
343 } else {
344 SetExceptionBreakpoints::Plain {
345 filters: filters.into_iter().map(|filter| filter.filter).collect(),
346 }
347 };
348 self.request(arg, cx.background_executor().clone())
349 }
350
351 fn send_source_breakpoints(
352 &self,
353 ignore_breakpoints: bool,
354 cx: &App,
355 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
356 let mut breakpoint_tasks = Vec::new();
357 let breakpoints = self
358 .breakpoint_store
359 .read_with(cx, |store, cx| store.all_breakpoints(cx));
360
361 for (path, breakpoints) in breakpoints {
362 let breakpoints = if ignore_breakpoints {
363 vec![]
364 } else {
365 breakpoints
366 .into_iter()
367 .filter(|bp| bp.state.is_enabled())
368 .map(Into::into)
369 .collect()
370 };
371
372 breakpoint_tasks.push(
373 self.request(
374 dap_command::SetBreakpoints {
375 source: client_source(&path),
376 source_modified: Some(false),
377 breakpoints,
378 },
379 cx.background_executor().clone(),
380 )
381 .map(|result| result.map_err(|e| (path, e))),
382 );
383 }
384
385 cx.background_spawn(async move {
386 futures::future::join_all(breakpoint_tasks)
387 .await
388 .into_iter()
389 .filter_map(Result::err)
390 .collect::<HashMap<_, _>>()
391 })
392 }
393
394 pub fn label(&self) -> String {
395 self.definition.label.clone()
396 }
397
398 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
399 let adapter_id = self.binary.adapter_name.to_string();
400
401 self.request(Initialize { adapter_id }, cx.background_executor().clone())
402 }
403
404 fn initialize_sequence(
405 &self,
406 capabilities: &Capabilities,
407 initialized_rx: oneshot::Receiver<()>,
408 dap_store: WeakEntity<DapStore>,
409 cx: &App,
410 ) -> Task<Result<()>> {
411 let mut raw = self.binary.request_args.clone();
412
413 merge_json_value_into(
414 self.definition.initialize_args.clone().unwrap_or(json!({})),
415 &mut raw.configuration,
416 );
417
418 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
419 let launch = match raw.request {
420 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
421 Launch {
422 raw: raw.configuration,
423 },
424 cx.background_executor().clone(),
425 ),
426 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
427 Attach {
428 raw: raw.configuration,
429 },
430 cx.background_executor().clone(),
431 ),
432 };
433
434 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
435 let exception_filters = capabilities
436 .exception_breakpoint_filters
437 .as_ref()
438 .map(|exception_filters| {
439 exception_filters
440 .iter()
441 .filter(|filter| filter.default == Some(true))
442 .cloned()
443 .collect::<Vec<_>>()
444 })
445 .unwrap_or_default();
446 let supports_exception_filters = capabilities
447 .supports_exception_filter_options
448 .unwrap_or_default();
449 let configuration_sequence = cx.spawn({
450 let this = self.clone();
451 let worktree = self.worktree().clone();
452 async move |cx| {
453 initialized_rx.await?;
454 let errors_by_path = cx
455 .update(|cx| this.send_source_breakpoints(false, cx))?
456 .await;
457
458 dap_store.update(cx, |_, cx| {
459 let Some(worktree) = worktree.upgrade() else {
460 return;
461 };
462
463 for (path, error) in &errors_by_path {
464 log::error!("failed to set breakpoints for {path:?}: {error}");
465 }
466
467 if let Some(failed_path) = errors_by_path.keys().next() {
468 let failed_path = failed_path
469 .strip_prefix(worktree.read(cx).abs_path())
470 .unwrap_or(failed_path)
471 .display();
472 let message = format!(
473 "Failed to set breakpoints for {failed_path}{}",
474 match errors_by_path.len() {
475 0 => unreachable!(),
476 1 => "".into(),
477 2 => " and 1 other path".into(),
478 n => format!(" and {} other paths", n - 1),
479 }
480 );
481 cx.emit(super::dap_store::DapStoreEvent::Notification(message));
482 }
483 })?;
484
485 cx.update(|cx| {
486 this.send_exception_breakpoints(
487 exception_filters,
488 supports_exception_filters,
489 cx,
490 )
491 })?
492 .await
493 .ok();
494 let ret = if configuration_done_supported {
495 this.request(ConfigurationDone {}, cx.background_executor().clone())
496 } else {
497 Task::ready(Ok(()))
498 }
499 .await;
500 ret
501 }
502 });
503
504 cx.background_spawn(async move {
505 futures::future::try_join(launch, configuration_sequence).await?;
506 Ok(())
507 })
508 }
509
510 fn request<R: LocalDapCommand>(
511 &self,
512 request: R,
513 executor: BackgroundExecutor,
514 ) -> Task<Result<R::Response>>
515 where
516 <R::DapRequest as dap::requests::Request>::Response: 'static,
517 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
518 {
519 let request = Arc::new(request);
520
521 let request_clone = request.clone();
522 let connection = self.client.clone();
523 let request_task = executor.spawn(async move {
524 let args = request_clone.to_dap();
525 connection.request::<R::DapRequest>(args).await
526 });
527
528 executor.spawn(async move {
529 let response = request.response_from_dap(request_task.await?);
530 response
531 })
532 }
533}
534impl From<RemoteConnection> for Mode {
535 fn from(value: RemoteConnection) -> Self {
536 Self::Remote(value)
537 }
538}
539
540impl Mode {
541 fn request_dap<R: DapCommand>(
542 &self,
543 session_id: SessionId,
544 request: R,
545 cx: &mut Context<Session>,
546 ) -> Task<Result<R::Response>>
547 where
548 <R::DapRequest as dap::requests::Request>::Response: 'static,
549 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
550 {
551 match self {
552 Mode::Local(debug_adapter_client) => {
553 debug_adapter_client.request(request, cx.background_executor().clone())
554 }
555 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
556 }
557 }
558}
559
560#[derive(Default)]
561struct ThreadStates {
562 global_state: Option<ThreadStatus>,
563 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
564}
565
566impl ThreadStates {
567 fn stop_all_threads(&mut self) {
568 self.global_state = Some(ThreadStatus::Stopped);
569 self.known_thread_states.clear();
570 }
571
572 fn exit_all_threads(&mut self) {
573 self.global_state = Some(ThreadStatus::Exited);
574 self.known_thread_states.clear();
575 }
576
577 fn continue_all_threads(&mut self) {
578 self.global_state = Some(ThreadStatus::Running);
579 self.known_thread_states.clear();
580 }
581
582 fn stop_thread(&mut self, thread_id: ThreadId) {
583 self.known_thread_states
584 .insert(thread_id, ThreadStatus::Stopped);
585 }
586
587 fn continue_thread(&mut self, thread_id: ThreadId) {
588 self.known_thread_states
589 .insert(thread_id, ThreadStatus::Running);
590 }
591
592 fn process_step(&mut self, thread_id: ThreadId) {
593 self.known_thread_states
594 .insert(thread_id, ThreadStatus::Stepping);
595 }
596
597 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
598 self.thread_state(thread_id)
599 .unwrap_or(ThreadStatus::Running)
600 }
601
602 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
603 self.known_thread_states
604 .get(&thread_id)
605 .copied()
606 .or(self.global_state)
607 }
608
609 fn exit_thread(&mut self, thread_id: ThreadId) {
610 self.known_thread_states
611 .insert(thread_id, ThreadStatus::Exited);
612 }
613
614 fn any_stopped_thread(&self) -> bool {
615 self.global_state
616 .is_some_and(|state| state == ThreadStatus::Stopped)
617 || self
618 .known_thread_states
619 .values()
620 .any(|status| *status == ThreadStatus::Stopped)
621 }
622}
623const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
624
625type IsEnabled = bool;
626
627#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
628pub struct OutputToken(pub usize);
629/// Represents a current state of a single debug adapter and provides ways to mutate it.
630pub struct Session {
631 mode: Mode,
632 pub(super) capabilities: Capabilities,
633 id: SessionId,
634 child_session_ids: HashSet<SessionId>,
635 parent_id: Option<SessionId>,
636 ignore_breakpoints: bool,
637 modules: Vec<dap::Module>,
638 loaded_sources: Vec<dap::Source>,
639 output_token: OutputToken,
640 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
641 threads: IndexMap<ThreadId, Thread>,
642 thread_states: ThreadStates,
643 variables: HashMap<VariableReference, Vec<dap::Variable>>,
644 stack_frames: IndexMap<StackFrameId, StackFrame>,
645 locations: HashMap<u64, dap::LocationsResponse>,
646 is_session_terminated: bool,
647 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
648 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
649 _background_tasks: Vec<Task<()>>,
650}
651
652trait CacheableCommand: Any + Send + Sync {
653 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
654 fn dyn_hash(&self, hasher: &mut dyn Hasher);
655 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
656}
657
658impl<T> CacheableCommand for T
659where
660 T: DapCommand + PartialEq + Eq + Hash,
661{
662 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
663 (rhs as &dyn Any)
664 .downcast_ref::<Self>()
665 .map_or(false, |rhs| self == rhs)
666 }
667
668 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
669 T::hash(self, &mut hasher);
670 }
671
672 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
673 self
674 }
675}
676
677pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
678
679impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
680 fn from(request: T) -> Self {
681 Self(Arc::new(request))
682 }
683}
684
685impl PartialEq for RequestSlot {
686 fn eq(&self, other: &Self) -> bool {
687 self.0.dyn_eq(other.0.as_ref())
688 }
689}
690
691impl Eq for RequestSlot {}
692
693impl Hash for RequestSlot {
694 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
695 self.0.dyn_hash(state);
696 (&*self.0 as &dyn Any).type_id().hash(state)
697 }
698}
699
700#[derive(Debug, Clone, Hash, PartialEq, Eq)]
701pub struct CompletionsQuery {
702 pub query: String,
703 pub column: u64,
704 pub line: Option<u64>,
705 pub frame_id: Option<u64>,
706}
707
708impl CompletionsQuery {
709 pub fn new(
710 buffer: &language::Buffer,
711 cursor_position: language::Anchor,
712 frame_id: Option<u64>,
713 ) -> Self {
714 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
715 Self {
716 query: buffer.text(),
717 column: column as u64,
718 frame_id,
719 line: Some(row as u64),
720 }
721 }
722}
723
724pub enum SessionEvent {
725 Modules,
726 LoadedSources,
727 Stopped(Option<ThreadId>),
728 StackTrace,
729 Variables,
730 Threads,
731}
732
733pub(super) enum SessionStateEvent {
734 Shutdown,
735 Restart,
736}
737
738impl EventEmitter<SessionEvent> for Session {}
739impl EventEmitter<SessionStateEvent> for Session {}
740
741// local session will send breakpoint updates to DAP for all new breakpoints
742// remote side will only send breakpoint updates when it is a breakpoint created by that peer
743// BreakpointStore notifies session on breakpoint changes
744impl Session {
745 pub(crate) fn local(
746 breakpoint_store: Entity<BreakpointStore>,
747 worktree: WeakEntity<Worktree>,
748 session_id: SessionId,
749 parent_session: Option<Entity<Session>>,
750 binary: DebugAdapterBinary,
751 config: DebugTaskDefinition,
752 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
753 initialized_tx: oneshot::Sender<()>,
754 cx: &mut App,
755 ) -> Task<Result<Entity<Self>>> {
756 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
757
758 cx.spawn(async move |cx| {
759 let mode = LocalMode::new(
760 session_id,
761 parent_session.clone(),
762 worktree,
763 breakpoint_store.clone(),
764 config.clone(),
765 binary,
766 message_tx,
767 cx.clone(),
768 )
769 .await?;
770
771 cx.new(|cx| {
772 create_local_session(
773 breakpoint_store,
774 session_id,
775 parent_session,
776 start_debugging_requests_tx,
777 initialized_tx,
778 message_rx,
779 mode,
780 cx,
781 )
782 })
783 })
784 }
785
786 pub(crate) fn remote(
787 session_id: SessionId,
788 client: AnyProtoClient,
789 upstream_project_id: u64,
790 ignore_breakpoints: bool,
791 ) -> Self {
792 Self {
793 mode: Mode::Remote(RemoteConnection {
794 _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
795 _client: client,
796 _upstream_project_id: upstream_project_id,
797 }),
798 id: session_id,
799 child_session_ids: HashSet::default(),
800 parent_id: None,
801 capabilities: Capabilities::default(),
802 ignore_breakpoints,
803 variables: Default::default(),
804 stack_frames: Default::default(),
805 thread_states: ThreadStates::default(),
806 output_token: OutputToken(0),
807 output: circular_buffer::CircularBuffer::boxed(),
808 requests: HashMap::default(),
809 modules: Vec::default(),
810 loaded_sources: Vec::default(),
811 threads: IndexMap::default(),
812 _background_tasks: Vec::default(),
813 locations: Default::default(),
814 is_session_terminated: false,
815 exception_breakpoints: Default::default(),
816 }
817 }
818
819 pub fn session_id(&self) -> SessionId {
820 self.id
821 }
822
823 pub fn child_session_ids(&self) -> HashSet<SessionId> {
824 self.child_session_ids.clone()
825 }
826
827 pub fn add_child_session_id(&mut self, session_id: SessionId) {
828 self.child_session_ids.insert(session_id);
829 }
830
831 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
832 self.child_session_ids.remove(&session_id);
833 }
834
835 pub fn parent_id(&self) -> Option<SessionId> {
836 self.parent_id
837 }
838
839 pub fn capabilities(&self) -> &Capabilities {
840 &self.capabilities
841 }
842
843 pub fn binary(&self) -> &DebugAdapterBinary {
844 let Mode::Local(local_mode) = &self.mode else {
845 panic!("Session is not local");
846 };
847 &local_mode.binary
848 }
849
850 pub fn adapter_name(&self) -> SharedString {
851 match &self.mode {
852 Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
853 Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
854 }
855 }
856
857 pub fn configuration(&self) -> Option<DebugTaskDefinition> {
858 if let Mode::Local(local_mode) = &self.mode {
859 Some(local_mode.definition.clone())
860 } else {
861 None
862 }
863 }
864
865 pub fn is_terminated(&self) -> bool {
866 self.is_session_terminated
867 }
868
869 pub fn is_local(&self) -> bool {
870 matches!(self.mode, Mode::Local(_))
871 }
872
873 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
874 match &mut self.mode {
875 Mode::Local(local_mode) => Some(local_mode),
876 Mode::Remote(_) => None,
877 }
878 }
879
880 pub fn as_local(&self) -> Option<&LocalMode> {
881 match &self.mode {
882 Mode::Local(local_mode) => Some(local_mode),
883 Mode::Remote(_) => None,
884 }
885 }
886
887 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
888 match &self.mode {
889 Mode::Local(local_mode) => {
890 let capabilities = local_mode.clone().request_initialization(cx);
891
892 cx.spawn(async move |this, cx| {
893 let capabilities = capabilities.await?;
894 this.update(cx, |session, _| {
895 session.capabilities = capabilities;
896 let filters = session
897 .capabilities
898 .exception_breakpoint_filters
899 .clone()
900 .unwrap_or_default();
901 for filter in filters {
902 let default = filter.default.unwrap_or_default();
903 session
904 .exception_breakpoints
905 .entry(filter.filter.clone())
906 .or_insert_with(|| (filter, default));
907 }
908 })?;
909 Ok(())
910 })
911 }
912 Mode::Remote(_) => Task::ready(Err(anyhow!(
913 "Cannot send initialize request from remote session"
914 ))),
915 }
916 }
917
918 pub(super) fn initialize_sequence(
919 &mut self,
920 initialize_rx: oneshot::Receiver<()>,
921 dap_store: WeakEntity<DapStore>,
922 cx: &mut Context<Self>,
923 ) -> Task<Result<()>> {
924 match &self.mode {
925 Mode::Local(local_mode) => {
926 local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
927 }
928 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
929 }
930 }
931
932 pub fn run_to_position(
933 &mut self,
934 breakpoint: SourceBreakpoint,
935 active_thread_id: ThreadId,
936 cx: &mut Context<Self>,
937 ) {
938 match &mut self.mode {
939 Mode::Local(local_mode) => {
940 if !matches!(
941 self.thread_states.thread_state(active_thread_id),
942 Some(ThreadStatus::Stopped)
943 ) {
944 return;
945 };
946 let path = breakpoint.path.clone();
947 local_mode.tmp_breakpoint = Some(breakpoint);
948 let task = local_mode.send_breakpoints_from_path(
949 path,
950 BreakpointUpdatedReason::Toggled,
951 cx,
952 );
953
954 cx.spawn(async move |this, cx| {
955 task.await;
956 this.update(cx, |this, cx| {
957 this.continue_thread(active_thread_id, cx);
958 })
959 })
960 .detach();
961 }
962 Mode::Remote(_) => {}
963 }
964 }
965
966 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
967 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
968 }
969
970 pub fn output(
971 &self,
972 since: OutputToken,
973 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
974 if self.output_token.0 == 0 {
975 return (self.output.range(0..0), OutputToken(0));
976 };
977
978 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
979
980 let clamped_events_since = events_since.clamp(0, self.output.len());
981 (
982 self.output
983 .range(self.output.len() - clamped_events_since..),
984 self.output_token,
985 )
986 }
987
988 pub fn respond_to_client(
989 &self,
990 request_seq: u64,
991 success: bool,
992 command: String,
993 body: Option<serde_json::Value>,
994 cx: &mut Context<Self>,
995 ) -> Task<Result<()>> {
996 let Some(local_session) = self.as_local().cloned() else {
997 unreachable!("Cannot respond to remote client");
998 };
999
1000 cx.background_spawn(async move {
1001 local_session
1002 .client
1003 .send_message(Message::Response(Response {
1004 body,
1005 success,
1006 command,
1007 seq: request_seq + 1,
1008 request_seq,
1009 message: None,
1010 }))
1011 .await
1012 })
1013 }
1014
1015 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1016 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1017 let breakpoint = local.tmp_breakpoint.take()?;
1018 let path = breakpoint.path.clone();
1019 Some((local, path))
1020 }) {
1021 local
1022 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1023 .detach();
1024 };
1025
1026 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1027 self.thread_states.stop_all_threads();
1028
1029 self.invalidate_command_type::<StackTraceCommand>();
1030 }
1031
1032 // Event if we stopped all threads we still need to insert the thread_id
1033 // to our own data
1034 if let Some(thread_id) = event.thread_id {
1035 self.thread_states.stop_thread(ThreadId(thread_id));
1036
1037 self.invalidate_state(
1038 &StackTraceCommand {
1039 thread_id,
1040 start_frame: None,
1041 levels: None,
1042 }
1043 .into(),
1044 );
1045 }
1046
1047 self.invalidate_generic();
1048 self.threads.clear();
1049 self.variables.clear();
1050 cx.emit(SessionEvent::Stopped(
1051 event
1052 .thread_id
1053 .map(Into::into)
1054 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1055 ));
1056 cx.notify();
1057 }
1058
1059 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1060 match *event {
1061 Events::Initialized(_) => {
1062 debug_assert!(
1063 false,
1064 "Initialized event should have been handled in LocalMode"
1065 );
1066 }
1067 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1068 Events::Continued(event) => {
1069 if event.all_threads_continued.unwrap_or_default() {
1070 self.thread_states.continue_all_threads();
1071 } else {
1072 self.thread_states
1073 .continue_thread(ThreadId(event.thread_id));
1074 }
1075 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1076 self.invalidate_generic();
1077 }
1078 Events::Exited(_event) => {
1079 self.clear_active_debug_line(cx);
1080 }
1081 Events::Terminated(_) => {
1082 self.is_session_terminated = true;
1083 self.clear_active_debug_line(cx);
1084 }
1085 Events::Thread(event) => {
1086 let thread_id = ThreadId(event.thread_id);
1087
1088 match event.reason {
1089 dap::ThreadEventReason::Started => {
1090 self.thread_states.continue_thread(thread_id);
1091 }
1092 dap::ThreadEventReason::Exited => {
1093 self.thread_states.exit_thread(thread_id);
1094 }
1095 reason => {
1096 log::error!("Unhandled thread event reason {:?}", reason);
1097 }
1098 }
1099 self.invalidate_state(&ThreadsCommand.into());
1100 cx.notify();
1101 }
1102 Events::Output(event) => {
1103 if event
1104 .category
1105 .as_ref()
1106 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1107 {
1108 return;
1109 }
1110
1111 self.output.push_back(event);
1112 self.output_token.0 += 1;
1113 cx.notify();
1114 }
1115 Events::Breakpoint(_) => {}
1116 Events::Module(event) => {
1117 match event.reason {
1118 dap::ModuleEventReason::New => {
1119 self.modules.push(event.module);
1120 }
1121 dap::ModuleEventReason::Changed => {
1122 if let Some(module) = self
1123 .modules
1124 .iter_mut()
1125 .find(|other| event.module.id == other.id)
1126 {
1127 *module = event.module;
1128 }
1129 }
1130 dap::ModuleEventReason::Removed => {
1131 self.modules.retain(|other| event.module.id != other.id);
1132 }
1133 }
1134
1135 // todo(debugger): We should only send the invalidate command to downstream clients.
1136 // self.invalidate_state(&ModulesCommand.into());
1137 }
1138 Events::LoadedSource(_) => {
1139 self.invalidate_state(&LoadedSourcesCommand.into());
1140 }
1141 Events::Capabilities(event) => {
1142 self.capabilities = self.capabilities.merge(event.capabilities);
1143 cx.notify();
1144 }
1145 Events::Memory(_) => {}
1146 Events::Process(_) => {}
1147 Events::ProgressEnd(_) => {}
1148 Events::ProgressStart(_) => {}
1149 Events::ProgressUpdate(_) => {}
1150 Events::Invalidated(_) => {}
1151 Events::Other(_) => {}
1152 }
1153 }
1154
1155 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1156 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1157 &mut self,
1158 request: T,
1159 process_result: impl FnOnce(
1160 &mut Self,
1161 Result<T::Response>,
1162 &mut Context<Self>,
1163 ) -> Option<T::Response>
1164 + 'static,
1165 cx: &mut Context<Self>,
1166 ) {
1167 const {
1168 assert!(
1169 T::CACHEABLE,
1170 "Only requests marked as cacheable should invoke `fetch`"
1171 );
1172 }
1173
1174 if !self.thread_states.any_stopped_thread()
1175 && request.type_id() != TypeId::of::<ThreadsCommand>()
1176 || self.is_session_terminated
1177 {
1178 return;
1179 }
1180
1181 let request_map = self
1182 .requests
1183 .entry(std::any::TypeId::of::<T>())
1184 .or_default();
1185
1186 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1187 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1188
1189 let task = Self::request_inner::<Arc<T>>(
1190 &self.capabilities,
1191 self.id,
1192 &self.mode,
1193 command,
1194 process_result,
1195 cx,
1196 );
1197 let task = cx
1198 .background_executor()
1199 .spawn(async move {
1200 let _ = task.await?;
1201 Some(())
1202 })
1203 .shared();
1204
1205 vacant.insert(task);
1206 cx.notify();
1207 }
1208 }
1209
1210 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1211 capabilities: &Capabilities,
1212 session_id: SessionId,
1213 mode: &Mode,
1214 request: T,
1215 process_result: impl FnOnce(
1216 &mut Self,
1217 Result<T::Response>,
1218 &mut Context<Self>,
1219 ) -> Option<T::Response>
1220 + 'static,
1221 cx: &mut Context<Self>,
1222 ) -> Task<Option<T::Response>> {
1223 if !T::is_supported(&capabilities) {
1224 log::warn!(
1225 "Attempted to send a DAP request that isn't supported: {:?}",
1226 request
1227 );
1228 let error = Err(anyhow::Error::msg(
1229 "Couldn't complete request because it's not supported",
1230 ));
1231 return cx.spawn(async move |this, cx| {
1232 this.update(cx, |this, cx| process_result(this, error, cx))
1233 .log_err()
1234 .flatten()
1235 });
1236 }
1237
1238 let request = mode.request_dap(session_id, request, cx);
1239 cx.spawn(async move |this, cx| {
1240 let result = request.await;
1241 this.update(cx, |this, cx| process_result(this, result, cx))
1242 .log_err()
1243 .flatten()
1244 })
1245 }
1246
1247 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1248 &self,
1249 request: T,
1250 process_result: impl FnOnce(
1251 &mut Self,
1252 Result<T::Response>,
1253 &mut Context<Self>,
1254 ) -> Option<T::Response>
1255 + 'static,
1256 cx: &mut Context<Self>,
1257 ) -> Task<Option<T::Response>> {
1258 Self::request_inner(
1259 &self.capabilities,
1260 self.id,
1261 &self.mode,
1262 request,
1263 process_result,
1264 cx,
1265 )
1266 }
1267
1268 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1269 self.requests.remove(&std::any::TypeId::of::<Command>());
1270 }
1271
1272 fn invalidate_generic(&mut self) {
1273 self.invalidate_command_type::<ModulesCommand>();
1274 self.invalidate_command_type::<LoadedSourcesCommand>();
1275 self.invalidate_command_type::<ThreadsCommand>();
1276 }
1277
1278 fn invalidate_state(&mut self, key: &RequestSlot) {
1279 self.requests
1280 .entry((&*key.0 as &dyn Any).type_id())
1281 .and_modify(|request_map| {
1282 request_map.remove(&key);
1283 });
1284 }
1285
1286 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1287 self.thread_states.thread_status(thread_id)
1288 }
1289
1290 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1291 self.fetch(
1292 dap_command::ThreadsCommand,
1293 |this, result, cx| {
1294 let result = result.log_err()?;
1295
1296 this.threads = result
1297 .iter()
1298 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1299 .collect();
1300
1301 this.invalidate_command_type::<StackTraceCommand>();
1302 cx.emit(SessionEvent::Threads);
1303 cx.notify();
1304
1305 Some(result)
1306 },
1307 cx,
1308 );
1309
1310 self.threads
1311 .values()
1312 .map(|thread| {
1313 (
1314 thread.dap.clone(),
1315 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1316 )
1317 })
1318 .collect()
1319 }
1320
1321 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1322 self.fetch(
1323 dap_command::ModulesCommand,
1324 |this, result, cx| {
1325 let result = result.log_err()?;
1326
1327 this.modules = result.iter().cloned().collect();
1328 cx.emit(SessionEvent::Modules);
1329 cx.notify();
1330
1331 Some(result)
1332 },
1333 cx,
1334 );
1335
1336 &self.modules
1337 }
1338
1339 pub fn ignore_breakpoints(&self) -> bool {
1340 self.ignore_breakpoints
1341 }
1342
1343 pub fn toggle_ignore_breakpoints(
1344 &mut self,
1345 cx: &mut App,
1346 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1347 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1348 }
1349
1350 pub(crate) fn set_ignore_breakpoints(
1351 &mut self,
1352 ignore: bool,
1353 cx: &mut App,
1354 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1355 if self.ignore_breakpoints == ignore {
1356 return Task::ready(HashMap::default());
1357 }
1358
1359 self.ignore_breakpoints = ignore;
1360
1361 if let Some(local) = self.as_local() {
1362 local.send_source_breakpoints(ignore, cx)
1363 } else {
1364 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1365 unimplemented!()
1366 }
1367 }
1368
1369 pub fn exception_breakpoints(
1370 &self,
1371 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1372 self.exception_breakpoints.values()
1373 }
1374
1375 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1376 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1377 *is_enabled = !*is_enabled;
1378 self.send_exception_breakpoints(cx);
1379 }
1380 }
1381
1382 fn send_exception_breakpoints(&mut self, cx: &App) {
1383 if let Some(local) = self.as_local() {
1384 let exception_filters = self
1385 .exception_breakpoints
1386 .values()
1387 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1388 .collect();
1389
1390 let supports_exception_filters = self
1391 .capabilities
1392 .supports_exception_filter_options
1393 .unwrap_or_default();
1394 local
1395 .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1396 .detach_and_log_err(cx);
1397 } else {
1398 debug_assert!(false, "Not implemented");
1399 }
1400 }
1401
1402 pub fn breakpoints_enabled(&self) -> bool {
1403 self.ignore_breakpoints
1404 }
1405
1406 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1407 self.fetch(
1408 dap_command::LoadedSourcesCommand,
1409 |this, result, cx| {
1410 let result = result.log_err()?;
1411 this.loaded_sources = result.iter().cloned().collect();
1412 cx.emit(SessionEvent::LoadedSources);
1413 cx.notify();
1414 Some(result)
1415 },
1416 cx,
1417 );
1418
1419 &self.loaded_sources
1420 }
1421
1422 fn fallback_to_manual_restart(
1423 &mut self,
1424 res: Result<()>,
1425 cx: &mut Context<Self>,
1426 ) -> Option<()> {
1427 if res.log_err().is_none() {
1428 cx.emit(SessionStateEvent::Restart);
1429 return None;
1430 }
1431 Some(())
1432 }
1433
1434 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1435 res.log_err()?;
1436 Some(())
1437 }
1438
1439 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1440 thread_id: ThreadId,
1441 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1442 {
1443 move |this, response, cx| match response.log_err() {
1444 Some(response) => Some(response),
1445 None => {
1446 this.thread_states.stop_thread(thread_id);
1447 cx.notify();
1448 None
1449 }
1450 }
1451 }
1452
1453 fn clear_active_debug_line_response(
1454 &mut self,
1455 response: Result<()>,
1456 cx: &mut Context<Session>,
1457 ) -> Option<()> {
1458 response.log_err()?;
1459 self.clear_active_debug_line(cx);
1460 Some(())
1461 }
1462
1463 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1464 self.as_local()
1465 .expect("Message handler will only run in local mode")
1466 .breakpoint_store
1467 .update(cx, |store, cx| {
1468 store.remove_active_position(Some(self.id), cx)
1469 });
1470 }
1471
1472 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1473 self.request(
1474 PauseCommand {
1475 thread_id: thread_id.0,
1476 },
1477 Self::empty_response,
1478 cx,
1479 )
1480 .detach();
1481 }
1482
1483 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1484 self.request(
1485 RestartStackFrameCommand { stack_frame_id },
1486 Self::empty_response,
1487 cx,
1488 )
1489 .detach();
1490 }
1491
1492 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1493 if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1494 self.request(
1495 RestartCommand {
1496 raw: args.unwrap_or(Value::Null),
1497 },
1498 Self::fallback_to_manual_restart,
1499 cx,
1500 )
1501 .detach();
1502 } else {
1503 cx.emit(SessionStateEvent::Restart);
1504 }
1505 }
1506
1507 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1508 self.is_session_terminated = true;
1509 self.thread_states.exit_all_threads();
1510 cx.notify();
1511
1512 let task = if self
1513 .capabilities
1514 .supports_terminate_request
1515 .unwrap_or_default()
1516 {
1517 self.request(
1518 TerminateCommand {
1519 restart: Some(false),
1520 },
1521 Self::clear_active_debug_line_response,
1522 cx,
1523 )
1524 } else {
1525 self.request(
1526 DisconnectCommand {
1527 restart: Some(false),
1528 terminate_debuggee: Some(true),
1529 suspend_debuggee: Some(false),
1530 },
1531 Self::clear_active_debug_line_response,
1532 cx,
1533 )
1534 };
1535
1536 cx.emit(SessionStateEvent::Shutdown);
1537
1538 let debug_client = self.adapter_client();
1539
1540 cx.background_spawn(async move {
1541 let _ = task.await;
1542
1543 if let Some(client) = debug_client {
1544 client.shutdown().await.log_err();
1545 }
1546 })
1547 }
1548
1549 pub fn completions(
1550 &mut self,
1551 query: CompletionsQuery,
1552 cx: &mut Context<Self>,
1553 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1554 let task = self.request(query, |_, result, _| result.log_err(), cx);
1555
1556 cx.background_executor().spawn(async move {
1557 anyhow::Ok(
1558 task.await
1559 .map(|response| response.targets)
1560 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1561 )
1562 })
1563 }
1564
1565 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1566 self.thread_states.continue_thread(thread_id);
1567 self.request(
1568 ContinueCommand {
1569 args: ContinueArguments {
1570 thread_id: thread_id.0,
1571 single_thread: Some(true),
1572 },
1573 },
1574 Self::on_step_response::<ContinueCommand>(thread_id),
1575 cx,
1576 )
1577 .detach();
1578 }
1579
1580 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1581 match self.mode {
1582 Mode::Local(ref local) => Some(local.client.clone()),
1583 Mode::Remote(_) => None,
1584 }
1585 }
1586
1587 pub fn step_over(
1588 &mut self,
1589 thread_id: ThreadId,
1590 granularity: SteppingGranularity,
1591 cx: &mut Context<Self>,
1592 ) {
1593 let supports_single_thread_execution_requests =
1594 self.capabilities.supports_single_thread_execution_requests;
1595 let supports_stepping_granularity = self
1596 .capabilities
1597 .supports_stepping_granularity
1598 .unwrap_or_default();
1599
1600 let command = NextCommand {
1601 inner: StepCommand {
1602 thread_id: thread_id.0,
1603 granularity: supports_stepping_granularity.then(|| granularity),
1604 single_thread: supports_single_thread_execution_requests,
1605 },
1606 };
1607
1608 self.thread_states.process_step(thread_id);
1609 self.request(
1610 command,
1611 Self::on_step_response::<NextCommand>(thread_id),
1612 cx,
1613 )
1614 .detach();
1615 }
1616
1617 pub fn step_in(
1618 &mut self,
1619 thread_id: ThreadId,
1620 granularity: SteppingGranularity,
1621 cx: &mut Context<Self>,
1622 ) {
1623 let supports_single_thread_execution_requests =
1624 self.capabilities.supports_single_thread_execution_requests;
1625 let supports_stepping_granularity = self
1626 .capabilities
1627 .supports_stepping_granularity
1628 .unwrap_or_default();
1629
1630 let command = StepInCommand {
1631 inner: StepCommand {
1632 thread_id: thread_id.0,
1633 granularity: supports_stepping_granularity.then(|| granularity),
1634 single_thread: supports_single_thread_execution_requests,
1635 },
1636 };
1637
1638 self.thread_states.process_step(thread_id);
1639 self.request(
1640 command,
1641 Self::on_step_response::<StepInCommand>(thread_id),
1642 cx,
1643 )
1644 .detach();
1645 }
1646
1647 pub fn step_out(
1648 &mut self,
1649 thread_id: ThreadId,
1650 granularity: SteppingGranularity,
1651 cx: &mut Context<Self>,
1652 ) {
1653 let supports_single_thread_execution_requests =
1654 self.capabilities.supports_single_thread_execution_requests;
1655 let supports_stepping_granularity = self
1656 .capabilities
1657 .supports_stepping_granularity
1658 .unwrap_or_default();
1659
1660 let command = StepOutCommand {
1661 inner: StepCommand {
1662 thread_id: thread_id.0,
1663 granularity: supports_stepping_granularity.then(|| granularity),
1664 single_thread: supports_single_thread_execution_requests,
1665 },
1666 };
1667
1668 self.thread_states.process_step(thread_id);
1669 self.request(
1670 command,
1671 Self::on_step_response::<StepOutCommand>(thread_id),
1672 cx,
1673 )
1674 .detach();
1675 }
1676
1677 pub fn step_back(
1678 &mut self,
1679 thread_id: ThreadId,
1680 granularity: SteppingGranularity,
1681 cx: &mut Context<Self>,
1682 ) {
1683 let supports_single_thread_execution_requests =
1684 self.capabilities.supports_single_thread_execution_requests;
1685 let supports_stepping_granularity = self
1686 .capabilities
1687 .supports_stepping_granularity
1688 .unwrap_or_default();
1689
1690 let command = StepBackCommand {
1691 inner: StepCommand {
1692 thread_id: thread_id.0,
1693 granularity: supports_stepping_granularity.then(|| granularity),
1694 single_thread: supports_single_thread_execution_requests,
1695 },
1696 };
1697
1698 self.thread_states.process_step(thread_id);
1699
1700 self.request(
1701 command,
1702 Self::on_step_response::<StepBackCommand>(thread_id),
1703 cx,
1704 )
1705 .detach();
1706 }
1707
1708 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1709 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1710 && self.requests.contains_key(&ThreadsCommand.type_id())
1711 && self.threads.contains_key(&thread_id)
1712 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1713 // We could still be using an old thread id and have sent a new thread's request
1714 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1715 // But it very well could cause a minor bug in the future that is hard to track down
1716 {
1717 self.fetch(
1718 super::dap_command::StackTraceCommand {
1719 thread_id: thread_id.0,
1720 start_frame: None,
1721 levels: None,
1722 },
1723 move |this, stack_frames, cx| {
1724 let stack_frames = stack_frames.log_err()?;
1725
1726 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1727 thread.stack_frame_ids =
1728 stack_frames.iter().map(|frame| frame.id).collect();
1729 });
1730 debug_assert!(
1731 matches!(entry, indexmap::map::Entry::Occupied(_)),
1732 "Sent request for thread_id that doesn't exist"
1733 );
1734
1735 this.stack_frames.extend(
1736 stack_frames
1737 .iter()
1738 .cloned()
1739 .map(|frame| (frame.id, StackFrame::from(frame))),
1740 );
1741
1742 this.invalidate_command_type::<ScopesCommand>();
1743 this.invalidate_command_type::<VariablesCommand>();
1744
1745 cx.emit(SessionEvent::StackTrace);
1746 cx.notify();
1747 Some(stack_frames)
1748 },
1749 cx,
1750 );
1751 }
1752
1753 self.threads
1754 .get(&thread_id)
1755 .map(|thread| {
1756 thread
1757 .stack_frame_ids
1758 .iter()
1759 .filter_map(|id| self.stack_frames.get(id))
1760 .cloned()
1761 .collect()
1762 })
1763 .unwrap_or_default()
1764 }
1765
1766 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1767 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1768 && self
1769 .requests
1770 .contains_key(&TypeId::of::<StackTraceCommand>())
1771 {
1772 self.fetch(
1773 ScopesCommand { stack_frame_id },
1774 move |this, scopes, cx| {
1775 let scopes = scopes.log_err()?;
1776
1777 for scope in scopes .iter(){
1778 this.variables(scope.variables_reference, cx);
1779 }
1780
1781 let entry = this
1782 .stack_frames
1783 .entry(stack_frame_id)
1784 .and_modify(|stack_frame| {
1785 stack_frame.scopes = scopes.clone();
1786 });
1787
1788 cx.emit(SessionEvent::Variables);
1789
1790 debug_assert!(
1791 matches!(entry, indexmap::map::Entry::Occupied(_)),
1792 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1793 );
1794
1795 Some(scopes)
1796 },
1797 cx,
1798 );
1799 }
1800
1801 self.stack_frames
1802 .get(&stack_frame_id)
1803 .map(|frame| frame.scopes.as_slice())
1804 .unwrap_or_default()
1805 }
1806
1807 pub fn variables(
1808 &mut self,
1809 variables_reference: VariableReference,
1810 cx: &mut Context<Self>,
1811 ) -> Vec<dap::Variable> {
1812 let command = VariablesCommand {
1813 variables_reference,
1814 filter: None,
1815 start: None,
1816 count: None,
1817 format: None,
1818 };
1819
1820 self.fetch(
1821 command,
1822 move |this, variables, cx| {
1823 let variables = variables.log_err()?;
1824 this.variables
1825 .insert(variables_reference, variables.clone());
1826
1827 cx.emit(SessionEvent::Variables);
1828 Some(variables)
1829 },
1830 cx,
1831 );
1832
1833 self.variables
1834 .get(&variables_reference)
1835 .cloned()
1836 .unwrap_or_default()
1837 }
1838
1839 pub fn set_variable_value(
1840 &mut self,
1841 variables_reference: u64,
1842 name: String,
1843 value: String,
1844 cx: &mut Context<Self>,
1845 ) {
1846 if self.capabilities.supports_set_variable.unwrap_or_default() {
1847 self.request(
1848 SetVariableValueCommand {
1849 name,
1850 value,
1851 variables_reference,
1852 },
1853 move |this, response, cx| {
1854 let response = response.log_err()?;
1855 this.invalidate_command_type::<VariablesCommand>();
1856 cx.notify();
1857 Some(response)
1858 },
1859 cx,
1860 )
1861 .detach()
1862 }
1863 }
1864
1865 pub fn evaluate(
1866 &mut self,
1867 expression: String,
1868 context: Option<EvaluateArgumentsContext>,
1869 frame_id: Option<u64>,
1870 source: Option<Source>,
1871 cx: &mut Context<Self>,
1872 ) {
1873 self.request(
1874 EvaluateCommand {
1875 expression,
1876 context,
1877 frame_id,
1878 source,
1879 },
1880 |this, response, cx| {
1881 let response = response.log_err()?;
1882 this.output_token.0 += 1;
1883 this.output.push_back(dap::OutputEvent {
1884 category: None,
1885 output: response.result.clone(),
1886 group: None,
1887 variables_reference: Some(response.variables_reference),
1888 source: None,
1889 line: None,
1890 column: None,
1891 data: None,
1892 location_reference: None,
1893 });
1894
1895 this.invalidate_command_type::<ScopesCommand>();
1896 cx.notify();
1897 Some(response)
1898 },
1899 cx,
1900 )
1901 .detach();
1902 }
1903
1904 pub fn location(
1905 &mut self,
1906 reference: u64,
1907 cx: &mut Context<Self>,
1908 ) -> Option<dap::LocationsResponse> {
1909 self.fetch(
1910 LocationsCommand { reference },
1911 move |this, response, _| {
1912 let response = response.log_err()?;
1913 this.locations.insert(reference, response.clone());
1914 Some(response)
1915 },
1916 cx,
1917 );
1918 self.locations.get(&reference).cloned()
1919 }
1920 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1921 let command = DisconnectCommand {
1922 restart: Some(false),
1923 terminate_debuggee: Some(true),
1924 suspend_debuggee: Some(false),
1925 };
1926
1927 self.request(command, Self::empty_response, cx).detach()
1928 }
1929
1930 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1931 if self
1932 .capabilities
1933 .supports_terminate_threads_request
1934 .unwrap_or_default()
1935 {
1936 self.request(
1937 TerminateThreadsCommand {
1938 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1939 },
1940 Self::clear_active_debug_line_response,
1941 cx,
1942 )
1943 .detach();
1944 } else {
1945 self.shutdown(cx).detach();
1946 }
1947 }
1948}
1949
1950fn create_local_session(
1951 breakpoint_store: Entity<BreakpointStore>,
1952 session_id: SessionId,
1953 parent_session: Option<Entity<Session>>,
1954 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1955 initialized_tx: oneshot::Sender<()>,
1956 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1957 mode: LocalMode,
1958 cx: &mut Context<Session>,
1959) -> Session {
1960 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1961 let mut initialized_tx = Some(initialized_tx);
1962 while let Some(message) = message_rx.next().await {
1963 if let Message::Event(event) = message {
1964 if let Events::Initialized(_) = *event {
1965 if let Some(tx) = initialized_tx.take() {
1966 tx.send(()).ok();
1967 }
1968 } else {
1969 let Ok(_) = this.update(cx, |session, cx| {
1970 session.handle_dap_event(event, cx);
1971 }) else {
1972 break;
1973 };
1974 }
1975 } else {
1976 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1977 else {
1978 break;
1979 };
1980 }
1981 }
1982 })];
1983
1984 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1985 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1986 if let Some(local) = (!this.ignore_breakpoints)
1987 .then(|| this.as_local_mut())
1988 .flatten()
1989 {
1990 local
1991 .send_breakpoints_from_path(path.clone(), *reason, cx)
1992 .detach();
1993 };
1994 }
1995 BreakpointStoreEvent::BreakpointsCleared(paths) => {
1996 if let Some(local) = (!this.ignore_breakpoints)
1997 .then(|| this.as_local_mut())
1998 .flatten()
1999 {
2000 local.unset_breakpoints_from_paths(paths, cx).detach();
2001 }
2002 }
2003 BreakpointStoreEvent::ActiveDebugLineChanged => {}
2004 })
2005 .detach();
2006
2007 Session {
2008 mode: Mode::Local(mode),
2009 id: session_id,
2010 child_session_ids: HashSet::default(),
2011 parent_id: parent_session.map(|session| session.read(cx).id),
2012 variables: Default::default(),
2013 capabilities: Capabilities::default(),
2014 thread_states: ThreadStates::default(),
2015 output_token: OutputToken(0),
2016 ignore_breakpoints: false,
2017 output: circular_buffer::CircularBuffer::boxed(),
2018 requests: HashMap::default(),
2019 modules: Vec::default(),
2020 loaded_sources: Vec::default(),
2021 threads: IndexMap::default(),
2022 stack_frames: IndexMap::default(),
2023 locations: Default::default(),
2024 exception_breakpoints: Default::default(),
2025 _background_tasks,
2026 is_session_terminated: false,
2027 }
2028}