1use crate::debugger::breakpoint_store::BreakpointSessionState;
2
3use super::breakpoint_store::{
4 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
5};
6use super::dap_command::{
7 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
8 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
9 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
10 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
11 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
12 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
13};
14use super::dap_store::DapStore;
15use anyhow::{Context as _, Result, anyhow};
16use collections::{HashMap, HashSet, IndexMap};
17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
18use dap::messages::Response;
19use dap::requests::{Request, RunInTerminal, StartDebugging};
20use dap::{
21 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
22 SteppingGranularity, StoppedEvent, VariableReference,
23 client::{DebugAdapterClient, SessionId},
24 messages::{Events, Message},
25};
26use dap::{
27 ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
28 RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
29 StartDebuggingRequestArgumentsRequest,
30};
31use futures::SinkExt;
32use futures::channel::mpsc::UnboundedSender;
33use futures::channel::{mpsc, oneshot};
34use futures::{FutureExt, future::Shared};
35use gpui::{
36 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
37 Task, WeakEntity,
38};
39
40use rpc::ErrorExt;
41use serde_json::Value;
42use smol::stream::StreamExt;
43use std::any::TypeId;
44use std::collections::BTreeMap;
45use std::u64;
46use std::{
47 any::Any,
48 collections::hash_map::Entry,
49 hash::{Hash, Hasher},
50 path::Path,
51 sync::Arc,
52};
53use task::TaskContext;
54use text::{PointUtf16, ToPointUtf16};
55use util::ResultExt;
56use worktree::Worktree;
57
58#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
59#[repr(transparent)]
60pub struct ThreadId(pub u64);
61
62impl ThreadId {
63 pub const MIN: ThreadId = ThreadId(u64::MIN);
64 pub const MAX: ThreadId = ThreadId(u64::MAX);
65}
66
67impl From<u64> for ThreadId {
68 fn from(id: u64) -> Self {
69 Self(id)
70 }
71}
72
73#[derive(Clone, Debug)]
74pub struct StackFrame {
75 pub dap: dap::StackFrame,
76 pub scopes: Vec<dap::Scope>,
77}
78
79impl From<dap::StackFrame> for StackFrame {
80 fn from(stack_frame: dap::StackFrame) -> Self {
81 Self {
82 scopes: vec![],
83 dap: stack_frame,
84 }
85 }
86}
87
88#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
89pub enum ThreadStatus {
90 #[default]
91 Running,
92 Stopped,
93 Stepping,
94 Exited,
95 Ended,
96}
97
98impl ThreadStatus {
99 pub fn label(&self) -> &'static str {
100 match self {
101 ThreadStatus::Running => "Running",
102 ThreadStatus::Stopped => "Stopped",
103 ThreadStatus::Stepping => "Stepping",
104 ThreadStatus::Exited => "Exited",
105 ThreadStatus::Ended => "Ended",
106 }
107 }
108}
109
110#[derive(Debug)]
111pub struct Thread {
112 dap: dap::Thread,
113 stack_frames: Vec<StackFrame>,
114 stack_frames_error: Option<anyhow::Error>,
115 _has_stopped: bool,
116}
117
118impl From<dap::Thread> for Thread {
119 fn from(dap: dap::Thread) -> Self {
120 Self {
121 dap,
122 stack_frames: Default::default(),
123 stack_frames_error: None,
124 _has_stopped: false,
125 }
126 }
127}
128
129pub enum Mode {
130 Building,
131 Running(RunningMode),
132}
133
134#[derive(Clone)]
135pub struct RunningMode {
136 client: Arc<DebugAdapterClient>,
137 binary: DebugAdapterBinary,
138 tmp_breakpoint: Option<SourceBreakpoint>,
139 worktree: WeakEntity<Worktree>,
140 executor: BackgroundExecutor,
141 is_started: bool,
142 has_ever_stopped: bool,
143 messages_tx: UnboundedSender<Message>,
144}
145
146fn client_source(abs_path: &Path) -> dap::Source {
147 dap::Source {
148 name: abs_path
149 .file_name()
150 .map(|filename| filename.to_string_lossy().to_string()),
151 path: Some(abs_path.to_string_lossy().to_string()),
152 source_reference: None,
153 presentation_hint: None,
154 origin: None,
155 sources: None,
156 adapter_data: None,
157 checksums: None,
158 }
159}
160
161impl RunningMode {
162 async fn new(
163 session_id: SessionId,
164 parent_session: Option<Entity<Session>>,
165 worktree: WeakEntity<Worktree>,
166 binary: DebugAdapterBinary,
167 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
168 cx: &mut AsyncApp,
169 ) -> Result<Self> {
170 let message_handler = Box::new({
171 let messages_tx = messages_tx.clone();
172 move |message| {
173 messages_tx.unbounded_send(message).ok();
174 }
175 });
176
177 let client = if let Some(client) = parent_session
178 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
179 .flatten()
180 {
181 client
182 .create_child_connection(session_id, binary.clone(), message_handler, cx)
183 .await?
184 } else {
185 DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
186 };
187
188 Ok(Self {
189 client: Arc::new(client),
190 worktree,
191 tmp_breakpoint: None,
192 binary,
193 executor: cx.background_executor().clone(),
194 is_started: false,
195 has_ever_stopped: false,
196 messages_tx,
197 })
198 }
199
200 pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
201 &self.worktree
202 }
203
204 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
205 let tasks: Vec<_> = paths
206 .into_iter()
207 .map(|path| {
208 self.request(dap_command::SetBreakpoints {
209 source: client_source(path),
210 source_modified: None,
211 breakpoints: vec![],
212 })
213 })
214 .collect();
215
216 cx.background_spawn(async move {
217 futures::future::join_all(tasks)
218 .await
219 .iter()
220 .for_each(|res| match res {
221 Ok(_) => {}
222 Err(err) => {
223 log::warn!("Set breakpoints request failed: {}", err);
224 }
225 });
226 })
227 }
228
229 fn send_breakpoints_from_path(
230 &self,
231 abs_path: Arc<Path>,
232 reason: BreakpointUpdatedReason,
233 breakpoint_store: &Entity<BreakpointStore>,
234 cx: &mut App,
235 ) -> Task<()> {
236 let breakpoints =
237 breakpoint_store
238 .read(cx)
239 .source_breakpoints_from_path(&abs_path, cx)
240 .into_iter()
241 .filter(|bp| bp.state.is_enabled())
242 .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
243 breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
244 }))
245 .map(Into::into)
246 .collect();
247
248 let raw_breakpoints = breakpoint_store
249 .read(cx)
250 .breakpoints_from_path(&abs_path)
251 .into_iter()
252 .filter(|bp| bp.bp.state.is_enabled())
253 .collect::<Vec<_>>();
254
255 let task = self.request(dap_command::SetBreakpoints {
256 source: client_source(&abs_path),
257 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
258 breakpoints,
259 });
260 let session_id = self.client.id();
261 let breakpoint_store = breakpoint_store.downgrade();
262 cx.spawn(async move |cx| match cx.background_spawn(task).await {
263 Ok(breakpoints) => {
264 let breakpoints =
265 breakpoints
266 .into_iter()
267 .zip(raw_breakpoints)
268 .filter_map(|(dap_bp, zed_bp)| {
269 Some((
270 zed_bp,
271 BreakpointSessionState {
272 id: dap_bp.id?,
273 verified: dap_bp.verified,
274 },
275 ))
276 });
277 breakpoint_store
278 .update(cx, |this, _| {
279 this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
280 })
281 .ok();
282 }
283 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
284 })
285 }
286
287 fn send_exception_breakpoints(
288 &self,
289 filters: Vec<ExceptionBreakpointsFilter>,
290 supports_filter_options: bool,
291 ) -> Task<Result<Vec<dap::Breakpoint>>> {
292 let arg = if supports_filter_options {
293 SetExceptionBreakpoints::WithOptions {
294 filters: filters
295 .into_iter()
296 .map(|filter| ExceptionFilterOptions {
297 filter_id: filter.filter,
298 condition: None,
299 mode: None,
300 })
301 .collect(),
302 }
303 } else {
304 SetExceptionBreakpoints::Plain {
305 filters: filters.into_iter().map(|filter| filter.filter).collect(),
306 }
307 };
308 self.request(arg)
309 }
310
311 fn send_source_breakpoints(
312 &self,
313 ignore_breakpoints: bool,
314 breakpoint_store: &Entity<BreakpointStore>,
315 cx: &App,
316 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
317 let mut breakpoint_tasks = Vec::new();
318 let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
319 let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
320 debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
321 let session_id = self.client.id();
322 for (path, breakpoints) in breakpoints {
323 let breakpoints = if ignore_breakpoints {
324 vec![]
325 } else {
326 breakpoints
327 .into_iter()
328 .filter(|bp| bp.state.is_enabled())
329 .map(Into::into)
330 .collect()
331 };
332
333 let raw_breakpoints = raw_breakpoints
334 .remove(&path)
335 .unwrap_or_default()
336 .into_iter()
337 .filter(|bp| bp.bp.state.is_enabled());
338 let error_path = path.clone();
339 let send_request = self
340 .request(dap_command::SetBreakpoints {
341 source: client_source(&path),
342 source_modified: Some(false),
343 breakpoints,
344 })
345 .map(|result| result.map_err(move |e| (error_path, e)));
346
347 let task = cx.spawn({
348 let breakpoint_store = breakpoint_store.downgrade();
349 async move |cx| {
350 let breakpoints = cx.background_spawn(send_request).await?;
351
352 let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
353 |(dap_bp, zed_bp)| {
354 Some((
355 zed_bp,
356 BreakpointSessionState {
357 id: dap_bp.id?,
358 verified: dap_bp.verified,
359 },
360 ))
361 },
362 );
363 breakpoint_store
364 .update(cx, |this, _| {
365 this.mark_breakpoints_verified(session_id, &path, breakpoints);
366 })
367 .ok();
368
369 Ok(())
370 }
371 });
372 breakpoint_tasks.push(task);
373 }
374
375 cx.background_spawn(async move {
376 futures::future::join_all(breakpoint_tasks)
377 .await
378 .into_iter()
379 .filter_map(Result::err)
380 .collect::<HashMap<_, _>>()
381 })
382 }
383
384 fn initialize_sequence(
385 &self,
386 capabilities: &Capabilities,
387 initialized_rx: oneshot::Receiver<()>,
388 dap_store: WeakEntity<DapStore>,
389 cx: &mut Context<Session>,
390 ) -> Task<Result<()>> {
391 let raw = self.binary.request_args.clone();
392
393 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
394 let launch = match raw.request {
395 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
396 raw: raw.configuration,
397 }),
398 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
399 raw: raw.configuration,
400 }),
401 };
402
403 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
404 let exception_filters = capabilities
405 .exception_breakpoint_filters
406 .as_ref()
407 .map(|exception_filters| {
408 exception_filters
409 .iter()
410 .filter(|filter| filter.default == Some(true))
411 .cloned()
412 .collect::<Vec<_>>()
413 })
414 .unwrap_or_default();
415 let supports_exception_filters = capabilities
416 .supports_exception_filter_options
417 .unwrap_or_default();
418 let this = self.clone();
419 let worktree = self.worktree().clone();
420 let configuration_sequence = cx.spawn({
421 async move |_, cx| {
422 let breakpoint_store =
423 dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
424 initialized_rx.await?;
425 let errors_by_path = cx
426 .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
427 .await;
428
429 dap_store.update(cx, |_, cx| {
430 let Some(worktree) = worktree.upgrade() else {
431 return;
432 };
433
434 for (path, error) in &errors_by_path {
435 log::error!("failed to set breakpoints for {path:?}: {error}");
436 }
437
438 if let Some(failed_path) = errors_by_path.keys().next() {
439 let failed_path = failed_path
440 .strip_prefix(worktree.read(cx).abs_path())
441 .unwrap_or(failed_path)
442 .display();
443 let message = format!(
444 "Failed to set breakpoints for {failed_path}{}",
445 match errors_by_path.len() {
446 0 => unreachable!(),
447 1 => "".into(),
448 2 => " and 1 other path".into(),
449 n => format!(" and {} other paths", n - 1),
450 }
451 );
452 cx.emit(super::dap_store::DapStoreEvent::Notification(message));
453 }
454 })?;
455
456 this.send_exception_breakpoints(exception_filters, supports_exception_filters)
457 .await
458 .ok();
459 let ret = if configuration_done_supported {
460 this.request(ConfigurationDone {})
461 } else {
462 Task::ready(Ok(()))
463 }
464 .await;
465 ret
466 }
467 });
468
469 let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
470
471 cx.spawn(async move |this, cx| {
472 let result = task.await;
473
474 this.update(cx, |this, cx| {
475 if let Some(this) = this.as_running_mut() {
476 this.is_started = true;
477 cx.notify();
478 }
479 })
480 .ok();
481
482 result?;
483 anyhow::Ok(())
484 })
485 }
486
487 fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
488 let client = self.client.clone();
489 let messages_tx = self.messages_tx.clone();
490 let message_handler = Box::new(move |message| {
491 messages_tx.unbounded_send(message).ok();
492 });
493 if client.should_reconnect_for_ssh() {
494 Some(cx.spawn(async move |cx| {
495 client.connect(message_handler, cx).await?;
496 anyhow::Ok(())
497 }))
498 } else {
499 None
500 }
501 }
502
503 fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
504 where
505 <R::DapRequest as dap::requests::Request>::Response: 'static,
506 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
507 {
508 let request = Arc::new(request);
509
510 let request_clone = request.clone();
511 let connection = self.client.clone();
512 self.executor.spawn(async move {
513 let args = request_clone.to_dap();
514 let response = connection.request::<R::DapRequest>(args).await?;
515 request.response_from_dap(response)
516 })
517 }
518}
519
520impl Mode {
521 pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
522 where
523 <R::DapRequest as dap::requests::Request>::Response: 'static,
524 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
525 {
526 match self {
527 Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
528 Mode::Building => Task::ready(Err(anyhow!(
529 "no adapter running to send request: {request:?}"
530 ))),
531 }
532 }
533
534 /// Did this debug session stop at least once?
535 pub(crate) fn has_ever_stopped(&self) -> bool {
536 match self {
537 Mode::Building => false,
538 Mode::Running(running_mode) => running_mode.has_ever_stopped,
539 }
540 }
541
542 fn stopped(&mut self) {
543 if let Mode::Running(running) = self {
544 running.has_ever_stopped = true;
545 }
546 }
547}
548
549#[derive(Default)]
550struct ThreadStates {
551 global_state: Option<ThreadStatus>,
552 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
553}
554
555impl ThreadStates {
556 fn stop_all_threads(&mut self) {
557 self.global_state = Some(ThreadStatus::Stopped);
558 self.known_thread_states.clear();
559 }
560
561 fn exit_all_threads(&mut self) {
562 self.global_state = Some(ThreadStatus::Exited);
563 self.known_thread_states.clear();
564 }
565
566 fn continue_all_threads(&mut self) {
567 self.global_state = Some(ThreadStatus::Running);
568 self.known_thread_states.clear();
569 }
570
571 fn stop_thread(&mut self, thread_id: ThreadId) {
572 self.known_thread_states
573 .insert(thread_id, ThreadStatus::Stopped);
574 }
575
576 fn continue_thread(&mut self, thread_id: ThreadId) {
577 self.known_thread_states
578 .insert(thread_id, ThreadStatus::Running);
579 }
580
581 fn process_step(&mut self, thread_id: ThreadId) {
582 self.known_thread_states
583 .insert(thread_id, ThreadStatus::Stepping);
584 }
585
586 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
587 self.thread_state(thread_id)
588 .unwrap_or(ThreadStatus::Running)
589 }
590
591 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
592 self.known_thread_states
593 .get(&thread_id)
594 .copied()
595 .or(self.global_state)
596 }
597
598 fn exit_thread(&mut self, thread_id: ThreadId) {
599 self.known_thread_states
600 .insert(thread_id, ThreadStatus::Exited);
601 }
602
603 fn any_stopped_thread(&self) -> bool {
604 self.global_state
605 .is_some_and(|state| state == ThreadStatus::Stopped)
606 || self
607 .known_thread_states
608 .values()
609 .any(|status| *status == ThreadStatus::Stopped)
610 }
611}
612const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
613
614type IsEnabled = bool;
615
616#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
617pub struct OutputToken(pub usize);
618/// Represents a current state of a single debug adapter and provides ways to mutate it.
619pub struct Session {
620 pub mode: Mode,
621 id: SessionId,
622 label: SharedString,
623 adapter: DebugAdapterName,
624 pub(super) capabilities: Capabilities,
625 child_session_ids: HashSet<SessionId>,
626 parent_session: Option<Entity<Session>>,
627 modules: Vec<dap::Module>,
628 loaded_sources: Vec<dap::Source>,
629 output_token: OutputToken,
630 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
631 threads: IndexMap<ThreadId, Thread>,
632 thread_states: ThreadStates,
633 variables: HashMap<VariableReference, Vec<dap::Variable>>,
634 stack_frames: IndexMap<StackFrameId, StackFrame>,
635 locations: HashMap<u64, dap::LocationsResponse>,
636 is_session_terminated: bool,
637 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
638 pub(crate) breakpoint_store: Entity<BreakpointStore>,
639 ignore_breakpoints: bool,
640 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
641 background_tasks: Vec<Task<()>>,
642 task_context: TaskContext,
643}
644
645trait CacheableCommand: Any + Send + Sync {
646 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
647 fn dyn_hash(&self, hasher: &mut dyn Hasher);
648 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
649}
650
651impl<T> CacheableCommand for T
652where
653 T: DapCommand + PartialEq + Eq + Hash,
654{
655 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
656 (rhs as &dyn Any)
657 .downcast_ref::<Self>()
658 .map_or(false, |rhs| self == rhs)
659 }
660
661 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
662 T::hash(self, &mut hasher);
663 }
664
665 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
666 self
667 }
668}
669
670pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
671
672impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
673 fn from(request: T) -> Self {
674 Self(Arc::new(request))
675 }
676}
677
678impl PartialEq for RequestSlot {
679 fn eq(&self, other: &Self) -> bool {
680 self.0.dyn_eq(other.0.as_ref())
681 }
682}
683
684impl Eq for RequestSlot {}
685
686impl Hash for RequestSlot {
687 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
688 self.0.dyn_hash(state);
689 (&*self.0 as &dyn Any).type_id().hash(state)
690 }
691}
692
693#[derive(Debug, Clone, Hash, PartialEq, Eq)]
694pub struct CompletionsQuery {
695 pub query: String,
696 pub column: u64,
697 pub line: Option<u64>,
698 pub frame_id: Option<u64>,
699}
700
701impl CompletionsQuery {
702 pub fn new(
703 buffer: &language::Buffer,
704 cursor_position: language::Anchor,
705 frame_id: Option<u64>,
706 ) -> Self {
707 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
708 Self {
709 query: buffer.text(),
710 column: column as u64,
711 frame_id,
712 line: Some(row as u64),
713 }
714 }
715}
716
717#[derive(Debug)]
718pub enum SessionEvent {
719 Modules,
720 LoadedSources,
721 Stopped(Option<ThreadId>),
722 StackTrace,
723 Variables,
724 Threads,
725 InvalidateInlineValue,
726 CapabilitiesLoaded,
727 RunInTerminal {
728 request: RunInTerminalRequestArguments,
729 sender: mpsc::Sender<Result<u32>>,
730 },
731 ConsoleOutput,
732}
733
734#[derive(Clone, Debug, PartialEq, Eq)]
735pub enum SessionStateEvent {
736 Running,
737 Shutdown,
738 Restart,
739 SpawnChildSession {
740 request: StartDebuggingRequestArguments,
741 },
742}
743
744impl EventEmitter<SessionEvent> for Session {}
745impl EventEmitter<SessionStateEvent> for Session {}
746
747// local session will send breakpoint updates to DAP for all new breakpoints
748// remote side will only send breakpoint updates when it is a breakpoint created by that peer
749// BreakpointStore notifies session on breakpoint changes
750impl Session {
751 pub(crate) fn new(
752 breakpoint_store: Entity<BreakpointStore>,
753 session_id: SessionId,
754 parent_session: Option<Entity<Session>>,
755 label: SharedString,
756 adapter: DebugAdapterName,
757 task_context: TaskContext,
758 cx: &mut App,
759 ) -> Entity<Self> {
760 cx.new::<Self>(|cx| {
761 cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
762 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
763 if let Some(local) = (!this.ignore_breakpoints)
764 .then(|| this.as_running_mut())
765 .flatten()
766 {
767 local
768 .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
769 .detach();
770 };
771 }
772 BreakpointStoreEvent::BreakpointsCleared(paths) => {
773 if let Some(local) = (!this.ignore_breakpoints)
774 .then(|| this.as_running_mut())
775 .flatten()
776 {
777 local.unset_breakpoints_from_paths(paths, cx).detach();
778 }
779 }
780 BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
781 })
782 .detach();
783 cx.on_app_quit(Self::on_app_quit).detach();
784
785 let this = Self {
786 mode: Mode::Building,
787 id: session_id,
788 child_session_ids: HashSet::default(),
789 parent_session,
790 capabilities: Capabilities::default(),
791 variables: Default::default(),
792 stack_frames: Default::default(),
793 thread_states: ThreadStates::default(),
794 output_token: OutputToken(0),
795 output: circular_buffer::CircularBuffer::boxed(),
796 requests: HashMap::default(),
797 modules: Vec::default(),
798 loaded_sources: Vec::default(),
799 threads: IndexMap::default(),
800 background_tasks: Vec::default(),
801 locations: Default::default(),
802 is_session_terminated: false,
803 ignore_breakpoints: false,
804 breakpoint_store,
805 exception_breakpoints: Default::default(),
806 label,
807 adapter,
808 task_context,
809 };
810
811 this
812 })
813 }
814
815 pub fn task_context(&self) -> &TaskContext {
816 &self.task_context
817 }
818
819 pub fn worktree(&self) -> Option<Entity<Worktree>> {
820 match &self.mode {
821 Mode::Building => None,
822 Mode::Running(local_mode) => local_mode.worktree.upgrade(),
823 }
824 }
825
826 pub fn boot(
827 &mut self,
828 binary: DebugAdapterBinary,
829 worktree: Entity<Worktree>,
830 dap_store: WeakEntity<DapStore>,
831 cx: &mut Context<Self>,
832 ) -> Task<Result<()>> {
833 let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
834 let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
835
836 let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
837 let mut initialized_tx = Some(initialized_tx);
838 while let Some(message) = message_rx.next().await {
839 if let Message::Event(event) = message {
840 if let Events::Initialized(_) = *event {
841 if let Some(tx) = initialized_tx.take() {
842 tx.send(()).ok();
843 }
844 } else {
845 let Ok(_) = this.update(cx, |session, cx| {
846 session.handle_dap_event(event, cx);
847 }) else {
848 break;
849 };
850 }
851 } else if let Message::Request(request) = message {
852 let Ok(_) = this.update(cx, |this, cx| {
853 if request.command == StartDebugging::COMMAND {
854 this.handle_start_debugging_request(request, cx)
855 .detach_and_log_err(cx);
856 } else if request.command == RunInTerminal::COMMAND {
857 this.handle_run_in_terminal_request(request, cx)
858 .detach_and_log_err(cx);
859 }
860 }) else {
861 break;
862 };
863 }
864 }
865 })];
866 self.background_tasks = background_tasks;
867 let id = self.id;
868 let parent_session = self.parent_session.clone();
869
870 cx.spawn(async move |this, cx| {
871 let mode = RunningMode::new(
872 id,
873 parent_session,
874 worktree.downgrade(),
875 binary.clone(),
876 message_tx,
877 cx,
878 )
879 .await?;
880 this.update(cx, |this, cx| {
881 this.mode = Mode::Running(mode);
882 cx.emit(SessionStateEvent::Running);
883 })?;
884
885 this.update(cx, |session, cx| session.request_initialize(cx))?
886 .await?;
887
888 let result = this
889 .update(cx, |session, cx| {
890 session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
891 })?
892 .await;
893
894 if result.is_err() {
895 let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
896
897 console
898 .send(format!(
899 "Tried to launch debugger with: {}",
900 serde_json::to_string_pretty(&binary.request_args.configuration)
901 .unwrap_or_default(),
902 ))
903 .await
904 .ok();
905 }
906
907 result
908 })
909 }
910
911 pub fn session_id(&self) -> SessionId {
912 self.id
913 }
914
915 pub fn child_session_ids(&self) -> HashSet<SessionId> {
916 self.child_session_ids.clone()
917 }
918
919 pub fn add_child_session_id(&mut self, session_id: SessionId) {
920 self.child_session_ids.insert(session_id);
921 }
922
923 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
924 self.child_session_ids.remove(&session_id);
925 }
926
927 pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
928 self.parent_session
929 .as_ref()
930 .map(|session| session.read(cx).id)
931 }
932
933 pub fn parent_session(&self) -> Option<&Entity<Self>> {
934 self.parent_session.as_ref()
935 }
936
937 pub fn capabilities(&self) -> &Capabilities {
938 &self.capabilities
939 }
940
941 pub fn binary(&self) -> Option<&DebugAdapterBinary> {
942 match &self.mode {
943 Mode::Building => None,
944 Mode::Running(running_mode) => Some(&running_mode.binary),
945 }
946 }
947
948 pub fn adapter(&self) -> DebugAdapterName {
949 self.adapter.clone()
950 }
951
952 pub fn label(&self) -> SharedString {
953 self.label.clone()
954 }
955
956 pub fn is_terminated(&self) -> bool {
957 self.is_session_terminated
958 }
959
960 pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
961 let (tx, mut rx) = mpsc::unbounded();
962
963 cx.spawn(async move |this, cx| {
964 while let Some(output) = rx.next().await {
965 this.update(cx, |this, cx| {
966 let event = dap::OutputEvent {
967 category: None,
968 output,
969 group: None,
970 variables_reference: None,
971 source: None,
972 line: None,
973 column: None,
974 data: None,
975 location_reference: None,
976 };
977 this.push_output(event, cx);
978 })?;
979 }
980 anyhow::Ok(())
981 })
982 .detach();
983
984 return tx;
985 }
986
987 pub fn is_started(&self) -> bool {
988 match &self.mode {
989 Mode::Building => false,
990 Mode::Running(running) => running.is_started,
991 }
992 }
993
994 pub fn is_building(&self) -> bool {
995 matches!(self.mode, Mode::Building)
996 }
997
998 pub fn is_running(&self) -> bool {
999 matches!(self.mode, Mode::Running(_))
1000 }
1001
1002 pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1003 match &mut self.mode {
1004 Mode::Running(local_mode) => Some(local_mode),
1005 Mode::Building => None,
1006 }
1007 }
1008
1009 pub fn as_running(&self) -> Option<&RunningMode> {
1010 match &self.mode {
1011 Mode::Running(local_mode) => Some(local_mode),
1012 Mode::Building => None,
1013 }
1014 }
1015
1016 fn handle_start_debugging_request(
1017 &mut self,
1018 request: dap::messages::Request,
1019 cx: &mut Context<Self>,
1020 ) -> Task<Result<()>> {
1021 let request_seq = request.seq;
1022
1023 let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1024 .arguments
1025 .as_ref()
1026 .map(|value| serde_json::from_value(value.clone()));
1027
1028 let mut success = true;
1029 if let Some(Ok(request)) = launch_request {
1030 cx.emit(SessionStateEvent::SpawnChildSession { request });
1031 } else {
1032 log::error!(
1033 "Failed to parse launch request arguments: {:?}",
1034 request.arguments
1035 );
1036 success = false;
1037 }
1038
1039 cx.spawn(async move |this, cx| {
1040 this.update(cx, |this, cx| {
1041 this.respond_to_client(
1042 request_seq,
1043 success,
1044 StartDebugging::COMMAND.to_string(),
1045 None,
1046 cx,
1047 )
1048 })?
1049 .await
1050 })
1051 }
1052
1053 fn handle_run_in_terminal_request(
1054 &mut self,
1055 request: dap::messages::Request,
1056 cx: &mut Context<Self>,
1057 ) -> Task<Result<()>> {
1058 let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1059 request.arguments.unwrap_or_default(),
1060 ) {
1061 Ok(args) => args,
1062 Err(error) => {
1063 return cx.spawn(async move |session, cx| {
1064 let error = serde_json::to_value(dap::ErrorResponse {
1065 error: Some(dap::Message {
1066 id: request.seq,
1067 format: error.to_string(),
1068 variables: None,
1069 send_telemetry: None,
1070 show_user: None,
1071 url: None,
1072 url_label: None,
1073 }),
1074 })
1075 .ok();
1076
1077 session
1078 .update(cx, |this, cx| {
1079 this.respond_to_client(
1080 request.seq,
1081 false,
1082 StartDebugging::COMMAND.to_string(),
1083 error,
1084 cx,
1085 )
1086 })?
1087 .await?;
1088
1089 Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1090 });
1091 }
1092 };
1093
1094 let seq = request.seq;
1095
1096 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1097 cx.emit(SessionEvent::RunInTerminal {
1098 request: request_args,
1099 sender: tx,
1100 });
1101 cx.notify();
1102
1103 cx.spawn(async move |session, cx| {
1104 let result = util::maybe!(async move {
1105 rx.next().await.ok_or_else(|| {
1106 anyhow!("failed to receive response from spawn terminal".to_string())
1107 })?
1108 })
1109 .await;
1110 let (success, body) = match result {
1111 Ok(pid) => (
1112 true,
1113 serde_json::to_value(dap::RunInTerminalResponse {
1114 process_id: None,
1115 shell_process_id: Some(pid as u64),
1116 })
1117 .ok(),
1118 ),
1119 Err(error) => (
1120 false,
1121 serde_json::to_value(dap::ErrorResponse {
1122 error: Some(dap::Message {
1123 id: seq,
1124 format: error.to_string(),
1125 variables: None,
1126 send_telemetry: None,
1127 show_user: None,
1128 url: None,
1129 url_label: None,
1130 }),
1131 })
1132 .ok(),
1133 ),
1134 };
1135
1136 session
1137 .update(cx, |session, cx| {
1138 session.respond_to_client(
1139 seq,
1140 success,
1141 RunInTerminal::COMMAND.to_string(),
1142 body,
1143 cx,
1144 )
1145 })?
1146 .await
1147 })
1148 }
1149
1150 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1151 let adapter_id = self.adapter().to_string();
1152 let request = Initialize { adapter_id };
1153
1154 let Mode::Running(running) = &self.mode else {
1155 return Task::ready(Err(anyhow!(
1156 "Cannot send initialize request, task still building"
1157 )));
1158 };
1159 let mut response = running.request(request.clone());
1160
1161 cx.spawn(async move |this, cx| {
1162 loop {
1163 let capabilities = response.await;
1164 match capabilities {
1165 Err(e) => {
1166 let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1167 this.as_running()
1168 .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1169 }) else {
1170 return Err(e);
1171 };
1172 log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1173 reconnect.await?;
1174
1175 let Ok(Some(r)) = this.update(cx, |this, _| {
1176 this.as_running()
1177 .map(|running| running.request(request.clone()))
1178 }) else {
1179 return Err(e);
1180 };
1181 response = r
1182 }
1183 Ok(capabilities) => {
1184 this.update(cx, |session, cx| {
1185 session.capabilities = capabilities;
1186 let filters = session
1187 .capabilities
1188 .exception_breakpoint_filters
1189 .clone()
1190 .unwrap_or_default();
1191 for filter in filters {
1192 let default = filter.default.unwrap_or_default();
1193 session
1194 .exception_breakpoints
1195 .entry(filter.filter.clone())
1196 .or_insert_with(|| (filter, default));
1197 }
1198 cx.emit(SessionEvent::CapabilitiesLoaded);
1199 })?;
1200 return Ok(());
1201 }
1202 }
1203 }
1204 })
1205 }
1206
1207 pub(super) fn initialize_sequence(
1208 &mut self,
1209 initialize_rx: oneshot::Receiver<()>,
1210 dap_store: WeakEntity<DapStore>,
1211 cx: &mut Context<Self>,
1212 ) -> Task<Result<()>> {
1213 match &self.mode {
1214 Mode::Running(local_mode) => {
1215 local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1216 }
1217 Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1218 }
1219 }
1220
1221 pub fn run_to_position(
1222 &mut self,
1223 breakpoint: SourceBreakpoint,
1224 active_thread_id: ThreadId,
1225 cx: &mut Context<Self>,
1226 ) {
1227 match &mut self.mode {
1228 Mode::Running(local_mode) => {
1229 if !matches!(
1230 self.thread_states.thread_state(active_thread_id),
1231 Some(ThreadStatus::Stopped)
1232 ) {
1233 return;
1234 };
1235 let path = breakpoint.path.clone();
1236 local_mode.tmp_breakpoint = Some(breakpoint);
1237 let task = local_mode.send_breakpoints_from_path(
1238 path,
1239 BreakpointUpdatedReason::Toggled,
1240 &self.breakpoint_store,
1241 cx,
1242 );
1243
1244 cx.spawn(async move |this, cx| {
1245 task.await;
1246 this.update(cx, |this, cx| {
1247 this.continue_thread(active_thread_id, cx);
1248 })
1249 })
1250 .detach();
1251 }
1252 Mode::Building => {}
1253 }
1254 }
1255
1256 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1257 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1258 }
1259
1260 pub fn output(
1261 &self,
1262 since: OutputToken,
1263 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1264 if self.output_token.0 == 0 {
1265 return (self.output.range(0..0), OutputToken(0));
1266 };
1267
1268 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1269
1270 let clamped_events_since = events_since.clamp(0, self.output.len());
1271 (
1272 self.output
1273 .range(self.output.len() - clamped_events_since..),
1274 self.output_token,
1275 )
1276 }
1277
1278 pub fn respond_to_client(
1279 &self,
1280 request_seq: u64,
1281 success: bool,
1282 command: String,
1283 body: Option<serde_json::Value>,
1284 cx: &mut Context<Self>,
1285 ) -> Task<Result<()>> {
1286 let Some(local_session) = self.as_running() else {
1287 unreachable!("Cannot respond to remote client");
1288 };
1289 let client = local_session.client.clone();
1290
1291 cx.background_spawn(async move {
1292 client
1293 .send_message(Message::Response(Response {
1294 body,
1295 success,
1296 command,
1297 seq: request_seq + 1,
1298 request_seq,
1299 message: None,
1300 }))
1301 .await
1302 })
1303 }
1304
1305 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1306 self.mode.stopped();
1307 // todo(debugger): Find a clean way to get around the clone
1308 let breakpoint_store = self.breakpoint_store.clone();
1309 if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1310 let breakpoint = local.tmp_breakpoint.take()?;
1311 let path = breakpoint.path.clone();
1312 Some((local, path))
1313 }) {
1314 local
1315 .send_breakpoints_from_path(
1316 path,
1317 BreakpointUpdatedReason::Toggled,
1318 &breakpoint_store,
1319 cx,
1320 )
1321 .detach();
1322 };
1323
1324 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1325 self.thread_states.stop_all_threads();
1326
1327 self.invalidate_command_type::<StackTraceCommand>();
1328 }
1329
1330 // Event if we stopped all threads we still need to insert the thread_id
1331 // to our own data
1332 if let Some(thread_id) = event.thread_id {
1333 self.thread_states.stop_thread(ThreadId(thread_id));
1334
1335 self.invalidate_state(
1336 &StackTraceCommand {
1337 thread_id,
1338 start_frame: None,
1339 levels: None,
1340 }
1341 .into(),
1342 );
1343 }
1344
1345 self.invalidate_generic();
1346 self.threads.clear();
1347 self.variables.clear();
1348 cx.emit(SessionEvent::Stopped(
1349 event
1350 .thread_id
1351 .map(Into::into)
1352 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1353 ));
1354 cx.emit(SessionEvent::InvalidateInlineValue);
1355 cx.notify();
1356 }
1357
1358 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1359 match *event {
1360 Events::Initialized(_) => {
1361 debug_assert!(
1362 false,
1363 "Initialized event should have been handled in LocalMode"
1364 );
1365 }
1366 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1367 Events::Continued(event) => {
1368 if event.all_threads_continued.unwrap_or_default() {
1369 self.thread_states.continue_all_threads();
1370 self.breakpoint_store.update(cx, |store, cx| {
1371 store.remove_active_position(Some(self.session_id()), cx)
1372 });
1373 } else {
1374 self.thread_states
1375 .continue_thread(ThreadId(event.thread_id));
1376 }
1377 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1378 self.invalidate_generic();
1379 }
1380 Events::Exited(_event) => {
1381 self.clear_active_debug_line(cx);
1382 }
1383 Events::Terminated(_) => {
1384 self.shutdown(cx).detach();
1385 }
1386 Events::Thread(event) => {
1387 let thread_id = ThreadId(event.thread_id);
1388
1389 match event.reason {
1390 dap::ThreadEventReason::Started => {
1391 self.thread_states.continue_thread(thread_id);
1392 }
1393 dap::ThreadEventReason::Exited => {
1394 self.thread_states.exit_thread(thread_id);
1395 }
1396 reason => {
1397 log::error!("Unhandled thread event reason {:?}", reason);
1398 }
1399 }
1400 self.invalidate_state(&ThreadsCommand.into());
1401 cx.notify();
1402 }
1403 Events::Output(event) => {
1404 if event
1405 .category
1406 .as_ref()
1407 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1408 {
1409 return;
1410 }
1411
1412 self.push_output(event, cx);
1413 cx.notify();
1414 }
1415 Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1416 store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1417 }),
1418 Events::Module(event) => {
1419 match event.reason {
1420 dap::ModuleEventReason::New => {
1421 self.modules.push(event.module);
1422 }
1423 dap::ModuleEventReason::Changed => {
1424 if let Some(module) = self
1425 .modules
1426 .iter_mut()
1427 .find(|other| event.module.id == other.id)
1428 {
1429 *module = event.module;
1430 }
1431 }
1432 dap::ModuleEventReason::Removed => {
1433 self.modules.retain(|other| event.module.id != other.id);
1434 }
1435 }
1436
1437 // todo(debugger): We should only send the invalidate command to downstream clients.
1438 // self.invalidate_state(&ModulesCommand.into());
1439 }
1440 Events::LoadedSource(_) => {
1441 self.invalidate_state(&LoadedSourcesCommand.into());
1442 }
1443 Events::Capabilities(event) => {
1444 self.capabilities = self.capabilities.merge(event.capabilities);
1445 cx.notify();
1446 }
1447 Events::Memory(_) => {}
1448 Events::Process(_) => {}
1449 Events::ProgressEnd(_) => {}
1450 Events::ProgressStart(_) => {}
1451 Events::ProgressUpdate(_) => {}
1452 Events::Invalidated(_) => {}
1453 Events::Other(_) => {}
1454 }
1455 }
1456
1457 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1458 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1459 &mut self,
1460 request: T,
1461 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1462 cx: &mut Context<Self>,
1463 ) {
1464 const {
1465 assert!(
1466 T::CACHEABLE,
1467 "Only requests marked as cacheable should invoke `fetch`"
1468 );
1469 }
1470
1471 if !self.thread_states.any_stopped_thread()
1472 && request.type_id() != TypeId::of::<ThreadsCommand>()
1473 || self.is_session_terminated
1474 {
1475 return;
1476 }
1477
1478 let request_map = self
1479 .requests
1480 .entry(std::any::TypeId::of::<T>())
1481 .or_default();
1482
1483 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1484 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1485
1486 let task = Self::request_inner::<Arc<T>>(
1487 &self.capabilities,
1488 &self.mode,
1489 command,
1490 |this, result, cx| {
1491 process_result(this, result, cx);
1492 None
1493 },
1494 cx,
1495 );
1496 let task = cx
1497 .background_executor()
1498 .spawn(async move {
1499 let _ = task.await?;
1500 Some(())
1501 })
1502 .shared();
1503
1504 vacant.insert(task);
1505 cx.notify();
1506 }
1507 }
1508
1509 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1510 capabilities: &Capabilities,
1511 mode: &Mode,
1512 request: T,
1513 process_result: impl FnOnce(
1514 &mut Self,
1515 Result<T::Response>,
1516 &mut Context<Self>,
1517 ) -> Option<T::Response>
1518 + 'static,
1519 cx: &mut Context<Self>,
1520 ) -> Task<Option<T::Response>> {
1521 if !T::is_supported(&capabilities) {
1522 log::warn!(
1523 "Attempted to send a DAP request that isn't supported: {:?}",
1524 request
1525 );
1526 let error = Err(anyhow::Error::msg(
1527 "Couldn't complete request because it's not supported",
1528 ));
1529 return cx.spawn(async move |this, cx| {
1530 this.update(cx, |this, cx| process_result(this, error, cx))
1531 .ok()
1532 .flatten()
1533 });
1534 }
1535
1536 let request = mode.request_dap(request);
1537 cx.spawn(async move |this, cx| {
1538 let result = request.await;
1539 this.update(cx, |this, cx| process_result(this, result, cx))
1540 .ok()
1541 .flatten()
1542 })
1543 }
1544
1545 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1546 &self,
1547 request: T,
1548 process_result: impl FnOnce(
1549 &mut Self,
1550 Result<T::Response>,
1551 &mut Context<Self>,
1552 ) -> Option<T::Response>
1553 + 'static,
1554 cx: &mut Context<Self>,
1555 ) -> Task<Option<T::Response>> {
1556 Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1557 }
1558
1559 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1560 self.requests.remove(&std::any::TypeId::of::<Command>());
1561 }
1562
1563 fn invalidate_generic(&mut self) {
1564 self.invalidate_command_type::<ModulesCommand>();
1565 self.invalidate_command_type::<LoadedSourcesCommand>();
1566 self.invalidate_command_type::<ThreadsCommand>();
1567 }
1568
1569 fn invalidate_state(&mut self, key: &RequestSlot) {
1570 self.requests
1571 .entry((&*key.0 as &dyn Any).type_id())
1572 .and_modify(|request_map| {
1573 request_map.remove(&key);
1574 });
1575 }
1576
1577 fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1578 self.output.push_back(event);
1579 self.output_token.0 += 1;
1580 cx.emit(SessionEvent::ConsoleOutput);
1581 }
1582
1583 pub fn any_stopped_thread(&self) -> bool {
1584 self.thread_states.any_stopped_thread()
1585 }
1586
1587 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1588 self.thread_states.thread_status(thread_id)
1589 }
1590
1591 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1592 self.fetch(
1593 dap_command::ThreadsCommand,
1594 |this, result, cx| {
1595 let Some(result) = result.log_err() else {
1596 return;
1597 };
1598
1599 this.threads = result
1600 .into_iter()
1601 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1602 .collect();
1603
1604 this.invalidate_command_type::<StackTraceCommand>();
1605 cx.emit(SessionEvent::Threads);
1606 cx.notify();
1607 },
1608 cx,
1609 );
1610
1611 self.threads
1612 .values()
1613 .map(|thread| {
1614 (
1615 thread.dap.clone(),
1616 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1617 )
1618 })
1619 .collect()
1620 }
1621
1622 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1623 self.fetch(
1624 dap_command::ModulesCommand,
1625 |this, result, cx| {
1626 let Some(result) = result.log_err() else {
1627 return;
1628 };
1629
1630 this.modules = result;
1631 cx.emit(SessionEvent::Modules);
1632 cx.notify();
1633 },
1634 cx,
1635 );
1636
1637 &self.modules
1638 }
1639
1640 pub fn ignore_breakpoints(&self) -> bool {
1641 self.ignore_breakpoints
1642 }
1643
1644 pub fn toggle_ignore_breakpoints(
1645 &mut self,
1646 cx: &mut App,
1647 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1648 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1649 }
1650
1651 pub(crate) fn set_ignore_breakpoints(
1652 &mut self,
1653 ignore: bool,
1654 cx: &mut App,
1655 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1656 if self.ignore_breakpoints == ignore {
1657 return Task::ready(HashMap::default());
1658 }
1659
1660 self.ignore_breakpoints = ignore;
1661
1662 if let Some(local) = self.as_running() {
1663 local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1664 } else {
1665 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1666 unimplemented!()
1667 }
1668 }
1669
1670 pub fn exception_breakpoints(
1671 &self,
1672 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1673 self.exception_breakpoints.values()
1674 }
1675
1676 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1677 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1678 *is_enabled = !*is_enabled;
1679 self.send_exception_breakpoints(cx);
1680 }
1681 }
1682
1683 fn send_exception_breakpoints(&mut self, cx: &App) {
1684 if let Some(local) = self.as_running() {
1685 let exception_filters = self
1686 .exception_breakpoints
1687 .values()
1688 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1689 .collect();
1690
1691 let supports_exception_filters = self
1692 .capabilities
1693 .supports_exception_filter_options
1694 .unwrap_or_default();
1695 local
1696 .send_exception_breakpoints(exception_filters, supports_exception_filters)
1697 .detach_and_log_err(cx);
1698 } else {
1699 debug_assert!(false, "Not implemented");
1700 }
1701 }
1702
1703 pub fn breakpoints_enabled(&self) -> bool {
1704 self.ignore_breakpoints
1705 }
1706
1707 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1708 self.fetch(
1709 dap_command::LoadedSourcesCommand,
1710 |this, result, cx| {
1711 let Some(result) = result.log_err() else {
1712 return;
1713 };
1714 this.loaded_sources = result;
1715 cx.emit(SessionEvent::LoadedSources);
1716 cx.notify();
1717 },
1718 cx,
1719 );
1720
1721 &self.loaded_sources
1722 }
1723
1724 fn fallback_to_manual_restart(
1725 &mut self,
1726 res: Result<()>,
1727 cx: &mut Context<Self>,
1728 ) -> Option<()> {
1729 if res.log_err().is_none() {
1730 cx.emit(SessionStateEvent::Restart);
1731 return None;
1732 }
1733 Some(())
1734 }
1735
1736 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1737 res.log_err()?;
1738 Some(())
1739 }
1740
1741 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1742 thread_id: ThreadId,
1743 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1744 {
1745 move |this, response, cx| match response.log_err() {
1746 Some(response) => {
1747 this.breakpoint_store.update(cx, |store, cx| {
1748 store.remove_active_position(Some(this.session_id()), cx)
1749 });
1750 Some(response)
1751 }
1752 None => {
1753 this.thread_states.stop_thread(thread_id);
1754 cx.notify();
1755 None
1756 }
1757 }
1758 }
1759
1760 fn clear_active_debug_line_response(
1761 &mut self,
1762 response: Result<()>,
1763 cx: &mut Context<Session>,
1764 ) -> Option<()> {
1765 response.log_err()?;
1766 self.clear_active_debug_line(cx);
1767 Some(())
1768 }
1769
1770 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1771 self.breakpoint_store.update(cx, |store, cx| {
1772 store.remove_active_position(Some(self.id), cx)
1773 });
1774 }
1775
1776 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1777 self.request(
1778 PauseCommand {
1779 thread_id: thread_id.0,
1780 },
1781 Self::empty_response,
1782 cx,
1783 )
1784 .detach();
1785 }
1786
1787 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1788 self.request(
1789 RestartStackFrameCommand { stack_frame_id },
1790 Self::empty_response,
1791 cx,
1792 )
1793 .detach();
1794 }
1795
1796 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1797 if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1798 self.request(
1799 RestartCommand {
1800 raw: args.unwrap_or(Value::Null),
1801 },
1802 Self::fallback_to_manual_restart,
1803 cx,
1804 )
1805 .detach();
1806 } else {
1807 cx.emit(SessionStateEvent::Restart);
1808 }
1809 }
1810
1811 fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1812 let debug_adapter = self.adapter_client();
1813
1814 cx.background_spawn(async move {
1815 if let Some(client) = debug_adapter {
1816 client.shutdown().await.log_err();
1817 }
1818 })
1819 }
1820
1821 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1822 self.is_session_terminated = true;
1823 self.thread_states.exit_all_threads();
1824 cx.notify();
1825
1826 let task = if self
1827 .capabilities
1828 .supports_terminate_request
1829 .unwrap_or_default()
1830 {
1831 self.request(
1832 TerminateCommand {
1833 restart: Some(false),
1834 },
1835 Self::clear_active_debug_line_response,
1836 cx,
1837 )
1838 } else {
1839 self.request(
1840 DisconnectCommand {
1841 restart: Some(false),
1842 terminate_debuggee: Some(true),
1843 suspend_debuggee: Some(false),
1844 },
1845 Self::clear_active_debug_line_response,
1846 cx,
1847 )
1848 };
1849
1850 cx.emit(SessionStateEvent::Shutdown);
1851
1852 let debug_client = self.adapter_client();
1853
1854 cx.background_spawn(async move {
1855 let _ = task.await;
1856
1857 if let Some(client) = debug_client {
1858 client.shutdown().await.log_err();
1859 }
1860 })
1861 }
1862
1863 pub fn completions(
1864 &mut self,
1865 query: CompletionsQuery,
1866 cx: &mut Context<Self>,
1867 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1868 let task = self.request(query, |_, result, _| result.log_err(), cx);
1869
1870 cx.background_executor().spawn(async move {
1871 anyhow::Ok(
1872 task.await
1873 .map(|response| response.targets)
1874 .context("failed to fetch completions")?,
1875 )
1876 })
1877 }
1878
1879 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1880 self.thread_states.continue_thread(thread_id);
1881 self.request(
1882 ContinueCommand {
1883 args: ContinueArguments {
1884 thread_id: thread_id.0,
1885 single_thread: Some(true),
1886 },
1887 },
1888 Self::on_step_response::<ContinueCommand>(thread_id),
1889 cx,
1890 )
1891 .detach();
1892 }
1893
1894 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1895 match self.mode {
1896 Mode::Running(ref local) => Some(local.client.clone()),
1897 Mode::Building => None,
1898 }
1899 }
1900
1901 pub fn has_ever_stopped(&self) -> bool {
1902 self.mode.has_ever_stopped()
1903 }
1904 pub fn step_over(
1905 &mut self,
1906 thread_id: ThreadId,
1907 granularity: SteppingGranularity,
1908 cx: &mut Context<Self>,
1909 ) {
1910 let supports_single_thread_execution_requests =
1911 self.capabilities.supports_single_thread_execution_requests;
1912 let supports_stepping_granularity = self
1913 .capabilities
1914 .supports_stepping_granularity
1915 .unwrap_or_default();
1916
1917 let command = NextCommand {
1918 inner: StepCommand {
1919 thread_id: thread_id.0,
1920 granularity: supports_stepping_granularity.then(|| granularity),
1921 single_thread: supports_single_thread_execution_requests,
1922 },
1923 };
1924
1925 self.thread_states.process_step(thread_id);
1926 self.request(
1927 command,
1928 Self::on_step_response::<NextCommand>(thread_id),
1929 cx,
1930 )
1931 .detach();
1932 }
1933
1934 pub fn step_in(
1935 &mut self,
1936 thread_id: ThreadId,
1937 granularity: SteppingGranularity,
1938 cx: &mut Context<Self>,
1939 ) {
1940 let supports_single_thread_execution_requests =
1941 self.capabilities.supports_single_thread_execution_requests;
1942 let supports_stepping_granularity = self
1943 .capabilities
1944 .supports_stepping_granularity
1945 .unwrap_or_default();
1946
1947 let command = StepInCommand {
1948 inner: StepCommand {
1949 thread_id: thread_id.0,
1950 granularity: supports_stepping_granularity.then(|| granularity),
1951 single_thread: supports_single_thread_execution_requests,
1952 },
1953 };
1954
1955 self.thread_states.process_step(thread_id);
1956 self.request(
1957 command,
1958 Self::on_step_response::<StepInCommand>(thread_id),
1959 cx,
1960 )
1961 .detach();
1962 }
1963
1964 pub fn step_out(
1965 &mut self,
1966 thread_id: ThreadId,
1967 granularity: SteppingGranularity,
1968 cx: &mut Context<Self>,
1969 ) {
1970 let supports_single_thread_execution_requests =
1971 self.capabilities.supports_single_thread_execution_requests;
1972 let supports_stepping_granularity = self
1973 .capabilities
1974 .supports_stepping_granularity
1975 .unwrap_or_default();
1976
1977 let command = StepOutCommand {
1978 inner: StepCommand {
1979 thread_id: thread_id.0,
1980 granularity: supports_stepping_granularity.then(|| granularity),
1981 single_thread: supports_single_thread_execution_requests,
1982 },
1983 };
1984
1985 self.thread_states.process_step(thread_id);
1986 self.request(
1987 command,
1988 Self::on_step_response::<StepOutCommand>(thread_id),
1989 cx,
1990 )
1991 .detach();
1992 }
1993
1994 pub fn step_back(
1995 &mut self,
1996 thread_id: ThreadId,
1997 granularity: SteppingGranularity,
1998 cx: &mut Context<Self>,
1999 ) {
2000 let supports_single_thread_execution_requests =
2001 self.capabilities.supports_single_thread_execution_requests;
2002 let supports_stepping_granularity = self
2003 .capabilities
2004 .supports_stepping_granularity
2005 .unwrap_or_default();
2006
2007 let command = StepBackCommand {
2008 inner: StepCommand {
2009 thread_id: thread_id.0,
2010 granularity: supports_stepping_granularity.then(|| granularity),
2011 single_thread: supports_single_thread_execution_requests,
2012 },
2013 };
2014
2015 self.thread_states.process_step(thread_id);
2016
2017 self.request(
2018 command,
2019 Self::on_step_response::<StepBackCommand>(thread_id),
2020 cx,
2021 )
2022 .detach();
2023 }
2024
2025 pub fn stack_frames(
2026 &mut self,
2027 thread_id: ThreadId,
2028 cx: &mut Context<Self>,
2029 ) -> Result<Vec<StackFrame>> {
2030 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2031 && self.requests.contains_key(&ThreadsCommand.type_id())
2032 && self.threads.contains_key(&thread_id)
2033 // ^ todo(debugger): We need a better way to check that we're not querying stale data
2034 // We could still be using an old thread id and have sent a new thread's request
2035 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2036 // But it very well could cause a minor bug in the future that is hard to track down
2037 {
2038 self.fetch(
2039 super::dap_command::StackTraceCommand {
2040 thread_id: thread_id.0,
2041 start_frame: None,
2042 levels: None,
2043 },
2044 move |this, stack_frames, cx| {
2045 let entry =
2046 this.threads
2047 .entry(thread_id)
2048 .and_modify(|thread| match &stack_frames {
2049 Ok(stack_frames) => {
2050 thread.stack_frames = stack_frames
2051 .iter()
2052 .cloned()
2053 .map(StackFrame::from)
2054 .collect();
2055 thread.stack_frames_error = None;
2056 }
2057 Err(error) => {
2058 thread.stack_frames.clear();
2059 thread.stack_frames_error = Some(error.cloned());
2060 }
2061 });
2062 debug_assert!(
2063 matches!(entry, indexmap::map::Entry::Occupied(_)),
2064 "Sent request for thread_id that doesn't exist"
2065 );
2066 if let Ok(stack_frames) = stack_frames {
2067 this.stack_frames.extend(
2068 stack_frames
2069 .into_iter()
2070 .filter(|frame| {
2071 // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2072 // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2073 !(frame.id == 0
2074 && frame.line == 0
2075 && frame.column == 0
2076 && frame.presentation_hint
2077 == Some(StackFramePresentationHint::Label))
2078 })
2079 .map(|frame| (frame.id, StackFrame::from(frame))),
2080 );
2081 }
2082
2083 this.invalidate_command_type::<ScopesCommand>();
2084 this.invalidate_command_type::<VariablesCommand>();
2085
2086 cx.emit(SessionEvent::StackTrace);
2087 },
2088 cx,
2089 );
2090 }
2091
2092 match self.threads.get(&thread_id) {
2093 Some(thread) => {
2094 if let Some(error) = &thread.stack_frames_error {
2095 Err(error.cloned())
2096 } else {
2097 Ok(thread.stack_frames.clone())
2098 }
2099 }
2100 None => Ok(Vec::new()),
2101 }
2102 }
2103
2104 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2105 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2106 && self
2107 .requests
2108 .contains_key(&TypeId::of::<StackTraceCommand>())
2109 {
2110 self.fetch(
2111 ScopesCommand { stack_frame_id },
2112 move |this, scopes, cx| {
2113 let Some(scopes) = scopes.log_err() else {
2114 return
2115 };
2116
2117 for scope in scopes.iter() {
2118 this.variables(scope.variables_reference, cx);
2119 }
2120
2121 let entry = this
2122 .stack_frames
2123 .entry(stack_frame_id)
2124 .and_modify(|stack_frame| {
2125 stack_frame.scopes = scopes;
2126 });
2127
2128 cx.emit(SessionEvent::Variables);
2129
2130 debug_assert!(
2131 matches!(entry, indexmap::map::Entry::Occupied(_)),
2132 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2133 );
2134 },
2135 cx,
2136 );
2137 }
2138
2139 self.stack_frames
2140 .get(&stack_frame_id)
2141 .map(|frame| frame.scopes.as_slice())
2142 .unwrap_or_default()
2143 }
2144
2145 pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
2146 let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2147 return Vec::new();
2148 };
2149
2150 stack_frame
2151 .scopes
2152 .iter()
2153 .filter_map(|scope| self.variables.get(&scope.variables_reference))
2154 .flatten()
2155 .cloned()
2156 .collect()
2157 }
2158
2159 pub fn variables(
2160 &mut self,
2161 variables_reference: VariableReference,
2162 cx: &mut Context<Self>,
2163 ) -> Vec<dap::Variable> {
2164 let command = VariablesCommand {
2165 variables_reference,
2166 filter: None,
2167 start: None,
2168 count: None,
2169 format: None,
2170 };
2171
2172 self.fetch(
2173 command,
2174 move |this, variables, cx| {
2175 let Some(variables) = variables.log_err() else {
2176 return;
2177 };
2178
2179 this.variables.insert(variables_reference, variables);
2180
2181 cx.emit(SessionEvent::Variables);
2182 cx.emit(SessionEvent::InvalidateInlineValue);
2183 },
2184 cx,
2185 );
2186
2187 self.variables
2188 .get(&variables_reference)
2189 .cloned()
2190 .unwrap_or_default()
2191 }
2192
2193 pub fn set_variable_value(
2194 &mut self,
2195 variables_reference: u64,
2196 name: String,
2197 value: String,
2198 cx: &mut Context<Self>,
2199 ) {
2200 if self.capabilities.supports_set_variable.unwrap_or_default() {
2201 self.request(
2202 SetVariableValueCommand {
2203 name,
2204 value,
2205 variables_reference,
2206 },
2207 move |this, response, cx| {
2208 let response = response.log_err()?;
2209 this.invalidate_command_type::<VariablesCommand>();
2210 cx.notify();
2211 Some(response)
2212 },
2213 cx,
2214 )
2215 .detach()
2216 }
2217 }
2218
2219 pub fn evaluate(
2220 &mut self,
2221 expression: String,
2222 context: Option<EvaluateArgumentsContext>,
2223 frame_id: Option<u64>,
2224 source: Option<Source>,
2225 cx: &mut Context<Self>,
2226 ) -> Task<()> {
2227 let event = dap::OutputEvent {
2228 category: None,
2229 output: format!("> {expression}"),
2230 group: None,
2231 variables_reference: None,
2232 source: None,
2233 line: None,
2234 column: None,
2235 data: None,
2236 location_reference: None,
2237 };
2238 self.push_output(event, cx);
2239 let request = self.mode.request_dap(EvaluateCommand {
2240 expression,
2241 context,
2242 frame_id,
2243 source,
2244 });
2245 cx.spawn(async move |this, cx| {
2246 let response = request.await;
2247 this.update(cx, |this, cx| {
2248 match response {
2249 Ok(response) => {
2250 let event = dap::OutputEvent {
2251 category: None,
2252 output: format!("< {}", &response.result),
2253 group: None,
2254 variables_reference: Some(response.variables_reference),
2255 source: None,
2256 line: None,
2257 column: None,
2258 data: None,
2259 location_reference: None,
2260 };
2261 this.push_output(event, cx);
2262 }
2263 Err(e) => {
2264 let event = dap::OutputEvent {
2265 category: None,
2266 output: format!("{}", e),
2267 group: None,
2268 variables_reference: None,
2269 source: None,
2270 line: None,
2271 column: None,
2272 data: None,
2273 location_reference: None,
2274 };
2275 this.push_output(event, cx);
2276 }
2277 };
2278 this.invalidate_command_type::<ScopesCommand>();
2279 cx.notify();
2280 })
2281 .ok();
2282 })
2283 }
2284
2285 pub fn location(
2286 &mut self,
2287 reference: u64,
2288 cx: &mut Context<Self>,
2289 ) -> Option<dap::LocationsResponse> {
2290 self.fetch(
2291 LocationsCommand { reference },
2292 move |this, response, _| {
2293 let Some(response) = response.log_err() else {
2294 return;
2295 };
2296 this.locations.insert(reference, response);
2297 },
2298 cx,
2299 );
2300 self.locations.get(&reference).cloned()
2301 }
2302
2303 pub fn is_attached(&self) -> bool {
2304 let Mode::Running(local_mode) = &self.mode else {
2305 return false;
2306 };
2307 local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2308 }
2309
2310 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2311 let command = DisconnectCommand {
2312 restart: Some(false),
2313 terminate_debuggee: Some(false),
2314 suspend_debuggee: Some(false),
2315 };
2316
2317 self.request(command, Self::empty_response, cx).detach()
2318 }
2319
2320 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2321 if self
2322 .capabilities
2323 .supports_terminate_threads_request
2324 .unwrap_or_default()
2325 {
2326 self.request(
2327 TerminateThreadsCommand {
2328 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2329 },
2330 Self::clear_active_debug_line_response,
2331 cx,
2332 )
2333 .detach();
2334 } else {
2335 self.shutdown(cx).detach();
2336 }
2337 }
2338
2339 pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2340 self.thread_states.thread_state(thread_id)
2341 }
2342}