1use crate::debugger::breakpoint_store::BreakpointSessionState;
2
3use super::breakpoint_store::{
4 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
5};
6use super::dap_command::{
7 self, Attach, ConfigurationDone, ContinueCommand, DapCommand, DisconnectCommand,
8 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
9 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
10 ScopesCommand, SetExceptionBreakpoints, SetVariableValueCommand, StackTraceCommand,
11 StepBackCommand, StepCommand, StepInCommand, StepOutCommand, TerminateCommand,
12 TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
13};
14use super::dap_store::DapStore;
15use anyhow::{Context as _, Result, anyhow};
16use collections::{HashMap, HashSet, IndexMap, IndexSet};
17use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
18use dap::messages::Response;
19use dap::requests::{Request, RunInTerminal, StartDebugging};
20use dap::{
21 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
22 SteppingGranularity, StoppedEvent, VariableReference,
23 client::{DebugAdapterClient, SessionId},
24 messages::{Events, Message},
25};
26use dap::{
27 ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
28 RunInTerminalRequestArguments, StartDebuggingRequestArguments,
29};
30use futures::channel::{mpsc, oneshot};
31use futures::{FutureExt, future::Shared};
32use gpui::{
33 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
34 Task, WeakEntity,
35};
36
37use serde_json::Value;
38use smol::stream::StreamExt;
39use std::any::TypeId;
40use std::collections::BTreeMap;
41use std::u64;
42use std::{
43 any::Any,
44 collections::hash_map::Entry,
45 hash::{Hash, Hasher},
46 path::Path,
47 sync::Arc,
48};
49use text::{PointUtf16, ToPointUtf16};
50use util::ResultExt;
51use worktree::Worktree;
52
53#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
54#[repr(transparent)]
55pub struct ThreadId(pub u64);
56
57impl ThreadId {
58 pub const MIN: ThreadId = ThreadId(u64::MIN);
59 pub const MAX: ThreadId = ThreadId(u64::MAX);
60}
61
62impl From<u64> for ThreadId {
63 fn from(id: u64) -> Self {
64 Self(id)
65 }
66}
67
68#[derive(Clone, Debug)]
69pub struct StackFrame {
70 pub dap: dap::StackFrame,
71 pub scopes: Vec<dap::Scope>,
72}
73
74impl From<dap::StackFrame> for StackFrame {
75 fn from(stack_frame: dap::StackFrame) -> Self {
76 Self {
77 scopes: vec![],
78 dap: stack_frame,
79 }
80 }
81}
82
83#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
84pub enum ThreadStatus {
85 #[default]
86 Running,
87 Stopped,
88 Stepping,
89 Exited,
90 Ended,
91}
92
93impl ThreadStatus {
94 pub fn label(&self) -> &'static str {
95 match self {
96 ThreadStatus::Running => "Running",
97 ThreadStatus::Stopped => "Stopped",
98 ThreadStatus::Stepping => "Stepping",
99 ThreadStatus::Exited => "Exited",
100 ThreadStatus::Ended => "Ended",
101 }
102 }
103}
104
105#[derive(Debug)]
106pub struct Thread {
107 dap: dap::Thread,
108 stack_frame_ids: IndexSet<StackFrameId>,
109 _has_stopped: bool,
110}
111
112impl From<dap::Thread> for Thread {
113 fn from(dap: dap::Thread) -> Self {
114 Self {
115 dap,
116 stack_frame_ids: Default::default(),
117 _has_stopped: false,
118 }
119 }
120}
121
122pub enum Mode {
123 Building,
124 Running(RunningMode),
125}
126
127#[derive(Clone)]
128pub struct RunningMode {
129 client: Arc<DebugAdapterClient>,
130 binary: DebugAdapterBinary,
131 tmp_breakpoint: Option<SourceBreakpoint>,
132 worktree: WeakEntity<Worktree>,
133 executor: BackgroundExecutor,
134 is_started: bool,
135}
136
137fn client_source(abs_path: &Path) -> dap::Source {
138 dap::Source {
139 name: abs_path
140 .file_name()
141 .map(|filename| filename.to_string_lossy().to_string()),
142 path: Some(abs_path.to_string_lossy().to_string()),
143 source_reference: None,
144 presentation_hint: None,
145 origin: None,
146 sources: None,
147 adapter_data: None,
148 checksums: None,
149 }
150}
151
152impl RunningMode {
153 async fn new(
154 session_id: SessionId,
155 parent_session: Option<Entity<Session>>,
156 worktree: WeakEntity<Worktree>,
157 binary: DebugAdapterBinary,
158 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
159 cx: AsyncApp,
160 ) -> Result<Self> {
161 let message_handler = Box::new(move |message| {
162 messages_tx.unbounded_send(message).ok();
163 });
164
165 let client = Arc::new(
166 if let Some(client) = parent_session
167 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
168 .flatten()
169 {
170 client
171 .reconnect(session_id, binary.clone(), message_handler, cx.clone())
172 .await?
173 } else {
174 DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx.clone())
175 .await?
176 },
177 );
178
179 Ok(Self {
180 client,
181 worktree,
182 tmp_breakpoint: None,
183 binary,
184 executor: cx.background_executor().clone(),
185 is_started: false,
186 })
187 }
188
189 pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
190 &self.worktree
191 }
192
193 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
194 let tasks: Vec<_> = paths
195 .into_iter()
196 .map(|path| {
197 self.request(dap_command::SetBreakpoints {
198 source: client_source(path),
199 source_modified: None,
200 breakpoints: vec![],
201 })
202 })
203 .collect();
204
205 cx.background_spawn(async move {
206 futures::future::join_all(tasks)
207 .await
208 .iter()
209 .for_each(|res| match res {
210 Ok(_) => {}
211 Err(err) => {
212 log::warn!("Set breakpoints request failed: {}", err);
213 }
214 });
215 })
216 }
217
218 fn send_breakpoints_from_path(
219 &self,
220 abs_path: Arc<Path>,
221 reason: BreakpointUpdatedReason,
222 breakpoint_store: &Entity<BreakpointStore>,
223 cx: &mut App,
224 ) -> Task<()> {
225 let breakpoints =
226 breakpoint_store
227 .read(cx)
228 .source_breakpoints_from_path(&abs_path, cx)
229 .into_iter()
230 .filter(|bp| bp.state.is_enabled())
231 .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
232 breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
233 }))
234 .map(Into::into)
235 .collect();
236
237 let raw_breakpoints = breakpoint_store
238 .read(cx)
239 .breakpoints_from_path(&abs_path)
240 .into_iter()
241 .filter(|bp| bp.bp.state.is_enabled())
242 .collect::<Vec<_>>();
243
244 let task = self.request(dap_command::SetBreakpoints {
245 source: client_source(&abs_path),
246 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
247 breakpoints,
248 });
249 let session_id = self.client.id();
250 let breakpoint_store = breakpoint_store.downgrade();
251 cx.spawn(async move |cx| match cx.background_spawn(task).await {
252 Ok(breakpoints) => {
253 let breakpoints =
254 breakpoints
255 .into_iter()
256 .zip(raw_breakpoints)
257 .filter_map(|(dap_bp, zed_bp)| {
258 Some((
259 zed_bp,
260 BreakpointSessionState {
261 id: dap_bp.id?,
262 verified: dap_bp.verified,
263 },
264 ))
265 });
266 breakpoint_store
267 .update(cx, |this, _| {
268 this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
269 })
270 .ok();
271 }
272 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
273 })
274 }
275
276 fn send_exception_breakpoints(
277 &self,
278 filters: Vec<ExceptionBreakpointsFilter>,
279 supports_filter_options: bool,
280 ) -> Task<Result<Vec<dap::Breakpoint>>> {
281 let arg = if supports_filter_options {
282 SetExceptionBreakpoints::WithOptions {
283 filters: filters
284 .into_iter()
285 .map(|filter| ExceptionFilterOptions {
286 filter_id: filter.filter,
287 condition: None,
288 mode: None,
289 })
290 .collect(),
291 }
292 } else {
293 SetExceptionBreakpoints::Plain {
294 filters: filters.into_iter().map(|filter| filter.filter).collect(),
295 }
296 };
297 self.request(arg)
298 }
299
300 fn send_source_breakpoints(
301 &self,
302 ignore_breakpoints: bool,
303 breakpoint_store: &Entity<BreakpointStore>,
304 cx: &App,
305 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
306 let mut breakpoint_tasks = Vec::new();
307 let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
308 let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
309 debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
310 let session_id = self.client.id();
311 for (path, breakpoints) in breakpoints {
312 let breakpoints = if ignore_breakpoints {
313 vec![]
314 } else {
315 breakpoints
316 .into_iter()
317 .filter(|bp| bp.state.is_enabled())
318 .map(Into::into)
319 .collect()
320 };
321
322 let raw_breakpoints = raw_breakpoints
323 .remove(&path)
324 .unwrap_or_default()
325 .into_iter()
326 .filter(|bp| bp.bp.state.is_enabled());
327 let error_path = path.clone();
328 let send_request = self
329 .request(dap_command::SetBreakpoints {
330 source: client_source(&path),
331 source_modified: Some(false),
332 breakpoints,
333 })
334 .map(|result| result.map_err(move |e| (error_path, e)));
335
336 let task = cx.spawn({
337 let breakpoint_store = breakpoint_store.downgrade();
338 async move |cx| {
339 let breakpoints = cx.background_spawn(send_request).await?;
340
341 let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
342 |(dap_bp, zed_bp)| {
343 Some((
344 zed_bp,
345 BreakpointSessionState {
346 id: dap_bp.id?,
347 verified: dap_bp.verified,
348 },
349 ))
350 },
351 );
352 breakpoint_store
353 .update(cx, |this, _| {
354 this.mark_breakpoints_verified(session_id, &path, breakpoints);
355 })
356 .ok();
357
358 Ok(())
359 }
360 });
361 breakpoint_tasks.push(task);
362 }
363
364 cx.background_spawn(async move {
365 futures::future::join_all(breakpoint_tasks)
366 .await
367 .into_iter()
368 .filter_map(Result::err)
369 .collect::<HashMap<_, _>>()
370 })
371 }
372
373 fn initialize_sequence(
374 &self,
375 capabilities: &Capabilities,
376 initialized_rx: oneshot::Receiver<()>,
377 dap_store: WeakEntity<DapStore>,
378 cx: &mut Context<Session>,
379 ) -> Task<Result<()>> {
380 let raw = self.binary.request_args.clone();
381
382 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
383 let launch = match raw.request {
384 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
385 raw: raw.configuration,
386 }),
387 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
388 raw: raw.configuration,
389 }),
390 };
391
392 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
393 let exception_filters = capabilities
394 .exception_breakpoint_filters
395 .as_ref()
396 .map(|exception_filters| {
397 exception_filters
398 .iter()
399 .filter(|filter| filter.default == Some(true))
400 .cloned()
401 .collect::<Vec<_>>()
402 })
403 .unwrap_or_default();
404 let supports_exception_filters = capabilities
405 .supports_exception_filter_options
406 .unwrap_or_default();
407 let this = self.clone();
408 let worktree = self.worktree().clone();
409 let configuration_sequence = cx.spawn({
410 async move |_, cx| {
411 let breakpoint_store =
412 dap_store.read_with(cx, |dap_store, _| dap_store.breakpoint_store().clone())?;
413 initialized_rx.await?;
414 let errors_by_path = cx
415 .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
416 .await;
417
418 dap_store.update(cx, |_, cx| {
419 let Some(worktree) = worktree.upgrade() else {
420 return;
421 };
422
423 for (path, error) in &errors_by_path {
424 log::error!("failed to set breakpoints for {path:?}: {error}");
425 }
426
427 if let Some(failed_path) = errors_by_path.keys().next() {
428 let failed_path = failed_path
429 .strip_prefix(worktree.read(cx).abs_path())
430 .unwrap_or(failed_path)
431 .display();
432 let message = format!(
433 "Failed to set breakpoints for {failed_path}{}",
434 match errors_by_path.len() {
435 0 => unreachable!(),
436 1 => "".into(),
437 2 => " and 1 other path".into(),
438 n => format!(" and {} other paths", n - 1),
439 }
440 );
441 cx.emit(super::dap_store::DapStoreEvent::Notification(message));
442 }
443 })?;
444
445 this.send_exception_breakpoints(exception_filters, supports_exception_filters)
446 .await
447 .ok();
448 let ret = if configuration_done_supported {
449 this.request(ConfigurationDone {})
450 } else {
451 Task::ready(Ok(()))
452 }
453 .await;
454 ret
455 }
456 });
457
458 let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
459
460 cx.spawn(async move |this, cx| {
461 task.await?;
462
463 this.update(cx, |this, cx| {
464 if let Some(this) = this.as_running_mut() {
465 this.is_started = true;
466 cx.notify();
467 }
468 })
469 .ok();
470
471 anyhow::Ok(())
472 })
473 }
474
475 fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
476 where
477 <R::DapRequest as dap::requests::Request>::Response: 'static,
478 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
479 {
480 let request = Arc::new(request);
481
482 let request_clone = request.clone();
483 let connection = self.client.clone();
484 self.executor.spawn(async move {
485 let args = request_clone.to_dap();
486 let response = connection.request::<R::DapRequest>(args).await?;
487 request.response_from_dap(response)
488 })
489 }
490}
491
492impl Mode {
493 pub(super) fn request_dap<R: DapCommand>(&self, request: R) -> Task<Result<R::Response>>
494 where
495 <R::DapRequest as dap::requests::Request>::Response: 'static,
496 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
497 {
498 match self {
499 Mode::Running(debug_adapter_client) => debug_adapter_client.request(request),
500 Mode::Building => Task::ready(Err(anyhow!(
501 "no adapter running to send request: {request:?}"
502 ))),
503 }
504 }
505}
506
507#[derive(Default)]
508struct ThreadStates {
509 global_state: Option<ThreadStatus>,
510 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
511}
512
513impl ThreadStates {
514 fn stop_all_threads(&mut self) {
515 self.global_state = Some(ThreadStatus::Stopped);
516 self.known_thread_states.clear();
517 }
518
519 fn exit_all_threads(&mut self) {
520 self.global_state = Some(ThreadStatus::Exited);
521 self.known_thread_states.clear();
522 }
523
524 fn continue_all_threads(&mut self) {
525 self.global_state = Some(ThreadStatus::Running);
526 self.known_thread_states.clear();
527 }
528
529 fn stop_thread(&mut self, thread_id: ThreadId) {
530 self.known_thread_states
531 .insert(thread_id, ThreadStatus::Stopped);
532 }
533
534 fn continue_thread(&mut self, thread_id: ThreadId) {
535 self.known_thread_states
536 .insert(thread_id, ThreadStatus::Running);
537 }
538
539 fn process_step(&mut self, thread_id: ThreadId) {
540 self.known_thread_states
541 .insert(thread_id, ThreadStatus::Stepping);
542 }
543
544 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
545 self.thread_state(thread_id)
546 .unwrap_or(ThreadStatus::Running)
547 }
548
549 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
550 self.known_thread_states
551 .get(&thread_id)
552 .copied()
553 .or(self.global_state)
554 }
555
556 fn exit_thread(&mut self, thread_id: ThreadId) {
557 self.known_thread_states
558 .insert(thread_id, ThreadStatus::Exited);
559 }
560
561 fn any_stopped_thread(&self) -> bool {
562 self.global_state
563 .is_some_and(|state| state == ThreadStatus::Stopped)
564 || self
565 .known_thread_states
566 .values()
567 .any(|status| *status == ThreadStatus::Stopped)
568 }
569}
570const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
571
572type IsEnabled = bool;
573
574#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
575pub struct OutputToken(pub usize);
576/// Represents a current state of a single debug adapter and provides ways to mutate it.
577pub struct Session {
578 pub mode: Mode,
579 id: SessionId,
580 label: SharedString,
581 adapter: DebugAdapterName,
582 pub(super) capabilities: Capabilities,
583 child_session_ids: HashSet<SessionId>,
584 parent_session: Option<Entity<Session>>,
585 modules: Vec<dap::Module>,
586 loaded_sources: Vec<dap::Source>,
587 output_token: OutputToken,
588 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
589 threads: IndexMap<ThreadId, Thread>,
590 thread_states: ThreadStates,
591 variables: HashMap<VariableReference, Vec<dap::Variable>>,
592 stack_frames: IndexMap<StackFrameId, StackFrame>,
593 locations: HashMap<u64, dap::LocationsResponse>,
594 is_session_terminated: bool,
595 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
596 pub(crate) breakpoint_store: Entity<BreakpointStore>,
597 ignore_breakpoints: bool,
598 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
599 background_tasks: Vec<Task<()>>,
600}
601
602trait CacheableCommand: Any + Send + Sync {
603 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
604 fn dyn_hash(&self, hasher: &mut dyn Hasher);
605 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
606}
607
608impl<T> CacheableCommand for T
609where
610 T: DapCommand + PartialEq + Eq + Hash,
611{
612 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
613 (rhs as &dyn Any)
614 .downcast_ref::<Self>()
615 .map_or(false, |rhs| self == rhs)
616 }
617
618 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
619 T::hash(self, &mut hasher);
620 }
621
622 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
623 self
624 }
625}
626
627pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
628
629impl<T: DapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
630 fn from(request: T) -> Self {
631 Self(Arc::new(request))
632 }
633}
634
635impl PartialEq for RequestSlot {
636 fn eq(&self, other: &Self) -> bool {
637 self.0.dyn_eq(other.0.as_ref())
638 }
639}
640
641impl Eq for RequestSlot {}
642
643impl Hash for RequestSlot {
644 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
645 self.0.dyn_hash(state);
646 (&*self.0 as &dyn Any).type_id().hash(state)
647 }
648}
649
650#[derive(Debug, Clone, Hash, PartialEq, Eq)]
651pub struct CompletionsQuery {
652 pub query: String,
653 pub column: u64,
654 pub line: Option<u64>,
655 pub frame_id: Option<u64>,
656}
657
658impl CompletionsQuery {
659 pub fn new(
660 buffer: &language::Buffer,
661 cursor_position: language::Anchor,
662 frame_id: Option<u64>,
663 ) -> Self {
664 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
665 Self {
666 query: buffer.text(),
667 column: column as u64,
668 frame_id,
669 line: Some(row as u64),
670 }
671 }
672}
673
674#[derive(Debug)]
675pub enum SessionEvent {
676 Modules,
677 LoadedSources,
678 Stopped(Option<ThreadId>),
679 StackTrace,
680 Variables,
681 Threads,
682 InvalidateInlineValue,
683 CapabilitiesLoaded,
684 RunInTerminal {
685 request: RunInTerminalRequestArguments,
686 sender: mpsc::Sender<Result<u32>>,
687 },
688 ConsoleOutput,
689}
690
691#[derive(Clone, Debug, PartialEq, Eq)]
692pub enum SessionStateEvent {
693 Running,
694 Shutdown,
695 Restart,
696 SpawnChildSession {
697 request: StartDebuggingRequestArguments,
698 },
699}
700
701impl EventEmitter<SessionEvent> for Session {}
702impl EventEmitter<SessionStateEvent> for Session {}
703
704// local session will send breakpoint updates to DAP for all new breakpoints
705// remote side will only send breakpoint updates when it is a breakpoint created by that peer
706// BreakpointStore notifies session on breakpoint changes
707impl Session {
708 pub(crate) fn new(
709 breakpoint_store: Entity<BreakpointStore>,
710 session_id: SessionId,
711 parent_session: Option<Entity<Session>>,
712 label: SharedString,
713 adapter: DebugAdapterName,
714 cx: &mut App,
715 ) -> Entity<Self> {
716 cx.new::<Self>(|cx| {
717 cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
718 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
719 if let Some(local) = (!this.ignore_breakpoints)
720 .then(|| this.as_running_mut())
721 .flatten()
722 {
723 local
724 .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
725 .detach();
726 };
727 }
728 BreakpointStoreEvent::BreakpointsCleared(paths) => {
729 if let Some(local) = (!this.ignore_breakpoints)
730 .then(|| this.as_running_mut())
731 .flatten()
732 {
733 local.unset_breakpoints_from_paths(paths, cx).detach();
734 }
735 }
736 BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
737 })
738 .detach();
739 cx.on_app_quit(Self::on_app_quit).detach();
740
741 let this = Self {
742 mode: Mode::Building,
743 id: session_id,
744 child_session_ids: HashSet::default(),
745 parent_session,
746 capabilities: Capabilities::default(),
747 variables: Default::default(),
748 stack_frames: Default::default(),
749 thread_states: ThreadStates::default(),
750 output_token: OutputToken(0),
751 output: circular_buffer::CircularBuffer::boxed(),
752 requests: HashMap::default(),
753 modules: Vec::default(),
754 loaded_sources: Vec::default(),
755 threads: IndexMap::default(),
756 background_tasks: Vec::default(),
757 locations: Default::default(),
758 is_session_terminated: false,
759 ignore_breakpoints: false,
760 breakpoint_store,
761 exception_breakpoints: Default::default(),
762 label,
763 adapter,
764 };
765
766 this
767 })
768 }
769
770 pub fn worktree(&self) -> Option<Entity<Worktree>> {
771 match &self.mode {
772 Mode::Building => None,
773 Mode::Running(local_mode) => local_mode.worktree.upgrade(),
774 }
775 }
776
777 pub fn boot(
778 &mut self,
779 binary: DebugAdapterBinary,
780 worktree: Entity<Worktree>,
781 dap_store: WeakEntity<DapStore>,
782 cx: &mut Context<Self>,
783 ) -> Task<Result<()>> {
784 let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
785 let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
786
787 let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
788 let mut initialized_tx = Some(initialized_tx);
789 while let Some(message) = message_rx.next().await {
790 if let Message::Event(event) = message {
791 if let Events::Initialized(_) = *event {
792 if let Some(tx) = initialized_tx.take() {
793 tx.send(()).ok();
794 }
795 } else {
796 let Ok(_) = this.update(cx, |session, cx| {
797 session.handle_dap_event(event, cx);
798 }) else {
799 break;
800 };
801 }
802 } else if let Message::Request(request) = message {
803 let Ok(_) = this.update(cx, |this, cx| {
804 if request.command == StartDebugging::COMMAND {
805 this.handle_start_debugging_request(request, cx)
806 .detach_and_log_err(cx);
807 } else if request.command == RunInTerminal::COMMAND {
808 this.handle_run_in_terminal_request(request, cx)
809 .detach_and_log_err(cx);
810 }
811 }) else {
812 break;
813 };
814 }
815 }
816 })];
817 self.background_tasks = background_tasks;
818 let id = self.id;
819 let parent_session = self.parent_session.clone();
820
821 cx.spawn(async move |this, cx| {
822 let mode = RunningMode::new(
823 id,
824 parent_session,
825 worktree.downgrade(),
826 binary,
827 message_tx,
828 cx.clone(),
829 )
830 .await?;
831 this.update(cx, |this, cx| {
832 this.mode = Mode::Running(mode);
833 cx.emit(SessionStateEvent::Running);
834 })?;
835
836 this.update(cx, |session, cx| session.request_initialize(cx))?
837 .await?;
838
839 this.update(cx, |session, cx| {
840 session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
841 })?
842 .await
843 })
844 }
845
846 pub fn session_id(&self) -> SessionId {
847 self.id
848 }
849
850 pub fn child_session_ids(&self) -> HashSet<SessionId> {
851 self.child_session_ids.clone()
852 }
853
854 pub fn add_child_session_id(&mut self, session_id: SessionId) {
855 self.child_session_ids.insert(session_id);
856 }
857
858 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
859 self.child_session_ids.remove(&session_id);
860 }
861
862 pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
863 self.parent_session
864 .as_ref()
865 .map(|session| session.read(cx).id)
866 }
867
868 pub fn parent_session(&self) -> Option<&Entity<Self>> {
869 self.parent_session.as_ref()
870 }
871
872 pub fn capabilities(&self) -> &Capabilities {
873 &self.capabilities
874 }
875
876 pub fn binary(&self) -> &DebugAdapterBinary {
877 let Mode::Running(local_mode) = &self.mode else {
878 panic!("Session is not running");
879 };
880 &local_mode.binary
881 }
882
883 pub fn adapter(&self) -> DebugAdapterName {
884 self.adapter.clone()
885 }
886
887 pub fn label(&self) -> SharedString {
888 self.label.clone()
889 }
890
891 pub fn is_terminated(&self) -> bool {
892 self.is_session_terminated
893 }
894
895 pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
896 let (tx, mut rx) = mpsc::unbounded();
897
898 cx.spawn(async move |this, cx| {
899 while let Some(output) = rx.next().await {
900 this.update(cx, |this, cx| {
901 let event = dap::OutputEvent {
902 category: None,
903 output,
904 group: None,
905 variables_reference: None,
906 source: None,
907 line: None,
908 column: None,
909 data: None,
910 location_reference: None,
911 };
912 this.push_output(event, cx);
913 })?;
914 }
915 anyhow::Ok(())
916 })
917 .detach();
918
919 return tx;
920 }
921
922 pub fn is_started(&self) -> bool {
923 match &self.mode {
924 Mode::Building => false,
925 Mode::Running(running) => running.is_started,
926 }
927 }
928
929 pub fn is_building(&self) -> bool {
930 matches!(self.mode, Mode::Building)
931 }
932
933 pub fn is_running(&self) -> bool {
934 matches!(self.mode, Mode::Running(_))
935 }
936
937 pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
938 match &mut self.mode {
939 Mode::Running(local_mode) => Some(local_mode),
940 Mode::Building => None,
941 }
942 }
943
944 pub fn as_running(&self) -> Option<&RunningMode> {
945 match &self.mode {
946 Mode::Running(local_mode) => Some(local_mode),
947 Mode::Building => None,
948 }
949 }
950
951 fn handle_start_debugging_request(
952 &mut self,
953 request: dap::messages::Request,
954 cx: &mut Context<Self>,
955 ) -> Task<Result<()>> {
956 let request_seq = request.seq;
957
958 let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
959 .arguments
960 .as_ref()
961 .map(|value| serde_json::from_value(value.clone()));
962
963 let mut success = true;
964 if let Some(Ok(request)) = launch_request {
965 cx.emit(SessionStateEvent::SpawnChildSession { request });
966 } else {
967 log::error!(
968 "Failed to parse launch request arguments: {:?}",
969 request.arguments
970 );
971 success = false;
972 }
973
974 cx.spawn(async move |this, cx| {
975 this.update(cx, |this, cx| {
976 this.respond_to_client(
977 request_seq,
978 success,
979 StartDebugging::COMMAND.to_string(),
980 None,
981 cx,
982 )
983 })?
984 .await
985 })
986 }
987
988 fn handle_run_in_terminal_request(
989 &mut self,
990 request: dap::messages::Request,
991 cx: &mut Context<Self>,
992 ) -> Task<Result<()>> {
993 let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
994 request.arguments.unwrap_or_default(),
995 )
996 .expect("To parse StartDebuggingRequestArguments");
997
998 let seq = request.seq;
999
1000 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1001 cx.emit(SessionEvent::RunInTerminal {
1002 request: request_args,
1003 sender: tx,
1004 });
1005 cx.notify();
1006
1007 cx.spawn(async move |session, cx| {
1008 let result = util::maybe!(async move {
1009 rx.next().await.ok_or_else(|| {
1010 anyhow!("failed to receive response from spawn terminal".to_string())
1011 })?
1012 })
1013 .await;
1014 let (success, body) = match result {
1015 Ok(pid) => (
1016 true,
1017 serde_json::to_value(dap::RunInTerminalResponse {
1018 process_id: None,
1019 shell_process_id: Some(pid as u64),
1020 })
1021 .ok(),
1022 ),
1023 Err(error) => (
1024 false,
1025 serde_json::to_value(dap::ErrorResponse {
1026 error: Some(dap::Message {
1027 id: seq,
1028 format: error.to_string(),
1029 variables: None,
1030 send_telemetry: None,
1031 show_user: None,
1032 url: None,
1033 url_label: None,
1034 }),
1035 })
1036 .ok(),
1037 ),
1038 };
1039
1040 session
1041 .update(cx, |session, cx| {
1042 session.respond_to_client(
1043 seq,
1044 success,
1045 RunInTerminal::COMMAND.to_string(),
1046 body,
1047 cx,
1048 )
1049 })?
1050 .await
1051 })
1052 }
1053
1054 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1055 let adapter_id = self.adapter().to_string();
1056 let request = Initialize { adapter_id };
1057 match &self.mode {
1058 Mode::Running(local_mode) => {
1059 let capabilities = local_mode.request(request);
1060
1061 cx.spawn(async move |this, cx| {
1062 let capabilities = capabilities.await?;
1063 this.update(cx, |session, cx| {
1064 session.capabilities = capabilities;
1065 let filters = session
1066 .capabilities
1067 .exception_breakpoint_filters
1068 .clone()
1069 .unwrap_or_default();
1070 for filter in filters {
1071 let default = filter.default.unwrap_or_default();
1072 session
1073 .exception_breakpoints
1074 .entry(filter.filter.clone())
1075 .or_insert_with(|| (filter, default));
1076 }
1077 cx.emit(SessionEvent::CapabilitiesLoaded);
1078 })?;
1079 Ok(())
1080 })
1081 }
1082 Mode::Building => Task::ready(Err(anyhow!(
1083 "Cannot send initialize request, task still building"
1084 ))),
1085 }
1086 }
1087
1088 pub(super) fn initialize_sequence(
1089 &mut self,
1090 initialize_rx: oneshot::Receiver<()>,
1091 dap_store: WeakEntity<DapStore>,
1092 cx: &mut Context<Self>,
1093 ) -> Task<Result<()>> {
1094 match &self.mode {
1095 Mode::Running(local_mode) => {
1096 local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1097 }
1098 Mode::Building => Task::ready(Err(anyhow!("cannot initialize, still building"))),
1099 }
1100 }
1101
1102 pub fn run_to_position(
1103 &mut self,
1104 breakpoint: SourceBreakpoint,
1105 active_thread_id: ThreadId,
1106 cx: &mut Context<Self>,
1107 ) {
1108 match &mut self.mode {
1109 Mode::Running(local_mode) => {
1110 if !matches!(
1111 self.thread_states.thread_state(active_thread_id),
1112 Some(ThreadStatus::Stopped)
1113 ) {
1114 return;
1115 };
1116 let path = breakpoint.path.clone();
1117 local_mode.tmp_breakpoint = Some(breakpoint);
1118 let task = local_mode.send_breakpoints_from_path(
1119 path,
1120 BreakpointUpdatedReason::Toggled,
1121 &self.breakpoint_store,
1122 cx,
1123 );
1124
1125 cx.spawn(async move |this, cx| {
1126 task.await;
1127 this.update(cx, |this, cx| {
1128 this.continue_thread(active_thread_id, cx);
1129 })
1130 })
1131 .detach();
1132 }
1133 Mode::Building => {}
1134 }
1135 }
1136
1137 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1138 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1139 }
1140
1141 pub fn output(
1142 &self,
1143 since: OutputToken,
1144 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1145 if self.output_token.0 == 0 {
1146 return (self.output.range(0..0), OutputToken(0));
1147 };
1148
1149 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1150
1151 let clamped_events_since = events_since.clamp(0, self.output.len());
1152 (
1153 self.output
1154 .range(self.output.len() - clamped_events_since..),
1155 self.output_token,
1156 )
1157 }
1158
1159 pub fn respond_to_client(
1160 &self,
1161 request_seq: u64,
1162 success: bool,
1163 command: String,
1164 body: Option<serde_json::Value>,
1165 cx: &mut Context<Self>,
1166 ) -> Task<Result<()>> {
1167 let Some(local_session) = self.as_running() else {
1168 unreachable!("Cannot respond to remote client");
1169 };
1170 let client = local_session.client.clone();
1171
1172 cx.background_spawn(async move {
1173 client
1174 .send_message(Message::Response(Response {
1175 body,
1176 success,
1177 command,
1178 seq: request_seq + 1,
1179 request_seq,
1180 message: None,
1181 }))
1182 .await
1183 })
1184 }
1185
1186 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1187 // todo(debugger): Find a clean way to get around the clone
1188 let breakpoint_store = self.breakpoint_store.clone();
1189 if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1190 let breakpoint = local.tmp_breakpoint.take()?;
1191 let path = breakpoint.path.clone();
1192 Some((local, path))
1193 }) {
1194 local
1195 .send_breakpoints_from_path(
1196 path,
1197 BreakpointUpdatedReason::Toggled,
1198 &breakpoint_store,
1199 cx,
1200 )
1201 .detach();
1202 };
1203
1204 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1205 self.thread_states.stop_all_threads();
1206
1207 self.invalidate_command_type::<StackTraceCommand>();
1208 }
1209
1210 // Event if we stopped all threads we still need to insert the thread_id
1211 // to our own data
1212 if let Some(thread_id) = event.thread_id {
1213 self.thread_states.stop_thread(ThreadId(thread_id));
1214
1215 self.invalidate_state(
1216 &StackTraceCommand {
1217 thread_id,
1218 start_frame: None,
1219 levels: None,
1220 }
1221 .into(),
1222 );
1223 }
1224
1225 self.invalidate_generic();
1226 self.threads.clear();
1227 self.variables.clear();
1228 cx.emit(SessionEvent::Stopped(
1229 event
1230 .thread_id
1231 .map(Into::into)
1232 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1233 ));
1234 cx.emit(SessionEvent::InvalidateInlineValue);
1235 cx.notify();
1236 }
1237
1238 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1239 match *event {
1240 Events::Initialized(_) => {
1241 debug_assert!(
1242 false,
1243 "Initialized event should have been handled in LocalMode"
1244 );
1245 }
1246 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1247 Events::Continued(event) => {
1248 if event.all_threads_continued.unwrap_or_default() {
1249 self.thread_states.continue_all_threads();
1250 self.breakpoint_store.update(cx, |store, cx| {
1251 store.remove_active_position(Some(self.session_id()), cx)
1252 });
1253 } else {
1254 self.thread_states
1255 .continue_thread(ThreadId(event.thread_id));
1256 }
1257 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1258 self.invalidate_generic();
1259 }
1260 Events::Exited(_event) => {
1261 self.clear_active_debug_line(cx);
1262 }
1263 Events::Terminated(_) => {
1264 self.shutdown(cx).detach();
1265 }
1266 Events::Thread(event) => {
1267 let thread_id = ThreadId(event.thread_id);
1268
1269 match event.reason {
1270 dap::ThreadEventReason::Started => {
1271 self.thread_states.continue_thread(thread_id);
1272 }
1273 dap::ThreadEventReason::Exited => {
1274 self.thread_states.exit_thread(thread_id);
1275 }
1276 reason => {
1277 log::error!("Unhandled thread event reason {:?}", reason);
1278 }
1279 }
1280 self.invalidate_state(&ThreadsCommand.into());
1281 cx.notify();
1282 }
1283 Events::Output(event) => {
1284 if event
1285 .category
1286 .as_ref()
1287 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1288 {
1289 return;
1290 }
1291
1292 self.push_output(event, cx);
1293 cx.notify();
1294 }
1295 Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1296 store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1297 }),
1298 Events::Module(event) => {
1299 match event.reason {
1300 dap::ModuleEventReason::New => {
1301 self.modules.push(event.module);
1302 }
1303 dap::ModuleEventReason::Changed => {
1304 if let Some(module) = self
1305 .modules
1306 .iter_mut()
1307 .find(|other| event.module.id == other.id)
1308 {
1309 *module = event.module;
1310 }
1311 }
1312 dap::ModuleEventReason::Removed => {
1313 self.modules.retain(|other| event.module.id != other.id);
1314 }
1315 }
1316
1317 // todo(debugger): We should only send the invalidate command to downstream clients.
1318 // self.invalidate_state(&ModulesCommand.into());
1319 }
1320 Events::LoadedSource(_) => {
1321 self.invalidate_state(&LoadedSourcesCommand.into());
1322 }
1323 Events::Capabilities(event) => {
1324 self.capabilities = self.capabilities.merge(event.capabilities);
1325 cx.notify();
1326 }
1327 Events::Memory(_) => {}
1328 Events::Process(_) => {}
1329 Events::ProgressEnd(_) => {}
1330 Events::ProgressStart(_) => {}
1331 Events::ProgressUpdate(_) => {}
1332 Events::Invalidated(_) => {}
1333 Events::Other(_) => {}
1334 }
1335 }
1336
1337 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1338 fn fetch<T: DapCommand + PartialEq + Eq + Hash>(
1339 &mut self,
1340 request: T,
1341 process_result: impl FnOnce(
1342 &mut Self,
1343 Result<T::Response>,
1344 &mut Context<Self>,
1345 ) -> Option<T::Response>
1346 + 'static,
1347 cx: &mut Context<Self>,
1348 ) {
1349 const {
1350 assert!(
1351 T::CACHEABLE,
1352 "Only requests marked as cacheable should invoke `fetch`"
1353 );
1354 }
1355
1356 if !self.thread_states.any_stopped_thread()
1357 && request.type_id() != TypeId::of::<ThreadsCommand>()
1358 || self.is_session_terminated
1359 {
1360 return;
1361 }
1362
1363 let request_map = self
1364 .requests
1365 .entry(std::any::TypeId::of::<T>())
1366 .or_default();
1367
1368 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1369 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1370
1371 let task = Self::request_inner::<Arc<T>>(
1372 &self.capabilities,
1373 &self.mode,
1374 command,
1375 process_result,
1376 cx,
1377 );
1378 let task = cx
1379 .background_executor()
1380 .spawn(async move {
1381 let _ = task.await?;
1382 Some(())
1383 })
1384 .shared();
1385
1386 vacant.insert(task);
1387 cx.notify();
1388 }
1389 }
1390
1391 pub async fn request2<T: DapCommand + PartialEq + Eq + Hash>(
1392 &self,
1393 request: T,
1394 ) -> Result<T::Response> {
1395 if !T::is_supported(&self.capabilities) {
1396 anyhow::bail!("DAP request {:?} is not supported", request);
1397 }
1398
1399 self.mode.request_dap(request).await
1400 }
1401
1402 fn request_inner<T: DapCommand + PartialEq + Eq + Hash>(
1403 capabilities: &Capabilities,
1404 mode: &Mode,
1405 request: T,
1406 process_result: impl FnOnce(
1407 &mut Self,
1408 Result<T::Response>,
1409 &mut Context<Self>,
1410 ) -> Option<T::Response>
1411 + 'static,
1412 cx: &mut Context<Self>,
1413 ) -> Task<Option<T::Response>> {
1414 if !T::is_supported(&capabilities) {
1415 log::warn!(
1416 "Attempted to send a DAP request that isn't supported: {:?}",
1417 request
1418 );
1419 let error = Err(anyhow::Error::msg(
1420 "Couldn't complete request because it's not supported",
1421 ));
1422 return cx.spawn(async move |this, cx| {
1423 this.update(cx, |this, cx| process_result(this, error, cx))
1424 .ok()
1425 .flatten()
1426 });
1427 }
1428
1429 let request = mode.request_dap(request);
1430 cx.spawn(async move |this, cx| {
1431 let result = request.await;
1432 this.update(cx, |this, cx| process_result(this, result, cx))
1433 .ok()
1434 .flatten()
1435 })
1436 }
1437
1438 fn request<T: DapCommand + PartialEq + Eq + Hash>(
1439 &self,
1440 request: T,
1441 process_result: impl FnOnce(
1442 &mut Self,
1443 Result<T::Response>,
1444 &mut Context<Self>,
1445 ) -> Option<T::Response>
1446 + 'static,
1447 cx: &mut Context<Self>,
1448 ) -> Task<Option<T::Response>> {
1449 Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1450 }
1451
1452 fn invalidate_command_type<Command: DapCommand>(&mut self) {
1453 self.requests.remove(&std::any::TypeId::of::<Command>());
1454 }
1455
1456 fn invalidate_generic(&mut self) {
1457 self.invalidate_command_type::<ModulesCommand>();
1458 self.invalidate_command_type::<LoadedSourcesCommand>();
1459 self.invalidate_command_type::<ThreadsCommand>();
1460 }
1461
1462 fn invalidate_state(&mut self, key: &RequestSlot) {
1463 self.requests
1464 .entry((&*key.0 as &dyn Any).type_id())
1465 .and_modify(|request_map| {
1466 request_map.remove(&key);
1467 });
1468 }
1469
1470 fn push_output(&mut self, event: OutputEvent, cx: &mut Context<Self>) {
1471 self.output.push_back(event);
1472 self.output_token.0 += 1;
1473 cx.emit(SessionEvent::ConsoleOutput);
1474 }
1475
1476 pub fn any_stopped_thread(&self) -> bool {
1477 self.thread_states.any_stopped_thread()
1478 }
1479
1480 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1481 self.thread_states.thread_status(thread_id)
1482 }
1483
1484 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1485 self.fetch(
1486 dap_command::ThreadsCommand,
1487 |this, result, cx| {
1488 let result = result.log_err()?;
1489
1490 this.threads = result
1491 .iter()
1492 .map(|thread| (ThreadId(thread.id), Thread::from(thread.clone())))
1493 .collect();
1494
1495 this.invalidate_command_type::<StackTraceCommand>();
1496 cx.emit(SessionEvent::Threads);
1497 cx.notify();
1498
1499 Some(result)
1500 },
1501 cx,
1502 );
1503
1504 self.threads
1505 .values()
1506 .map(|thread| {
1507 (
1508 thread.dap.clone(),
1509 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1510 )
1511 })
1512 .collect()
1513 }
1514
1515 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1516 self.fetch(
1517 dap_command::ModulesCommand,
1518 |this, result, cx| {
1519 let result = result.log_err()?;
1520
1521 this.modules = result.iter().cloned().collect();
1522 cx.emit(SessionEvent::Modules);
1523 cx.notify();
1524
1525 Some(result)
1526 },
1527 cx,
1528 );
1529
1530 &self.modules
1531 }
1532
1533 pub fn ignore_breakpoints(&self) -> bool {
1534 self.ignore_breakpoints
1535 }
1536
1537 pub fn toggle_ignore_breakpoints(
1538 &mut self,
1539 cx: &mut App,
1540 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1541 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1542 }
1543
1544 pub(crate) fn set_ignore_breakpoints(
1545 &mut self,
1546 ignore: bool,
1547 cx: &mut App,
1548 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1549 if self.ignore_breakpoints == ignore {
1550 return Task::ready(HashMap::default());
1551 }
1552
1553 self.ignore_breakpoints = ignore;
1554
1555 if let Some(local) = self.as_running() {
1556 local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1557 } else {
1558 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1559 unimplemented!()
1560 }
1561 }
1562
1563 pub fn exception_breakpoints(
1564 &self,
1565 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1566 self.exception_breakpoints.values()
1567 }
1568
1569 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1570 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1571 *is_enabled = !*is_enabled;
1572 self.send_exception_breakpoints(cx);
1573 }
1574 }
1575
1576 fn send_exception_breakpoints(&mut self, cx: &App) {
1577 if let Some(local) = self.as_running() {
1578 let exception_filters = self
1579 .exception_breakpoints
1580 .values()
1581 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1582 .collect();
1583
1584 let supports_exception_filters = self
1585 .capabilities
1586 .supports_exception_filter_options
1587 .unwrap_or_default();
1588 local
1589 .send_exception_breakpoints(exception_filters, supports_exception_filters)
1590 .detach_and_log_err(cx);
1591 } else {
1592 debug_assert!(false, "Not implemented");
1593 }
1594 }
1595
1596 pub fn breakpoints_enabled(&self) -> bool {
1597 self.ignore_breakpoints
1598 }
1599
1600 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
1601 self.fetch(
1602 dap_command::LoadedSourcesCommand,
1603 |this, result, cx| {
1604 let result = result.log_err()?;
1605 this.loaded_sources = result.iter().cloned().collect();
1606 cx.emit(SessionEvent::LoadedSources);
1607 cx.notify();
1608 Some(result)
1609 },
1610 cx,
1611 );
1612
1613 &self.loaded_sources
1614 }
1615
1616 fn fallback_to_manual_restart(
1617 &mut self,
1618 res: Result<()>,
1619 cx: &mut Context<Self>,
1620 ) -> Option<()> {
1621 if res.log_err().is_none() {
1622 cx.emit(SessionStateEvent::Restart);
1623 return None;
1624 }
1625 Some(())
1626 }
1627
1628 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
1629 res.log_err()?;
1630 Some(())
1631 }
1632
1633 fn on_step_response<T: DapCommand + PartialEq + Eq + Hash>(
1634 thread_id: ThreadId,
1635 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
1636 {
1637 move |this, response, cx| match response.log_err() {
1638 Some(response) => {
1639 this.breakpoint_store.update(cx, |store, cx| {
1640 store.remove_active_position(Some(this.session_id()), cx)
1641 });
1642 Some(response)
1643 }
1644 None => {
1645 this.thread_states.stop_thread(thread_id);
1646 cx.notify();
1647 None
1648 }
1649 }
1650 }
1651
1652 fn clear_active_debug_line_response(
1653 &mut self,
1654 response: Result<()>,
1655 cx: &mut Context<Session>,
1656 ) -> Option<()> {
1657 response.log_err()?;
1658 self.clear_active_debug_line(cx);
1659 Some(())
1660 }
1661
1662 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
1663 self.breakpoint_store.update(cx, |store, cx| {
1664 store.remove_active_position(Some(self.id), cx)
1665 });
1666 }
1667
1668 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1669 self.request(
1670 PauseCommand {
1671 thread_id: thread_id.0,
1672 },
1673 Self::empty_response,
1674 cx,
1675 )
1676 .detach();
1677 }
1678
1679 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
1680 self.request(
1681 RestartStackFrameCommand { stack_frame_id },
1682 Self::empty_response,
1683 cx,
1684 )
1685 .detach();
1686 }
1687
1688 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
1689 if self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated() {
1690 self.request(
1691 RestartCommand {
1692 raw: args.unwrap_or(Value::Null),
1693 },
1694 Self::fallback_to_manual_restart,
1695 cx,
1696 )
1697 .detach();
1698 } else {
1699 cx.emit(SessionStateEvent::Restart);
1700 }
1701 }
1702
1703 fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1704 let debug_adapter = self.adapter_client();
1705
1706 cx.background_spawn(async move {
1707 if let Some(client) = debug_adapter {
1708 client.shutdown().await.log_err();
1709 }
1710 })
1711 }
1712
1713 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
1714 self.is_session_terminated = true;
1715 self.thread_states.exit_all_threads();
1716 cx.notify();
1717
1718 let task = if self
1719 .capabilities
1720 .supports_terminate_request
1721 .unwrap_or_default()
1722 {
1723 self.request(
1724 TerminateCommand {
1725 restart: Some(false),
1726 },
1727 Self::clear_active_debug_line_response,
1728 cx,
1729 )
1730 } else {
1731 self.request(
1732 DisconnectCommand {
1733 restart: Some(false),
1734 terminate_debuggee: Some(true),
1735 suspend_debuggee: Some(false),
1736 },
1737 Self::clear_active_debug_line_response,
1738 cx,
1739 )
1740 };
1741
1742 cx.emit(SessionStateEvent::Shutdown);
1743
1744 let debug_client = self.adapter_client();
1745
1746 cx.background_spawn(async move {
1747 let _ = task.await;
1748
1749 if let Some(client) = debug_client {
1750 client.shutdown().await.log_err();
1751 }
1752 })
1753 }
1754
1755 pub fn completions(
1756 &mut self,
1757 query: CompletionsQuery,
1758 cx: &mut Context<Self>,
1759 ) -> Task<Result<Vec<dap::CompletionItem>>> {
1760 let task = self.request(query, |_, result, _| result.log_err(), cx);
1761
1762 cx.background_executor().spawn(async move {
1763 anyhow::Ok(
1764 task.await
1765 .map(|response| response.targets)
1766 .context("failed to fetch completions")?,
1767 )
1768 })
1769 }
1770
1771 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
1772 self.thread_states.continue_thread(thread_id);
1773 self.request(
1774 ContinueCommand {
1775 args: ContinueArguments {
1776 thread_id: thread_id.0,
1777 single_thread: Some(true),
1778 },
1779 },
1780 Self::on_step_response::<ContinueCommand>(thread_id),
1781 cx,
1782 )
1783 .detach();
1784 }
1785
1786 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
1787 match self.mode {
1788 Mode::Running(ref local) => Some(local.client.clone()),
1789 Mode::Building => None,
1790 }
1791 }
1792
1793 pub fn step_over(
1794 &mut self,
1795 thread_id: ThreadId,
1796 granularity: SteppingGranularity,
1797 cx: &mut Context<Self>,
1798 ) {
1799 let supports_single_thread_execution_requests =
1800 self.capabilities.supports_single_thread_execution_requests;
1801 let supports_stepping_granularity = self
1802 .capabilities
1803 .supports_stepping_granularity
1804 .unwrap_or_default();
1805
1806 let command = NextCommand {
1807 inner: StepCommand {
1808 thread_id: thread_id.0,
1809 granularity: supports_stepping_granularity.then(|| granularity),
1810 single_thread: supports_single_thread_execution_requests,
1811 },
1812 };
1813
1814 self.thread_states.process_step(thread_id);
1815 self.request(
1816 command,
1817 Self::on_step_response::<NextCommand>(thread_id),
1818 cx,
1819 )
1820 .detach();
1821 }
1822
1823 pub fn step_in(
1824 &mut self,
1825 thread_id: ThreadId,
1826 granularity: SteppingGranularity,
1827 cx: &mut Context<Self>,
1828 ) {
1829 let supports_single_thread_execution_requests =
1830 self.capabilities.supports_single_thread_execution_requests;
1831 let supports_stepping_granularity = self
1832 .capabilities
1833 .supports_stepping_granularity
1834 .unwrap_or_default();
1835
1836 let command = StepInCommand {
1837 inner: StepCommand {
1838 thread_id: thread_id.0,
1839 granularity: supports_stepping_granularity.then(|| granularity),
1840 single_thread: supports_single_thread_execution_requests,
1841 },
1842 };
1843
1844 self.thread_states.process_step(thread_id);
1845 self.request(
1846 command,
1847 Self::on_step_response::<StepInCommand>(thread_id),
1848 cx,
1849 )
1850 .detach();
1851 }
1852
1853 pub fn step_out(
1854 &mut self,
1855 thread_id: ThreadId,
1856 granularity: SteppingGranularity,
1857 cx: &mut Context<Self>,
1858 ) {
1859 let supports_single_thread_execution_requests =
1860 self.capabilities.supports_single_thread_execution_requests;
1861 let supports_stepping_granularity = self
1862 .capabilities
1863 .supports_stepping_granularity
1864 .unwrap_or_default();
1865
1866 let command = StepOutCommand {
1867 inner: StepCommand {
1868 thread_id: thread_id.0,
1869 granularity: supports_stepping_granularity.then(|| granularity),
1870 single_thread: supports_single_thread_execution_requests,
1871 },
1872 };
1873
1874 self.thread_states.process_step(thread_id);
1875 self.request(
1876 command,
1877 Self::on_step_response::<StepOutCommand>(thread_id),
1878 cx,
1879 )
1880 .detach();
1881 }
1882
1883 pub fn step_back(
1884 &mut self,
1885 thread_id: ThreadId,
1886 granularity: SteppingGranularity,
1887 cx: &mut Context<Self>,
1888 ) {
1889 let supports_single_thread_execution_requests =
1890 self.capabilities.supports_single_thread_execution_requests;
1891 let supports_stepping_granularity = self
1892 .capabilities
1893 .supports_stepping_granularity
1894 .unwrap_or_default();
1895
1896 let command = StepBackCommand {
1897 inner: StepCommand {
1898 thread_id: thread_id.0,
1899 granularity: supports_stepping_granularity.then(|| granularity),
1900 single_thread: supports_single_thread_execution_requests,
1901 },
1902 };
1903
1904 self.thread_states.process_step(thread_id);
1905
1906 self.request(
1907 command,
1908 Self::on_step_response::<StepBackCommand>(thread_id),
1909 cx,
1910 )
1911 .detach();
1912 }
1913
1914 pub fn stack_frames(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) -> Vec<StackFrame> {
1915 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
1916 && self.requests.contains_key(&ThreadsCommand.type_id())
1917 && self.threads.contains_key(&thread_id)
1918 // ^ todo(debugger): We need a better way to check that we're not querying stale data
1919 // We could still be using an old thread id and have sent a new thread's request
1920 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
1921 // But it very well could cause a minor bug in the future that is hard to track down
1922 {
1923 self.fetch(
1924 super::dap_command::StackTraceCommand {
1925 thread_id: thread_id.0,
1926 start_frame: None,
1927 levels: None,
1928 },
1929 move |this, stack_frames, cx| {
1930 let stack_frames = stack_frames.log_err()?;
1931
1932 let entry = this.threads.entry(thread_id).and_modify(|thread| {
1933 thread.stack_frame_ids =
1934 stack_frames.iter().map(|frame| frame.id).collect();
1935 });
1936 debug_assert!(
1937 matches!(entry, indexmap::map::Entry::Occupied(_)),
1938 "Sent request for thread_id that doesn't exist"
1939 );
1940
1941 this.stack_frames.extend(
1942 stack_frames
1943 .iter()
1944 .cloned()
1945 .map(|frame| (frame.id, StackFrame::from(frame))),
1946 );
1947
1948 this.invalidate_command_type::<ScopesCommand>();
1949 this.invalidate_command_type::<VariablesCommand>();
1950
1951 cx.emit(SessionEvent::StackTrace);
1952 cx.notify();
1953 Some(stack_frames)
1954 },
1955 cx,
1956 );
1957 }
1958
1959 self.threads
1960 .get(&thread_id)
1961 .map(|thread| {
1962 thread
1963 .stack_frame_ids
1964 .iter()
1965 .filter_map(|id| self.stack_frames.get(id))
1966 .cloned()
1967 .collect()
1968 })
1969 .unwrap_or_default()
1970 }
1971
1972 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
1973 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
1974 && self
1975 .requests
1976 .contains_key(&TypeId::of::<StackTraceCommand>())
1977 {
1978 self.fetch(
1979 ScopesCommand { stack_frame_id },
1980 move |this, scopes, cx| {
1981 let scopes = scopes.log_err()?;
1982
1983 for scope in scopes .iter(){
1984 this.variables(scope.variables_reference, cx);
1985 }
1986
1987 let entry = this
1988 .stack_frames
1989 .entry(stack_frame_id)
1990 .and_modify(|stack_frame| {
1991 stack_frame.scopes = scopes.clone();
1992 });
1993
1994 cx.emit(SessionEvent::Variables);
1995
1996 debug_assert!(
1997 matches!(entry, indexmap::map::Entry::Occupied(_)),
1998 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
1999 );
2000
2001 Some(scopes)
2002 },
2003 cx,
2004 );
2005 }
2006
2007 self.stack_frames
2008 .get(&stack_frame_id)
2009 .map(|frame| frame.scopes.as_slice())
2010 .unwrap_or_default()
2011 }
2012
2013 pub fn variables_by_stack_frame_id(&self, stack_frame_id: StackFrameId) -> Vec<dap::Variable> {
2014 let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2015 return Vec::new();
2016 };
2017
2018 stack_frame
2019 .scopes
2020 .iter()
2021 .filter_map(|scope| self.variables.get(&scope.variables_reference))
2022 .flatten()
2023 .cloned()
2024 .collect()
2025 }
2026
2027 pub fn variables(
2028 &mut self,
2029 variables_reference: VariableReference,
2030 cx: &mut Context<Self>,
2031 ) -> Vec<dap::Variable> {
2032 let command = VariablesCommand {
2033 variables_reference,
2034 filter: None,
2035 start: None,
2036 count: None,
2037 format: None,
2038 };
2039
2040 self.fetch(
2041 command,
2042 move |this, variables, cx| {
2043 let variables = variables.log_err()?;
2044 this.variables
2045 .insert(variables_reference, variables.clone());
2046
2047 cx.emit(SessionEvent::Variables);
2048 cx.emit(SessionEvent::InvalidateInlineValue);
2049 Some(variables)
2050 },
2051 cx,
2052 );
2053
2054 self.variables
2055 .get(&variables_reference)
2056 .cloned()
2057 .unwrap_or_default()
2058 }
2059
2060 pub fn set_variable_value(
2061 &mut self,
2062 variables_reference: u64,
2063 name: String,
2064 value: String,
2065 cx: &mut Context<Self>,
2066 ) {
2067 if self.capabilities.supports_set_variable.unwrap_or_default() {
2068 self.request(
2069 SetVariableValueCommand {
2070 name,
2071 value,
2072 variables_reference,
2073 },
2074 move |this, response, cx| {
2075 let response = response.log_err()?;
2076 this.invalidate_command_type::<VariablesCommand>();
2077 cx.notify();
2078 Some(response)
2079 },
2080 cx,
2081 )
2082 .detach()
2083 }
2084 }
2085
2086 pub fn evaluate(
2087 &mut self,
2088 expression: String,
2089 context: Option<EvaluateArgumentsContext>,
2090 frame_id: Option<u64>,
2091 source: Option<Source>,
2092 cx: &mut Context<Self>,
2093 ) -> Task<()> {
2094 let event = dap::OutputEvent {
2095 category: None,
2096 output: format!("> {expression}"),
2097 group: None,
2098 variables_reference: None,
2099 source: None,
2100 line: None,
2101 column: None,
2102 data: None,
2103 location_reference: None,
2104 };
2105 self.push_output(event, cx);
2106 let request = self.mode.request_dap(EvaluateCommand {
2107 expression,
2108 context,
2109 frame_id,
2110 source,
2111 });
2112 cx.spawn(async move |this, cx| {
2113 let response = request.await;
2114 this.update(cx, |this, cx| {
2115 match response {
2116 Ok(response) => {
2117 let event = dap::OutputEvent {
2118 category: None,
2119 output: format!("< {}", &response.result),
2120 group: None,
2121 variables_reference: Some(response.variables_reference),
2122 source: None,
2123 line: None,
2124 column: None,
2125 data: None,
2126 location_reference: None,
2127 };
2128 this.push_output(event, cx);
2129 }
2130 Err(e) => {
2131 let event = dap::OutputEvent {
2132 category: None,
2133 output: format!("{}", e),
2134 group: None,
2135 variables_reference: None,
2136 source: None,
2137 line: None,
2138 column: None,
2139 data: None,
2140 location_reference: None,
2141 };
2142 this.push_output(event, cx);
2143 }
2144 };
2145 this.invalidate_command_type::<ScopesCommand>();
2146 cx.notify();
2147 })
2148 .ok();
2149 })
2150 }
2151
2152 pub fn location(
2153 &mut self,
2154 reference: u64,
2155 cx: &mut Context<Self>,
2156 ) -> Option<dap::LocationsResponse> {
2157 self.fetch(
2158 LocationsCommand { reference },
2159 move |this, response, _| {
2160 let response = response.log_err()?;
2161 this.locations.insert(reference, response.clone());
2162 Some(response)
2163 },
2164 cx,
2165 );
2166 self.locations.get(&reference).cloned()
2167 }
2168
2169 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2170 let command = DisconnectCommand {
2171 restart: Some(false),
2172 terminate_debuggee: Some(true),
2173 suspend_debuggee: Some(false),
2174 };
2175
2176 self.request(command, Self::empty_response, cx).detach()
2177 }
2178
2179 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2180 if self
2181 .capabilities
2182 .supports_terminate_threads_request
2183 .unwrap_or_default()
2184 {
2185 self.request(
2186 TerminateThreadsCommand {
2187 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2188 },
2189 Self::clear_active_debug_line_response,
2190 cx,
2191 )
2192 .detach();
2193 } else {
2194 self.shutdown(cx).detach();
2195 }
2196 }
2197}