1use super::breakpoint_store::{
2 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
3};
4use super::dap_command::{
5 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
6 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
7 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
8 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
9 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
10 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
11};
12use super::dap_store::DapStore;
13use anyhow::{Context as _, Result, anyhow};
14use collections::{HashMap, HashSet, IndexMap, IndexSet};
15use dap::adapters::DebugAdapterBinary;
16use dap::messages::Response;
17use dap::{
18 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
19 SteppingGranularity, StoppedEvent, VariableReference,
20 client::{DebugAdapterClient, SessionId},
21 messages::{Events, Message},
22};
23use dap::{ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEventCategory};
24use futures::channel::oneshot;
25use futures::{FutureExt, future::Shared};
26use gpui::{
27 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
28 Task, WeakEntity,
29};
30
31use rpc::AnyProtoClient;
32use serde_json::{Value, json};
33use smol::stream::StreamExt;
34use std::any::TypeId;
35use std::collections::BTreeMap;
36use std::u64;
37use std::{
38 any::Any,
39 collections::hash_map::Entry,
40 hash::{Hash, Hasher},
41 path::Path,
42 sync::Arc,
43};
44use task::DebugTaskDefinition;
45use text::{PointUtf16, ToPointUtf16};
46use util::{ResultExt, merge_json_value_into};
47use worktree::Worktree;
48
49#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
50#[repr(transparent)]
51pub struct ThreadId(pub u64);
52
53impl ThreadId {
54 pub const MIN: ThreadId = ThreadId(u64::MIN);
55 pub const MAX: ThreadId = ThreadId(u64::MAX);
56}
57
58impl From<u64> for ThreadId {
59 fn from(id: u64) -> Self {
60 Self(id)
61 }
62}
63
64#[derive(Clone, Debug)]
65pub struct StackFrame {
66 pub dap: dap::StackFrame,
67 pub scopes: Vec<dap::Scope>,
68}
69
70impl From<dap::StackFrame> for StackFrame {
71 fn from(stack_frame: dap::StackFrame) -> Self {
72 Self {
73 scopes: vec![],
74 dap: stack_frame,
75 }
76 }
77}
78
79#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
80pub enum ThreadStatus {
81 #[default]
82 Running,
83 Stopped,
84 Stepping,
85 Exited,
86 Ended,
87}
88
89impl ThreadStatus {
90 pub fn label(&self) -> &'static str {
91 match self {
92 ThreadStatus::Running => "Running",
93 ThreadStatus::Stopped => "Stopped",
94 ThreadStatus::Stepping => "Stepping",
95 ThreadStatus::Exited => "Exited",
96 ThreadStatus::Ended => "Ended",
97 }
98 }
99}
100
101#[derive(Debug)]
102pub struct Thread {
103 dap: dap::Thread,
104 stack_frame_ids: IndexSet<StackFrameId>,
105 _has_stopped: bool,
106}
107
108impl From<dap::Thread> for Thread {
109 fn from(dap: dap::Thread) -> Self {
110 Self {
111 dap,
112 stack_frame_ids: Default::default(),
113 _has_stopped: false,
114 }
115 }
116}
117
118type UpstreamProjectId = u64;
119
120struct RemoteConnection {
121 _client: AnyProtoClient,
122 _upstream_project_id: UpstreamProjectId,
123 _adapter_name: SharedString,
124}
125
126impl RemoteConnection {
127 fn send_proto_client_request<R: DapCommand>(
128 &self,
129 _request: R,
130 _session_id: SessionId,
131 cx: &mut App,
132 ) -> Task<Result<R::Response>> {
133 // let message = request.to_proto(session_id, self.upstream_project_id);
134 // let upstream_client = self.client.clone();
135 cx.background_executor().spawn(async move {
136 // debugger(todo): Properly send messages when we wrap dap_commands in envelopes again
137 // let response = upstream_client.request(message).await?;
138 // request.response_from_proto(response)
139 Err(anyhow!("Sending dap commands over RPC isn't supported yet"))
140 })
141 }
142
143 fn request<R: DapCommand>(
144 &self,
145 request: R,
146 session_id: SessionId,
147 cx: &mut App,
148 ) -> Task<Result<R::Response>>
149 where
150 <R::DapRequest as dap::requests::Request>::Response: 'static,
151 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
152 {
153 return self.send_proto_client_request::<R>(request, session_id, cx);
154 }
155}
156
157enum Mode {
158 Local(LocalMode),
159 Remote(RemoteConnection),
160}
161
162#[derive(Clone)]
163pub struct LocalMode {
164 client: Arc<DebugAdapterClient>,
165 definition: DebugTaskDefinition,
166 binary: DebugAdapterBinary,
167 root_binary: Option<Arc<DebugAdapterBinary>>,
168 pub(crate) breakpoint_store: Entity<BreakpointStore>,
169 tmp_breakpoint: Option<SourceBreakpoint>,
170 worktree: WeakEntity<Worktree>,
171}
172
173fn client_source(abs_path: &Path) -> dap::Source {
174 dap::Source {
175 name: abs_path
176 .file_name()
177 .map(|filename| filename.to_string_lossy().to_string()),
178 path: Some(abs_path.to_string_lossy().to_string()),
179 source_reference: None,
180 presentation_hint: None,
181 origin: None,
182 sources: None,
183 adapter_data: None,
184 checksums: None,
185 }
186}
187
188impl LocalMode {
189 fn new(
190 session_id: SessionId,
191 parent_session: Option<Entity<Session>>,
192 worktree: WeakEntity<Worktree>,
193 breakpoint_store: Entity<BreakpointStore>,
194 config: DebugTaskDefinition,
195 binary: DebugAdapterBinary,
196 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
197 cx: AsyncApp,
198 ) -> Task<Result<Self>> {
199 cx.spawn(async move |cx| {
200 let message_handler = Box::new(move |message| {
201 messages_tx.unbounded_send(message).ok();
202 });
203
204 let root_binary = if let Some(parent_session) = parent_session.as_ref() {
205 Some(parent_session.read_with(cx, |session, _| session.root_binary().clone())?)
206 } else {
207 None
208 };
209
210 let client = Arc::new(
211 if let Some(client) = parent_session
212 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
213 .flatten()
214 {
215 client
216 .reconnect(session_id, binary.clone(), message_handler, cx.clone())
217 .await?
218 } else {
219 DebugAdapterClient::start(
220 session_id,
221 binary.clone(),
222 message_handler,
223 cx.clone(),
224 )
225 .await
226 .with_context(|| "Failed to start communication with debug adapter")?
227 },
228 );
229
230 Ok(Self {
231 client,
232 breakpoint_store,
233 worktree,
234 tmp_breakpoint: None,
235 definition: config,
236 root_binary,
237 binary,
238 })
239 })
240 }
241
242 pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
243 &self.worktree
244 }
245
246 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
247 let tasks: Vec<_> = paths
248 .into_iter()
249 .map(|path| {
250 self.request(
251 dap_command::SetBreakpoints {
252 source: client_source(path),
253 source_modified: None,
254 breakpoints: vec![],
255 },
256 cx.background_executor().clone(),
257 )
258 })
259 .collect();
260
261 cx.background_spawn(async move {
262 futures::future::join_all(tasks)
263 .await
264 .iter()
265 .for_each(|res| match res {
266 Ok(_) => {}
267 Err(err) => {
268 log::warn!("Set breakpoints request failed: {}", err);
269 }
270 });
271 })
272 }
273
274 fn send_breakpoints_from_path(
275 &self,
276 abs_path: Arc<Path>,
277 reason: BreakpointUpdatedReason,
278 cx: &mut App,
279 ) -> Task<()> {
280 let breakpoints = self
281 .breakpoint_store
282 .read_with(cx, |store, cx| store.breakpoints_from_path(&abs_path, cx))
283 .into_iter()
284 .filter(|bp| bp.state.is_enabled())
285 .chain(self.tmp_breakpoint.clone())
286 .map(Into::into)
287 .collect();
288
289 let task = self.request(
290 dap_command::SetBreakpoints {
291 source: client_source(&abs_path),
292 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
293 breakpoints,
294 },
295 cx.background_executor().clone(),
296 );
297
298 cx.background_spawn(async move {
299 match task.await {
300 Ok(_) => {}
301 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
302 }
303 })
304 }
305
306 fn send_exception_breakpoints(
307 &self,
308 filters: Vec<ExceptionBreakpointsFilter>,
309 supports_filter_options: bool,
310 cx: &App,
311 ) -> Task<Result<Vec<dap::Breakpoint>>> {
312 let arg = if supports_filter_options {
313 SetExceptionBreakpoints::WithOptions {
314 filters: filters
315 .into_iter()
316 .map(|filter| ExceptionFilterOptions {
317 filter_id: filter.filter,
318 condition: None,
319 mode: None,
320 })
321 .collect(),
322 }
323 } else {
324 SetExceptionBreakpoints::Plain {
325 filters: filters.into_iter().map(|filter| filter.filter).collect(),
326 }
327 };
328 self.request(arg, cx.background_executor().clone())
329 }
330
331 fn send_source_breakpoints(
332 &self,
333 ignore_breakpoints: bool,
334 cx: &App,
335 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
336 let mut breakpoint_tasks = Vec::new();
337 let breakpoints = self
338 .breakpoint_store
339 .read_with(cx, |store, cx| store.all_breakpoints(cx));
340
341 for (path, breakpoints) in breakpoints {
342 let breakpoints = if ignore_breakpoints {
343 vec![]
344 } else {
345 breakpoints
346 .into_iter()
347 .filter(|bp| bp.state.is_enabled())
348 .map(Into::into)
349 .collect()
350 };
351
352 breakpoint_tasks.push(
353 self.request(
354 dap_command::SetBreakpoints {
355 source: client_source(&path),
356 source_modified: Some(false),
357 breakpoints,
358 },
359 cx.background_executor().clone(),
360 )
361 .map(|result| result.map_err(|e| (path, e))),
362 );
363 }
364
365 cx.background_spawn(async move {
366 futures::future::join_all(breakpoint_tasks)
367 .await
368 .into_iter()
369 .filter_map(Result::err)
370 .collect::<HashMap<_, _>>()
371 })
372 }
373
374 pub fn label(&self) -> String {
375 self.definition.label.clone()
376 }
377
378 fn request_initialization(&self, cx: &App) -> Task<Result<Capabilities>> {
379 let adapter_id = self.definition.adapter.clone();
380
381 self.request(Initialize { adapter_id }, cx.background_executor().clone())
382 }
383
384 fn initialize_sequence(
385 &self,
386 capabilities: &Capabilities,
387 initialized_rx: oneshot::Receiver<()>,
388 dap_store: WeakEntity<DapStore>,
389 cx: &App,
390 ) -> Task<Result<()>> {
391 let mut raw = self.binary.request_args.clone();
392
393 merge_json_value_into(
394 self.definition.initialize_args.clone().unwrap_or(json!({})),
395 &mut raw.configuration,
396 );
397
398 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
399 let launch = match raw.request {
400 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(
401 Launch {
402 raw: raw.configuration,
403 },
404 cx.background_executor().clone(),
405 ),
406 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(
407 Attach {
408 raw: raw.configuration,
409 },
410 cx.background_executor().clone(),
411 ),
412 };
413
414 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
415 let exception_filters = capabilities
416 .exception_breakpoint_filters
417 .as_ref()
418 .map(|exception_filters| {
419 exception_filters
420 .iter()
421 .filter(|filter| filter.default == Some(true))
422 .cloned()
423 .collect::<Vec<_>>()
424 })
425 .unwrap_or_default();
426 let supports_exception_filters = capabilities
427 .supports_exception_filter_options
428 .unwrap_or_default();
429 let configuration_sequence = cx.spawn({
430 let this = self.clone();
431 let worktree = self.worktree().clone();
432 async move |cx| {
433 initialized_rx.await?;
434 let errors_by_path = cx
435 .update(|cx| this.send_source_breakpoints(false, cx))?
436 .await;
437
438 dap_store.update(cx, |_, cx| {
439 let Some(worktree) = worktree.upgrade() else {
440 return;
441 };
442
443 for (path, error) in &errors_by_path {
444 log::error!("failed to set breakpoints for {path:?}: {error}");
445 }
446
447 if let Some(failed_path) = errors_by_path.keys().next() {
448 let failed_path = failed_path
449 .strip_prefix(worktree.read(cx).abs_path())
450 .unwrap_or(failed_path)
451 .display();
452 let message = format!(
453 "Failed to set breakpoints for {failed_path}{}",
454 match errors_by_path.len() {
455 0 => unreachable!(),
456 1 => "".into(),
457 2 => " and 1 other path".into(),
458 n => format!(" and {} other paths", n - 1),
459 }
460 );
461 cx.emit(super::dap_store::DapStoreEvent::Notification(message));
462 }
463 })?;
464
465 cx.update(|cx| {
466 this.send_exception_breakpoints(
467 exception_filters,
468 supports_exception_filters,
469 cx,
470 )
471 })?
472 .await
473 .ok();
474 let ret = if configuration_done_supported {
475 this.request(ConfigurationDone {}, cx.background_executor().clone())
476 } else {
477 Task::ready(Ok(()))
478 }
479 .await;
480 ret
481 }
482 });
483
484 cx.background_spawn(async move {
485 futures::future::try_join(launch, configuration_sequence).await?;
486 Ok(())
487 })
488 }
489
490 fn request<R: LocalDapCommand>(
491 &self,
492 request: R,
493 executor: BackgroundExecutor,
494 ) -> Task<Result<R::Response>>
495 where
496 <R::DapRequest as dap::requests::Request>::Response: 'static,
497 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
498 {
499 let request = Arc::new(request);
500
501 let request_clone = request.clone();
502 let connection = self.client.clone();
503 let request_task = executor.spawn(async move {
504 let args = request_clone.to_dap();
505 connection.request::<R::DapRequest>(args).await
506 });
507
508 executor.spawn(async move {
509 let response = request.response_from_dap(request_task.await?);
510 response
511 })
512 }
513}
514impl From<RemoteConnection> for Mode {
515 fn from(value: RemoteConnection) -> Self {
516 Self::Remote(value)
517 }
518}
519
520impl Mode {
521 fn request_dap<R: DapCommand>(
522 &self,
523 session_id: SessionId,
524 request: R,
525 cx: &mut Context<Session>,
526 ) -> Task<Result<R::Response>>
527 where
528 <R::DapRequest as dap::requests::Request>::Response: 'static,
529 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
530 {
531 match self {
532 Mode::Local(debug_adapter_client) => {
533 debug_adapter_client.request(request, cx.background_executor().clone())
534 }
535 Mode::Remote(remote_connection) => remote_connection.request(request, session_id, cx),
536 }
537 }
538}
539
540#[derive(Default)]
541struct ThreadStates {
542 global_state: Option<ThreadStatus>,
543 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
544}
545
546impl ThreadStates {
547 fn stop_all_threads(&mut self) {
548 self.global_state = Some(ThreadStatus::Stopped);
549 self.known_thread_states.clear();
550 }
551
552 fn exit_all_threads(&mut self) {
553 self.global_state = Some(ThreadStatus::Exited);
554 self.known_thread_states.clear();
555 }
556
557 fn continue_all_threads(&mut self) {
558 self.global_state = Some(ThreadStatus::Running);
559 self.known_thread_states.clear();
560 }
561
562 fn stop_thread(&mut self, thread_id: ThreadId) {
563 self.known_thread_states
564 .insert(thread_id, ThreadStatus::Stopped);
565 }
566
567 fn continue_thread(&mut self, thread_id: ThreadId) {
568 self.known_thread_states
569 .insert(thread_id, ThreadStatus::Running);
570 }
571
572 fn process_step(&mut self, thread_id: ThreadId) {
573 self.known_thread_states
574 .insert(thread_id, ThreadStatus::Stepping);
575 }
576
577 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
578 self.thread_state(thread_id)
579 .unwrap_or(ThreadStatus::Running)
580 }
581
582 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
583 self.known_thread_states
584 .get(&thread_id)
585 .copied()
586 .or(self.global_state)
587 }
588
589 fn exit_thread(&mut self, thread_id: ThreadId) {
590 self.known_thread_states
591 .insert(thread_id, ThreadStatus::Exited);
592 }
593
594 fn any_stopped_thread(&self) -> bool {
595 self.global_state
596 .is_some_and(|state| state == ThreadStatus::Stopped)
597 || self
598 .known_thread_states
599 .values()
600 .any(|status| *status == ThreadStatus::Stopped)
601 }
602}
603const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
604
605type IsEnabled = bool;
606
607#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
608pub struct OutputToken(pub usize);
609/// Represents a current state of a single debug adapter and provides ways to mutate it.
610pub struct Session {
611 mode: Mode,
612 pub(super) capabilities: Capabilities,
613 id: SessionId,
614 child_session_ids: HashSet<SessionId>,
615 parent_id: Option<SessionId>,
616 ignore_breakpoints: bool,
617 modules: Vec<dap::Module>,
618 loaded_sources: Vec<dap::Source>,
619 output_token: OutputToken,
620 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
621 threads: IndexMap<ThreadId, Thread>,
622 thread_states: ThreadStates,
623 variables: HashMap<VariableReference, Vec<dap::Variable>>,
624 stack_frames: IndexMap<StackFrameId, StackFrame>,
625 locations: HashMap<u64, dap::LocationsResponse>,
626 is_session_terminated: bool,
627 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
628 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
629 _background_tasks: Vec<Task<()>>,
630}
631
632trait CacheableCommand: Any + Send + Sync {
633 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
634 fn dyn_hash(&self, hasher: &mut dyn Hasher);
635 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
636}
637
638impl<T> CacheableCommand for T
639where
640 T: DapCommand + PartialEq + Eq + Hash,
641{
642 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
643 (rhs as &dyn Any)
644 .downcast_ref::<Self>()
645 .map_or(false, |rhs| self == rhs)
646 }
647
648 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
649 T::hash(self, &mut hasher);
650 }
651
652 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
653 self
654 }
655}
656
657pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
658
659impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
660 fn from(request: T) -> Self {
661 Self(Arc::new(request))
662 }
663}
664
665impl PartialEq for RequestSlot {
666 fn eq(&self, other: &Self) -> bool {
667 self.0.dyn_eq(other.0.as_ref())
668 }
669}
670
671impl Eq for RequestSlot {}
672
673impl Hash for RequestSlot {
674 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
675 self.0.dyn_hash(state);
676 (&*self.0 as &dyn Any).type_id().hash(state)
677 }
678}
679
680#[derive(Debug, Clone, Hash, PartialEq, Eq)]
681pub struct CompletionsQuery {
682 pub query: String,
683 pub column: u64,
684 pub line: Option<u64>,
685 pub frame_id: Option<u64>,
686}
687
688impl CompletionsQuery {
689 pub fn new(
690 buffer: &language::Buffer,
691 cursor_position: language::Anchor,
692 frame_id: Option<u64>,
693 ) -> Self {
694 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
695 Self {
696 query: buffer.text(),
697 column: column as u64,
698 frame_id,
699 line: Some(row as u64),
700 }
701 }
702}
703
704pub enum SessionEvent {
705 Modules,
706 LoadedSources,
707 Stopped(Option<ThreadId>),
708 StackTrace,
709 Variables,
710 Threads,
711}
712
713pub(super) enum SessionStateEvent {
714 Shutdown,
715 Restart,
716}
717
718impl EventEmitter<SessionEvent> for Session {}
719impl EventEmitter<SessionStateEvent> for Session {}
720
721// local session will send breakpoint updates to DAP for all new breakpoints
722// remote side will only send breakpoint updates when it is a breakpoint created by that peer
723// BreakpointStore notifies session on breakpoint changes
724impl Session {
725 pub(crate) fn local(
726 breakpoint_store: Entity<BreakpointStore>,
727 worktree: WeakEntity<Worktree>,
728 session_id: SessionId,
729 parent_session: Option<Entity<Session>>,
730 binary: DebugAdapterBinary,
731 config: DebugTaskDefinition,
732 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
733 initialized_tx: oneshot::Sender<()>,
734 cx: &mut App,
735 ) -> Task<Result<Entity<Self>>> {
736 let (message_tx, message_rx) = futures::channel::mpsc::unbounded();
737
738 cx.spawn(async move |cx| {
739 let mode = LocalMode::new(
740 session_id,
741 parent_session.clone(),
742 worktree,
743 breakpoint_store.clone(),
744 config.clone(),
745 binary,
746 message_tx,
747 cx.clone(),
748 )
749 .await?;
750
751 cx.new(|cx| {
752 create_local_session(
753 breakpoint_store,
754 session_id,
755 parent_session,
756 start_debugging_requests_tx,
757 initialized_tx,
758 message_rx,
759 mode,
760 cx,
761 )
762 })
763 })
764 }
765
766 pub(crate) fn remote(
767 session_id: SessionId,
768 client: AnyProtoClient,
769 upstream_project_id: u64,
770 ignore_breakpoints: bool,
771 ) -> Self {
772 Self {
773 mode: Mode::Remote(RemoteConnection {
774 _adapter_name: SharedString::new(""), // todo(debugger) we need to pipe in the right values to deserialize the debugger pane layout
775 _client: client,
776 _upstream_project_id: upstream_project_id,
777 }),
778 id: session_id,
779 child_session_ids: HashSet::default(),
780 parent_id: None,
781 capabilities: Capabilities::default(),
782 ignore_breakpoints,
783 variables: Default::default(),
784 stack_frames: Default::default(),
785 thread_states: ThreadStates::default(),
786 output_token: OutputToken(0),
787 output: circular_buffer::CircularBuffer::boxed(),
788 requests: HashMap::default(),
789 modules: Vec::default(),
790 loaded_sources: Vec::default(),
791 threads: IndexMap::default(),
792 _background_tasks: Vec::default(),
793 locations: Default::default(),
794 is_session_terminated: false,
795 exception_breakpoints: Default::default(),
796 }
797 }
798
799 pub fn session_id(&self) -> SessionId {
800 self.id
801 }
802
803 pub fn child_session_ids(&self) -> HashSet<SessionId> {
804 self.child_session_ids.clone()
805 }
806
807 pub fn add_child_session_id(&mut self, session_id: SessionId) {
808 self.child_session_ids.insert(session_id);
809 }
810
811 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
812 self.child_session_ids.remove(&session_id);
813 }
814
815 pub fn parent_id(&self) -> Option<SessionId> {
816 self.parent_id
817 }
818
819 pub fn capabilities(&self) -> &Capabilities {
820 &self.capabilities
821 }
822
823 pub(crate) fn root_binary(&self) -> Arc<DebugAdapterBinary> {
824 let Mode::Local(local_mode) = &self.mode else {
825 panic!("Session is not local");
826 };
827 local_mode
828 .root_binary
829 .clone()
830 .unwrap_or_else(|| Arc::new(local_mode.binary.clone()))
831 }
832
833 pub fn binary(&self) -> &DebugAdapterBinary {
834 let Mode::Local(local_mode) = &self.mode else {
835 panic!("Session is not local");
836 };
837 &local_mode.binary
838 }
839
840 pub fn adapter_name(&self) -> SharedString {
841 match &self.mode {
842 Mode::Local(local_mode) => local_mode.definition.adapter.clone().into(),
843 Mode::Remote(remote_mode) => remote_mode._adapter_name.clone(),
844 }
845 }
846
847 pub fn configuration(&self) -> Option<DebugTaskDefinition> {
848 if let Mode::Local(local_mode) = &self.mode {
849 Some(local_mode.definition.clone())
850 } else {
851 None
852 }
853 }
854
855 pub fn is_terminated(&self) -> bool {
856 self.is_session_terminated
857 }
858
859 pub fn is_local(&self) -> bool {
860 matches!(self.mode, Mode::Local(_))
861 }
862
863 pub fn as_local_mut(&mut self) -> Option<&mut LocalMode> {
864 match &mut self.mode {
865 Mode::Local(local_mode) => Some(local_mode),
866 Mode::Remote(_) => None,
867 }
868 }
869
870 pub fn as_local(&self) -> Option<&LocalMode> {
871 match &self.mode {
872 Mode::Local(local_mode) => Some(local_mode),
873 Mode::Remote(_) => None,
874 }
875 }
876
877 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
878 match &self.mode {
879 Mode::Local(local_mode) => {
880 let capabilities = local_mode.clone().request_initialization(cx);
881
882 cx.spawn(async move |this, cx| {
883 let capabilities = capabilities.await?;
884 this.update(cx, |session, _| {
885 session.capabilities = capabilities;
886 let filters = session
887 .capabilities
888 .exception_breakpoint_filters
889 .clone()
890 .unwrap_or_default();
891 for filter in filters {
892 let default = filter.default.unwrap_or_default();
893 session
894 .exception_breakpoints
895 .entry(filter.filter.clone())
896 .or_insert_with(|| (filter, default));
897 }
898 })?;
899 Ok(())
900 })
901 }
902 Mode::Remote(_) => Task::ready(Err(anyhow!(
903 "Cannot send initialize request from remote session"
904 ))),
905 }
906 }
907
908 pub(super) fn initialize_sequence(
909 &mut self,
910 initialize_rx: oneshot::Receiver<()>,
911 dap_store: WeakEntity<DapStore>,
912 cx: &mut Context<Self>,
913 ) -> Task<Result<()>> {
914 match &self.mode {
915 Mode::Local(local_mode) => {
916 local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
917 }
918 Mode::Remote(_) => Task::ready(Err(anyhow!("cannot initialize remote session"))),
919 }
920 }
921
922 pub fn run_to_position(
923 &mut self,
924 breakpoint: SourceBreakpoint,
925 active_thread_id: ThreadId,
926 cx: &mut Context<Self>,
927 ) {
928 match &mut self.mode {
929 Mode::Local(local_mode) => {
930 if !matches!(
931 self.thread_states.thread_state(active_thread_id),
932 Some(ThreadStatus::Stopped)
933 ) {
934 return;
935 };
936 let path = breakpoint.path.clone();
937 local_mode.tmp_breakpoint = Some(breakpoint);
938 let task = local_mode.send_breakpoints_from_path(
939 path,
940 BreakpointUpdatedReason::Toggled,
941 cx,
942 );
943
944 cx.spawn(async move |this, cx| {
945 task.await;
946 this.update(cx, |this, cx| {
947 this.continue_thread(active_thread_id, cx);
948 })
949 })
950 .detach();
951 }
952 Mode::Remote(_) => {}
953 }
954 }
955
956 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
957 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
958 }
959
960 pub fn output(
961 &self,
962 since: OutputToken,
963 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
964 if self.output_token.0 == 0 {
965 return (self.output.range(0..0), OutputToken(0));
966 };
967
968 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
969
970 let clamped_events_since = events_since.clamp(0, self.output.len());
971 (
972 self.output
973 .range(self.output.len() - clamped_events_since..),
974 self.output_token,
975 )
976 }
977
978 pub fn respond_to_client(
979 &self,
980 request_seq: u64,
981 success: bool,
982 command: String,
983 body: Option<serde_json::Value>,
984 cx: &mut Context<Self>,
985 ) -> Task<Result<()>> {
986 let Some(local_session) = self.as_local().cloned() else {
987 unreachable!("Cannot respond to remote client");
988 };
989
990 cx.background_spawn(async move {
991 local_session
992 .client
993 .send_message(Message::Response(Response {
994 body,
995 success,
996 command,
997 seq: request_seq + 1,
998 request_seq,
999 message: None,
1000 }))
1001 .await
1002 })
1003 }
1004
1005 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1006 if let Some((local, path)) = self.as_local_mut().and_then(|local| {
1007 let breakpoint = local.tmp_breakpoint.take()?;
1008 let path = breakpoint.path.clone();
1009 Some((local, path))
1010 }) {
1011 local
1012 .send_breakpoints_from_path(path, BreakpointUpdatedReason::Toggled, cx)
1013 .detach();
1014 };
1015
1016 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1017 self.thread_states.stop_all_threads();
1018
1019 self.invalidate_command_type::<StackTraceCommand>();
1020 }
1021
1022 // Event if we stopped all threads we still need to insert the thread_id
1023 // to our own data
1024 if let Some(thread_id) = event.thread_id {
1025 self.thread_states.stop_thread(ThreadId(thread_id));
1026
1027 self.invalidate_state(
1028 &StackTraceCommand {
1029 thread_id,
1030 start_frame: None,
1031 levels: None,
1032 }
1033 .into(),
1034 );
1035 }
1036
1037 self.invalidate_generic();
1038 self.threads.clear();
1039 self.variables.clear();
1040 cx.emit(SessionEvent::Stopped(
1041 event
1042 .thread_id
1043 .map(Into::into)
1044 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1045 ));
1046 cx.notify();
1047 }
1048
1049 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1050 match *event {
1051 Events::Initialized(_) => {
1052 debug_assert!(
1053 false,
1054 "Initialized event should have been handled in LocalMode"
1055 );
1056 }
1057 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1058 Events::Continued(event) => {
1059 if event.all_threads_continued.unwrap_or_default() {
1060 self.thread_states.continue_all_threads();
1061 } else {
1062 self.thread_states
1063 .continue_thread(ThreadId(event.thread_id));
1064 }
1065 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1066 self.invalidate_generic();
1067 }
1068 Events::Exited(_event) => {
1069 self.clear_active_debug_line(cx);
1070 }
1071 Events::Terminated(_) => {
1072 self.is_session_terminated = true;
1073 self.clear_active_debug_line(cx);
1074 }
1075 Events::Thread(event) => {
1076 let thread_id = ThreadId(event.thread_id);
1077
1078 match event.reason {
1079 dap::ThreadEventReason::Started => {
1080 self.thread_states.continue_thread(thread_id);
1081 }
1082 dap::ThreadEventReason::Exited => {
1083 self.thread_states.exit_thread(thread_id);
1084 }
1085 reason => {
1086 log::error!("Unhandled thread event reason {:?}", reason);
1087 }
1088 }
1089 self.invalidate_state(&ThreadsCommand.into());
1090 cx.notify();
1091 }
1092 Events::Output(event) => {
1093 if event
1094 .category
1095 .as_ref()
1096 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1097 {
1098 return;
1099 }
1100
1101 self.output.push_back(event);
1102 self.output_token.0 += 1;
1103 cx.notify();
1104 }
1105 Events::Breakpoint(_) => {}
1106 Events::Module(event) => {
1107 match event.reason {
1108 dap::ModuleEventReason::New => {
1109 self.modules.push(event.module);
1110 }
1111 dap::ModuleEventReason::Changed => {
1112 if let Some(module) = self
1113 .modules
1114 .iter_mut()
1115 .find(|other| event.module.id == other.id)
1116 {
1117 *module = event.module;
1118 }
1119 }
1120 dap::ModuleEventReason::Removed => {
1121 self.modules.retain(|other| event.module.id != other.id);
1122 }
1123 }
1124
1125 // todo(debugger): We should only send the invalidate command to downstream clients.
1126 // self.invalidate_state(&ModulesCommand.into());
1127 }
1128 Events::LoadedSource(_) => {
1129 self.invalidate_state(&LoadedSourcesCommand.into());
1130 }
1131 Events::Capabilities(event) => {
1132 self.capabilities = self.capabilities.merge(event.capabilities);
1133 cx.notify();
1134 }
1135 Events::Memory(_) => {}
1136 Events::Process(_) => {}
1137 Events::ProgressEnd(_) => {}
1138 Events::ProgressStart(_) => {}
1139 Events::ProgressUpdate(_) => {}
1140 Events::Invalidated(_) => {}
1141 Events::Other(_) => {}
1142 }
1143 }
1144
1145 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1146 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1147 &mut self,
1148 request: T,
1149 process_result: impl FnOnce(
1150 &mut Self,
1151 Result<T::Response>,
1152 &mut Context<Self>,
1153 ) -> Option<T::Response>
1154 + 'static,
1155 cx: &mut Context<Self>,
1156 ) {
1157 const {
1158 assert!(
1159 T::CACHEABLE,
1160 "Only requests marked as cacheable should invoke `fetch`"
1161 );
1162 }
1163
1164 if !self.thread_states.any_stopped_thread()
1165 && request.type_id() != TypeId::of::<ThreadsCommand>()
1166 || self.is_session_terminated
1167 {
1168 return;
1169 }
1170
1171 let request_map = self
1172 .requests
1173 .entry(std::any::TypeId::of::<T>())
1174 .or_default();
1175
1176 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1177 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1178
1179 let task = Self::request_inner::<Arc<T>>(
1180 &self.capabilities,
1181 self.id,
1182 &self.mode,
1183 command,
1184 process_result,
1185 cx,
1186 );
1187 let task = cx
1188 .background_executor()
1189 .spawn(async move {
1190 let _ = task.await?;
1191 Some(())
1192 })
1193 .shared();
1194
1195 vacant.insert(task);
1196 cx.notify();
1197 }
1198 }
1199
1200 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1201 capabilities: &Capabilities,
1202 session_id: SessionId,
1203 mode: &Mode,
1204 request: T,
1205 process_result: impl FnOnce(
1206 &mut Self,
1207 Result<T::Response>,
1208 &mut Context<Self>,
1209 ) -> Option<T::Response>
1210 + 'static,
1211 cx: &mut Context<Self>,
1212 ) -> Task<Option<T::Response>> {
1213 if !T::is_supported(&capabilities) {
1214 log::warn!(
1215 "Attempted to send a DAP request that isn't supported: {:?}",
1216 request
1217 );
1218 let error = Err(anyhow::Error::msg(
1219 "Couldn't complete request because it's not supported",
1220 ));
1221 return cx.spawn(async move |this, cx| {
1222 this.update(cx, |this, cx| process_result(this, error, cx))
1223 .log_err()
1224 .flatten()
1225 });
1226 }
1227
1228 let request = mode.request_dap(session_id, request, cx);
1229 cx.spawn(async move |this, cx| {
1230 let result = request.await;
1231 this.update(cx, |this, cx| process_result(this, result, cx))
1232 .log_err()
1233 .flatten()
1234 })
1235 }
1236
1237 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1238 &self,
1239 request: T,
1240 process_result: impl FnOnce(
1241 &mut Self,
1242 Result<T::Response>,
1243 &mut Context<Self>,
1244 ) -> Option<T::Response>
1245 + 'static,
1246 cx: &mut Context<Self>,
1247 ) -> Task<Option<T::Response>> {
1248 Self::request_inner(
1249 &self.capabilities,
1250 self.id,
1251 &self.mode,
1252 request,
1253 process_result,
1254 cx,
1255 )
1256 }
1257
1258 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1259 self.requests.remove(&std::any::TypeId::of::<Command>());
1260 }
1261
1262 fn invalidate_generic(&mut self) {
1263 self.invalidate_command_type::<ModulesCommand>();
1264 self.invalidate_command_type::<LoadedSourcesCommand>();
1265 self.invalidate_command_type::<ThreadsCommand>();
1266 }
1267
1268 fn invalidate_state(&mut self, key: &RequestSlot) {
1269 self.requests
1270 .entry((&*key.0 as &dyn Any).type_id())
1271 .and_modify(|request_map| {
1272 request_map.remove(&key);
1273 });
1274 }
1275
1276 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1277 self.thread_states.thread_status(thread_id)
1278 }
1279
1280 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1281 self.fetch(
1282 dap_command::ThreadsCommand,
1283 |this, result, cx| {
1284 let result = result.log_err()?;
1285
1286 this.threads = result
1287 .iter()
1288 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1289 .collect();
1290
1291 this.invalidate_command_type::<StackTraceCommand>();
1292 cx.emit(SessionEvent::Threads);
1293 cx.notify();
1294
1295 Some(result)
1296 },
1297 cx,
1298 );
1299
1300 self.threads
1301 .values()
1302 .map(|thread| {
1303 (
1304 thread.dap.clone(),
1305 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1306 )
1307 })
1308 .collect()
1309 }
1310
1311 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1312 self.fetch(
1313 dap_command::ModulesCommand,
1314 |this, result, cx| {
1315 let result = result.log_err()?;
1316
1317 this.modules = result.iter().cloned().collect();
1318 cx.emit(SessionEvent::Modules);
1319 cx.notify();
1320
1321 Some(result)
1322 },
1323 cx,
1324 );
1325
1326 &self.modules
1327 }
1328
1329 pub fn ignore_breakpoints(&self) -> bool {
1330 self.ignore_breakpoints
1331 }
1332
1333 pub fn toggle_ignore_breakpoints(
1334 &mut self,
1335 cx: &mut App,
1336 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1337 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1338 }
1339
1340 pub(crate) fn set_ignore_breakpoints(
1341 &mut self,
1342 ignore: bool,
1343 cx: &mut App,
1344 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1345 if self.ignore_breakpoints == ignore {
1346 return Task::ready(HashMap::default());
1347 }
1348
1349 self.ignore_breakpoints = ignore;
1350
1351 if let Some(local) = self.as_local() {
1352 local.send_source_breakpoints(ignore, cx)
1353 } else {
1354 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1355 unimplemented!()
1356 }
1357 }
1358
1359 pub fn exception_breakpoints(
1360 &self,
1361 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1362 self.exception_breakpoints.values()
1363 }
1364
1365 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1366 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1367 *is_enabled = !*is_enabled;
1368 self.send_exception_breakpoints(cx);
1369 }
1370 }
1371
1372 fn send_exception_breakpoints(&mut self, cx: &App) {
1373 if let Some(local) = self.as_local() {
1374 let exception_filters = self
1375 .exception_breakpoints
1376 .values()
1377 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1378 .collect();
1379
1380 let supports_exception_filters = self
1381 .capabilities
1382 .supports_exception_filter_options
1383 .unwrap_or_default();
1384 local
1385 .send_exception_breakpoints(exception_filters, supports_exception_filters, cx)
1386 .detach_and_log_err(cx);
1387 } else {
1388 debug_assert!(false, "Not implemented");
1389 }
1390 }
1391
1392 pub fn breakpoints_enabled(&self) -> bool {
1393 self.ignore_breakpoints
1394 }
1395
1396 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1397 self.fetch(
1398 dap_command::LoadedSourcesCommand,
1399 |this, result, cx| {
1400 let result = result.log_err()?;
1401 this.loaded_sources = result.iter().cloned().collect();
1402 cx.emit(SessionEvent::LoadedSources);
1403 cx.notify();
1404 Some(result)
1405 },
1406 cx,
1407 );
1408
1409 &self.loaded_sources
1410 }
1411
1412 fn fallback_to_manual_restart(
1413 &mut self,
1414 res: Result<()>,
1415 cx: &mut Context<Self>,
1416 ) -> Option<()> {
1417 if res.log_err().is_none() {
1418 cx.emit(SessionStateEvent::Restart);
1419 return None;
1420 }
1421 Some(())
1422 }
1423
1424 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1425 res.log_err()?;
1426 Some(())
1427 }
1428
1429 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1430 thread_id: ThreadId,
1431 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1432 {
1433 move |this, response, cx| match response.log_err() {
1434 Some(response) => Some(response),
1435 None => {
1436 this.thread_states.stop_thread(thread_id);
1437 cx.notify();
1438 None
1439 }
1440 }
1441 }
1442
1443 fn clear_active_debug_line_response(
1444 &mut self,
1445 response: Result<()>,
1446 cx: &mut Context<Session>,
1447 ) -> Option<()> {
1448 response.log_err()?;
1449 self.clear_active_debug_line(cx);
1450 Some(())
1451 }
1452
1453 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1454 self.as_local()
1455 .expect("Message handler will only run in local mode")
1456 .breakpoint_store
1457 .update(cx, |store, cx| {
1458 store.remove_active_position(Some(self.id), cx)
1459 });
1460 }
1461
1462 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1463 self.request(
1464 PauseCommand {
1465 thread_id: thread_id.0,
1466 },
1467 Self::empty_response,
1468 cx,
1469 )
1470 .detach();
1471 }
1472
1473 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1474 self.request(
1475 RestartStackFrameCommand { stack_frame_id },
1476 Self::empty_response,
1477 cx,
1478 )
1479 .detach();
1480 }
1481
1482 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1483 if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1484 self.request(
1485 RestartCommand {
1486 raw: args.unwrap_or(Value::Null),
1487 },
1488 Self::fallback_to_manual_restart,
1489 cx,
1490 )
1491 .detach();
1492 } else {
1493 cx.emit(SessionStateEvent::Restart);
1494 }
1495 }
1496
1497 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1498 self.is_session_terminated = true;
1499 self.thread_states.exit_all_threads();
1500 cx.notify();
1501
1502 let task = if self
1503 .capabilities
1504 .supports_terminate_request
1505 .unwrap_or_default()
1506 {
1507 self.request(
1508 TerminateCommand {
1509 restart: Some(false),
1510 },
1511 Self::clear_active_debug_line_response,
1512 cx,
1513 )
1514 } else {
1515 self.request(
1516 DisconnectCommand {
1517 restart: Some(false),
1518 terminate_debuggee: Some(true),
1519 suspend_debuggee: Some(false),
1520 },
1521 Self::clear_active_debug_line_response,
1522 cx,
1523 )
1524 };
1525
1526 cx.emit(SessionStateEvent::Shutdown);
1527
1528 let debug_client = self.adapter_client();
1529
1530 cx.background_spawn(async move {
1531 let _ = task.await;
1532
1533 if let Some(client) = debug_client {
1534 client.shutdown().await.log_err();
1535 }
1536 })
1537 }
1538
1539 pub fn completions(
1540 &mut self,
1541 query: CompletionsQuery,
1542 cx: &mut Context<Self>,
1543 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1544 let task = self.request(query, |_, result, _| result.log_err(), cx);
1545
1546 cx.background_executor().spawn(async move {
1547 anyhow::Ok(
1548 task.await
1549 .map(|response| response.targets)
1550 .ok_or_else(|| anyhow!("failed to fetch completions"))?,
1551 )
1552 })
1553 }
1554
1555 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1556 self.thread_states.continue_thread(thread_id);
1557 self.request(
1558 ContinueCommand {
1559 args: ContinueArguments {
1560 thread_id: thread_id.0,
1561 single_thread: Some(true),
1562 },
1563 },
1564 Self::on_step_response::<ContinueCommand>(thread_id),
1565 cx,
1566 )
1567 .detach();
1568 }
1569
1570 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1571 match self.mode {
1572 Mode::Local(ref local) => Some(local.client.clone()),
1573 Mode::Remote(_) => None,
1574 }
1575 }
1576
1577 pub fn step_over(
1578 &mut self,
1579 thread_id: ThreadId,
1580 granularity: SteppingGranularity,
1581 cx: &mut Context<Self>,
1582 ) {
1583 let supports_single_thread_execution_requests =
1584 self.capabilities.supports_single_thread_execution_requests;
1585 let supports_stepping_granularity = self
1586 .capabilities
1587 .supports_stepping_granularity
1588 .unwrap_or_default();
1589
1590 let command = NextCommand {
1591 inner: StepCommand {
1592 thread_id: thread_id.0,
1593 granularity: supports_stepping_granularity.then(|| granularity),
1594 single_thread: supports_single_thread_execution_requests,
1595 },
1596 };
1597
1598 self.thread_states.process_step(thread_id);
1599 self.request(
1600 command,
1601 Self::on_step_response::<NextCommand>(thread_id),
1602 cx,
1603 )
1604 .detach();
1605 }
1606
1607 pub fn step_in(
1608 &mut self,
1609 thread_id: ThreadId,
1610 granularity: SteppingGranularity,
1611 cx: &mut Context<Self>,
1612 ) {
1613 let supports_single_thread_execution_requests =
1614 self.capabilities.supports_single_thread_execution_requests;
1615 let supports_stepping_granularity = self
1616 .capabilities
1617 .supports_stepping_granularity
1618 .unwrap_or_default();
1619
1620 let command = StepInCommand {
1621 inner: StepCommand {
1622 thread_id: thread_id.0,
1623 granularity: supports_stepping_granularity.then(|| granularity),
1624 single_thread: supports_single_thread_execution_requests,
1625 },
1626 };
1627
1628 self.thread_states.process_step(thread_id);
1629 self.request(
1630 command,
1631 Self::on_step_response::<StepInCommand>(thread_id),
1632 cx,
1633 )
1634 .detach();
1635 }
1636
1637 pub fn step_out(
1638 &mut self,
1639 thread_id: ThreadId,
1640 granularity: SteppingGranularity,
1641 cx: &mut Context<Self>,
1642 ) {
1643 let supports_single_thread_execution_requests =
1644 self.capabilities.supports_single_thread_execution_requests;
1645 let supports_stepping_granularity = self
1646 .capabilities
1647 .supports_stepping_granularity
1648 .unwrap_or_default();
1649
1650 let command = StepOutCommand {
1651 inner: StepCommand {
1652 thread_id: thread_id.0,
1653 granularity: supports_stepping_granularity.then(|| granularity),
1654 single_thread: supports_single_thread_execution_requests,
1655 },
1656 };
1657
1658 self.thread_states.process_step(thread_id);
1659 self.request(
1660 command,
1661 Self::on_step_response::<StepOutCommand>(thread_id),
1662 cx,
1663 )
1664 .detach();
1665 }
1666
1667 pub fn step_back(
1668 &mut self,
1669 thread_id: ThreadId,
1670 granularity: SteppingGranularity,
1671 cx: &mut Context<Self>,
1672 ) {
1673 let supports_single_thread_execution_requests =
1674 self.capabilities.supports_single_thread_execution_requests;
1675 let supports_stepping_granularity = self
1676 .capabilities
1677 .supports_stepping_granularity
1678 .unwrap_or_default();
1679
1680 let command = StepBackCommand {
1681 inner: StepCommand {
1682 thread_id: thread_id.0,
1683 granularity: supports_stepping_granularity.then(|| granularity),
1684 single_thread: supports_single_thread_execution_requests,
1685 },
1686 };
1687
1688 self.thread_states.process_step(thread_id);
1689
1690 self.request(
1691 command,
1692 Self::on_step_response::<StepBackCommand>(thread_id),
1693 cx,
1694 )
1695 .detach();
1696 }
1697
1698 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1699 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1700 && self.requests.contains_key(&ThreadsCommand.type_id())
1701 && self.threads.contains_key(&thread_id)
1702 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1703 // We could still be using an old thread id and have sent a new thread's request
1704 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1705 // But it very well could cause a minor bug in the future that is hard to track down
1706 {
1707 self.fetch(
1708 super::dap_command::StackTraceCommand {
1709 thread_id: thread_id.0,
1710 start_frame: None,
1711 levels: None,
1712 },
1713 move |this, stack_frames, cx| {
1714 let stack_frames = stack_frames.log_err()?;
1715
1716 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1717 thread.stack_frame_ids =
1718 stack_frames.iter().map(|frame| frame.id).collect();
1719 });
1720 debug_assert!(
1721 matches!(entry, indexmap::map::Entry::Occupied(_)),
1722 "Sent request for thread_id that doesn't exist"
1723 );
1724
1725 this.stack_frames.extend(
1726 stack_frames
1727 .iter()
1728 .cloned()
1729 .map(|frame| (frame.id, StackFrame::from(frame))),
1730 );
1731
1732 this.invalidate_command_type::<ScopesCommand>();
1733 this.invalidate_command_type::<VariablesCommand>();
1734
1735 cx.emit(SessionEvent::StackTrace);
1736 cx.notify();
1737 Some(stack_frames)
1738 },
1739 cx,
1740 );
1741 }
1742
1743 self.threads
1744 .get(&thread_id)
1745 .map(|thread| {
1746 thread
1747 .stack_frame_ids
1748 .iter()
1749 .filter_map(|id| self.stack_frames.get(id))
1750 .cloned()
1751 .collect()
1752 })
1753 .unwrap_or_default()
1754 }
1755
1756 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1757 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1758 && self
1759 .requests
1760 .contains_key(&TypeId::of::<StackTraceCommand>())
1761 {
1762 self.fetch(
1763 ScopesCommand { stack_frame_id },
1764 move |this, scopes, cx| {
1765 let scopes = scopes.log_err()?;
1766
1767 for scope in scopes .iter(){
1768 this.variables(scope.variables_reference, cx);
1769 }
1770
1771 let entry = this
1772 .stack_frames
1773 .entry(stack_frame_id)
1774 .and_modify(|stack_frame| {
1775 stack_frame.scopes = scopes.clone();
1776 });
1777
1778 cx.emit(SessionEvent::Variables);
1779
1780 debug_assert!(
1781 matches!(entry, indexmap::map::Entry::Occupied(_)),
1782 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1783 );
1784
1785 Some(scopes)
1786 },
1787 cx,
1788 );
1789 }
1790
1791 self.stack_frames
1792 .get(&stack_frame_id)
1793 .map(|frame| frame.scopes.as_slice())
1794 .unwrap_or_default()
1795 }
1796
1797 pub fn variables(
1798 &mut self,
1799 variables_reference: VariableReference,
1800 cx: &mut Context<Self>,
1801 ) -> Vec<dap::Variable> {
1802 let command = VariablesCommand {
1803 variables_reference,
1804 filter: None,
1805 start: None,
1806 count: None,
1807 format: None,
1808 };
1809
1810 self.fetch(
1811 command,
1812 move |this, variables, cx| {
1813 let variables = variables.log_err()?;
1814 this.variables
1815 .insert(variables_reference, variables.clone());
1816
1817 cx.emit(SessionEvent::Variables);
1818 Some(variables)
1819 },
1820 cx,
1821 );
1822
1823 self.variables
1824 .get(&variables_reference)
1825 .cloned()
1826 .unwrap_or_default()
1827 }
1828
1829 pub fn set_variable_value(
1830 &mut self,
1831 variables_reference: u64,
1832 name: String,
1833 value: String,
1834 cx: &mut Context<Self>,
1835 ) {
1836 if self.capabilities.supports_set_variable.unwrap_or_default() {
1837 self.request(
1838 SetVariableValueCommand {
1839 name,
1840 value,
1841 variables_reference,
1842 },
1843 move |this, response, cx| {
1844 let response = response.log_err()?;
1845 this.invalidate_command_type::<VariablesCommand>();
1846 cx.notify();
1847 Some(response)
1848 },
1849 cx,
1850 )
1851 .detach()
1852 }
1853 }
1854
1855 pub fn evaluate(
1856 &mut self,
1857 expression: String,
1858 context: Option<EvaluateArgumentsContext>,
1859 frame_id: Option<u64>,
1860 source: Option<Source>,
1861 cx: &mut Context<Self>,
1862 ) {
1863 self.request(
1864 EvaluateCommand {
1865 expression,
1866 context,
1867 frame_id,
1868 source,
1869 },
1870 |this, response, cx| {
1871 let response = response.log_err()?;
1872 this.output_token.0 += 1;
1873 this.output.push_back(dap::OutputEvent {
1874 category: None,
1875 output: response.result.clone(),
1876 group: None,
1877 variables_reference: Some(response.variables_reference),
1878 source: None,
1879 line: None,
1880 column: None,
1881 data: None,
1882 location_reference: None,
1883 });
1884
1885 this.invalidate_command_type::<ScopesCommand>();
1886 cx.notify();
1887 Some(response)
1888 },
1889 cx,
1890 )
1891 .detach();
1892 }
1893
1894 pub fn location(
1895 &mut self,
1896 reference: u64,
1897 cx: &mut Context<Self>,
1898 ) -> Option<dap::LocationsResponse> {
1899 self.fetch(
1900 LocationsCommand { reference },
1901 move |this, response, _| {
1902 let response = response.log_err()?;
1903 this.locations.insert(reference, response.clone());
1904 Some(response)
1905 },
1906 cx,
1907 );
1908 self.locations.get(&reference).cloned()
1909 }
1910 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
1911 let command = DisconnectCommand {
1912 restart: Some(false),
1913 terminate_debuggee: Some(true),
1914 suspend_debuggee: Some(false),
1915 };
1916
1917 self.request(command, Self::empty_response, cx).detach()
1918 }
1919
1920 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
1921 if self
1922 .capabilities
1923 .supports_terminate_threads_request
1924 .unwrap_or_default()
1925 {
1926 self.request(
1927 TerminateThreadsCommand {
1928 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
1929 },
1930 Self::clear_active_debug_line_response,
1931 cx,
1932 )
1933 .detach();
1934 } else {
1935 self.shutdown(cx).detach();
1936 }
1937 }
1938}
1939
1940fn create_local_session(
1941 breakpoint_store: Entity<BreakpointStore>,
1942 session_id: SessionId,
1943 parent_session: Option<Entity<Session>>,
1944 start_debugging_requests_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
1945 initialized_tx: oneshot::Sender<()>,
1946 mut message_rx: futures::channel::mpsc::UnboundedReceiver<Message>,
1947 mode: LocalMode,
1948 cx: &mut Context<Session>,
1949) -> Session {
1950 let _background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
1951 let mut initialized_tx = Some(initialized_tx);
1952 while let Some(message) = message_rx.next().await {
1953 if let Message::Event(event) = message {
1954 if let Events::Initialized(_) = *event {
1955 if let Some(tx) = initialized_tx.take() {
1956 tx.send(()).ok();
1957 }
1958 } else {
1959 let Ok(_) = this.update(cx, |session, cx| {
1960 session.handle_dap_event(event, cx);
1961 }) else {
1962 break;
1963 };
1964 }
1965 } else {
1966 let Ok(_) = start_debugging_requests_tx.unbounded_send((session_id, message))
1967 else {
1968 break;
1969 };
1970 }
1971 }
1972 })];
1973
1974 cx.subscribe(&breakpoint_store, |this, _, event, cx| match event {
1975 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
1976 if let Some(local) = (!this.ignore_breakpoints)
1977 .then(|| this.as_local_mut())
1978 .flatten()
1979 {
1980 local
1981 .send_breakpoints_from_path(path.clone(), *reason, cx)
1982 .detach();
1983 };
1984 }
1985 BreakpointStoreEvent::BreakpointsCleared(paths) => {
1986 if let Some(local) = (!this.ignore_breakpoints)
1987 .then(|| this.as_local_mut())
1988 .flatten()
1989 {
1990 local.unset_breakpoints_from_paths(paths, cx).detach();
1991 }
1992 }
1993 BreakpointStoreEvent::ActiveDebugLineChanged => {}
1994 })
1995 .detach();
1996
1997 Session {
1998 mode: Mode::Local(mode),
1999 id: session_id,
2000 child_session_ids: HashSet::default(),
2001 parent_id: parent_session.map(|session| session.read(cx).id),
2002 variables: Default::default(),
2003 capabilities: Capabilities::default(),
2004 thread_states: ThreadStates::default(),
2005 output_token: OutputToken(0),
2006 ignore_breakpoints: false,
2007 output: circular_buffer::CircularBuffer::boxed(),
2008 requests: HashMap::default(),
2009 modules: Vec::default(),
2010 loaded_sources: Vec::default(),
2011 threads: IndexMap::default(),
2012 stack_frames: IndexMap::default(),
2013 locations: Default::default(),
2014 exception_breakpoints: Default::default(),
2015 _background_tasks,
2016 is_session_terminated: false,
2017 }
2018}