1use crate::debugger::breakpoint_store::BreakpointSessionState;
2use crate::debugger::dap_command::{DataBreakpointContext, ReadMemory};
3use crate::debugger::memory::{self, Memory, MemoryIterator, MemoryPageBuilder, PageAddress};
4
5use super::breakpoint_store::{
6 BreakpointStore, BreakpointStoreEvent, BreakpointUpdatedReason, SourceBreakpoint,
7};
8use super::dap_command::{
9 self, Attach, ConfigurationDone, ContinueCommand, DataBreakpointInfoCommand, DisconnectCommand,
10 EvaluateCommand, Initialize, Launch, LoadedSourcesCommand, LocalDapCommand, LocationsCommand,
11 ModulesCommand, NextCommand, PauseCommand, RestartCommand, RestartStackFrameCommand,
12 ScopesCommand, SetDataBreakpointsCommand, SetExceptionBreakpoints, SetVariableValueCommand,
13 StackTraceCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
14 TerminateCommand, TerminateThreadsCommand, ThreadsCommand, VariablesCommand,
15};
16use super::dap_store::DapStore;
17use anyhow::{Context as _, Result, anyhow};
18use base64::Engine;
19use collections::{HashMap, HashSet, IndexMap};
20use dap::adapters::{DebugAdapterBinary, DebugAdapterName};
21use dap::messages::Response;
22use dap::requests::{Request, RunInTerminal, StartDebugging};
23use dap::{
24 Capabilities, ContinueArguments, EvaluateArgumentsContext, Module, Source, StackFrameId,
25 SteppingGranularity, StoppedEvent, VariableReference,
26 client::{DebugAdapterClient, SessionId},
27 messages::{Events, Message},
28};
29use dap::{
30 ExceptionBreakpointsFilter, ExceptionFilterOptions, OutputEvent, OutputEventCategory,
31 RunInTerminalRequestArguments, StackFramePresentationHint, StartDebuggingRequestArguments,
32 StartDebuggingRequestArgumentsRequest, VariablePresentationHint, WriteMemoryArguments,
33};
34use futures::channel::mpsc::UnboundedSender;
35use futures::channel::{mpsc, oneshot};
36use futures::io::BufReader;
37use futures::{AsyncBufReadExt as _, SinkExt, StreamExt, TryStreamExt};
38use futures::{FutureExt, future::Shared};
39use gpui::{
40 App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
41 Task, WeakEntity,
42};
43use http_client::HttpClient;
44
45use node_runtime::NodeRuntime;
46use remote::RemoteClient;
47use rpc::ErrorExt;
48use serde::{Deserialize, Serialize};
49use serde_json::Value;
50use smol::net::TcpListener;
51use std::any::TypeId;
52use std::collections::BTreeMap;
53use std::ops::RangeInclusive;
54use std::path::PathBuf;
55use std::process::Stdio;
56use std::u64;
57use std::{
58 any::Any,
59 collections::hash_map::Entry,
60 hash::{Hash, Hasher},
61 path::Path,
62 sync::Arc,
63};
64use task::TaskContext;
65use text::{PointUtf16, ToPointUtf16};
66use util::command::new_smol_command;
67use util::{ResultExt, debug_panic, maybe};
68use worktree::Worktree;
69
70#[derive(Debug, Copy, Clone, Hash, PartialEq, PartialOrd, Ord, Eq)]
71#[repr(transparent)]
72pub struct ThreadId(pub i64);
73
74impl From<i64> for ThreadId {
75 fn from(id: i64) -> Self {
76 Self(id)
77 }
78}
79
80#[derive(Clone, Debug)]
81pub struct StackFrame {
82 pub dap: dap::StackFrame,
83 pub scopes: Vec<dap::Scope>,
84}
85
86impl From<dap::StackFrame> for StackFrame {
87 fn from(stack_frame: dap::StackFrame) -> Self {
88 Self {
89 scopes: vec![],
90 dap: stack_frame,
91 }
92 }
93}
94
95#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
96pub enum ThreadStatus {
97 #[default]
98 Running,
99 Stopped,
100 Stepping,
101 Exited,
102 Ended,
103}
104
105impl ThreadStatus {
106 pub fn label(&self) -> &'static str {
107 match self {
108 ThreadStatus::Running => "Running",
109 ThreadStatus::Stopped => "Stopped",
110 ThreadStatus::Stepping => "Stepping",
111 ThreadStatus::Exited => "Exited",
112 ThreadStatus::Ended => "Ended",
113 }
114 }
115}
116
117#[derive(Debug)]
118pub struct Thread {
119 dap: dap::Thread,
120 stack_frames: Vec<StackFrame>,
121 stack_frames_error: Option<anyhow::Error>,
122 _has_stopped: bool,
123}
124
125impl From<dap::Thread> for Thread {
126 fn from(dap: dap::Thread) -> Self {
127 Self {
128 dap,
129 stack_frames: Default::default(),
130 stack_frames_error: None,
131 _has_stopped: false,
132 }
133 }
134}
135
136#[derive(Debug, Clone, PartialEq)]
137pub struct Watcher {
138 pub expression: SharedString,
139 pub value: SharedString,
140 pub variables_reference: u64,
141 pub presentation_hint: Option<VariablePresentationHint>,
142}
143
144#[derive(Debug, Clone, PartialEq)]
145pub struct DataBreakpointState {
146 pub dap: dap::DataBreakpoint,
147 pub is_enabled: bool,
148 pub context: Arc<DataBreakpointContext>,
149}
150
151pub enum SessionState {
152 /// Represents a session that is building/initializing
153 /// even if a session doesn't have a pre build task this state
154 /// is used to run all the async tasks that are required to start the session
155 Booting(Option<Task<Result<()>>>),
156 Running(RunningMode),
157}
158
159#[derive(Clone)]
160pub struct RunningMode {
161 client: Arc<DebugAdapterClient>,
162 binary: DebugAdapterBinary,
163 tmp_breakpoint: Option<SourceBreakpoint>,
164 worktree: WeakEntity<Worktree>,
165 executor: BackgroundExecutor,
166 is_started: bool,
167 has_ever_stopped: bool,
168 messages_tx: UnboundedSender<Message>,
169}
170
171#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
172pub struct SessionQuirks {
173 pub compact: bool,
174 pub prefer_thread_name: bool,
175}
176
177fn client_source(abs_path: &Path) -> dap::Source {
178 dap::Source {
179 name: abs_path
180 .file_name()
181 .map(|filename| filename.to_string_lossy().into_owned()),
182 path: Some(abs_path.to_string_lossy().into_owned()),
183 source_reference: None,
184 presentation_hint: None,
185 origin: None,
186 sources: None,
187 adapter_data: None,
188 checksums: None,
189 }
190}
191
192impl RunningMode {
193 async fn new(
194 session_id: SessionId,
195 parent_session: Option<Entity<Session>>,
196 worktree: WeakEntity<Worktree>,
197 binary: DebugAdapterBinary,
198 messages_tx: futures::channel::mpsc::UnboundedSender<Message>,
199 cx: &mut AsyncApp,
200 ) -> Result<Self> {
201 let message_handler = Box::new({
202 let messages_tx = messages_tx.clone();
203 move |message| {
204 messages_tx.unbounded_send(message).ok();
205 }
206 });
207
208 let client = if let Some(client) = parent_session
209 .and_then(|session| cx.update(|cx| session.read(cx).adapter_client()).ok())
210 .flatten()
211 {
212 client
213 .create_child_connection(session_id, binary.clone(), message_handler, cx)
214 .await?
215 } else {
216 DebugAdapterClient::start(session_id, binary.clone(), message_handler, cx).await?
217 };
218
219 Ok(Self {
220 client: Arc::new(client),
221 worktree,
222 tmp_breakpoint: None,
223 binary,
224 executor: cx.background_executor().clone(),
225 is_started: false,
226 has_ever_stopped: false,
227 messages_tx,
228 })
229 }
230
231 pub(crate) fn worktree(&self) -> &WeakEntity<Worktree> {
232 &self.worktree
233 }
234
235 fn unset_breakpoints_from_paths(&self, paths: &Vec<Arc<Path>>, cx: &mut App) -> Task<()> {
236 let tasks: Vec<_> = paths
237 .iter()
238 .map(|path| {
239 self.request(dap_command::SetBreakpoints {
240 source: client_source(path),
241 source_modified: None,
242 breakpoints: vec![],
243 })
244 })
245 .collect();
246
247 cx.background_spawn(async move {
248 futures::future::join_all(tasks)
249 .await
250 .iter()
251 .for_each(|res| match res {
252 Ok(_) => {}
253 Err(err) => {
254 log::warn!("Set breakpoints request failed: {}", err);
255 }
256 });
257 })
258 }
259
260 fn send_breakpoints_from_path(
261 &self,
262 abs_path: Arc<Path>,
263 reason: BreakpointUpdatedReason,
264 breakpoint_store: &Entity<BreakpointStore>,
265 cx: &mut App,
266 ) -> Task<()> {
267 let breakpoints =
268 breakpoint_store
269 .read(cx)
270 .source_breakpoints_from_path(&abs_path, cx)
271 .into_iter()
272 .filter(|bp| bp.state.is_enabled())
273 .chain(self.tmp_breakpoint.iter().filter_map(|breakpoint| {
274 breakpoint.path.eq(&abs_path).then(|| breakpoint.clone())
275 }))
276 .map(Into::into)
277 .collect();
278
279 let raw_breakpoints = breakpoint_store
280 .read(cx)
281 .breakpoints_from_path(&abs_path)
282 .into_iter()
283 .filter(|bp| bp.bp.state.is_enabled())
284 .collect::<Vec<_>>();
285
286 let task = self.request(dap_command::SetBreakpoints {
287 source: client_source(&abs_path),
288 source_modified: Some(matches!(reason, BreakpointUpdatedReason::FileSaved)),
289 breakpoints,
290 });
291 let session_id = self.client.id();
292 let breakpoint_store = breakpoint_store.downgrade();
293 cx.spawn(async move |cx| match cx.background_spawn(task).await {
294 Ok(breakpoints) => {
295 let breakpoints =
296 breakpoints
297 .into_iter()
298 .zip(raw_breakpoints)
299 .filter_map(|(dap_bp, zed_bp)| {
300 Some((
301 zed_bp,
302 BreakpointSessionState {
303 id: dap_bp.id?,
304 verified: dap_bp.verified,
305 },
306 ))
307 });
308 breakpoint_store
309 .update(cx, |this, _| {
310 this.mark_breakpoints_verified(session_id, &abs_path, breakpoints);
311 })
312 .ok();
313 }
314 Err(err) => log::warn!("Set breakpoints request failed for path: {}", err),
315 })
316 }
317
318 fn send_exception_breakpoints(
319 &self,
320 filters: Vec<ExceptionBreakpointsFilter>,
321 supports_filter_options: bool,
322 ) -> Task<Result<Vec<dap::Breakpoint>>> {
323 let arg = if supports_filter_options {
324 SetExceptionBreakpoints::WithOptions {
325 filters: filters
326 .into_iter()
327 .map(|filter| ExceptionFilterOptions {
328 filter_id: filter.filter,
329 condition: None,
330 mode: None,
331 })
332 .collect(),
333 }
334 } else {
335 SetExceptionBreakpoints::Plain {
336 filters: filters.into_iter().map(|filter| filter.filter).collect(),
337 }
338 };
339 self.request(arg)
340 }
341
342 fn send_source_breakpoints(
343 &self,
344 ignore_breakpoints: bool,
345 breakpoint_store: &Entity<BreakpointStore>,
346 cx: &App,
347 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
348 let mut breakpoint_tasks = Vec::new();
349 let breakpoints = breakpoint_store.read(cx).all_source_breakpoints(cx);
350 let mut raw_breakpoints = breakpoint_store.read_with(cx, |this, _| this.all_breakpoints());
351 debug_assert_eq!(raw_breakpoints.len(), breakpoints.len());
352 let session_id = self.client.id();
353 for (path, breakpoints) in breakpoints {
354 let breakpoints = if ignore_breakpoints {
355 vec![]
356 } else {
357 breakpoints
358 .into_iter()
359 .filter(|bp| bp.state.is_enabled())
360 .map(Into::into)
361 .collect()
362 };
363
364 let raw_breakpoints = raw_breakpoints
365 .remove(&path)
366 .unwrap_or_default()
367 .into_iter()
368 .filter(|bp| bp.bp.state.is_enabled());
369 let error_path = path.clone();
370 let send_request = self
371 .request(dap_command::SetBreakpoints {
372 source: client_source(&path),
373 source_modified: Some(false),
374 breakpoints,
375 })
376 .map(|result| result.map_err(move |e| (error_path, e)));
377
378 let task = cx.spawn({
379 let breakpoint_store = breakpoint_store.downgrade();
380 async move |cx| {
381 let breakpoints = cx.background_spawn(send_request).await?;
382
383 let breakpoints = breakpoints.into_iter().zip(raw_breakpoints).filter_map(
384 |(dap_bp, zed_bp)| {
385 Some((
386 zed_bp,
387 BreakpointSessionState {
388 id: dap_bp.id?,
389 verified: dap_bp.verified,
390 },
391 ))
392 },
393 );
394 breakpoint_store
395 .update(cx, |this, _| {
396 this.mark_breakpoints_verified(session_id, &path, breakpoints);
397 })
398 .ok();
399
400 Ok(())
401 }
402 });
403 breakpoint_tasks.push(task);
404 }
405
406 cx.background_spawn(async move {
407 futures::future::join_all(breakpoint_tasks)
408 .await
409 .into_iter()
410 .filter_map(Result::err)
411 .collect::<HashMap<_, _>>()
412 })
413 }
414
415 fn initialize_sequence(
416 &self,
417 capabilities: &Capabilities,
418 initialized_rx: oneshot::Receiver<()>,
419 dap_store: WeakEntity<DapStore>,
420 cx: &mut Context<Session>,
421 ) -> Task<Result<()>> {
422 let raw = self.binary.request_args.clone();
423
424 // Of relevance: https://github.com/microsoft/vscode/issues/4902#issuecomment-368583522
425 let launch = match raw.request {
426 dap::StartDebuggingRequestArgumentsRequest::Launch => self.request(Launch {
427 raw: raw.configuration,
428 }),
429 dap::StartDebuggingRequestArgumentsRequest::Attach => self.request(Attach {
430 raw: raw.configuration,
431 }),
432 };
433
434 let configuration_done_supported = ConfigurationDone::is_supported(capabilities);
435 // From spec (on initialization sequence):
436 // client sends a setExceptionBreakpoints request if one or more exceptionBreakpointFilters have been defined (or if supportsConfigurationDoneRequest is not true)
437 //
438 // Thus we should send setExceptionBreakpoints even if `exceptionFilters` variable is empty (as long as there were some options in the first place).
439 let should_send_exception_breakpoints = capabilities
440 .exception_breakpoint_filters
441 .as_ref()
442 .is_some_and(|filters| !filters.is_empty())
443 || !configuration_done_supported;
444 let supports_exception_filters = capabilities
445 .supports_exception_filter_options
446 .unwrap_or_default();
447 let this = self.clone();
448 let worktree = self.worktree().clone();
449 let mut filters = capabilities
450 .exception_breakpoint_filters
451 .clone()
452 .unwrap_or_default();
453 let configuration_sequence = cx.spawn({
454 async move |session, cx| {
455 let adapter_name = session.read_with(cx, |this, _| this.adapter())?;
456 let (breakpoint_store, adapter_defaults) =
457 dap_store.read_with(cx, |dap_store, _| {
458 (
459 dap_store.breakpoint_store().clone(),
460 dap_store.adapter_options(&adapter_name),
461 )
462 })?;
463 initialized_rx.await?;
464 let errors_by_path = cx
465 .update(|cx| this.send_source_breakpoints(false, &breakpoint_store, cx))?
466 .await;
467
468 dap_store.update(cx, |_, cx| {
469 let Some(worktree) = worktree.upgrade() else {
470 return;
471 };
472
473 for (path, error) in &errors_by_path {
474 log::error!("failed to set breakpoints for {path:?}: {error}");
475 }
476
477 if let Some(failed_path) = errors_by_path.keys().next() {
478 let failed_path = failed_path
479 .strip_prefix(worktree.read(cx).abs_path())
480 .unwrap_or(failed_path)
481 .display();
482 let message = format!(
483 "Failed to set breakpoints for {failed_path}{}",
484 match errors_by_path.len() {
485 0 => unreachable!(),
486 1 => "".into(),
487 2 => " and 1 other path".into(),
488 n => format!(" and {} other paths", n - 1),
489 }
490 );
491 cx.emit(super::dap_store::DapStoreEvent::Notification(message));
492 }
493 })?;
494
495 if should_send_exception_breakpoints {
496 _ = session.update(cx, |this, _| {
497 filters.retain(|filter| {
498 let is_enabled = if let Some(defaults) = adapter_defaults.as_ref() {
499 defaults
500 .exception_breakpoints
501 .get(&filter.filter)
502 .map(|options| options.enabled)
503 .unwrap_or_else(|| filter.default.unwrap_or_default())
504 } else {
505 filter.default.unwrap_or_default()
506 };
507 this.exception_breakpoints
508 .entry(filter.filter.clone())
509 .or_insert_with(|| (filter.clone(), is_enabled));
510 is_enabled
511 });
512 });
513
514 this.send_exception_breakpoints(filters, supports_exception_filters)
515 .await
516 .ok();
517 }
518
519 if configuration_done_supported {
520 this.request(ConfigurationDone {})
521 } else {
522 Task::ready(Ok(()))
523 }
524 .await
525 }
526 });
527
528 let task = cx.background_spawn(futures::future::try_join(launch, configuration_sequence));
529
530 cx.spawn(async move |this, cx| {
531 let result = task.await;
532
533 this.update(cx, |this, cx| {
534 if let Some(this) = this.as_running_mut() {
535 this.is_started = true;
536 cx.notify();
537 }
538 })
539 .ok();
540
541 result?;
542 anyhow::Ok(())
543 })
544 }
545
546 fn reconnect_for_ssh(&self, cx: &mut AsyncApp) -> Option<Task<Result<()>>> {
547 let client = self.client.clone();
548 let messages_tx = self.messages_tx.clone();
549 let message_handler = Box::new(move |message| {
550 messages_tx.unbounded_send(message).ok();
551 });
552 if client.should_reconnect_for_ssh() {
553 Some(cx.spawn(async move |cx| {
554 client.connect(message_handler, cx).await?;
555 anyhow::Ok(())
556 }))
557 } else {
558 None
559 }
560 }
561
562 fn request<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
563 where
564 <R::DapRequest as dap::requests::Request>::Response: 'static,
565 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
566 {
567 let request = Arc::new(request);
568
569 let request_clone = request.clone();
570 let connection = self.client.clone();
571 self.executor.spawn(async move {
572 let args = request_clone.to_dap();
573 let response = connection.request::<R::DapRequest>(args).await?;
574 request.response_from_dap(response)
575 })
576 }
577}
578
579impl SessionState {
580 pub(super) fn request_dap<R: LocalDapCommand>(&self, request: R) -> Task<Result<R::Response>>
581 where
582 <R::DapRequest as dap::requests::Request>::Response: 'static,
583 <R::DapRequest as dap::requests::Request>::Arguments: 'static + Send,
584 {
585 match self {
586 SessionState::Running(debug_adapter_client) => debug_adapter_client.request(request),
587 SessionState::Booting(_) => Task::ready(Err(anyhow!(
588 "no adapter running to send request: {request:?}"
589 ))),
590 }
591 }
592
593 /// Did this debug session stop at least once?
594 pub(crate) fn has_ever_stopped(&self) -> bool {
595 match self {
596 SessionState::Booting(_) => false,
597 SessionState::Running(running_mode) => running_mode.has_ever_stopped,
598 }
599 }
600
601 fn stopped(&mut self) {
602 if let SessionState::Running(running) = self {
603 running.has_ever_stopped = true;
604 }
605 }
606}
607
608#[derive(Default)]
609struct ThreadStates {
610 global_state: Option<ThreadStatus>,
611 known_thread_states: IndexMap<ThreadId, ThreadStatus>,
612}
613
614impl ThreadStates {
615 fn stop_all_threads(&mut self) {
616 self.global_state = Some(ThreadStatus::Stopped);
617 self.known_thread_states.clear();
618 }
619
620 fn exit_all_threads(&mut self) {
621 self.global_state = Some(ThreadStatus::Exited);
622 self.known_thread_states.clear();
623 }
624
625 fn continue_all_threads(&mut self) {
626 self.global_state = Some(ThreadStatus::Running);
627 self.known_thread_states.clear();
628 }
629
630 fn stop_thread(&mut self, thread_id: ThreadId) {
631 self.known_thread_states
632 .insert(thread_id, ThreadStatus::Stopped);
633 }
634
635 fn continue_thread(&mut self, thread_id: ThreadId) {
636 self.known_thread_states
637 .insert(thread_id, ThreadStatus::Running);
638 }
639
640 fn process_step(&mut self, thread_id: ThreadId) {
641 self.known_thread_states
642 .insert(thread_id, ThreadStatus::Stepping);
643 }
644
645 fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
646 self.thread_state(thread_id)
647 .unwrap_or(ThreadStatus::Running)
648 }
649
650 fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
651 self.known_thread_states
652 .get(&thread_id)
653 .copied()
654 .or(self.global_state)
655 }
656
657 fn exit_thread(&mut self, thread_id: ThreadId) {
658 self.known_thread_states
659 .insert(thread_id, ThreadStatus::Exited);
660 }
661
662 fn any_stopped_thread(&self) -> bool {
663 self.global_state
664 .is_some_and(|state| state == ThreadStatus::Stopped)
665 || self
666 .known_thread_states
667 .values()
668 .any(|status| *status == ThreadStatus::Stopped)
669 }
670}
671const MAX_TRACKED_OUTPUT_EVENTS: usize = 5000;
672
673type IsEnabled = bool;
674
675#[derive(Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord)]
676pub struct OutputToken(pub usize);
677/// Represents a current state of a single debug adapter and provides ways to mutate it.
678pub struct Session {
679 pub mode: SessionState,
680 id: SessionId,
681 label: Option<SharedString>,
682 adapter: DebugAdapterName,
683 pub(super) capabilities: Capabilities,
684 child_session_ids: HashSet<SessionId>,
685 parent_session: Option<Entity<Session>>,
686 modules: Vec<dap::Module>,
687 loaded_sources: Vec<dap::Source>,
688 output_token: OutputToken,
689 output: Box<circular_buffer::CircularBuffer<MAX_TRACKED_OUTPUT_EVENTS, dap::OutputEvent>>,
690 threads: IndexMap<ThreadId, Thread>,
691 thread_states: ThreadStates,
692 watchers: HashMap<SharedString, Watcher>,
693 variables: HashMap<VariableReference, Vec<dap::Variable>>,
694 stack_frames: IndexMap<StackFrameId, StackFrame>,
695 locations: HashMap<u64, dap::LocationsResponse>,
696 is_session_terminated: bool,
697 requests: HashMap<TypeId, HashMap<RequestSlot, Shared<Task<Option<()>>>>>,
698 pub(crate) breakpoint_store: Entity<BreakpointStore>,
699 ignore_breakpoints: bool,
700 exception_breakpoints: BTreeMap<String, (ExceptionBreakpointsFilter, IsEnabled)>,
701 data_breakpoints: BTreeMap<String, DataBreakpointState>,
702 background_tasks: Vec<Task<()>>,
703 restart_task: Option<Task<()>>,
704 task_context: TaskContext,
705 memory: memory::Memory,
706 quirks: SessionQuirks,
707 remote_client: Option<Entity<RemoteClient>>,
708 node_runtime: Option<NodeRuntime>,
709 http_client: Option<Arc<dyn HttpClient>>,
710 companion_port: Option<u16>,
711}
712
713trait CacheableCommand: Any + Send + Sync {
714 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool;
715 fn dyn_hash(&self, hasher: &mut dyn Hasher);
716 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
717}
718
719impl<T> CacheableCommand for T
720where
721 T: LocalDapCommand + PartialEq + Eq + Hash,
722{
723 fn dyn_eq(&self, rhs: &dyn CacheableCommand) -> bool {
724 (rhs as &dyn Any).downcast_ref::<Self>() == Some(self)
725 }
726
727 fn dyn_hash(&self, mut hasher: &mut dyn Hasher) {
728 T::hash(self, &mut hasher);
729 }
730
731 fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
732 self
733 }
734}
735
736pub(crate) struct RequestSlot(Arc<dyn CacheableCommand>);
737
738impl<T: LocalDapCommand + PartialEq + Eq + Hash> From<T> for RequestSlot {
739 fn from(request: T) -> Self {
740 Self(Arc::new(request))
741 }
742}
743
744impl PartialEq for RequestSlot {
745 fn eq(&self, other: &Self) -> bool {
746 self.0.dyn_eq(other.0.as_ref())
747 }
748}
749
750impl Eq for RequestSlot {}
751
752impl Hash for RequestSlot {
753 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
754 self.0.dyn_hash(state);
755 (&*self.0 as &dyn Any).type_id().hash(state)
756 }
757}
758
759#[derive(Debug, Clone, Hash, PartialEq, Eq)]
760pub struct CompletionsQuery {
761 pub query: String,
762 pub column: u64,
763 pub line: Option<u64>,
764 pub frame_id: Option<u64>,
765}
766
767impl CompletionsQuery {
768 pub fn new(
769 buffer: &language::Buffer,
770 cursor_position: language::Anchor,
771 frame_id: Option<u64>,
772 ) -> Self {
773 let PointUtf16 { row, column } = cursor_position.to_point_utf16(&buffer.snapshot());
774 Self {
775 query: buffer.text(),
776 column: column as u64,
777 frame_id,
778 line: Some(row as u64),
779 }
780 }
781}
782
783#[derive(Debug)]
784pub enum SessionEvent {
785 Modules,
786 LoadedSources,
787 Stopped(Option<ThreadId>),
788 StackTrace,
789 Variables,
790 Watchers,
791 Threads,
792 InvalidateInlineValue,
793 CapabilitiesLoaded,
794 RunInTerminal {
795 request: RunInTerminalRequestArguments,
796 sender: mpsc::Sender<Result<u32>>,
797 },
798 DataBreakpointInfo,
799 ConsoleOutput,
800}
801
802#[derive(Clone, Debug, PartialEq, Eq)]
803pub enum SessionStateEvent {
804 Running,
805 Shutdown,
806 Restart,
807 SpawnChildSession {
808 request: StartDebuggingRequestArguments,
809 },
810}
811
812impl EventEmitter<SessionEvent> for Session {}
813impl EventEmitter<SessionStateEvent> for Session {}
814
815// local session will send breakpoint updates to DAP for all new breakpoints
816// remote side will only send breakpoint updates when it is a breakpoint created by that peer
817// BreakpointStore notifies session on breakpoint changes
818impl Session {
819 pub(crate) fn new(
820 breakpoint_store: Entity<BreakpointStore>,
821 session_id: SessionId,
822 parent_session: Option<Entity<Session>>,
823 label: Option<SharedString>,
824 adapter: DebugAdapterName,
825 task_context: TaskContext,
826 quirks: SessionQuirks,
827 remote_client: Option<Entity<RemoteClient>>,
828 node_runtime: Option<NodeRuntime>,
829 http_client: Option<Arc<dyn HttpClient>>,
830 cx: &mut App,
831 ) -> Entity<Self> {
832 cx.new::<Self>(|cx| {
833 cx.subscribe(&breakpoint_store, |this, store, event, cx| match event {
834 BreakpointStoreEvent::BreakpointsUpdated(path, reason) => {
835 if let Some(local) = (!this.ignore_breakpoints)
836 .then(|| this.as_running_mut())
837 .flatten()
838 {
839 local
840 .send_breakpoints_from_path(path.clone(), *reason, &store, cx)
841 .detach();
842 };
843 }
844 BreakpointStoreEvent::BreakpointsCleared(paths) => {
845 if let Some(local) = (!this.ignore_breakpoints)
846 .then(|| this.as_running_mut())
847 .flatten()
848 {
849 local.unset_breakpoints_from_paths(paths, cx).detach();
850 }
851 }
852 BreakpointStoreEvent::SetDebugLine | BreakpointStoreEvent::ClearDebugLines => {}
853 })
854 .detach();
855
856 Self {
857 mode: SessionState::Booting(None),
858 id: session_id,
859 child_session_ids: HashSet::default(),
860 parent_session,
861 capabilities: Capabilities::default(),
862 watchers: HashMap::default(),
863 variables: Default::default(),
864 stack_frames: Default::default(),
865 thread_states: ThreadStates::default(),
866 output_token: OutputToken(0),
867 output: circular_buffer::CircularBuffer::boxed(),
868 requests: HashMap::default(),
869 modules: Vec::default(),
870 loaded_sources: Vec::default(),
871 threads: IndexMap::default(),
872 background_tasks: Vec::default(),
873 restart_task: None,
874 locations: Default::default(),
875 is_session_terminated: false,
876 ignore_breakpoints: false,
877 breakpoint_store,
878 data_breakpoints: Default::default(),
879 exception_breakpoints: Default::default(),
880 label,
881 adapter,
882 task_context,
883 memory: memory::Memory::new(),
884 quirks,
885 remote_client,
886 node_runtime,
887 http_client,
888 companion_port: None,
889 }
890 })
891 }
892
893 pub fn task_context(&self) -> &TaskContext {
894 &self.task_context
895 }
896
897 pub fn worktree(&self) -> Option<Entity<Worktree>> {
898 match &self.mode {
899 SessionState::Booting(_) => None,
900 SessionState::Running(local_mode) => local_mode.worktree.upgrade(),
901 }
902 }
903
904 pub fn boot(
905 &mut self,
906 binary: DebugAdapterBinary,
907 worktree: Entity<Worktree>,
908 dap_store: WeakEntity<DapStore>,
909 cx: &mut Context<Self>,
910 ) -> Task<Result<()>> {
911 let (message_tx, mut message_rx) = futures::channel::mpsc::unbounded();
912 let (initialized_tx, initialized_rx) = futures::channel::oneshot::channel();
913
914 let background_tasks = vec![cx.spawn(async move |this: WeakEntity<Session>, cx| {
915 let mut initialized_tx = Some(initialized_tx);
916 while let Some(message) = message_rx.next().await {
917 if let Message::Event(event) = message {
918 if let Events::Initialized(_) = *event {
919 if let Some(tx) = initialized_tx.take() {
920 tx.send(()).ok();
921 }
922 } else {
923 let Ok(_) = this.update(cx, |session, cx| {
924 session.handle_dap_event(event, cx);
925 }) else {
926 break;
927 };
928 }
929 } else if let Message::Request(request) = message {
930 let Ok(_) = this.update(cx, |this, cx| {
931 if request.command == StartDebugging::COMMAND {
932 this.handle_start_debugging_request(request, cx)
933 .detach_and_log_err(cx);
934 } else if request.command == RunInTerminal::COMMAND {
935 this.handle_run_in_terminal_request(request, cx)
936 .detach_and_log_err(cx);
937 }
938 }) else {
939 break;
940 };
941 }
942 }
943 })];
944 self.background_tasks = background_tasks;
945 let id = self.id;
946 let parent_session = self.parent_session.clone();
947
948 cx.spawn(async move |this, cx| {
949 let mode = RunningMode::new(
950 id,
951 parent_session,
952 worktree.downgrade(),
953 binary.clone(),
954 message_tx,
955 cx,
956 )
957 .await?;
958 this.update(cx, |this, cx| {
959 match &mut this.mode {
960 SessionState::Booting(task) if task.is_some() => {
961 task.take().unwrap().detach_and_log_err(cx);
962 }
963 SessionState::Booting(_) => {}
964 SessionState::Running(_) => {
965 debug_panic!("Attempting to boot a session that is already running");
966 }
967 };
968 this.mode = SessionState::Running(mode);
969 cx.emit(SessionStateEvent::Running);
970 })?;
971
972 this.update(cx, |session, cx| session.request_initialize(cx))?
973 .await?;
974
975 let result = this
976 .update(cx, |session, cx| {
977 session.initialize_sequence(initialized_rx, dap_store.clone(), cx)
978 })?
979 .await;
980
981 if result.is_err() {
982 let mut console = this.update(cx, |session, cx| session.console_output(cx))?;
983
984 console
985 .send(format!(
986 "Tried to launch debugger with: {}",
987 serde_json::to_string_pretty(&binary.request_args.configuration)
988 .unwrap_or_default(),
989 ))
990 .await
991 .ok();
992 }
993
994 result
995 })
996 }
997
998 pub fn session_id(&self) -> SessionId {
999 self.id
1000 }
1001
1002 pub fn child_session_ids(&self) -> HashSet<SessionId> {
1003 self.child_session_ids.clone()
1004 }
1005
1006 pub fn add_child_session_id(&mut self, session_id: SessionId) {
1007 self.child_session_ids.insert(session_id);
1008 }
1009
1010 pub fn remove_child_session_id(&mut self, session_id: SessionId) {
1011 self.child_session_ids.remove(&session_id);
1012 }
1013
1014 pub fn parent_id(&self, cx: &App) -> Option<SessionId> {
1015 self.parent_session
1016 .as_ref()
1017 .map(|session| session.read(cx).id)
1018 }
1019
1020 pub fn parent_session(&self) -> Option<&Entity<Self>> {
1021 self.parent_session.as_ref()
1022 }
1023
1024 pub fn on_app_quit(&mut self, cx: &mut Context<Self>) -> Task<()> {
1025 let Some(client) = self.adapter_client() else {
1026 return Task::ready(());
1027 };
1028
1029 let supports_terminate = self
1030 .capabilities
1031 .support_terminate_debuggee
1032 .unwrap_or(false);
1033
1034 cx.background_spawn(async move {
1035 if supports_terminate {
1036 client
1037 .request::<dap::requests::Terminate>(dap::TerminateArguments {
1038 restart: Some(false),
1039 })
1040 .await
1041 .ok();
1042 } else {
1043 client
1044 .request::<dap::requests::Disconnect>(dap::DisconnectArguments {
1045 restart: Some(false),
1046 terminate_debuggee: Some(true),
1047 suspend_debuggee: Some(false),
1048 })
1049 .await
1050 .ok();
1051 }
1052 })
1053 }
1054
1055 pub fn capabilities(&self) -> &Capabilities {
1056 &self.capabilities
1057 }
1058
1059 pub fn binary(&self) -> Option<&DebugAdapterBinary> {
1060 match &self.mode {
1061 SessionState::Booting(_) => None,
1062 SessionState::Running(running_mode) => Some(&running_mode.binary),
1063 }
1064 }
1065
1066 pub fn adapter(&self) -> DebugAdapterName {
1067 self.adapter.clone()
1068 }
1069
1070 pub fn label(&self) -> Option<SharedString> {
1071 self.label.clone()
1072 }
1073
1074 pub fn is_terminated(&self) -> bool {
1075 self.is_session_terminated
1076 }
1077
1078 pub fn console_output(&mut self, cx: &mut Context<Self>) -> mpsc::UnboundedSender<String> {
1079 let (tx, mut rx) = mpsc::unbounded();
1080
1081 cx.spawn(async move |this, cx| {
1082 while let Some(output) = rx.next().await {
1083 this.update(cx, |this, _| {
1084 let event = dap::OutputEvent {
1085 category: None,
1086 output,
1087 group: None,
1088 variables_reference: None,
1089 source: None,
1090 line: None,
1091 column: None,
1092 data: None,
1093 location_reference: None,
1094 };
1095 this.push_output(event);
1096 })?;
1097 }
1098 anyhow::Ok(())
1099 })
1100 .detach();
1101
1102 tx
1103 }
1104
1105 pub fn is_started(&self) -> bool {
1106 match &self.mode {
1107 SessionState::Booting(_) => false,
1108 SessionState::Running(running) => running.is_started,
1109 }
1110 }
1111
1112 pub fn is_building(&self) -> bool {
1113 matches!(self.mode, SessionState::Booting(_))
1114 }
1115
1116 pub fn as_running_mut(&mut self) -> Option<&mut RunningMode> {
1117 match &mut self.mode {
1118 SessionState::Running(local_mode) => Some(local_mode),
1119 SessionState::Booting(_) => None,
1120 }
1121 }
1122
1123 pub fn as_running(&self) -> Option<&RunningMode> {
1124 match &self.mode {
1125 SessionState::Running(local_mode) => Some(local_mode),
1126 SessionState::Booting(_) => None,
1127 }
1128 }
1129
1130 fn handle_start_debugging_request(
1131 &mut self,
1132 request: dap::messages::Request,
1133 cx: &mut Context<Self>,
1134 ) -> Task<Result<()>> {
1135 let request_seq = request.seq;
1136
1137 let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
1138 .arguments
1139 .as_ref()
1140 .map(|value| serde_json::from_value(value.clone()));
1141
1142 let mut success = true;
1143 if let Some(Ok(request)) = launch_request {
1144 cx.emit(SessionStateEvent::SpawnChildSession { request });
1145 } else {
1146 log::error!(
1147 "Failed to parse launch request arguments: {:?}",
1148 request.arguments
1149 );
1150 success = false;
1151 }
1152
1153 cx.spawn(async move |this, cx| {
1154 this.update(cx, |this, cx| {
1155 this.respond_to_client(
1156 request_seq,
1157 success,
1158 StartDebugging::COMMAND.to_string(),
1159 None,
1160 cx,
1161 )
1162 })?
1163 .await
1164 })
1165 }
1166
1167 fn handle_run_in_terminal_request(
1168 &mut self,
1169 request: dap::messages::Request,
1170 cx: &mut Context<Self>,
1171 ) -> Task<Result<()>> {
1172 let request_args = match serde_json::from_value::<RunInTerminalRequestArguments>(
1173 request.arguments.unwrap_or_default(),
1174 ) {
1175 Ok(args) => args,
1176 Err(error) => {
1177 return cx.spawn(async move |session, cx| {
1178 let error = serde_json::to_value(dap::ErrorResponse {
1179 error: Some(dap::Message {
1180 id: request.seq,
1181 format: error.to_string(),
1182 variables: None,
1183 send_telemetry: None,
1184 show_user: None,
1185 url: None,
1186 url_label: None,
1187 }),
1188 })
1189 .ok();
1190
1191 session
1192 .update(cx, |this, cx| {
1193 this.respond_to_client(
1194 request.seq,
1195 false,
1196 StartDebugging::COMMAND.to_string(),
1197 error,
1198 cx,
1199 )
1200 })?
1201 .await?;
1202
1203 Err(anyhow!("Failed to parse RunInTerminalRequestArguments"))
1204 });
1205 }
1206 };
1207
1208 let seq = request.seq;
1209
1210 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
1211 cx.emit(SessionEvent::RunInTerminal {
1212 request: request_args,
1213 sender: tx,
1214 });
1215 cx.notify();
1216
1217 cx.spawn(async move |session, cx| {
1218 let result = util::maybe!(async move {
1219 rx.next().await.ok_or_else(|| {
1220 anyhow!("failed to receive response from spawn terminal".to_string())
1221 })?
1222 })
1223 .await;
1224 let (success, body) = match result {
1225 Ok(pid) => (
1226 true,
1227 serde_json::to_value(dap::RunInTerminalResponse {
1228 process_id: None,
1229 shell_process_id: Some(pid as u64),
1230 })
1231 .ok(),
1232 ),
1233 Err(error) => (
1234 false,
1235 serde_json::to_value(dap::ErrorResponse {
1236 error: Some(dap::Message {
1237 id: seq,
1238 format: error.to_string(),
1239 variables: None,
1240 send_telemetry: None,
1241 show_user: None,
1242 url: None,
1243 url_label: None,
1244 }),
1245 })
1246 .ok(),
1247 ),
1248 };
1249
1250 session
1251 .update(cx, |session, cx| {
1252 session.respond_to_client(
1253 seq,
1254 success,
1255 RunInTerminal::COMMAND.to_string(),
1256 body,
1257 cx,
1258 )
1259 })?
1260 .await
1261 })
1262 }
1263
1264 pub(super) fn request_initialize(&mut self, cx: &mut Context<Self>) -> Task<Result<()>> {
1265 let adapter_id = self.adapter().to_string();
1266 let request = Initialize { adapter_id };
1267
1268 let SessionState::Running(running) = &self.mode else {
1269 return Task::ready(Err(anyhow!(
1270 "Cannot send initialize request, task still building"
1271 )));
1272 };
1273 let mut response = running.request(request.clone());
1274
1275 cx.spawn(async move |this, cx| {
1276 loop {
1277 let capabilities = response.await;
1278 match capabilities {
1279 Err(e) => {
1280 let Ok(Some(reconnect)) = this.update(cx, |this, cx| {
1281 this.as_running()
1282 .and_then(|running| running.reconnect_for_ssh(&mut cx.to_async()))
1283 }) else {
1284 return Err(e);
1285 };
1286 log::info!("Failed to connect to debug adapter: {}, retrying...", e);
1287 reconnect.await?;
1288
1289 let Ok(Some(r)) = this.update(cx, |this, _| {
1290 this.as_running()
1291 .map(|running| running.request(request.clone()))
1292 }) else {
1293 return Err(e);
1294 };
1295 response = r
1296 }
1297 Ok(capabilities) => {
1298 this.update(cx, |session, cx| {
1299 session.capabilities = capabilities;
1300
1301 cx.emit(SessionEvent::CapabilitiesLoaded);
1302 })?;
1303 return Ok(());
1304 }
1305 }
1306 }
1307 })
1308 }
1309
1310 pub(super) fn initialize_sequence(
1311 &mut self,
1312 initialize_rx: oneshot::Receiver<()>,
1313 dap_store: WeakEntity<DapStore>,
1314 cx: &mut Context<Self>,
1315 ) -> Task<Result<()>> {
1316 match &self.mode {
1317 SessionState::Running(local_mode) => {
1318 local_mode.initialize_sequence(&self.capabilities, initialize_rx, dap_store, cx)
1319 }
1320 SessionState::Booting(_) => {
1321 Task::ready(Err(anyhow!("cannot initialize, still building")))
1322 }
1323 }
1324 }
1325
1326 pub fn run_to_position(
1327 &mut self,
1328 breakpoint: SourceBreakpoint,
1329 active_thread_id: ThreadId,
1330 cx: &mut Context<Self>,
1331 ) {
1332 match &mut self.mode {
1333 SessionState::Running(local_mode) => {
1334 if !matches!(
1335 self.thread_states.thread_state(active_thread_id),
1336 Some(ThreadStatus::Stopped)
1337 ) {
1338 return;
1339 };
1340 let path = breakpoint.path.clone();
1341 local_mode.tmp_breakpoint = Some(breakpoint);
1342 let task = local_mode.send_breakpoints_from_path(
1343 path,
1344 BreakpointUpdatedReason::Toggled,
1345 &self.breakpoint_store,
1346 cx,
1347 );
1348
1349 cx.spawn(async move |this, cx| {
1350 task.await;
1351 this.update(cx, |this, cx| {
1352 this.continue_thread(active_thread_id, cx);
1353 })
1354 })
1355 .detach();
1356 }
1357 SessionState::Booting(_) => {}
1358 }
1359 }
1360
1361 pub fn has_new_output(&self, last_update: OutputToken) -> bool {
1362 self.output_token.0.checked_sub(last_update.0).unwrap_or(0) != 0
1363 }
1364
1365 pub fn output(
1366 &self,
1367 since: OutputToken,
1368 ) -> (impl Iterator<Item = &dap::OutputEvent>, OutputToken) {
1369 if self.output_token.0 == 0 {
1370 return (self.output.range(0..0), OutputToken(0));
1371 };
1372
1373 let events_since = self.output_token.0.checked_sub(since.0).unwrap_or(0);
1374
1375 let clamped_events_since = events_since.clamp(0, self.output.len());
1376 (
1377 self.output
1378 .range(self.output.len() - clamped_events_since..),
1379 self.output_token,
1380 )
1381 }
1382
1383 pub fn respond_to_client(
1384 &self,
1385 request_seq: u64,
1386 success: bool,
1387 command: String,
1388 body: Option<serde_json::Value>,
1389 cx: &mut Context<Self>,
1390 ) -> Task<Result<()>> {
1391 let Some(local_session) = self.as_running() else {
1392 unreachable!("Cannot respond to remote client");
1393 };
1394 let client = local_session.client.clone();
1395
1396 cx.background_spawn(async move {
1397 client
1398 .send_message(Message::Response(Response {
1399 body,
1400 success,
1401 command,
1402 seq: request_seq + 1,
1403 request_seq,
1404 message: None,
1405 }))
1406 .await
1407 })
1408 }
1409
1410 fn handle_stopped_event(&mut self, event: StoppedEvent, cx: &mut Context<Self>) {
1411 self.mode.stopped();
1412 // todo(debugger): Find a clean way to get around the clone
1413 let breakpoint_store = self.breakpoint_store.clone();
1414 if let Some((local, path)) = self.as_running_mut().and_then(|local| {
1415 let breakpoint = local.tmp_breakpoint.take()?;
1416 let path = breakpoint.path;
1417 Some((local, path))
1418 }) {
1419 local
1420 .send_breakpoints_from_path(
1421 path,
1422 BreakpointUpdatedReason::Toggled,
1423 &breakpoint_store,
1424 cx,
1425 )
1426 .detach();
1427 };
1428
1429 if event.all_threads_stopped.unwrap_or_default() || event.thread_id.is_none() {
1430 self.thread_states.stop_all_threads();
1431 self.invalidate_command_type::<StackTraceCommand>();
1432 }
1433
1434 // Event if we stopped all threads we still need to insert the thread_id
1435 // to our own data
1436 if let Some(thread_id) = event.thread_id {
1437 self.thread_states.stop_thread(ThreadId(thread_id));
1438
1439 self.invalidate_state(
1440 &StackTraceCommand {
1441 thread_id,
1442 start_frame: None,
1443 levels: None,
1444 }
1445 .into(),
1446 );
1447 }
1448
1449 self.invalidate_generic();
1450 self.threads.clear();
1451 self.variables.clear();
1452 cx.emit(SessionEvent::Stopped(
1453 event
1454 .thread_id
1455 .map(Into::into)
1456 .filter(|_| !event.preserve_focus_hint.unwrap_or(false)),
1457 ));
1458 cx.emit(SessionEvent::InvalidateInlineValue);
1459 cx.notify();
1460 }
1461
1462 pub(crate) fn handle_dap_event(&mut self, event: Box<Events>, cx: &mut Context<Self>) {
1463 match *event {
1464 Events::Initialized(_) => {
1465 debug_assert!(
1466 false,
1467 "Initialized event should have been handled in LocalMode"
1468 );
1469 }
1470 Events::Stopped(event) => self.handle_stopped_event(event, cx),
1471 Events::Continued(event) => {
1472 if event.all_threads_continued.unwrap_or_default() {
1473 self.thread_states.continue_all_threads();
1474 self.breakpoint_store.update(cx, |store, cx| {
1475 store.remove_active_position(Some(self.session_id()), cx)
1476 });
1477 } else {
1478 self.thread_states
1479 .continue_thread(ThreadId(event.thread_id));
1480 }
1481 // todo(debugger): We should be able to get away with only invalidating generic if all threads were continued
1482 self.invalidate_generic();
1483 }
1484 Events::Exited(_event) => {
1485 self.clear_active_debug_line(cx);
1486 }
1487 Events::Terminated(_) => {
1488 self.shutdown(cx).detach();
1489 }
1490 Events::Thread(event) => {
1491 let thread_id = ThreadId(event.thread_id);
1492
1493 match event.reason {
1494 dap::ThreadEventReason::Started => {
1495 self.thread_states.continue_thread(thread_id);
1496 }
1497 dap::ThreadEventReason::Exited => {
1498 self.thread_states.exit_thread(thread_id);
1499 }
1500 reason => {
1501 log::error!("Unhandled thread event reason {:?}", reason);
1502 }
1503 }
1504 self.invalidate_state(&ThreadsCommand.into());
1505 cx.notify();
1506 }
1507 Events::Output(event) => {
1508 if event
1509 .category
1510 .as_ref()
1511 .is_some_and(|category| *category == OutputEventCategory::Telemetry)
1512 {
1513 return;
1514 }
1515
1516 self.push_output(event);
1517 cx.notify();
1518 }
1519 Events::Breakpoint(event) => self.breakpoint_store.update(cx, |store, _| {
1520 store.update_session_breakpoint(self.session_id(), event.reason, event.breakpoint);
1521 }),
1522 Events::Module(event) => {
1523 match event.reason {
1524 dap::ModuleEventReason::New => {
1525 self.modules.push(event.module);
1526 }
1527 dap::ModuleEventReason::Changed => {
1528 if let Some(module) = self
1529 .modules
1530 .iter_mut()
1531 .find(|other| event.module.id == other.id)
1532 {
1533 *module = event.module;
1534 }
1535 }
1536 dap::ModuleEventReason::Removed => {
1537 self.modules.retain(|other| event.module.id != other.id);
1538 }
1539 }
1540
1541 // todo(debugger): We should only send the invalidate command to downstream clients.
1542 // self.invalidate_state(&ModulesCommand.into());
1543 }
1544 Events::LoadedSource(_) => {
1545 self.invalidate_state(&LoadedSourcesCommand.into());
1546 }
1547 Events::Capabilities(event) => {
1548 self.capabilities = self.capabilities.merge(event.capabilities);
1549
1550 // The adapter might've enabled new exception breakpoints (or disabled existing ones).
1551 let recent_filters = self
1552 .capabilities
1553 .exception_breakpoint_filters
1554 .iter()
1555 .flatten()
1556 .map(|filter| (filter.filter.clone(), filter.clone()))
1557 .collect::<BTreeMap<_, _>>();
1558 for filter in recent_filters.values() {
1559 let default = filter.default.unwrap_or_default();
1560 self.exception_breakpoints
1561 .entry(filter.filter.clone())
1562 .or_insert_with(|| (filter.clone(), default));
1563 }
1564 self.exception_breakpoints
1565 .retain(|k, _| recent_filters.contains_key(k));
1566 if self.is_started() {
1567 self.send_exception_breakpoints(cx);
1568 }
1569
1570 // Remove the ones that no longer exist.
1571 cx.notify();
1572 }
1573 Events::Memory(_) => {}
1574 Events::Process(_) => {}
1575 Events::ProgressEnd(_) => {}
1576 Events::ProgressStart(_) => {}
1577 Events::ProgressUpdate(_) => {}
1578 Events::Invalidated(_) => {}
1579 Events::Other(event) => {
1580 if event.event == "launchBrowserInCompanion" {
1581 let Some(request) = serde_json::from_value(event.body).ok() else {
1582 log::error!("failed to deserialize launchBrowserInCompanion event");
1583 return;
1584 };
1585 self.launch_browser_for_remote_server(request, cx);
1586 } else if event.event == "killCompanionBrowser" {
1587 let Some(request) = serde_json::from_value(event.body).ok() else {
1588 log::error!("failed to deserialize killCompanionBrowser event");
1589 return;
1590 };
1591 self.kill_browser(request, cx);
1592 }
1593 }
1594 }
1595 }
1596
1597 /// Ensure that there's a request in flight for the given command, and if not, send it. Use this to run requests that are idempotent.
1598 fn fetch<T: LocalDapCommand + PartialEq + Eq + Hash>(
1599 &mut self,
1600 request: T,
1601 process_result: impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) + 'static,
1602 cx: &mut Context<Self>,
1603 ) {
1604 const {
1605 assert!(
1606 T::CACHEABLE,
1607 "Only requests marked as cacheable should invoke `fetch`"
1608 );
1609 }
1610
1611 if !self.thread_states.any_stopped_thread()
1612 && request.type_id() != TypeId::of::<ThreadsCommand>()
1613 || self.is_session_terminated
1614 {
1615 return;
1616 }
1617
1618 let request_map = self
1619 .requests
1620 .entry(std::any::TypeId::of::<T>())
1621 .or_default();
1622
1623 if let Entry::Vacant(vacant) = request_map.entry(request.into()) {
1624 let command = vacant.key().0.clone().as_any_arc().downcast::<T>().unwrap();
1625
1626 let task = Self::request_inner::<Arc<T>>(
1627 &self.capabilities,
1628 &self.mode,
1629 command,
1630 |this, result, cx| {
1631 process_result(this, result, cx);
1632 None
1633 },
1634 cx,
1635 );
1636 let task = cx
1637 .background_executor()
1638 .spawn(async move {
1639 let _ = task.await?;
1640 Some(())
1641 })
1642 .shared();
1643
1644 vacant.insert(task);
1645 cx.notify();
1646 }
1647 }
1648
1649 fn request_inner<T: LocalDapCommand + PartialEq + Eq + Hash>(
1650 capabilities: &Capabilities,
1651 mode: &SessionState,
1652 request: T,
1653 process_result: impl FnOnce(
1654 &mut Self,
1655 Result<T::Response>,
1656 &mut Context<Self>,
1657 ) -> Option<T::Response>
1658 + 'static,
1659 cx: &mut Context<Self>,
1660 ) -> Task<Option<T::Response>> {
1661 if !T::is_supported(capabilities) {
1662 log::warn!(
1663 "Attempted to send a DAP request that isn't supported: {:?}",
1664 request
1665 );
1666 let error = Err(anyhow::Error::msg(
1667 "Couldn't complete request because it's not supported",
1668 ));
1669 return cx.spawn(async move |this, cx| {
1670 this.update(cx, |this, cx| process_result(this, error, cx))
1671 .ok()
1672 .flatten()
1673 });
1674 }
1675
1676 let request = mode.request_dap(request);
1677 cx.spawn(async move |this, cx| {
1678 let result = request.await;
1679 this.update(cx, |this, cx| process_result(this, result, cx))
1680 .ok()
1681 .flatten()
1682 })
1683 }
1684
1685 fn request<T: LocalDapCommand + PartialEq + Eq + Hash>(
1686 &self,
1687 request: T,
1688 process_result: impl FnOnce(
1689 &mut Self,
1690 Result<T::Response>,
1691 &mut Context<Self>,
1692 ) -> Option<T::Response>
1693 + 'static,
1694 cx: &mut Context<Self>,
1695 ) -> Task<Option<T::Response>> {
1696 Self::request_inner(&self.capabilities, &self.mode, request, process_result, cx)
1697 }
1698
1699 fn invalidate_command_type<Command: LocalDapCommand>(&mut self) {
1700 self.requests.remove(&std::any::TypeId::of::<Command>());
1701 }
1702
1703 fn invalidate_generic(&mut self) {
1704 self.invalidate_command_type::<ModulesCommand>();
1705 self.invalidate_command_type::<LoadedSourcesCommand>();
1706 self.invalidate_command_type::<ThreadsCommand>();
1707 self.invalidate_command_type::<DataBreakpointInfoCommand>();
1708 self.invalidate_command_type::<ReadMemory>();
1709 let executor = self.as_running().map(|running| running.executor.clone());
1710 if let Some(executor) = executor {
1711 self.memory.clear(&executor);
1712 }
1713 }
1714
1715 fn invalidate_state(&mut self, key: &RequestSlot) {
1716 self.requests
1717 .entry((&*key.0 as &dyn Any).type_id())
1718 .and_modify(|request_map| {
1719 request_map.remove(key);
1720 });
1721 }
1722
1723 fn push_output(&mut self, event: OutputEvent) {
1724 self.output.push_back(event);
1725 self.output_token.0 += 1;
1726 }
1727
1728 pub fn any_stopped_thread(&self) -> bool {
1729 self.thread_states.any_stopped_thread()
1730 }
1731
1732 pub fn thread_status(&self, thread_id: ThreadId) -> ThreadStatus {
1733 self.thread_states.thread_status(thread_id)
1734 }
1735
1736 pub fn threads(&mut self, cx: &mut Context<Self>) -> Vec<(dap::Thread, ThreadStatus)> {
1737 self.fetch(
1738 dap_command::ThreadsCommand,
1739 |this, result, cx| {
1740 let Some(result) = result.log_err() else {
1741 return;
1742 };
1743
1744 this.threads = result
1745 .into_iter()
1746 .map(|thread| (ThreadId(thread.id), Thread::from(thread)))
1747 .collect();
1748
1749 this.invalidate_command_type::<StackTraceCommand>();
1750 cx.emit(SessionEvent::Threads);
1751 cx.notify();
1752 },
1753 cx,
1754 );
1755
1756 self.threads
1757 .values()
1758 .map(|thread| {
1759 (
1760 thread.dap.clone(),
1761 self.thread_states.thread_status(ThreadId(thread.dap.id)),
1762 )
1763 })
1764 .collect()
1765 }
1766
1767 pub fn modules(&mut self, cx: &mut Context<Self>) -> &[Module] {
1768 self.fetch(
1769 dap_command::ModulesCommand,
1770 |this, result, cx| {
1771 let Some(result) = result.log_err() else {
1772 return;
1773 };
1774
1775 this.modules = result;
1776 cx.emit(SessionEvent::Modules);
1777 cx.notify();
1778 },
1779 cx,
1780 );
1781
1782 &self.modules
1783 }
1784
1785 // CodeLLDB returns the size of a pointed-to-memory, which we can use to make the experience of go-to-memory better.
1786 pub fn data_access_size(
1787 &mut self,
1788 frame_id: Option<u64>,
1789 evaluate_name: &str,
1790 cx: &mut Context<Self>,
1791 ) -> Task<Option<u64>> {
1792 let request = self.request(
1793 EvaluateCommand {
1794 expression: format!("?${{sizeof({evaluate_name})}}"),
1795 frame_id,
1796
1797 context: Some(EvaluateArgumentsContext::Repl),
1798 source: None,
1799 },
1800 |_, response, _| response.ok(),
1801 cx,
1802 );
1803 cx.background_spawn(async move {
1804 let result = request.await?;
1805 result.result.parse().ok()
1806 })
1807 }
1808
1809 pub fn memory_reference_of_expr(
1810 &mut self,
1811 frame_id: Option<u64>,
1812 expression: String,
1813 cx: &mut Context<Self>,
1814 ) -> Task<Option<(String, Option<String>)>> {
1815 let request = self.request(
1816 EvaluateCommand {
1817 expression,
1818 frame_id,
1819
1820 context: Some(EvaluateArgumentsContext::Repl),
1821 source: None,
1822 },
1823 |_, response, _| response.ok(),
1824 cx,
1825 );
1826 cx.background_spawn(async move {
1827 let result = request.await?;
1828 result
1829 .memory_reference
1830 .map(|reference| (reference, result.type_))
1831 })
1832 }
1833
1834 pub fn write_memory(&mut self, address: u64, data: &[u8], cx: &mut Context<Self>) {
1835 let data = base64::engine::general_purpose::STANDARD.encode(data);
1836 self.request(
1837 WriteMemoryArguments {
1838 memory_reference: address.to_string(),
1839 data,
1840 allow_partial: None,
1841 offset: None,
1842 },
1843 |this, response, cx| {
1844 this.memory.clear(cx.background_executor());
1845 this.invalidate_command_type::<ReadMemory>();
1846 this.invalidate_command_type::<VariablesCommand>();
1847 cx.emit(SessionEvent::Variables);
1848 response.ok()
1849 },
1850 cx,
1851 )
1852 .detach();
1853 }
1854 pub fn read_memory(
1855 &mut self,
1856 range: RangeInclusive<u64>,
1857 cx: &mut Context<Self>,
1858 ) -> MemoryIterator {
1859 // This function is a bit more involved when it comes to fetching data.
1860 // Since we attempt to read memory in pages, we need to account for some parts
1861 // of memory being unreadable. Therefore, we start off by fetching a page per request.
1862 // In case that fails, we try to re-fetch smaller regions until we have the full range.
1863 let page_range = Memory::memory_range_to_page_range(range.clone());
1864 for page_address in PageAddress::iter_range(page_range) {
1865 self.read_single_page_memory(page_address, cx);
1866 }
1867 self.memory.memory_range(range)
1868 }
1869
1870 fn read_single_page_memory(&mut self, page_start: PageAddress, cx: &mut Context<Self>) {
1871 _ = maybe!({
1872 let builder = self.memory.build_page(page_start)?;
1873
1874 self.memory_read_fetch_page_recursive(builder, cx);
1875 Some(())
1876 });
1877 }
1878 fn memory_read_fetch_page_recursive(
1879 &mut self,
1880 mut builder: MemoryPageBuilder,
1881 cx: &mut Context<Self>,
1882 ) {
1883 let Some(next_request) = builder.next_request() else {
1884 // We're done fetching. Let's grab the page and insert it into our memory store.
1885 let (address, contents) = builder.build();
1886 self.memory.insert_page(address, contents);
1887
1888 return;
1889 };
1890 let size = next_request.size;
1891 self.fetch(
1892 ReadMemory {
1893 memory_reference: format!("0x{:X}", next_request.address),
1894 offset: Some(0),
1895 count: next_request.size,
1896 },
1897 move |this, memory, cx| {
1898 if let Ok(memory) = memory {
1899 builder.known(memory.content);
1900 if let Some(unknown) = memory.unreadable_bytes {
1901 builder.unknown(unknown);
1902 }
1903 // This is the recursive bit: if we're not yet done with
1904 // the whole page, we'll kick off a new request with smaller range.
1905 // Note that this function is recursive only conceptually;
1906 // since it kicks off a new request with callback, we don't need to worry about stack overflow.
1907 this.memory_read_fetch_page_recursive(builder, cx);
1908 } else {
1909 builder.unknown(size);
1910 }
1911 },
1912 cx,
1913 );
1914 }
1915
1916 pub fn ignore_breakpoints(&self) -> bool {
1917 self.ignore_breakpoints
1918 }
1919
1920 pub fn toggle_ignore_breakpoints(
1921 &mut self,
1922 cx: &mut App,
1923 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1924 self.set_ignore_breakpoints(!self.ignore_breakpoints, cx)
1925 }
1926
1927 pub(crate) fn set_ignore_breakpoints(
1928 &mut self,
1929 ignore: bool,
1930 cx: &mut App,
1931 ) -> Task<HashMap<Arc<Path>, anyhow::Error>> {
1932 if self.ignore_breakpoints == ignore {
1933 return Task::ready(HashMap::default());
1934 }
1935
1936 self.ignore_breakpoints = ignore;
1937
1938 if let Some(local) = self.as_running() {
1939 local.send_source_breakpoints(ignore, &self.breakpoint_store, cx)
1940 } else {
1941 // todo(debugger): We need to propagate this change to downstream sessions and send a message to upstream sessions
1942 unimplemented!()
1943 }
1944 }
1945
1946 pub fn data_breakpoints(&self) -> impl Iterator<Item = &DataBreakpointState> {
1947 self.data_breakpoints.values()
1948 }
1949
1950 pub fn exception_breakpoints(
1951 &self,
1952 ) -> impl Iterator<Item = &(ExceptionBreakpointsFilter, IsEnabled)> {
1953 self.exception_breakpoints.values()
1954 }
1955
1956 pub fn toggle_exception_breakpoint(&mut self, id: &str, cx: &App) {
1957 if let Some((_, is_enabled)) = self.exception_breakpoints.get_mut(id) {
1958 *is_enabled = !*is_enabled;
1959 self.send_exception_breakpoints(cx);
1960 }
1961 }
1962
1963 fn send_exception_breakpoints(&mut self, cx: &App) {
1964 if let Some(local) = self.as_running() {
1965 let exception_filters = self
1966 .exception_breakpoints
1967 .values()
1968 .filter_map(|(filter, is_enabled)| is_enabled.then(|| filter.clone()))
1969 .collect();
1970
1971 let supports_exception_filters = self
1972 .capabilities
1973 .supports_exception_filter_options
1974 .unwrap_or_default();
1975 local
1976 .send_exception_breakpoints(exception_filters, supports_exception_filters)
1977 .detach_and_log_err(cx);
1978 } else {
1979 debug_assert!(false, "Not implemented");
1980 }
1981 }
1982
1983 pub fn toggle_data_breakpoint(&mut self, id: &str, cx: &mut Context<'_, Session>) {
1984 if let Some(state) = self.data_breakpoints.get_mut(id) {
1985 state.is_enabled = !state.is_enabled;
1986 self.send_exception_breakpoints(cx);
1987 }
1988 }
1989
1990 fn send_data_breakpoints(&mut self, cx: &mut Context<Self>) {
1991 if let Some(mode) = self.as_running() {
1992 let breakpoints = self
1993 .data_breakpoints
1994 .values()
1995 .filter_map(|state| state.is_enabled.then(|| state.dap.clone()))
1996 .collect();
1997 let command = SetDataBreakpointsCommand { breakpoints };
1998 mode.request(command).detach_and_log_err(cx);
1999 }
2000 }
2001
2002 pub fn create_data_breakpoint(
2003 &mut self,
2004 context: Arc<DataBreakpointContext>,
2005 data_id: String,
2006 dap: dap::DataBreakpoint,
2007 cx: &mut Context<Self>,
2008 ) {
2009 if self.data_breakpoints.remove(&data_id).is_none() {
2010 self.data_breakpoints.insert(
2011 data_id,
2012 DataBreakpointState {
2013 dap,
2014 is_enabled: true,
2015 context,
2016 },
2017 );
2018 }
2019 self.send_data_breakpoints(cx);
2020 }
2021
2022 pub fn breakpoints_enabled(&self) -> bool {
2023 self.ignore_breakpoints
2024 }
2025
2026 pub fn loaded_sources(&mut self, cx: &mut Context<Self>) -> &[Source] {
2027 self.fetch(
2028 dap_command::LoadedSourcesCommand,
2029 |this, result, cx| {
2030 let Some(result) = result.log_err() else {
2031 return;
2032 };
2033 this.loaded_sources = result;
2034 cx.emit(SessionEvent::LoadedSources);
2035 cx.notify();
2036 },
2037 cx,
2038 );
2039
2040 &self.loaded_sources
2041 }
2042
2043 fn fallback_to_manual_restart(
2044 &mut self,
2045 res: Result<()>,
2046 cx: &mut Context<Self>,
2047 ) -> Option<()> {
2048 if res.log_err().is_none() {
2049 cx.emit(SessionStateEvent::Restart);
2050 return None;
2051 }
2052 Some(())
2053 }
2054
2055 fn empty_response(&mut self, res: Result<()>, _cx: &mut Context<Self>) -> Option<()> {
2056 res.log_err()?;
2057 Some(())
2058 }
2059
2060 fn on_step_response<T: LocalDapCommand + PartialEq + Eq + Hash>(
2061 thread_id: ThreadId,
2062 ) -> impl FnOnce(&mut Self, Result<T::Response>, &mut Context<Self>) -> Option<T::Response> + 'static
2063 {
2064 move |this, response, cx| match response.log_err() {
2065 Some(response) => {
2066 this.breakpoint_store.update(cx, |store, cx| {
2067 store.remove_active_position(Some(this.session_id()), cx)
2068 });
2069 Some(response)
2070 }
2071 None => {
2072 this.thread_states.stop_thread(thread_id);
2073 cx.notify();
2074 None
2075 }
2076 }
2077 }
2078
2079 fn clear_active_debug_line_response(
2080 &mut self,
2081 response: Result<()>,
2082 cx: &mut Context<Session>,
2083 ) -> Option<()> {
2084 response.log_err()?;
2085 self.clear_active_debug_line(cx);
2086 Some(())
2087 }
2088
2089 fn clear_active_debug_line(&mut self, cx: &mut Context<Session>) {
2090 self.breakpoint_store.update(cx, |store, cx| {
2091 store.remove_active_position(Some(self.id), cx)
2092 });
2093 }
2094
2095 pub fn pause_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2096 self.request(
2097 PauseCommand {
2098 thread_id: thread_id.0,
2099 },
2100 Self::empty_response,
2101 cx,
2102 )
2103 .detach();
2104 }
2105
2106 pub fn restart_stack_frame(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) {
2107 self.request(
2108 RestartStackFrameCommand { stack_frame_id },
2109 Self::empty_response,
2110 cx,
2111 )
2112 .detach();
2113 }
2114
2115 pub fn restart(&mut self, args: Option<Value>, cx: &mut Context<Self>) {
2116 if self.restart_task.is_some() || self.as_running().is_none() {
2117 return;
2118 }
2119
2120 let supports_dap_restart =
2121 self.capabilities.supports_restart_request.unwrap_or(false) && !self.is_terminated();
2122
2123 self.restart_task = Some(cx.spawn(async move |this, cx| {
2124 let _ = this.update(cx, |session, cx| {
2125 if supports_dap_restart {
2126 session
2127 .request(
2128 RestartCommand {
2129 raw: args.unwrap_or(Value::Null),
2130 },
2131 Self::fallback_to_manual_restart,
2132 cx,
2133 )
2134 .detach();
2135 } else {
2136 cx.emit(SessionStateEvent::Restart);
2137 }
2138 });
2139 }));
2140 }
2141
2142 pub fn shutdown(&mut self, cx: &mut Context<Self>) -> Task<()> {
2143 if self.is_session_terminated {
2144 return Task::ready(());
2145 }
2146
2147 self.is_session_terminated = true;
2148 self.thread_states.exit_all_threads();
2149 cx.notify();
2150
2151 let task = match &mut self.mode {
2152 SessionState::Running(_) => {
2153 if self
2154 .capabilities
2155 .supports_terminate_request
2156 .unwrap_or_default()
2157 {
2158 self.request(
2159 TerminateCommand {
2160 restart: Some(false),
2161 },
2162 Self::clear_active_debug_line_response,
2163 cx,
2164 )
2165 } else {
2166 self.request(
2167 DisconnectCommand {
2168 restart: Some(false),
2169 terminate_debuggee: Some(true),
2170 suspend_debuggee: Some(false),
2171 },
2172 Self::clear_active_debug_line_response,
2173 cx,
2174 )
2175 }
2176 }
2177 SessionState::Booting(build_task) => {
2178 build_task.take();
2179 Task::ready(Some(()))
2180 }
2181 };
2182
2183 cx.emit(SessionStateEvent::Shutdown);
2184
2185 cx.spawn(async move |this, cx| {
2186 task.await;
2187 let _ = this.update(cx, |this, _| {
2188 if let Some(adapter_client) = this.adapter_client() {
2189 adapter_client.kill();
2190 }
2191 });
2192 })
2193 }
2194
2195 pub fn completions(
2196 &mut self,
2197 query: CompletionsQuery,
2198 cx: &mut Context<Self>,
2199 ) -> Task<Result<Vec<dap::CompletionItem>>> {
2200 let task = self.request(query, |_, result, _| result.log_err(), cx);
2201
2202 cx.background_executor().spawn(async move {
2203 anyhow::Ok(
2204 task.await
2205 .map(|response| response.targets)
2206 .context("failed to fetch completions")?,
2207 )
2208 })
2209 }
2210
2211 pub fn continue_thread(&mut self, thread_id: ThreadId, cx: &mut Context<Self>) {
2212 let supports_single_thread_execution_requests =
2213 self.capabilities.supports_single_thread_execution_requests;
2214 self.thread_states.continue_thread(thread_id);
2215 self.request(
2216 ContinueCommand {
2217 args: ContinueArguments {
2218 thread_id: thread_id.0,
2219 single_thread: supports_single_thread_execution_requests,
2220 },
2221 },
2222 Self::on_step_response::<ContinueCommand>(thread_id),
2223 cx,
2224 )
2225 .detach();
2226 }
2227
2228 pub fn adapter_client(&self) -> Option<Arc<DebugAdapterClient>> {
2229 match self.mode {
2230 SessionState::Running(ref local) => Some(local.client.clone()),
2231 SessionState::Booting(_) => None,
2232 }
2233 }
2234
2235 pub fn has_ever_stopped(&self) -> bool {
2236 self.mode.has_ever_stopped()
2237 }
2238 pub fn step_over(
2239 &mut self,
2240 thread_id: ThreadId,
2241 granularity: SteppingGranularity,
2242 cx: &mut Context<Self>,
2243 ) {
2244 let supports_single_thread_execution_requests =
2245 self.capabilities.supports_single_thread_execution_requests;
2246 let supports_stepping_granularity = self
2247 .capabilities
2248 .supports_stepping_granularity
2249 .unwrap_or_default();
2250
2251 let command = NextCommand {
2252 inner: StepCommand {
2253 thread_id: thread_id.0,
2254 granularity: supports_stepping_granularity.then(|| granularity),
2255 single_thread: supports_single_thread_execution_requests,
2256 },
2257 };
2258
2259 self.thread_states.process_step(thread_id);
2260 self.request(
2261 command,
2262 Self::on_step_response::<NextCommand>(thread_id),
2263 cx,
2264 )
2265 .detach();
2266 }
2267
2268 pub fn step_in(
2269 &mut self,
2270 thread_id: ThreadId,
2271 granularity: SteppingGranularity,
2272 cx: &mut Context<Self>,
2273 ) {
2274 let supports_single_thread_execution_requests =
2275 self.capabilities.supports_single_thread_execution_requests;
2276 let supports_stepping_granularity = self
2277 .capabilities
2278 .supports_stepping_granularity
2279 .unwrap_or_default();
2280
2281 let command = StepInCommand {
2282 inner: StepCommand {
2283 thread_id: thread_id.0,
2284 granularity: supports_stepping_granularity.then(|| granularity),
2285 single_thread: supports_single_thread_execution_requests,
2286 },
2287 };
2288
2289 self.thread_states.process_step(thread_id);
2290 self.request(
2291 command,
2292 Self::on_step_response::<StepInCommand>(thread_id),
2293 cx,
2294 )
2295 .detach();
2296 }
2297
2298 pub fn step_out(
2299 &mut self,
2300 thread_id: ThreadId,
2301 granularity: SteppingGranularity,
2302 cx: &mut Context<Self>,
2303 ) {
2304 let supports_single_thread_execution_requests =
2305 self.capabilities.supports_single_thread_execution_requests;
2306 let supports_stepping_granularity = self
2307 .capabilities
2308 .supports_stepping_granularity
2309 .unwrap_or_default();
2310
2311 let command = StepOutCommand {
2312 inner: StepCommand {
2313 thread_id: thread_id.0,
2314 granularity: supports_stepping_granularity.then(|| granularity),
2315 single_thread: supports_single_thread_execution_requests,
2316 },
2317 };
2318
2319 self.thread_states.process_step(thread_id);
2320 self.request(
2321 command,
2322 Self::on_step_response::<StepOutCommand>(thread_id),
2323 cx,
2324 )
2325 .detach();
2326 }
2327
2328 pub fn step_back(
2329 &mut self,
2330 thread_id: ThreadId,
2331 granularity: SteppingGranularity,
2332 cx: &mut Context<Self>,
2333 ) {
2334 let supports_single_thread_execution_requests =
2335 self.capabilities.supports_single_thread_execution_requests;
2336 let supports_stepping_granularity = self
2337 .capabilities
2338 .supports_stepping_granularity
2339 .unwrap_or_default();
2340
2341 let command = StepBackCommand {
2342 inner: StepCommand {
2343 thread_id: thread_id.0,
2344 granularity: supports_stepping_granularity.then(|| granularity),
2345 single_thread: supports_single_thread_execution_requests,
2346 },
2347 };
2348
2349 self.thread_states.process_step(thread_id);
2350
2351 self.request(
2352 command,
2353 Self::on_step_response::<StepBackCommand>(thread_id),
2354 cx,
2355 )
2356 .detach();
2357 }
2358
2359 pub fn stack_frames(
2360 &mut self,
2361 thread_id: ThreadId,
2362 cx: &mut Context<Self>,
2363 ) -> Result<Vec<StackFrame>> {
2364 if self.thread_states.thread_status(thread_id) == ThreadStatus::Stopped
2365 && self.requests.contains_key(&ThreadsCommand.type_id())
2366 && self.threads.contains_key(&thread_id)
2367 // ^ todo(debugger): We need a better way to check that we're not querying stale data
2368 // We could still be using an old thread id and have sent a new thread's request
2369 // This isn't the biggest concern right now because it hasn't caused any issues outside of tests
2370 // But it very well could cause a minor bug in the future that is hard to track down
2371 {
2372 self.fetch(
2373 super::dap_command::StackTraceCommand {
2374 thread_id: thread_id.0,
2375 start_frame: None,
2376 levels: None,
2377 },
2378 move |this, stack_frames, cx| {
2379 let entry =
2380 this.threads
2381 .entry(thread_id)
2382 .and_modify(|thread| match &stack_frames {
2383 Ok(stack_frames) => {
2384 thread.stack_frames = stack_frames
2385 .iter()
2386 .cloned()
2387 .map(StackFrame::from)
2388 .collect();
2389 thread.stack_frames_error = None;
2390 }
2391 Err(error) => {
2392 thread.stack_frames.clear();
2393 thread.stack_frames_error = Some(error.cloned());
2394 }
2395 });
2396 debug_assert!(
2397 matches!(entry, indexmap::map::Entry::Occupied(_)),
2398 "Sent request for thread_id that doesn't exist"
2399 );
2400 if let Ok(stack_frames) = stack_frames {
2401 this.stack_frames.extend(
2402 stack_frames
2403 .into_iter()
2404 .filter(|frame| {
2405 // Workaround for JavaScript debug adapter sending out "fake" stack frames for delineating await points. This is fine,
2406 // except that they always use an id of 0 for it, which collides with other (valid) stack frames.
2407 !(frame.id == 0
2408 && frame.line == 0
2409 && frame.column == 0
2410 && frame.presentation_hint
2411 == Some(StackFramePresentationHint::Label))
2412 })
2413 .map(|frame| (frame.id, StackFrame::from(frame))),
2414 );
2415 }
2416
2417 this.invalidate_command_type::<ScopesCommand>();
2418 this.invalidate_command_type::<VariablesCommand>();
2419
2420 cx.emit(SessionEvent::StackTrace);
2421 },
2422 cx,
2423 );
2424 }
2425
2426 match self.threads.get(&thread_id) {
2427 Some(thread) => {
2428 if let Some(error) = &thread.stack_frames_error {
2429 Err(error.cloned())
2430 } else {
2431 Ok(thread.stack_frames.clone())
2432 }
2433 }
2434 None => Ok(Vec::new()),
2435 }
2436 }
2437
2438 pub fn scopes(&mut self, stack_frame_id: u64, cx: &mut Context<Self>) -> &[dap::Scope] {
2439 if self.requests.contains_key(&TypeId::of::<ThreadsCommand>())
2440 && self
2441 .requests
2442 .contains_key(&TypeId::of::<StackTraceCommand>())
2443 {
2444 self.fetch(
2445 ScopesCommand { stack_frame_id },
2446 move |this, scopes, cx| {
2447 let Some(scopes) = scopes.log_err() else {
2448 return
2449 };
2450
2451 for scope in scopes.iter() {
2452 this.variables(scope.variables_reference, cx);
2453 }
2454
2455 let entry = this
2456 .stack_frames
2457 .entry(stack_frame_id)
2458 .and_modify(|stack_frame| {
2459 stack_frame.scopes = scopes;
2460 });
2461
2462 cx.emit(SessionEvent::Variables);
2463
2464 debug_assert!(
2465 matches!(entry, indexmap::map::Entry::Occupied(_)),
2466 "Sent scopes request for stack_frame_id that doesn't exist or hasn't been fetched"
2467 );
2468 },
2469 cx,
2470 );
2471 }
2472
2473 self.stack_frames
2474 .get(&stack_frame_id)
2475 .map(|frame| frame.scopes.as_slice())
2476 .unwrap_or_default()
2477 }
2478
2479 pub fn variables_by_stack_frame_id(
2480 &self,
2481 stack_frame_id: StackFrameId,
2482 globals: bool,
2483 locals: bool,
2484 ) -> Vec<dap::Variable> {
2485 let Some(stack_frame) = self.stack_frames.get(&stack_frame_id) else {
2486 return Vec::new();
2487 };
2488
2489 stack_frame
2490 .scopes
2491 .iter()
2492 .filter(|scope| {
2493 (scope.name.to_lowercase().contains("local") && locals)
2494 || (scope.name.to_lowercase().contains("global") && globals)
2495 })
2496 .filter_map(|scope| self.variables.get(&scope.variables_reference))
2497 .flatten()
2498 .cloned()
2499 .collect()
2500 }
2501
2502 pub fn watchers(&self) -> &HashMap<SharedString, Watcher> {
2503 &self.watchers
2504 }
2505
2506 pub fn add_watcher(
2507 &mut self,
2508 expression: SharedString,
2509 frame_id: u64,
2510 cx: &mut Context<Self>,
2511 ) -> Task<Result<()>> {
2512 let request = self.mode.request_dap(EvaluateCommand {
2513 expression: expression.to_string(),
2514 context: Some(EvaluateArgumentsContext::Watch),
2515 frame_id: Some(frame_id),
2516 source: None,
2517 });
2518
2519 cx.spawn(async move |this, cx| {
2520 let response = request.await?;
2521
2522 this.update(cx, |session, cx| {
2523 session.watchers.insert(
2524 expression.clone(),
2525 Watcher {
2526 expression,
2527 value: response.result.into(),
2528 variables_reference: response.variables_reference,
2529 presentation_hint: response.presentation_hint,
2530 },
2531 );
2532 cx.emit(SessionEvent::Watchers);
2533 })
2534 })
2535 }
2536
2537 pub fn refresh_watchers(&mut self, frame_id: u64, cx: &mut Context<Self>) {
2538 let watches = self.watchers.clone();
2539 for (_, watch) in watches.into_iter() {
2540 self.add_watcher(watch.expression.clone(), frame_id, cx)
2541 .detach();
2542 }
2543 }
2544
2545 pub fn remove_watcher(&mut self, expression: SharedString) {
2546 self.watchers.remove(&expression);
2547 }
2548
2549 pub fn variables(
2550 &mut self,
2551 variables_reference: VariableReference,
2552 cx: &mut Context<Self>,
2553 ) -> Vec<dap::Variable> {
2554 let command = VariablesCommand {
2555 variables_reference,
2556 filter: None,
2557 start: None,
2558 count: None,
2559 format: None,
2560 };
2561
2562 self.fetch(
2563 command,
2564 move |this, variables, cx| {
2565 let Some(variables) = variables.log_err() else {
2566 return;
2567 };
2568
2569 this.variables.insert(variables_reference, variables);
2570
2571 cx.emit(SessionEvent::Variables);
2572 cx.emit(SessionEvent::InvalidateInlineValue);
2573 },
2574 cx,
2575 );
2576
2577 self.variables
2578 .get(&variables_reference)
2579 .cloned()
2580 .unwrap_or_default()
2581 }
2582
2583 pub fn data_breakpoint_info(
2584 &mut self,
2585 context: Arc<DataBreakpointContext>,
2586 mode: Option<String>,
2587 cx: &mut Context<Self>,
2588 ) -> Task<Option<dap::DataBreakpointInfoResponse>> {
2589 let command = DataBreakpointInfoCommand { context, mode };
2590
2591 self.request(command, |_, response, _| response.ok(), cx)
2592 }
2593
2594 pub fn set_variable_value(
2595 &mut self,
2596 stack_frame_id: u64,
2597 variables_reference: u64,
2598 name: String,
2599 value: String,
2600 cx: &mut Context<Self>,
2601 ) {
2602 if self.capabilities.supports_set_variable.unwrap_or_default() {
2603 self.request(
2604 SetVariableValueCommand {
2605 name,
2606 value,
2607 variables_reference,
2608 },
2609 move |this, response, cx| {
2610 let response = response.log_err()?;
2611 this.invalidate_command_type::<VariablesCommand>();
2612 this.invalidate_command_type::<ReadMemory>();
2613 this.memory.clear(cx.background_executor());
2614 this.refresh_watchers(stack_frame_id, cx);
2615 cx.emit(SessionEvent::Variables);
2616 Some(response)
2617 },
2618 cx,
2619 )
2620 .detach();
2621 }
2622 }
2623
2624 pub fn evaluate(
2625 &mut self,
2626 expression: String,
2627 context: Option<EvaluateArgumentsContext>,
2628 frame_id: Option<u64>,
2629 source: Option<Source>,
2630 cx: &mut Context<Self>,
2631 ) -> Task<()> {
2632 let event = dap::OutputEvent {
2633 category: None,
2634 output: format!("> {expression}"),
2635 group: None,
2636 variables_reference: None,
2637 source: None,
2638 line: None,
2639 column: None,
2640 data: None,
2641 location_reference: None,
2642 };
2643 self.push_output(event);
2644 let request = self.mode.request_dap(EvaluateCommand {
2645 expression,
2646 context,
2647 frame_id,
2648 source,
2649 });
2650 cx.spawn(async move |this, cx| {
2651 let response = request.await;
2652 this.update(cx, |this, cx| {
2653 this.memory.clear(cx.background_executor());
2654 this.invalidate_command_type::<ReadMemory>();
2655 match response {
2656 Ok(response) => {
2657 let event = dap::OutputEvent {
2658 category: None,
2659 output: format!("< {}", &response.result),
2660 group: None,
2661 variables_reference: Some(response.variables_reference),
2662 source: None,
2663 line: None,
2664 column: None,
2665 data: None,
2666 location_reference: None,
2667 };
2668 this.push_output(event);
2669 }
2670 Err(e) => {
2671 let event = dap::OutputEvent {
2672 category: None,
2673 output: format!("{}", e),
2674 group: None,
2675 variables_reference: None,
2676 source: None,
2677 line: None,
2678 column: None,
2679 data: None,
2680 location_reference: None,
2681 };
2682 this.push_output(event);
2683 }
2684 };
2685 cx.notify();
2686 })
2687 .ok();
2688 })
2689 }
2690
2691 pub fn location(
2692 &mut self,
2693 reference: u64,
2694 cx: &mut Context<Self>,
2695 ) -> Option<dap::LocationsResponse> {
2696 self.fetch(
2697 LocationsCommand { reference },
2698 move |this, response, _| {
2699 let Some(response) = response.log_err() else {
2700 return;
2701 };
2702 this.locations.insert(reference, response);
2703 },
2704 cx,
2705 );
2706 self.locations.get(&reference).cloned()
2707 }
2708
2709 pub fn is_attached(&self) -> bool {
2710 let SessionState::Running(local_mode) = &self.mode else {
2711 return false;
2712 };
2713 local_mode.binary.request_args.request == StartDebuggingRequestArgumentsRequest::Attach
2714 }
2715
2716 pub fn disconnect_client(&mut self, cx: &mut Context<Self>) {
2717 let command = DisconnectCommand {
2718 restart: Some(false),
2719 terminate_debuggee: Some(false),
2720 suspend_debuggee: Some(false),
2721 };
2722
2723 self.request(command, Self::empty_response, cx).detach()
2724 }
2725
2726 pub fn terminate_threads(&mut self, thread_ids: Option<Vec<ThreadId>>, cx: &mut Context<Self>) {
2727 if self
2728 .capabilities
2729 .supports_terminate_threads_request
2730 .unwrap_or_default()
2731 {
2732 self.request(
2733 TerminateThreadsCommand {
2734 thread_ids: thread_ids.map(|ids| ids.into_iter().map(|id| id.0).collect()),
2735 },
2736 Self::clear_active_debug_line_response,
2737 cx,
2738 )
2739 .detach();
2740 } else {
2741 self.shutdown(cx).detach();
2742 }
2743 }
2744
2745 pub fn thread_state(&self, thread_id: ThreadId) -> Option<ThreadStatus> {
2746 self.thread_states.thread_state(thread_id)
2747 }
2748
2749 pub fn quirks(&self) -> SessionQuirks {
2750 self.quirks
2751 }
2752
2753 fn launch_browser_for_remote_server(
2754 &mut self,
2755 mut request: LaunchBrowserInCompanionParams,
2756 cx: &mut Context<Self>,
2757 ) {
2758 let Some(remote_client) = self.remote_client.clone() else {
2759 log::error!("can't launch browser in companion for non-remote project");
2760 return;
2761 };
2762 let Some(http_client) = self.http_client.clone() else {
2763 return;
2764 };
2765 let Some(node_runtime) = self.node_runtime.clone() else {
2766 return;
2767 };
2768
2769 let mut console_output = self.console_output(cx);
2770 let task = cx.spawn(async move |this, cx| {
2771 let (dap_port, _child) =
2772 if remote_client.read_with(cx, |client, _| client.shares_network_interface())? {
2773 (request.server_port, None)
2774 } else {
2775 let port = {
2776 let listener = TcpListener::bind("127.0.0.1:0")
2777 .await
2778 .context("getting port for DAP")?;
2779 listener.local_addr()?.port()
2780 };
2781 let child = remote_client.update(cx, |client, _| {
2782 let command = client.build_forward_port_command(
2783 port,
2784 "localhost".into(),
2785 request.server_port,
2786 )?;
2787 let child = new_smol_command(command.program)
2788 .args(command.args)
2789 .envs(command.env)
2790 .spawn()
2791 .context("spawning port forwarding process")?;
2792 anyhow::Ok(child)
2793 })??;
2794 (port, Some(child))
2795 };
2796
2797 let mut companion_process = None;
2798 let companion_port =
2799 if let Some(companion_port) = this.read_with(cx, |this, _| this.companion_port)? {
2800 companion_port
2801 } else {
2802 let task = cx.spawn(async move |cx| spawn_companion(node_runtime, cx).await);
2803 match task.await {
2804 Ok((port, child)) => {
2805 companion_process = Some(child);
2806 port
2807 }
2808 Err(e) => {
2809 console_output
2810 .send(format!("Failed to launch browser companion process: {e}"))
2811 .await
2812 .ok();
2813 return Err(e);
2814 }
2815 }
2816 };
2817 this.update(cx, |this, cx| {
2818 this.companion_port = Some(companion_port);
2819 let Some(mut child) = companion_process else {
2820 return;
2821 };
2822 if let Some(stderr) = child.stderr.take() {
2823 let mut console_output = console_output.clone();
2824 this.background_tasks.push(cx.spawn(async move |_, _| {
2825 let mut stderr = BufReader::new(stderr);
2826 let mut line = String::new();
2827 while let Ok(n) = stderr.read_line(&mut line).await
2828 && n > 0
2829 {
2830 console_output
2831 .send(format!("companion stderr: {line}"))
2832 .await
2833 .ok();
2834 line.clear();
2835 }
2836 }));
2837 }
2838 this.background_tasks.push(cx.spawn({
2839 let mut console_output = console_output.clone();
2840 async move |_, _| match child.status().await {
2841 Ok(status) => {
2842 if status.success() {
2843 console_output
2844 .send("Companion process exited normally".into())
2845 .await
2846 .ok();
2847 } else {
2848 console_output
2849 .send(format!(
2850 "Companion process exited abnormally with {status:?}"
2851 ))
2852 .await
2853 .ok();
2854 }
2855 }
2856 Err(e) => {
2857 console_output
2858 .send(format!("Failed to join companion process: {e}"))
2859 .await
2860 .ok();
2861 }
2862 }
2863 }))
2864 })?;
2865
2866 request
2867 .other
2868 .insert("proxyUri".into(), format!("127.0.0.1:{dap_port}").into());
2869 // TODO pass wslInfo as needed
2870
2871 let response = http_client
2872 .post_json(
2873 &format!("http://127.0.0.1:{companion_port}/launch-and-attach"),
2874 serde_json::to_string(&request)
2875 .context("serializing request")?
2876 .into(),
2877 )
2878 .await;
2879 match response {
2880 Ok(response) => {
2881 if !response.status().is_success() {
2882 console_output
2883 .send("Launch request to companion failed".into())
2884 .await
2885 .ok();
2886 return Err(anyhow!("launch request failed"));
2887 }
2888 }
2889 Err(e) => {
2890 console_output
2891 .send("Failed to read response from companion".into())
2892 .await
2893 .ok();
2894 return Err(e);
2895 }
2896 }
2897
2898 anyhow::Ok(())
2899 });
2900 self.background_tasks.push(cx.spawn(async move |_, _| {
2901 task.await.log_err();
2902 }));
2903 }
2904
2905 fn kill_browser(&self, request: KillCompanionBrowserParams, cx: &mut App) {
2906 let Some(companion_port) = self.companion_port else {
2907 log::error!("received killCompanionBrowser but js-debug-companion is not running");
2908 return;
2909 };
2910 let Some(http_client) = self.http_client.clone() else {
2911 return;
2912 };
2913
2914 cx.spawn(async move |_| {
2915 http_client
2916 .post_json(
2917 &format!("http://127.0.0.1:{companion_port}/kill"),
2918 serde_json::to_string(&request)
2919 .context("serializing request")?
2920 .into(),
2921 )
2922 .await?;
2923 anyhow::Ok(())
2924 })
2925 .detach_and_log_err(cx)
2926 }
2927}
2928
2929#[derive(Serialize, Deserialize)]
2930#[serde(rename_all = "camelCase")]
2931struct LaunchBrowserInCompanionParams {
2932 server_port: u16,
2933 #[serde(flatten)]
2934 other: HashMap<String, serde_json::Value>,
2935}
2936
2937#[derive(Serialize, Deserialize)]
2938#[serde(rename_all = "camelCase")]
2939struct KillCompanionBrowserParams {
2940 launch_id: u64,
2941}
2942
2943async fn spawn_companion(
2944 node_runtime: NodeRuntime,
2945 cx: &mut AsyncApp,
2946) -> Result<(u16, smol::process::Child)> {
2947 let binary_path = node_runtime
2948 .binary_path()
2949 .await
2950 .context("getting node path")?;
2951 let path = cx
2952 .spawn(async move |cx| get_or_install_companion(node_runtime, cx).await)
2953 .await?;
2954 log::info!("will launch js-debug-companion version {path:?}");
2955
2956 let port = {
2957 let listener = TcpListener::bind("127.0.0.1:0")
2958 .await
2959 .context("getting port for companion")?;
2960 listener.local_addr()?.port()
2961 };
2962
2963 let dir = paths::data_dir()
2964 .join("js_debug_companion_state")
2965 .to_string_lossy()
2966 .to_string();
2967
2968 let child = new_smol_command(binary_path)
2969 .arg(path)
2970 .args([
2971 format!("--listen=127.0.0.1:{port}"),
2972 format!("--state={dir}"),
2973 ])
2974 .stdin(Stdio::piped())
2975 .stdout(Stdio::piped())
2976 .stderr(Stdio::piped())
2977 .spawn()
2978 .context("spawning companion child process")?;
2979
2980 Ok((port, child))
2981}
2982
2983async fn get_or_install_companion(node: NodeRuntime, cx: &mut AsyncApp) -> Result<PathBuf> {
2984 const PACKAGE_NAME: &str = "@zed-industries/js-debug-companion-cli";
2985
2986 async fn install_latest_version(dir: PathBuf, node: NodeRuntime) -> Result<PathBuf> {
2987 let temp_dir = tempfile::tempdir().context("creating temporary directory")?;
2988 node.npm_install_packages(temp_dir.path(), &[(PACKAGE_NAME, "latest")])
2989 .await
2990 .context("installing latest companion package")?;
2991 let version = node
2992 .npm_package_installed_version(temp_dir.path(), PACKAGE_NAME)
2993 .await
2994 .context("getting installed companion version")?
2995 .context("companion was not installed")?;
2996 smol::fs::rename(temp_dir.path(), dir.join(&version))
2997 .await
2998 .context("moving companion package into place")?;
2999 Ok(dir.join(version))
3000 }
3001
3002 let dir = paths::debug_adapters_dir().join("js-debug-companion");
3003 let (latest_installed_version, latest_version) = cx
3004 .background_spawn({
3005 let dir = dir.clone();
3006 let node = node.clone();
3007 async move {
3008 smol::fs::create_dir_all(&dir)
3009 .await
3010 .context("creating companion installation directory")?;
3011
3012 let mut children = smol::fs::read_dir(&dir)
3013 .await
3014 .context("reading companion installation directory")?
3015 .try_collect::<Vec<_>>()
3016 .await
3017 .context("reading companion installation directory entries")?;
3018 children
3019 .sort_by_key(|child| semver::Version::parse(child.file_name().to_str()?).ok());
3020
3021 let latest_installed_version = children.last().and_then(|child| {
3022 let version = child.file_name().into_string().ok()?;
3023 Some((child.path(), version))
3024 });
3025 let latest_version = node
3026 .npm_package_latest_version(PACKAGE_NAME)
3027 .await
3028 .log_err();
3029 anyhow::Ok((latest_installed_version, latest_version))
3030 }
3031 })
3032 .await?;
3033
3034 let path = if let Some((installed_path, installed_version)) = latest_installed_version {
3035 if let Some(latest_version) = latest_version
3036 && latest_version != installed_version
3037 {
3038 cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3039 .detach();
3040 }
3041 Ok(installed_path)
3042 } else {
3043 cx.background_spawn(install_latest_version(dir.clone(), node.clone()))
3044 .await
3045 };
3046
3047 Ok(path?
3048 .join("node_modules")
3049 .join(PACKAGE_NAME)
3050 .join("out")
3051 .join("cli.js"))
3052}