1use crate::debugger::breakpoint_store::BreakpointSessionState;
2
3use super::breakpoint_store::{
4 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
5};
6use super::dap_command::{
7 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
8 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
9 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
10 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
11 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
12 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
13};
14use super::dap_store::DapStore;
15use anyhow::{Context as _, Result, anyhow};
16use collections::{HashMap, HashSet, IndexMap};
17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
18use dap::messages::Response;
19use dap::requests::{Request, RunInTerminal, StartDebugging};
20use dap::{
21 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
22 SteppingGranularity, StoppedEvent, VariableReference,
23 client::{DebugAdapterClient, SessionId},
24 messages::{Events, Message},
25};
26use dap::{
27 ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
28 RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
29 StartDebuggingRequestArgumentsRequest,
30};
31use futures::SinkExt;
32use futures::channel::mpsc::UnboundedSender;
33use futures::channel::{mpsc, oneshot};
34use futures::{FutureExt, future::Shared};
35use gpui::{
36 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
37 Task, WeakEntity,
38};
39
40use rpc::ErrorExt;
41use serde_json::Value;
42use smol::stream::StreamExt;
43use std::any::TypeId;
44use std::collections::BTreeMap;
45use std::u64;
46use std::{
47 any::Any,
48 collections::hash_map::Entry,
49 hash::{Hash, Hasher},
50 path::Path,
51 sync::Arc,
52};
53use task::TaskContext;
54use text::{PointUtf16, ToPointUtf16};
55use util::ResultExt;
56use worktree::Worktree;
57
58#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
59#[repr(transparent)]
60pub struct ThreadId(pub u64);
61
62impl ThreadId {
63 pub const MIN: ThreadId = ThreadId(u64::MIN);
64 pub const MAX: ThreadId = ThreadId(u64::MAX);
65}
66
67impl From<u64> for ThreadId {
68 fn from(id: u64) -> Self {
69 Self(id)
70 }
71}
72
73#[derive(Clone, Debug)]
74pub struct StackFrame {
75 pub dap: dap::StackFrame,
76 pub scopes: Vec<dap::Scope>,
77}
78
79impl From<dap::StackFrame> for StackFrame {
80 fn from(stack_frame: dap::StackFrame) -> Self {
81 Self {
82 scopes: vec![],
83 dap: stack_frame,
84 }
85 }
86}
87
88#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
89pub enum ThreadStatus {
90 #[default]
91 Running,
92 Stopped,
93 Stepping,
94 Exited,
95 Ended,
96}
97
98impl ThreadStatus {
99 pub fn label(&self) -> &'static str {
100 match self {
101 ThreadStatus::Running => "Running",
102 ThreadStatus::Stopped => "Stopped",
103 ThreadStatus::Stepping => "Stepping",
104 ThreadStatus::Exited => "Exited",
105 ThreadStatus::Ended => "Ended",
106 }
107 }
108}
109
110#[derive(Debug)]
111pub struct Thread {
112 dap: dap::Thread,
113 stack_frames: Vec<StackFrame>,
114 stack_frames_error: Option<anyhow::Error>,
115 _has_stopped: bool,
116}
117
118impl From<dap::Thread> for Thread {
119 fn from(dap: dap::Thread) -> Self {
120 Self {
121 dap,
122 stack_frames: Default::default(),
123 stack_frames_error: None,
124 _has_stopped: false,
125 }
126 }
127}
128
129pub enum Mode {
130 Building,
131 Running(RunningMode),
132}
133
134#[derive(Clone)]
135pub struct RunningMode {
136 client: Arc<DebugAdapterClient>,
137 binary: DebugAdapterBinary,
138 tmp_breakpoint: Option<SourceBreakpoint>,
139 worktree: WeakEntity<Worktree>,
140 executor: BackgroundExecutor,
141 is_started: bool,
142 has_ever_stopped: bool,
143 messages_tx: UnboundedSender<Message>,
144}
145
146fn client_source(abs_path: &Path) -> dap::Source {
147 dap::Source {
148 name: abs_path
149 .file_name()
150 .map(|filename| filename.to_string_lossy().to_string()),
151 path: Some(abs_path.to_string_lossy().to_string()),
152 source_reference: None,
153 presentation_hint: None,
154 origin: None,
155 sources: None,
156 adapter_data: None,
157 checksums: None,
158 }
159}
160
161impl RunningMode {
162 async fn new(
163 session_id: SessionId,
164 parent_session: Option<Entity<Session>>,
165 worktree: WeakEntity<Worktree>,
166 binary: DebugAdapterBinary,
167 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
168 cx: &mut AsyncApp,
169 ) -> Result<Self> {
170 let message_handler = Box::new({
171 let messages_tx = messages_tx.clone();
172 move |message| {
173 messages_tx.unbounded_send(message).ok();
174 }
175 });
176
177 let client = if let Some(client) = parent_session
178 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
179 .flatten()
180 {
181 client
182 .create_child_connection(session_id, binary.clone(), message_handler, cx)
183 .await?
184 } else {
185 DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
186 };
187
188 Ok(Self {
189 client: Arc::new(client),
190 worktree,
191 tmp_breakpoint: None,
192 binary,
193 executor: cx.background_executor().clone(),
194 is_started: false,
195 has_ever_stopped: false,
196 messages_tx,
197 })
198 }
199
200 pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
201 &self.worktree
202 }
203
204 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
205 let tasks: Vec<_> = paths
206 .into_iter()
207 .map(|path| {
208 self.request(dap_command::SetBreakpoints {
209 source: client_source(path),
210 source_modified: None,
211 breakpoints: vec![],
212 })
213 })
214 .collect();
215
216 cx.background_spawn(async move {
217 futures::future::join_all(tasks)
218 .await
219 .iter()
220 .for_each(|res| match res {
221 Ok(_) => {}
222 Err(err) => {
223 log::warn!("Set breakpoints request failed: {}", err);
224 }
225 });
226 })
227 }
228
229 fn send_breakpoints_from_path(
230 &self,
231 abs_path: Arc<Path>,
232 reason: BreakpointUpdatedReason,
233 breakpoint_store: &Entity<BreakpointStore>,
234 cx: &mut App,
235 ) -> Task<()> {
236 let breakpoints =
237 breakpoint_store
238 .read(cx)
239 .source_breakpoints_from_path(&abs_path, cx)
240 .into_iter()
241 .filter(|bp| bp.state.is_enabled())
242 .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
243 breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
244 }))
245 .map(Into::into)
246 .collect();
247
248 let raw_breakpoints = breakpoint_store
249 .read(cx)
250 .breakpoints_from_path(&abs_path)
251 .into_iter()
252 .filter(|bp| bp.bp.state.is_enabled())
253 .collect::<Vec<_>>();
254
255 let task = self.request(dap_command::SetBreakpoints {
256 source: client_source(&abs_path),
257 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
258 breakpoints,
259 });
260 let session_id = self.client.id();
261 let breakpoint_store = breakpoint_store.downgrade();
262 cx.spawn(async move |cx| match cx.background_spawn(task).await {
263 Ok(breakpoints) => {
264 let breakpoints =
265 breakpoints
266 .into_iter()
267 .zip(raw_breakpoints)
268 .filter_map(|(dap_bp, zed_bp)| {
269 Some((
270 zed_bp,
271 BreakpointSessionState {
272 id: dap_bp.id?,
273 verified: dap_bp.verified,
274 },
275 ))
276 });
277 breakpoint_store
278 .update(cx, |this, _| {
279 this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
280 })
281 .ok();
282 }
283 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
284 })
285 }
286
287 fn send_exception_breakpoints(
288 &self,
289 filters: Vec<ExceptionBreakpointsFilter>,
290 supports_filter_options: bool,
291 ) -> Task<Result<Vec<dap::Breakpoint>>> {
292 let arg = if supports_filter_options {
293 SetExceptionBreakpoints::WithOptions {
294 filters: filters
295 .into_iter()
296 .map(|filter| ExceptionFilterOptions {
297 filter_id: filter.filter,
298 condition: None,
299 mode: None,
300 })
301 .collect(),
302 }
303 } else {
304 SetExceptionBreakpoints::Plain {
305 filters: filters.into_iter().map(|filter| filter.filter).collect(),
306 }
307 };
308 self.request(arg)
309 }
310
311 fn send_source_breakpoints(
312 &self,
313 ignore_breakpoints: bool,
314 breakpoint_store: &Entity<BreakpointStore>,
315 cx: &App,
316 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
317 let mut breakpoint_tasks = Vec::new();
318 let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
319 let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
320 debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
321 let session_id = self.client.id();
322 for (path, breakpoints) in breakpoints {
323 let breakpoints = if ignore_breakpoints {
324 vec![]
325 } else {
326 breakpoints
327 .into_iter()
328 .filter(|bp| bp.state.is_enabled())
329 .map(Into::into)
330 .collect()
331 };
332
333 let raw_breakpoints = raw_breakpoints
334 .remove(&path)
335 .unwrap_or_default()
336 .into_iter()
337 .filter(|bp| bp.bp.state.is_enabled());
338 let error_path = path.clone();
339 let send_request = self
340 .request(dap_command::SetBreakpoints {
341 source: client_source(&path),
342 source_modified: Some(false),
343 breakpoints,
344 })
345 .map(|result| result.map_err(move |e| (error_path, e)));
346
347 let task = cx.spawn({
348 let breakpoint_store = breakpoint_store.downgrade();
349 async move |cx| {
350 let breakpoints = cx.background_spawn(send_request).await?;
351
352 let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
353 |(dap_bp, zed_bp)| {
354 Some((
355 zed_bp,
356 BreakpointSessionState {
357 id: dap_bp.id?,
358 verified: dap_bp.verified,
359 },
360 ))
361 },
362 );
363 breakpoint_store
364 .update(cx, |this, _| {
365 this.mark_breakpoints_verified(session_id, &path, breakpoints);
366 })
367 .ok();
368
369 Ok(())
370 }
371 });
372 breakpoint_tasks.push(task);
373 }
374
375 cx.background_spawn(async move {
376 futures::future::join_all(breakpoint_tasks)
377 .await
378 .into_iter()
379 .filter_map(Result::err)
380 .collect::<HashMap<_, _>>()
381 })
382 }
383
384 fn initialize_sequence(
385 &self,
386 capabilities: &Capabilities,
387 initialized_rx: oneshot::Receiver<()>,
388 dap_store: WeakEntity<DapStore>,
389 cx: &mut Context<Session>,
390 ) -> Task<Result<()>> {
391 let raw = self.binary.request_args.clone();
392
393 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
394 let launch = match raw.request {
395 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
396 raw: raw.configuration,
397 }),
398 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
399 raw: raw.configuration,
400 }),
401 };
402
403 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
404 let exception_filters = capabilities
405 .exception_breakpoint_filters
406 .as_ref()
407 .map(|exception_filters| {
408 exception_filters
409 .iter()
410 .filter(|filter| filter.default == Some(true))
411 .cloned()
412 .collect::<Vec<_>>()
413 })
414 .unwrap_or_default();
415 let supports_exception_filters = capabilities
416 .supports_exception_filter_options
417 .unwrap_or_default();
418 let this = self.clone();
419 let worktree = self.worktree().clone();
420 let configuration_sequence = cx.spawn({
421 async move |_, cx| {
422 let breakpoint_store =
423 dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
424 initialized_rx.await?;
425 let errors_by_path = cx
426 .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
427 .await;
428
429 dap_store.update(cx, |_, cx| {
430 let Some(worktree) = worktree.upgrade() else {
431 return;
432 };
433
434 for (path, error) in &errors_by_path {
435 log::error!("failed to set breakpoints for {path:?}: {error}");
436 }
437
438 if let Some(failed_path) = errors_by_path.keys().next() {
439 let failed_path = failed_path
440 .strip_prefix(worktree.read(cx).abs_path())
441 .unwrap_or(failed_path)
442 .display();
443 let message = format!(
444 "Failed to set breakpoints for {failed_path}{}",
445 match errors_by_path.len() {
446 0 => unreachable!(),
447 1 => "".into(),
448 2 => " and 1 other path".into(),
449 n => format!(" and {} other paths", n - 1),
450 }
451 );
452 cx.emit(super::dap_store::DapStoreEvent::Notification(message));
453 }
454 })?;
455
456 this.send_exception_breakpoints(exception_filters, supports_exception_filters)
457 .await
458 .ok();
459 let ret = if configuration_done_supported {
460 this.request(ConfigurationDone {})
461 } else {
462 Task::ready(Ok(()))
463 }
464 .await;
465 ret
466 }
467 });
468
469 let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
470
471 cx.spawn(async move |this, cx| {
472 let result = task.await;
473
474 this.update(cx, |this, cx| {
475 if let Some(this) = this.as_running_mut() {
476 this.is_started = true;
477 cx.notify();
478 }
479 })
480 .ok();
481
482 result?;
483 anyhow::Ok(())
484 })
485 }
486
487 fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
488 let client = self.client.clone();
489 let messages_tx = self.messages_tx.clone();
490 let message_handler = Box::new(move |message| {
491 messages_tx.unbounded_send(message).ok();
492 });
493 if client.should_reconnect_for_ssh() {
494 Some(cx.spawn(async move |cx| {
495 client.connect(message_handler, cx).await?;
496 anyhow::Ok(())
497 }))
498 } else {
499 None
500 }
501 }
502
503 fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
504 where
505 <R::DapRequest as dap::requests::Request>::Response: 'static,
506 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
507 {
508 let request = Arc::new(request);
509
510 let request_clone = request.clone();
511 let connection = self.client.clone();
512 self.executor.spawn(async move {
513 let args = request_clone.to_dap();
514 let response = connection.request::<R::DapRequest>(args).await?;
515 request.response_from_dap(response)
516 })
517 }
518}
519
520impl Mode {
521 pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
522 where
523 <R::DapRequest as dap::requests::Request>::Response: 'static,
524 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
525 {
526 match self {
527 Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
528 Mode::Building => Task::ready(Err(anyhow!(
529 "no adapter running to send request: {request:?}"
530 ))),
531 }
532 }
533
534 /// Did this debug session stop at least once?
535 pub(crate) fn has_ever_stopped(&self) -> bool {
536 match self {
537 Mode::Building => false,
538 Mode::Running(running_mode) => running_mode.has_ever_stopped,
539 }
540 }
541
542 fn stopped(&mut self) {
543 if let Mode::Running(running) = self {
544 running.has_ever_stopped = true;
545 }
546 }
547}
548
549#[derive(Default)]
550struct ThreadStates {
551 global_state: Option<ThreadStatus>,
552 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
553}
554
555impl ThreadStates {
556 fn stop_all_threads(&mut self) {
557 self.global_state = Some(ThreadStatus::Stopped);
558 self.known_thread_states.clear();
559 }
560
561 fn exit_all_threads(&mut self) {
562 self.global_state = Some(ThreadStatus::Exited);
563 self.known_thread_states.clear();
564 }
565
566 fn continue_all_threads(&mut self) {
567 self.global_state = Some(ThreadStatus::Running);
568 self.known_thread_states.clear();
569 }
570
571 fn stop_thread(&mut self, thread_id: ThreadId) {
572 self.known_thread_states
573 .insert(thread_id, ThreadStatus::Stopped);
574 }
575
576 fn continue_thread(&mut self, thread_id: ThreadId) {
577 self.known_thread_states
578 .insert(thread_id, ThreadStatus::Running);
579 }
580
581 fn process_step(&mut self, thread_id: ThreadId) {
582 self.known_thread_states
583 .insert(thread_id, ThreadStatus::Stepping);
584 }
585
586 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
587 self.thread_state(thread_id)
588 .unwrap_or(ThreadStatus::Running)
589 }
590
591 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
592 self.known_thread_states
593 .get(&thread_id)
594 .copied()
595 .or(self.global_state)
596 }
597
598 fn exit_thread(&mut self, thread_id: ThreadId) {
599 self.known_thread_states
600 .insert(thread_id, ThreadStatus::Exited);
601 }
602
603 fn any_stopped_thread(&self) -> bool {
604 self.global_state
605 .is_some_and(|state| state == ThreadStatus::Stopped)
606 || self
607 .known_thread_states
608 .values()
609 .any(|status| *status == ThreadStatus::Stopped)
610 }
611}
612const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
613
614type IsEnabled = bool;
615
616#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
617pub struct OutputToken(pub usize);
618/// Represents a current state of a single debug adapter and provides ways to mutate it.
619pub struct Session {
620 pub mode: Mode,
621 id: SessionId,
622 label: SharedString,
623 adapter: DebugAdapterName,
624 pub(super) capabilities: Capabilities,
625 child_session_ids: HashSet<SessionId>,
626 parent_session: Option<Entity<Session>>,
627 modules: Vec<dap::Module>,
628 loaded_sources: Vec<dap::Source>,
629 output_token: OutputToken,
630 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
631 threads: IndexMap<ThreadId, Thread>,
632 thread_states: ThreadStates,
633 variables: HashMap<VariableReference, Vec<dap::Variable>>,
634 stack_frames: IndexMap<StackFrameId, StackFrame>,
635 locations: HashMap<u64, dap::LocationsResponse>,
636 is_session_terminated: bool,
637 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
638 pub(crate) breakpoint_store: Entity<BreakpointStore>,
639 ignore_breakpoints: bool,
640 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
641 background_tasks: Vec<Task<()>>,
642 task_context: TaskContext,
643}
644
645trait CacheableCommand: Any + Send + Sync {
646 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
647 fn dyn_hash(&self, hasher: &mut dyn Hasher);
648 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
649}
650
651impl<T> CacheableCommand for T
652where
653 T: DapCommand + PartialEq + Eq + Hash,
654{
655 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
656 (rhs as &dyn Any)
657 .downcast_ref::<Self>()
658 .map_or(false, |rhs| self == rhs)
659 }
660
661 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
662 T::hash(self, &mut hasher);
663 }
664
665 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
666 self
667 }
668}
669
670pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
671
672impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
673 fn from(request: T) -> Self {
674 Self(Arc::new(request))
675 }
676}
677
678impl PartialEq for RequestSlot {
679 fn eq(&self, other: &Self) -> bool {
680 self.0.dyn_eq(other.0.as_ref())
681 }
682}
683
684impl Eq for RequestSlot {}
685
686impl Hash for RequestSlot {
687 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
688 self.0.dyn_hash(state);
689 (&*self.0 as &dyn Any).type_id().hash(state)
690 }
691}
692
693#[derive(Debug, Clone, Hash, PartialEq, Eq)]
694pub struct CompletionsQuery {
695 pub query: String,
696 pub column: u64,
697 pub line: Option<u64>,
698 pub frame_id: Option<u64>,
699}
700
701impl CompletionsQuery {
702 pub fn new(
703 buffer: &language::Buffer,
704 cursor_position: language::Anchor,
705 frame_id: Option<u64>,
706 ) -> Self {
707 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
708 Self {
709 query: buffer.text(),
710 column: column as u64,
711 frame_id,
712 line: Some(row as u64),
713 }
714 }
715}
716
717#[derive(Debug)]
718pub enum SessionEvent {
719 Modules,
720 LoadedSources,
721 Stopped(Option<ThreadId>),
722 StackTrace,
723 Variables,
724 Threads,
725 InvalidateInlineValue,
726 CapabilitiesLoaded,
727 RunInTerminal {
728 request: RunInTerminalRequestArguments,
729 sender: mpsc::Sender<Result<u32>>,
730 },
731 ConsoleOutput,
732}
733
734#[derive(Clone, Debug, PartialEq, Eq)]
735pub enum SessionStateEvent {
736 Running,
737 Shutdown,
738 Restart,
739 SpawnChildSession {
740 request: StartDebuggingRequestArguments,
741 },
742}
743
744impl EventEmitter<SessionEvent> for Session {}
745impl EventEmitter<SessionStateEvent> for Session {}
746
747// local session will send breakpoint updates to DAP for all new breakpoints
748// remote side will only send breakpoint updates when it is a breakpoint created by that peer
749// BreakpointStore notifies session on breakpoint changes
750impl Session {
751 pub(crate) fn new(
752 breakpoint_store: Entity<BreakpointStore>,
753 session_id: SessionId,
754 parent_session: Option<Entity<Session>>,
755 label: SharedString,
756 adapter: DebugAdapterName,
757 task_context: TaskContext,
758 cx: &mut App,
759 ) -> Entity<Self> {
760 cx.new::<Self>(|cx| {
761 cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
762 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
763 if let Some(local) = (!this.ignore_breakpoints)
764 .then(|| this.as_running_mut())
765 .flatten()
766 {
767 local
768 .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
769 .detach();
770 };
771 }
772 BreakpointStoreEvent::BreakpointsCleared(paths) => {
773 if let Some(local) = (!this.ignore_breakpoints)
774 .then(|| this.as_running_mut())
775 .flatten()
776 {
777 local.unset_breakpoints_from_paths(paths, cx).detach();
778 }
779 }
780 BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
781 })
782 .detach();
783 cx.on_app_quit(Self::on_app_quit).detach();
784
785 let this = Self {
786 mode: Mode::Building,
787 id: session_id,
788 child_session_ids: HashSet::default(),
789 parent_session,
790 capabilities: Capabilities::default(),
791 variables: Default::default(),
792 stack_frames: Default::default(),
793 thread_states: ThreadStates::default(),
794 output_token: OutputToken(0),
795 output: circular_buffer::CircularBuffer::boxed(),
796 requests: HashMap::default(),
797 modules: Vec::default(),
798 loaded_sources: Vec::default(),
799 threads: IndexMap::default(),
800 background_tasks: Vec::default(),
801 locations: Default::default(),
802 is_session_terminated: false,
803 ignore_breakpoints: false,
804 breakpoint_store,
805 exception_breakpoints: Default::default(),
806 label,
807 adapter,
808 task_context,
809 };
810
811 this
812 })
813 }
814
815 pub fn task_context(&self) -> &TaskContext {
816 &self.task_context
817 }
818
819 pub fn worktree(&self) -> Option<Entity<Worktree>> {
820 match &self.mode {
821 Mode::Building => None,
822 Mode::Running(local_mode) => local_mode.worktree.upgrade(),
823 }
824 }
825
826 pub fn boot(
827 &mut self,
828 binary: DebugAdapterBinary,
829 worktree: Entity<Worktree>,
830 dap_store: WeakEntity<DapStore>,
831 cx: &mut Context<Self>,
832 ) -> Task<Result<()>> {
833 let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
834 let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
835
836 let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
837 let mut initialized_tx = Some(initialized_tx);
838 while let Some(message) = message_rx.next().await {
839 if let Message::Event(event) = message {
840 if let Events::Initialized(_) = *event {
841 if let Some(tx) = initialized_tx.take() {
842 tx.send(()).ok();
843 }
844 } else {
845 let Ok(_) = this.update(cx, |session, cx| {
846 session.handle_dap_event(event, cx);
847 }) else {
848 break;
849 };
850 }
851 } else if let Message::Request(request) = message {
852 let Ok(_) = this.update(cx, |this, cx| {
853 if request.command == StartDebugging::COMMAND {
854 this.handle_start_debugging_request(request, cx)
855 .detach_and_log_err(cx);
856 } else if request.command == RunInTerminal::COMMAND {
857 this.handle_run_in_terminal_request(request, cx)
858 .detach_and_log_err(cx);
859 }
860 }) else {
861 break;
862 };
863 }
864 }
865 })];
866 self.background_tasks = background_tasks;
867 let id = self.id;
868 let parent_session = self.parent_session.clone();
869
870 cx.spawn(async move |this, cx| {
871 let mode = RunningMode::new(
872 id,
873 parent_session,
874 worktree.downgrade(),
875 binary.clone(),
876 message_tx,
877 cx,
878 )
879 .await?;
880 this.update(cx, |this, cx| {
881 this.mode = Mode::Running(mode);
882 cx.emit(SessionStateEvent::Running);
883 })?;
884
885 this.update(cx, |session, cx| session.request_initialize(cx))?
886 .await?;
887
888 let result = this
889 .update(cx, |session, cx| {
890 session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
891 })?
892 .await;
893
894 if result.is_err() {
895 let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
896
897 console
898 .send(format!(
899 "Tried to launch debugger with: {}",
900 serde_json::to_string_pretty(&binary.request_args.configuration)
901 .unwrap_or_default(),
902 ))
903 .await
904 .ok();
905 }
906
907 result
908 })
909 }
910
911 pub fn session_id(&self) -> SessionId {
912 self.id
913 }
914
915 pub fn child_session_ids(&self) -> HashSet<SessionId> {
916 self.child_session_ids.clone()
917 }
918
919 pub fn add_child_session_id(&mut self, session_id: SessionId) {
920 self.child_session_ids.insert(session_id);
921 }
922
923 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
924 self.child_session_ids.remove(&session_id);
925 }
926
927 pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
928 self.parent_session
929 .as_ref()
930 .map(|session| session.read(cx).id)
931 }
932
933 pub fn parent_session(&self) -> Option<&Entity<Self>> {
934 self.parent_session.as_ref()
935 }
936
937 pub fn capabilities(&self) -> &Capabilities {
938 &self.capabilities
939 }
940
941 pub fn binary(&self) -> Option<&DebugAdapterBinary> {
942 match &self.mode {
943 Mode::Building => None,
944 Mode::Running(running_mode) => Some(&running_mode.binary),
945 }
946 }
947
948 pub fn adapter(&self) -> DebugAdapterName {
949 self.adapter.clone()
950 }
951
952 pub fn label(&self) -> SharedString {
953 self.label.clone()
954 }
955
956 pub fn is_terminated(&self) -> bool {
957 self.is_session_terminated
958 }
959
960 pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
961 let (tx, mut rx) = mpsc::unbounded();
962
963 cx.spawn(async move |this, cx| {
964 while let Some(output) = rx.next().await {
965 this.update(cx, |this, cx| {
966 let event = dap::OutputEvent {
967 category: None,
968 output,
969 group: None,
970 variables_reference: None,
971 source: None,
972 line: None,
973 column: None,
974 data: None,
975 location_reference: None,
976 };
977 this.push_output(event, cx);
978 })?;
979 }
980 anyhow::Ok(())
981 })
982 .detach();
983
984 return tx;
985 }
986
987 pub fn is_started(&self) -> bool {
988 match &self.mode {
989 Mode::Building => false,
990 Mode::Running(running) => running.is_started,
991 }
992 }
993
994 pub fn is_building(&self) -> bool {
995 matches!(self.mode, Mode::Building)
996 }
997
998 pub fn is_running(&self) -> bool {
999 matches!(self.mode, Mode::Running(_))
1000 }
1001
1002 pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1003 match &mut self.mode {
1004 Mode::Running(local_mode) => Some(local_mode),
1005 Mode::Building => None,
1006 }
1007 }
1008
1009 pub fn as_running(&self) -> Option<&RunningMode> {
1010 match &self.mode {
1011 Mode::Running(local_mode) => Some(local_mode),
1012 Mode::Building => None,
1013 }
1014 }
1015
1016 fn handle_start_debugging_request(
1017 &mut self,
1018 request: dap::messages::Request,
1019 cx: &mut Context<Self>,
1020 ) -> Task<Result<()>> {
1021 let request_seq = request.seq;
1022
1023 let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1024 .arguments
1025 .as_ref()
1026 .map(|value| serde_json::from_value(value.clone()));
1027
1028 let mut success = true;
1029 if let Some(Ok(request)) = launch_request {
1030 cx.emit(SessionStateEvent::SpawnChildSession { request });
1031 } else {
1032 log::error!(
1033 "Failed to parse launch request arguments: {:?}",
1034 request.arguments
1035 );
1036 success = false;
1037 }
1038
1039 cx.spawn(async move |this, cx| {
1040 this.update(cx, |this, cx| {
1041 this.respond_to_client(
1042 request_seq,
1043 success,
1044 StartDebugging::COMMAND.to_string(),
1045 None,
1046 cx,
1047 )
1048 })?
1049 .await
1050 })
1051 }
1052
1053 fn handle_run_in_terminal_request(
1054 &mut self,
1055 request: dap::messages::Request,
1056 cx: &mut Context<Self>,
1057 ) -> Task<Result<()>> {
1058 let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1059 request.arguments.unwrap_or_default(),
1060 ) {
1061 Ok(args) => args,
1062 Err(error) => {
1063 return cx.spawn(async move |session, cx| {
1064 let error = serde_json::to_value(dap::ErrorResponse {
1065 error: Some(dap::Message {
1066 id: request.seq,
1067 format: error.to_string(),
1068 variables: None,
1069 send_telemetry: None,
1070 show_user: None,
1071 url: None,
1072 url_label: None,
1073 }),
1074 })
1075 .ok();
1076
1077 session
1078 .update(cx, |this, cx| {
1079 this.respond_to_client(
1080 request.seq,
1081 false,
1082 StartDebugging::COMMAND.to_string(),
1083 error,
1084 cx,
1085 )
1086 })?
1087 .await?;
1088
1089 Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1090 });
1091 }
1092 };
1093
1094 let seq = request.seq;
1095
1096 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1097 cx.emit(SessionEvent::RunInTerminal {
1098 request: request_args,
1099 sender: tx,
1100 });
1101 cx.notify();
1102
1103 cx.spawn(async move |session, cx| {
1104 let result = util::maybe!(async move {
1105 rx.next().await.ok_or_else(|| {
1106 anyhow!("failed to receive response from spawn terminal".to_string())
1107 })?
1108 })
1109 .await;
1110 let (success, body) = match result {
1111 Ok(pid) => (
1112 true,
1113 serde_json::to_value(dap::RunInTerminalResponse {
1114 process_id: None,
1115 shell_process_id: Some(pid as u64),
1116 })
1117 .ok(),
1118 ),
1119 Err(error) => (
1120 false,
1121 serde_json::to_value(dap::ErrorResponse {
1122 error: Some(dap::Message {
1123 id: seq,
1124 format: error.to_string(),
1125 variables: None,
1126 send_telemetry: None,
1127 show_user: None,
1128 url: None,
1129 url_label: None,
1130 }),
1131 })
1132 .ok(),
1133 ),
1134 };
1135
1136 session
1137 .update(cx, |session, cx| {
1138 session.respond_to_client(
1139 seq,
1140 success,
1141 RunInTerminal::COMMAND.to_string(),
1142 body,
1143 cx,
1144 )
1145 })?
1146 .await
1147 })
1148 }
1149
1150 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1151 let adapter_id = self.adapter().to_string();
1152 let request = Initialize { adapter_id };
1153
1154 let Mode::Running(running) = &self.mode else {
1155 return Task::ready(Err(anyhow!(
1156 "Cannot send initialize request, task still building"
1157 )));
1158 };
1159 let mut response = running.request(request.clone());
1160
1161 cx.spawn(async move |this, cx| {
1162 loop {
1163 let capabilities = response.await;
1164 match capabilities {
1165 Err(e) => {
1166 let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1167 this.as_running()
1168 .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1169 }) else {
1170 return Err(e);
1171 };
1172 log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1173 reconnect.await?;
1174
1175 let Ok(Some(r)) = this.update(cx, |this, _| {
1176 this.as_running()
1177 .map(|running| running.request(request.clone()))
1178 }) else {
1179 return Err(e);
1180 };
1181 response = r
1182 }
1183 Ok(capabilities) => {
1184 this.update(cx, |session, cx| {
1185 session.capabilities = capabilities;
1186 let filters = session
1187 .capabilities
1188 .exception_breakpoint_filters
1189 .clone()
1190 .unwrap_or_default();
1191 for filter in filters {
1192 let default = filter.default.unwrap_or_default();
1193 session
1194 .exception_breakpoints
1195 .entry(filter.filter.clone())
1196 .or_insert_with(|| (filter, default));
1197 }
1198 cx.emit(SessionEvent::CapabilitiesLoaded);
1199 })?;
1200 return Ok(());
1201 }
1202 }
1203 }
1204 })
1205 }
1206
1207 pub(super) fn initialize_sequence(
1208 &mut self,
1209 initialize_rx: oneshot::Receiver<()>,
1210 dap_store: WeakEntity<DapStore>,
1211 cx: &mut Context<Self>,
1212 ) -> Task<Result<()>> {
1213 match &self.mode {
1214 Mode::Running(local_mode) => {
1215 local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1216 }
1217 Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1218 }
1219 }
1220
1221 pub fn run_to_position(
1222 &mut self,
1223 breakpoint: SourceBreakpoint,
1224 active_thread_id: ThreadId,
1225 cx: &mut Context<Self>,
1226 ) {
1227 match &mut self.mode {
1228 Mode::Running(local_mode) => {
1229 if !matches!(
1230 self.thread_states.thread_state(active_thread_id),
1231 Some(ThreadStatus::Stopped)
1232 ) {
1233 return;
1234 };
1235 let path = breakpoint.path.clone();
1236 local_mode.tmp_breakpoint = Some(breakpoint);
1237 let task = local_mode.send_breakpoints_from_path(
1238 path,
1239 BreakpointUpdatedReason::Toggled,
1240 &self.breakpoint_store,
1241 cx,
1242 );
1243
1244 cx.spawn(async move |this, cx| {
1245 task.await;
1246 this.update(cx, |this, cx| {
1247 this.continue_thread(active_thread_id, cx);
1248 })
1249 })
1250 .detach();
1251 }
1252 Mode::Building => {}
1253 }
1254 }
1255
1256 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1257 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1258 }
1259
1260 pub fn output(
1261 &self,
1262 since: OutputToken,
1263 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1264 if self.output_token.0 == 0 {
1265 return (self.output.range(0..0), OutputToken(0));
1266 };
1267
1268 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1269
1270 let clamped_events_since = events_since.clamp(0, self.output.len());
1271 (
1272 self.output
1273 .range(self.output.len() - clamped_events_since..),
1274 self.output_token,
1275 )
1276 }
1277
1278 pub fn respond_to_client(
1279 &self,
1280 request_seq: u64,
1281 success: bool,
1282 command: String,
1283 body: Option<serde_json::Value>,
1284 cx: &mut Context<Self>,
1285 ) -> Task<Result<()>> {
1286 let Some(local_session) = self.as_running() else {
1287 unreachable!("Cannot respond to remote client");
1288 };
1289 let client = local_session.client.clone();
1290
1291 cx.background_spawn(async move {
1292 client
1293 .send_message(Message::Response(Response {
1294 body,
1295 success,
1296 command,
1297 seq: request_seq + 1,
1298 request_seq,
1299 message: None,
1300 }))
1301 .await
1302 })
1303 }
1304
1305 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1306 self.mode.stopped();
1307 // todo(debugger): Find a clean way to get around the clone
1308 let breakpoint_store = self.breakpoint_store.clone();
1309 if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1310 let breakpoint = local.tmp_breakpoint.take()?;
1311 let path = breakpoint.path.clone();
1312 Some((local, path))
1313 }) {
1314 local
1315 .send_breakpoints_from_path(
1316 path,
1317 BreakpointUpdatedReason::Toggled,
1318 &breakpoint_store,
1319 cx,
1320 )
1321 .detach();
1322 };
1323
1324 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1325 self.thread_states.stop_all_threads();
1326 self.invalidate_command_type::<StackTraceCommand>();
1327 }
1328
1329 // Event if we stopped all threads we still need to insert the thread_id
1330 // to our own data
1331 if let Some(thread_id) = event.thread_id {
1332 self.thread_states.stop_thread(ThreadId(thread_id));
1333
1334 self.invalidate_state(
1335 &StackTraceCommand {
1336 thread_id,
1337 start_frame: None,
1338 levels: None,
1339 }
1340 .into(),
1341 );
1342 }
1343
1344 self.invalidate_generic();
1345 self.threads.clear();
1346 self.variables.clear();
1347 cx.emit(SessionEvent::Stopped(
1348 event
1349 .thread_id
1350 .map(Into::into)
1351 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1352 ));
1353 cx.emit(SessionEvent::InvalidateInlineValue);
1354 cx.notify();
1355 }
1356
1357 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1358 match *event {
1359 Events::Initialized(_) => {
1360 debug_assert!(
1361 false,
1362 "Initialized event should have been handled in LocalMode"
1363 );
1364 }
1365 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1366 Events::Continued(event) => {
1367 if event.all_threads_continued.unwrap_or_default() {
1368 self.thread_states.continue_all_threads();
1369 self.breakpoint_store.update(cx, |store, cx| {
1370 store.remove_active_position(Some(self.session_id()), cx)
1371 });
1372 } else {
1373 self.thread_states
1374 .continue_thread(ThreadId(event.thread_id));
1375 }
1376 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1377 self.invalidate_generic();
1378 }
1379 Events::Exited(_event) => {
1380 self.clear_active_debug_line(cx);
1381 }
1382 Events::Terminated(_) => {
1383 self.shutdown(cx).detach();
1384 }
1385 Events::Thread(event) => {
1386 let thread_id = ThreadId(event.thread_id);
1387
1388 match event.reason {
1389 dap::ThreadEventReason::Started => {
1390 self.thread_states.continue_thread(thread_id);
1391 }
1392 dap::ThreadEventReason::Exited => {
1393 self.thread_states.exit_thread(thread_id);
1394 }
1395 reason => {
1396 log::error!("Unhandled thread event reason {:?}", reason);
1397 }
1398 }
1399 self.invalidate_state(&ThreadsCommand.into());
1400 cx.notify();
1401 }
1402 Events::Output(event) => {
1403 if event
1404 .category
1405 .as_ref()
1406 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1407 {
1408 return;
1409 }
1410
1411 self.push_output(event, cx);
1412 cx.notify();
1413 }
1414 Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1415 store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1416 }),
1417 Events::Module(event) => {
1418 match event.reason {
1419 dap::ModuleEventReason::New => {
1420 self.modules.push(event.module);
1421 }
1422 dap::ModuleEventReason::Changed => {
1423 if let Some(module) = self
1424 .modules
1425 .iter_mut()
1426 .find(|other| event.module.id == other.id)
1427 {
1428 *module = event.module;
1429 }
1430 }
1431 dap::ModuleEventReason::Removed => {
1432 self.modules.retain(|other| event.module.id != other.id);
1433 }
1434 }
1435
1436 // todo(debugger): We should only send the invalidate command to downstream clients.
1437 // self.invalidate_state(&ModulesCommand.into());
1438 }
1439 Events::LoadedSource(_) => {
1440 self.invalidate_state(&LoadedSourcesCommand.into());
1441 }
1442 Events::Capabilities(event) => {
1443 self.capabilities = self.capabilities.merge(event.capabilities);
1444 cx.notify();
1445 }
1446 Events::Memory(_) => {}
1447 Events::Process(_) => {}
1448 Events::ProgressEnd(_) => {}
1449 Events::ProgressStart(_) => {}
1450 Events::ProgressUpdate(_) => {}
1451 Events::Invalidated(_) => {}
1452 Events::Other(_) => {}
1453 }
1454 }
1455
1456 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1457 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1458 &mut self,
1459 request: T,
1460 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1461 cx: &mut Context<Self>,
1462 ) {
1463 const {
1464 assert!(
1465 T::CACHEABLE,
1466 "Only requests marked as cacheable should invoke `fetch`"
1467 );
1468 }
1469
1470 if !self.thread_states.any_stopped_thread()
1471 && request.type_id() != TypeId::of::<ThreadsCommand>()
1472 || self.is_session_terminated
1473 {
1474 return;
1475 }
1476
1477 let request_map = self
1478 .requests
1479 .entry(std::any::TypeId::of::<T>())
1480 .or_default();
1481
1482 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1483 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1484
1485 let task = Self::request_inner::<Arc<T>>(
1486 &self.capabilities,
1487 &self.mode,
1488 command,
1489 |this, result, cx| {
1490 process_result(this, result, cx);
1491 None
1492 },
1493 cx,
1494 );
1495 let task = cx
1496 .background_executor()
1497 .spawn(async move {
1498 let _ = task.await?;
1499 Some(())
1500 })
1501 .shared();
1502
1503 vacant.insert(task);
1504 cx.notify();
1505 }
1506 }
1507
1508 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1509 capabilities: &Capabilities,
1510 mode: &Mode,
1511 request: T,
1512 process_result: impl FnOnce(
1513 &mut Self,
1514 Result<T::Response>,
1515 &mut Context<Self>,
1516 ) -> Option<T::Response>
1517 + 'static,
1518 cx: &mut Context<Self>,
1519 ) -> Task<Option<T::Response>> {
1520 if !T::is_supported(&capabilities) {
1521 log::warn!(
1522 "Attempted to send a DAP request that isn't supported: {:?}",
1523 request
1524 );
1525 let error = Err(anyhow::Error::msg(
1526 "Couldn't complete request because it's not supported",
1527 ));
1528 return cx.spawn(async move |this, cx| {
1529 this.update(cx, |this, cx| process_result(this, error, cx))
1530 .ok()
1531 .flatten()
1532 });
1533 }
1534
1535 let request = mode.request_dap(request);
1536 cx.spawn(async move |this, cx| {
1537 let result = request.await;
1538 this.update(cx, |this, cx| process_result(this, result, cx))
1539 .ok()
1540 .flatten()
1541 })
1542 }
1543
1544 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1545 &self,
1546 request: T,
1547 process_result: impl FnOnce(
1548 &mut Self,
1549 Result<T::Response>,
1550 &mut Context<Self>,
1551 ) -> Option<T::Response>
1552 + 'static,
1553 cx: &mut Context<Self>,
1554 ) -> Task<Option<T::Response>> {
1555 Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1556 }
1557
1558 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1559 self.requests.remove(&std::any::TypeId::of::<Command>());
1560 }
1561
1562 fn invalidate_generic(&mut self) {
1563 self.invalidate_command_type::<ModulesCommand>();
1564 self.invalidate_command_type::<LoadedSourcesCommand>();
1565 self.invalidate_command_type::<ThreadsCommand>();
1566 }
1567
1568 fn invalidate_state(&mut self, key: &RequestSlot) {
1569 self.requests
1570 .entry((&*key.0 as &dyn Any).type_id())
1571 .and_modify(|request_map| {
1572 request_map.remove(&key);
1573 });
1574 }
1575
1576 fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1577 self.output.push_back(event);
1578 self.output_token.0 += 1;
1579 cx.emit(SessionEvent::ConsoleOutput);
1580 }
1581
1582 pub fn any_stopped_thread(&self) -> bool {
1583 self.thread_states.any_stopped_thread()
1584 }
1585
1586 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1587 self.thread_states.thread_status(thread_id)
1588 }
1589
1590 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1591 self.fetch(
1592 dap_command::ThreadsCommand,
1593 |this, result, cx| {
1594 let Some(result) = result.log_err() else {
1595 return;
1596 };
1597
1598 this.threads = result
1599 .into_iter()
1600 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1601 .collect();
1602
1603 this.invalidate_command_type::<StackTraceCommand>();
1604 cx.emit(SessionEvent::Threads);
1605 cx.notify();
1606 },
1607 cx,
1608 );
1609
1610 self.threads
1611 .values()
1612 .map(|thread| {
1613 (
1614 thread.dap.clone(),
1615 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1616 )
1617 })
1618 .collect()
1619 }
1620
1621 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1622 self.fetch(
1623 dap_command::ModulesCommand,
1624 |this, result, cx| {
1625 let Some(result) = result.log_err() else {
1626 return;
1627 };
1628
1629 this.modules = result;
1630 cx.emit(SessionEvent::Modules);
1631 cx.notify();
1632 },
1633 cx,
1634 );
1635
1636 &self.modules
1637 }
1638
1639 pub fn ignore_breakpoints(&self) -> bool {
1640 self.ignore_breakpoints
1641 }
1642
1643 pub fn toggle_ignore_breakpoints(
1644 &mut self,
1645 cx: &mut App,
1646 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1647 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1648 }
1649
1650 pub(crate) fn set_ignore_breakpoints(
1651 &mut self,
1652 ignore: bool,
1653 cx: &mut App,
1654 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1655 if self.ignore_breakpoints == ignore {
1656 return Task::ready(HashMap::default());
1657 }
1658
1659 self.ignore_breakpoints = ignore;
1660
1661 if let Some(local) = self.as_running() {
1662 local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1663 } else {
1664 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1665 unimplemented!()
1666 }
1667 }
1668
1669 pub fn exception_breakpoints(
1670 &self,
1671 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1672 self.exception_breakpoints.values()
1673 }
1674
1675 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1676 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1677 *is_enabled = !*is_enabled;
1678 self.send_exception_breakpoints(cx);
1679 }
1680 }
1681
1682 fn send_exception_breakpoints(&mut self, cx: &App) {
1683 if let Some(local) = self.as_running() {
1684 let exception_filters = self
1685 .exception_breakpoints
1686 .values()
1687 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1688 .collect();
1689
1690 let supports_exception_filters = self
1691 .capabilities
1692 .supports_exception_filter_options
1693 .unwrap_or_default();
1694 local
1695 .send_exception_breakpoints(exception_filters, supports_exception_filters)
1696 .detach_and_log_err(cx);
1697 } else {
1698 debug_assert!(false, "Not implemented");
1699 }
1700 }
1701
1702 pub fn breakpoints_enabled(&self) -> bool {
1703 self.ignore_breakpoints
1704 }
1705
1706 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1707 self.fetch(
1708 dap_command::LoadedSourcesCommand,
1709 |this, result, cx| {
1710 let Some(result) = result.log_err() else {
1711 return;
1712 };
1713 this.loaded_sources = result;
1714 cx.emit(SessionEvent::LoadedSources);
1715 cx.notify();
1716 },
1717 cx,
1718 );
1719
1720 &self.loaded_sources
1721 }
1722
1723 fn fallback_to_manual_restart(
1724 &mut self,
1725 res: Result<()>,
1726 cx: &mut Context<Self>,
1727 ) -> Option<()> {
1728 if res.log_err().is_none() {
1729 cx.emit(SessionStateEvent::Restart);
1730 return None;
1731 }
1732 Some(())
1733 }
1734
1735 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1736 res.log_err()?;
1737 Some(())
1738 }
1739
1740 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1741 thread_id: ThreadId,
1742 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1743 {
1744 move |this, response, cx| match response.log_err() {
1745 Some(response) => {
1746 this.breakpoint_store.update(cx, |store, cx| {
1747 store.remove_active_position(Some(this.session_id()), cx)
1748 });
1749 Some(response)
1750 }
1751 None => {
1752 this.thread_states.stop_thread(thread_id);
1753 cx.notify();
1754 None
1755 }
1756 }
1757 }
1758
1759 fn clear_active_debug_line_response(
1760 &mut self,
1761 response: Result<()>,
1762 cx: &mut Context<Session>,
1763 ) -> Option<()> {
1764 response.log_err()?;
1765 self.clear_active_debug_line(cx);
1766 Some(())
1767 }
1768
1769 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1770 self.breakpoint_store.update(cx, |store, cx| {
1771 store.remove_active_position(Some(self.id), cx)
1772 });
1773 }
1774
1775 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1776 self.request(
1777 PauseCommand {
1778 thread_id: thread_id.0,
1779 },
1780 Self::empty_response,
1781 cx,
1782 )
1783 .detach();
1784 }
1785
1786 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1787 self.request(
1788 RestartStackFrameCommand { stack_frame_id },
1789 Self::empty_response,
1790 cx,
1791 )
1792 .detach();
1793 }
1794
1795 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1796 if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1797 self.request(
1798 RestartCommand {
1799 raw: args.unwrap_or(Value::Null),
1800 },
1801 Self::fallback_to_manual_restart,
1802 cx,
1803 )
1804 .detach();
1805 } else {
1806 cx.emit(SessionStateEvent::Restart);
1807 }
1808 }
1809
1810 fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1811 let debug_adapter = self.adapter_client();
1812
1813 cx.background_spawn(async move {
1814 if let Some(client) = debug_adapter {
1815 client.shutdown().await.log_err();
1816 }
1817 })
1818 }
1819
1820 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1821 self.is_session_terminated = true;
1822 self.thread_states.exit_all_threads();
1823 cx.notify();
1824
1825 let task = if self
1826 .capabilities
1827 .supports_terminate_request
1828 .unwrap_or_default()
1829 {
1830 self.request(
1831 TerminateCommand {
1832 restart: Some(false),
1833 },
1834 Self::clear_active_debug_line_response,
1835 cx,
1836 )
1837 } else {
1838 self.request(
1839 DisconnectCommand {
1840 restart: Some(false),
1841 terminate_debuggee: Some(true),
1842 suspend_debuggee: Some(false),
1843 },
1844 Self::clear_active_debug_line_response,
1845 cx,
1846 )
1847 };
1848
1849 cx.emit(SessionStateEvent::Shutdown);
1850
1851 let debug_client = self.adapter_client();
1852
1853 cx.background_spawn(async move {
1854 let _ = task.await;
1855
1856 if let Some(client) = debug_client {
1857 client.shutdown().await.log_err();
1858 }
1859 })
1860 }
1861
1862 pub fn completions(
1863 &mut self,
1864 query: CompletionsQuery,
1865 cx: &mut Context<Self>,
1866 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1867 let task = self.request(query, |_, result, _| result.log_err(), cx);
1868
1869 cx.background_executor().spawn(async move {
1870 anyhow::Ok(
1871 task.await
1872 .map(|response| response.targets)
1873 .context("failed to fetch completions")?,
1874 )
1875 })
1876 }
1877
1878 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1879 self.thread_states.continue_thread(thread_id);
1880 self.request(
1881 ContinueCommand {
1882 args: ContinueArguments {
1883 thread_id: thread_id.0,
1884 single_thread: Some(true),
1885 },
1886 },
1887 Self::on_step_response::<ContinueCommand>(thread_id),
1888 cx,
1889 )
1890 .detach();
1891 }
1892
1893 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1894 match self.mode {
1895 Mode::Running(ref local) => Some(local.client.clone()),
1896 Mode::Building => None,
1897 }
1898 }
1899
1900 pub fn has_ever_stopped(&self) -> bool {
1901 self.mode.has_ever_stopped()
1902 }
1903 pub fn step_over(
1904 &mut self,
1905 thread_id: ThreadId,
1906 granularity: SteppingGranularity,
1907 cx: &mut Context<Self>,
1908 ) {
1909 let supports_single_thread_execution_requests =
1910 self.capabilities.supports_single_thread_execution_requests;
1911 let supports_stepping_granularity = self
1912 .capabilities
1913 .supports_stepping_granularity
1914 .unwrap_or_default();
1915
1916 let command = NextCommand {
1917 inner: StepCommand {
1918 thread_id: thread_id.0,
1919 granularity: supports_stepping_granularity.then(|| granularity),
1920 single_thread: supports_single_thread_execution_requests,
1921 },
1922 };
1923
1924 self.thread_states.process_step(thread_id);
1925 self.request(
1926 command,
1927 Self::on_step_response::<NextCommand>(thread_id),
1928 cx,
1929 )
1930 .detach();
1931 }
1932
1933 pub fn step_in(
1934 &mut self,
1935 thread_id: ThreadId,
1936 granularity: SteppingGranularity,
1937 cx: &mut Context<Self>,
1938 ) {
1939 let supports_single_thread_execution_requests =
1940 self.capabilities.supports_single_thread_execution_requests;
1941 let supports_stepping_granularity = self
1942 .capabilities
1943 .supports_stepping_granularity
1944 .unwrap_or_default();
1945
1946 let command = StepInCommand {
1947 inner: StepCommand {
1948 thread_id: thread_id.0,
1949 granularity: supports_stepping_granularity.then(|| granularity),
1950 single_thread: supports_single_thread_execution_requests,
1951 },
1952 };
1953
1954 self.thread_states.process_step(thread_id);
1955 self.request(
1956 command,
1957 Self::on_step_response::<StepInCommand>(thread_id),
1958 cx,
1959 )
1960 .detach();
1961 }
1962
1963 pub fn step_out(
1964 &mut self,
1965 thread_id: ThreadId,
1966 granularity: SteppingGranularity,
1967 cx: &mut Context<Self>,
1968 ) {
1969 let supports_single_thread_execution_requests =
1970 self.capabilities.supports_single_thread_execution_requests;
1971 let supports_stepping_granularity = self
1972 .capabilities
1973 .supports_stepping_granularity
1974 .unwrap_or_default();
1975
1976 let command = StepOutCommand {
1977 inner: StepCommand {
1978 thread_id: thread_id.0,
1979 granularity: supports_stepping_granularity.then(|| granularity),
1980 single_thread: supports_single_thread_execution_requests,
1981 },
1982 };
1983
1984 self.thread_states.process_step(thread_id);
1985 self.request(
1986 command,
1987 Self::on_step_response::<StepOutCommand>(thread_id),
1988 cx,
1989 )
1990 .detach();
1991 }
1992
1993 pub fn step_back(
1994 &mut self,
1995 thread_id: ThreadId,
1996 granularity: SteppingGranularity,
1997 cx: &mut Context<Self>,
1998 ) {
1999 let supports_single_thread_execution_requests =
2000 self.capabilities.supports_single_thread_execution_requests;
2001 let supports_stepping_granularity = self
2002 .capabilities
2003 .supports_stepping_granularity
2004 .unwrap_or_default();
2005
2006 let command = StepBackCommand {
2007 inner: StepCommand {
2008 thread_id: thread_id.0,
2009 granularity: supports_stepping_granularity.then(|| granularity),
2010 single_thread: supports_single_thread_execution_requests,
2011 },
2012 };
2013
2014 self.thread_states.process_step(thread_id);
2015
2016 self.request(
2017 command,
2018 Self::on_step_response::<StepBackCommand>(thread_id),
2019 cx,
2020 )
2021 .detach();
2022 }
2023
2024 pub fn stack_frames(
2025 &mut self,
2026 thread_id: ThreadId,
2027 cx: &mut Context<Self>,
2028 ) -> Result<Vec<StackFrame>> {
2029 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2030 && self.requests.contains_key(&ThreadsCommand.type_id())
2031 && self.threads.contains_key(&thread_id)
2032 // ^ todo(debugger): We need a better way to check that we're not querying stale data
2033 // We could still be using an old thread id and have sent a new thread's request
2034 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2035 // But it very well could cause a minor bug in the future that is hard to track down
2036 {
2037 self.fetch(
2038 super::dap_command::StackTraceCommand {
2039 thread_id: thread_id.0,
2040 start_frame: None,
2041 levels: None,
2042 },
2043 move |this, stack_frames, cx| {
2044 let entry =
2045 this.threads
2046 .entry(thread_id)
2047 .and_modify(|thread| match &stack_frames {
2048 Ok(stack_frames) => {
2049 thread.stack_frames = stack_frames
2050 .iter()
2051 .cloned()
2052 .map(StackFrame::from)
2053 .collect();
2054 thread.stack_frames_error = None;
2055 }
2056 Err(error) => {
2057 thread.stack_frames.clear();
2058 thread.stack_frames_error = Some(error.cloned());
2059 }
2060 });
2061 debug_assert!(
2062 matches!(entry, indexmap::map::Entry::Occupied(_)),
2063 "Sent request for thread_id that doesn't exist"
2064 );
2065 if let Ok(stack_frames) = stack_frames {
2066 this.stack_frames.extend(
2067 stack_frames
2068 .into_iter()
2069 .filter(|frame| {
2070 // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2071 // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2072 !(frame.id == 0
2073 && frame.line == 0
2074 && frame.column == 0
2075 && frame.presentation_hint
2076 == Some(StackFramePresentationHint::Label))
2077 })
2078 .map(|frame| (frame.id, StackFrame::from(frame))),
2079 );
2080 }
2081
2082 this.invalidate_command_type::<ScopesCommand>();
2083 this.invalidate_command_type::<VariablesCommand>();
2084
2085 cx.emit(SessionEvent::StackTrace);
2086 },
2087 cx,
2088 );
2089 }
2090
2091 match self.threads.get(&thread_id) {
2092 Some(thread) => {
2093 if let Some(error) = &thread.stack_frames_error {
2094 Err(error.cloned())
2095 } else {
2096 Ok(thread.stack_frames.clone())
2097 }
2098 }
2099 None => Ok(Vec::new()),
2100 }
2101 }
2102
2103 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2104 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2105 && self
2106 .requests
2107 .contains_key(&TypeId::of::<StackTraceCommand>())
2108 {
2109 self.fetch(
2110 ScopesCommand { stack_frame_id },
2111 move |this, scopes, cx| {
2112 let Some(scopes) = scopes.log_err() else {
2113 return
2114 };
2115
2116 for scope in scopes.iter() {
2117 this.variables(scope.variables_reference, cx);
2118 }
2119
2120 let entry = this
2121 .stack_frames
2122 .entry(stack_frame_id)
2123 .and_modify(|stack_frame| {
2124 stack_frame.scopes = scopes;
2125 });
2126
2127 cx.emit(SessionEvent::Variables);
2128
2129 debug_assert!(
2130 matches!(entry, indexmap::map::Entry::Occupied(_)),
2131 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2132 );
2133 },
2134 cx,
2135 );
2136 }
2137
2138 self.stack_frames
2139 .get(&stack_frame_id)
2140 .map(|frame| frame.scopes.as_slice())
2141 .unwrap_or_default()
2142 }
2143
2144 pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
2145 let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2146 return Vec::new();
2147 };
2148
2149 stack_frame
2150 .scopes
2151 .iter()
2152 .filter_map(|scope| self.variables.get(&scope.variables_reference))
2153 .flatten()
2154 .cloned()
2155 .collect()
2156 }
2157
2158 pub fn variables(
2159 &mut self,
2160 variables_reference: VariableReference,
2161 cx: &mut Context<Self>,
2162 ) -> Vec<dap::Variable> {
2163 let command = VariablesCommand {
2164 variables_reference,
2165 filter: None,
2166 start: None,
2167 count: None,
2168 format: None,
2169 };
2170
2171 self.fetch(
2172 command,
2173 move |this, variables, cx| {
2174 let Some(variables) = variables.log_err() else {
2175 return;
2176 };
2177
2178 this.variables.insert(variables_reference, variables);
2179
2180 cx.emit(SessionEvent::Variables);
2181 cx.emit(SessionEvent::InvalidateInlineValue);
2182 },
2183 cx,
2184 );
2185
2186 self.variables
2187 .get(&variables_reference)
2188 .cloned()
2189 .unwrap_or_default()
2190 }
2191
2192 pub fn set_variable_value(
2193 &mut self,
2194 variables_reference: u64,
2195 name: String,
2196 value: String,
2197 cx: &mut Context<Self>,
2198 ) {
2199 if self.capabilities.supports_set_variable.unwrap_or_default() {
2200 self.request(
2201 SetVariableValueCommand {
2202 name,
2203 value,
2204 variables_reference,
2205 },
2206 move |this, response, cx| {
2207 let response = response.log_err()?;
2208 this.invalidate_command_type::<VariablesCommand>();
2209 cx.emit(SessionEvent::Variables);
2210 Some(response)
2211 },
2212 cx,
2213 )
2214 .detach()
2215 }
2216 }
2217
2218 pub fn evaluate(
2219 &mut self,
2220 expression: String,
2221 context: Option<EvaluateArgumentsContext>,
2222 frame_id: Option<u64>,
2223 source: Option<Source>,
2224 cx: &mut Context<Self>,
2225 ) -> Task<()> {
2226 let event = dap::OutputEvent {
2227 category: None,
2228 output: format!("> {expression}"),
2229 group: None,
2230 variables_reference: None,
2231 source: None,
2232 line: None,
2233 column: None,
2234 data: None,
2235 location_reference: None,
2236 };
2237 self.push_output(event, cx);
2238 let request = self.mode.request_dap(EvaluateCommand {
2239 expression,
2240 context,
2241 frame_id,
2242 source,
2243 });
2244 cx.spawn(async move |this, cx| {
2245 let response = request.await;
2246 this.update(cx, |this, cx| {
2247 match response {
2248 Ok(response) => {
2249 let event = dap::OutputEvent {
2250 category: None,
2251 output: format!("< {}", &response.result),
2252 group: None,
2253 variables_reference: Some(response.variables_reference),
2254 source: None,
2255 line: None,
2256 column: None,
2257 data: None,
2258 location_reference: None,
2259 };
2260 this.push_output(event, cx);
2261 }
2262 Err(e) => {
2263 let event = dap::OutputEvent {
2264 category: None,
2265 output: format!("{}", e),
2266 group: None,
2267 variables_reference: None,
2268 source: None,
2269 line: None,
2270 column: None,
2271 data: None,
2272 location_reference: None,
2273 };
2274 this.push_output(event, cx);
2275 }
2276 };
2277 cx.notify();
2278 })
2279 .ok();
2280 })
2281 }
2282
2283 pub fn location(
2284 &mut self,
2285 reference: u64,
2286 cx: &mut Context<Self>,
2287 ) -> Option<dap::LocationsResponse> {
2288 self.fetch(
2289 LocationsCommand { reference },
2290 move |this, response, _| {
2291 let Some(response) = response.log_err() else {
2292 return;
2293 };
2294 this.locations.insert(reference, response);
2295 },
2296 cx,
2297 );
2298 self.locations.get(&reference).cloned()
2299 }
2300
2301 pub fn is_attached(&self) -> bool {
2302 let Mode::Running(local_mode) = &self.mode else {
2303 return false;
2304 };
2305 local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2306 }
2307
2308 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2309 let command = DisconnectCommand {
2310 restart: Some(false),
2311 terminate_debuggee: Some(false),
2312 suspend_debuggee: Some(false),
2313 };
2314
2315 self.request(command, Self::empty_response, cx).detach()
2316 }
2317
2318 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2319 if self
2320 .capabilities
2321 .supports_terminate_threads_request
2322 .unwrap_or_default()
2323 {
2324 self.request(
2325 TerminateThreadsCommand {
2326 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2327 },
2328 Self::clear_active_debug_line_response,
2329 cx,
2330 )
2331 .detach();
2332 } else {
2333 self.shutdown(cx).detach();
2334 }
2335 }
2336
2337 pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2338 self.thread_states.thread_state(thread_id)
2339 }
2340}