1use super::{
2 breakpoint_store::BreakpointStore,
3 locator_store::LocatorStore,
4 session::{self, Session, SessionStateEvent},
5};
6use crate::{ProjectEnvironment, debugger, worktree_store::WorktreeStore};
7use anyhow::{Result, anyhow};
8use async_trait::async_trait;
9use collections::HashMap;
10use dap::{
11 Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
12 EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
13 Source, StartDebuggingRequestArguments,
14 adapters::{DapStatus, DebugAdapterName},
15 client::SessionId,
16 messages::Message,
17 requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
18};
19use fs::Fs;
20use futures::{
21 channel::{mpsc, oneshot},
22 future::{Shared, join_all},
23};
24use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
25use http_client::HttpClient;
26use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
27use lsp::LanguageServerName;
28use node_runtime::NodeRuntime;
29
30use rpc::{
31 AnyProtoClient, TypedEnvelope,
32 proto::{self},
33};
34use serde_json::Value;
35use settings::WorktreeId;
36use smol::{lock::Mutex, stream::StreamExt};
37use std::{
38 borrow::Borrow,
39 collections::{BTreeMap, HashSet},
40 ffi::OsStr,
41 path::{Path, PathBuf},
42 sync::{Arc, atomic::Ordering::SeqCst},
43};
44use std::{collections::VecDeque, sync::atomic::AtomicU32};
45use task::{DebugAdapterConfig, DebugRequestDisposition};
46use util::ResultExt as _;
47use worktree::Worktree;
48
49pub enum DapStoreEvent {
50 DebugClientStarted(SessionId),
51 DebugSessionInitialized(SessionId),
52 DebugClientShutdown(SessionId),
53 DebugClientEvent {
54 session_id: SessionId,
55 message: Message,
56 },
57 RunInTerminal {
58 session_id: SessionId,
59 title: Option<String>,
60 cwd: Option<Arc<Path>>,
61 command: Option<String>,
62 args: Vec<String>,
63 envs: HashMap<String, String>,
64 sender: mpsc::Sender<Result<u32>>,
65 },
66 Notification(String),
67 RemoteHasInitialized,
68}
69
70#[allow(clippy::large_enum_variant)]
71pub enum DapStoreMode {
72 Local(LocalDapStore), // ssh host and collab host
73 Remote(RemoteDapStore), // collab guest
74}
75
76pub struct LocalDapStore {
77 fs: Arc<dyn Fs>,
78 node_runtime: NodeRuntime,
79 next_session_id: AtomicU32,
80 http_client: Arc<dyn HttpClient>,
81 worktree_store: Entity<WorktreeStore>,
82 environment: Entity<ProjectEnvironment>,
83 language_registry: Arc<LanguageRegistry>,
84 debug_adapters: Arc<DapRegistry>,
85 toolchain_store: Arc<dyn LanguageToolchainStore>,
86 locator_store: Arc<LocatorStore>,
87 start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
88 _start_debugging_task: Task<()>,
89}
90
91impl LocalDapStore {
92 fn next_session_id(&self) -> SessionId {
93 SessionId(self.next_session_id.fetch_add(1, SeqCst))
94 }
95}
96
97pub struct RemoteDapStore {
98 upstream_client: AnyProtoClient,
99 upstream_project_id: u64,
100 event_queue: Option<VecDeque<DapStoreEvent>>,
101}
102
103pub struct DapStore {
104 mode: DapStoreMode,
105 downstream_client: Option<(AnyProtoClient, u64)>,
106 breakpoint_store: Entity<BreakpointStore>,
107 sessions: BTreeMap<SessionId, Entity<Session>>,
108}
109
110impl EventEmitter<DapStoreEvent> for DapStore {}
111
112impl DapStore {
113 pub fn init(_client: &AnyProtoClient) {
114 // todo(debugger): Reenable these after we finish handle_dap_command refactor
115 // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
116 // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
117 // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
118 // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
119 // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
120 // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
121 // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
122 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
123 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
124 // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
125 // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
126 // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
127 }
128
129 #[expect(clippy::too_many_arguments)]
130 pub fn new_local(
131 http_client: Arc<dyn HttpClient>,
132 node_runtime: NodeRuntime,
133 fs: Arc<dyn Fs>,
134 language_registry: Arc<LanguageRegistry>,
135 debug_adapters: Arc<DapRegistry>,
136 environment: Entity<ProjectEnvironment>,
137 toolchain_store: Arc<dyn LanguageToolchainStore>,
138 breakpoint_store: Entity<BreakpointStore>,
139 worktree_store: Entity<WorktreeStore>,
140 cx: &mut Context<Self>,
141 ) -> Self {
142 cx.on_app_quit(Self::shutdown_sessions).detach();
143
144 let (start_debugging_tx, mut message_rx) =
145 futures::channel::mpsc::unbounded::<(SessionId, Message)>();
146
147 let _start_debugging_task = cx.spawn(async move |this, cx| {
148 while let Some((session_id, message)) = message_rx.next().await {
149 match message {
150 Message::Request(request) => {
151 let _ = this
152 .update(cx, |this, cx| {
153 if request.command == StartDebugging::COMMAND {
154 this.handle_start_debugging_request(session_id, request, cx)
155 .detach_and_log_err(cx);
156 } else if request.command == RunInTerminal::COMMAND {
157 this.handle_run_in_terminal_request(session_id, request, cx)
158 .detach_and_log_err(cx);
159 }
160 })
161 .log_err();
162 }
163 _ => {}
164 }
165 }
166 });
167 Self {
168 mode: DapStoreMode::Local(LocalDapStore {
169 fs,
170 environment,
171 http_client,
172 node_runtime,
173 worktree_store,
174 toolchain_store,
175 language_registry,
176 debug_adapters,
177 start_debugging_tx,
178 _start_debugging_task,
179 locator_store: Arc::from(LocatorStore::new()),
180 next_session_id: Default::default(),
181 }),
182 downstream_client: None,
183 breakpoint_store,
184 sessions: Default::default(),
185 }
186 }
187
188 pub fn new_remote(
189 project_id: u64,
190 upstream_client: AnyProtoClient,
191 breakpoint_store: Entity<BreakpointStore>,
192 ) -> Self {
193 Self {
194 mode: DapStoreMode::Remote(RemoteDapStore {
195 upstream_client,
196 upstream_project_id: project_id,
197 event_queue: Some(VecDeque::default()),
198 }),
199 downstream_client: None,
200 breakpoint_store,
201 sessions: Default::default(),
202 }
203 }
204
205 pub fn as_remote(&self) -> Option<&RemoteDapStore> {
206 match &self.mode {
207 DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
208 _ => None,
209 }
210 }
211
212 pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
213 if let DapStoreMode::Remote(remote) = &mut self.mode {
214 remote.event_queue.take()
215 } else {
216 None
217 }
218 }
219
220 pub fn as_local(&self) -> Option<&LocalDapStore> {
221 match &self.mode {
222 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
223 _ => None,
224 }
225 }
226
227 pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
228 match &mut self.mode {
229 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
230 _ => None,
231 }
232 }
233
234 pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
235 match &self.mode {
236 DapStoreMode::Remote(RemoteDapStore {
237 upstream_client,
238 upstream_project_id,
239 ..
240 }) => Some((upstream_client.clone(), *upstream_project_id)),
241
242 DapStoreMode::Local(_) => None,
243 }
244 }
245
246 pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
247 self.downstream_client.as_ref()
248 }
249
250 pub fn add_remote_client(
251 &mut self,
252 session_id: SessionId,
253 ignore: Option<bool>,
254 cx: &mut Context<Self>,
255 ) {
256 if let DapStoreMode::Remote(remote) = &self.mode {
257 self.sessions.insert(
258 session_id,
259 cx.new(|_| {
260 debugger::session::Session::remote(
261 session_id,
262 remote.upstream_client.clone(),
263 remote.upstream_project_id,
264 ignore.unwrap_or(false),
265 )
266 }),
267 );
268 } else {
269 debug_assert!(false);
270 }
271 }
272
273 pub fn session_by_id(
274 &self,
275 session_id: impl Borrow<SessionId>,
276 ) -> Option<Entity<session::Session>> {
277 let session_id = session_id.borrow();
278 let client = self.sessions.get(session_id).cloned();
279
280 client
281 }
282 pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
283 self.sessions.values()
284 }
285
286 pub fn capabilities_by_id(
287 &self,
288 session_id: impl Borrow<SessionId>,
289 cx: &App,
290 ) -> Option<Capabilities> {
291 let session_id = session_id.borrow();
292 self.sessions
293 .get(session_id)
294 .map(|client| client.read(cx).capabilities.clone())
295 }
296
297 pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
298 &self.breakpoint_store
299 }
300
301 #[allow(dead_code)]
302 async fn handle_ignore_breakpoint_state(
303 this: Entity<Self>,
304 envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
305 mut cx: AsyncApp,
306 ) -> Result<()> {
307 let session_id = SessionId::from_proto(envelope.payload.session_id);
308
309 this.update(&mut cx, |this, cx| {
310 if let Some(session) = this.session_by_id(&session_id) {
311 session.update(cx, |session, cx| {
312 session.set_ignore_breakpoints(envelope.payload.ignore, cx)
313 })
314 } else {
315 Task::ready(())
316 }
317 })?
318 .await;
319
320 Ok(())
321 }
322
323 pub fn new_session(
324 &mut self,
325 mut config: DebugAdapterConfig,
326 worktree: &Entity<Worktree>,
327 parent_session: Option<Entity<Session>>,
328 cx: &mut Context<Self>,
329 ) -> (SessionId, Task<Result<Entity<Session>>>) {
330 let Some(local_store) = self.as_local() else {
331 unimplemented!("Starting session on remote side");
332 };
333
334 let delegate = DapAdapterDelegate::new(
335 local_store.fs.clone(),
336 worktree.read(cx).id(),
337 local_store.node_runtime.clone(),
338 local_store.http_client.clone(),
339 local_store.language_registry.clone(),
340 local_store.toolchain_store.clone(),
341 local_store.environment.update(cx, |env, cx| {
342 env.get_worktree_environment(worktree.clone(), cx)
343 }),
344 );
345 let session_id = local_store.next_session_id();
346
347 if let Some(session) = &parent_session {
348 session.update(cx, |session, _| {
349 session.add_child_session_id(session_id);
350 });
351 }
352
353 let (initialized_tx, initialized_rx) = oneshot::channel();
354 let locator_store = local_store.locator_store.clone();
355 let debug_adapters = local_store.debug_adapters.clone();
356
357 let start_debugging_tx = local_store.start_debugging_tx.clone();
358
359 let task = cx.spawn(async move |this, cx| {
360 if config.locator.is_some() {
361 config = cx
362 .background_spawn(async move {
363 locator_store
364 .resolve_debug_config(&mut config)
365 .await
366 .map(|_| config)
367 })
368 .await?;
369 }
370
371 let start_client_task = this.update(cx, |this, cx| {
372 Session::local(
373 this.breakpoint_store.clone(),
374 session_id,
375 parent_session,
376 delegate,
377 config,
378 start_debugging_tx.clone(),
379 initialized_tx,
380 debug_adapters,
381 cx,
382 )
383 })?;
384
385 this.update(cx, |_, cx| {
386 create_new_session(session_id, initialized_rx, start_client_task, cx)
387 })?
388 .await
389 });
390
391 (session_id, task)
392 }
393
394 #[cfg(any(test, feature = "test-support"))]
395 pub fn new_fake_session(
396 &mut self,
397 config: DebugAdapterConfig,
398 worktree: &Entity<Worktree>,
399 parent_session: Option<Entity<Session>>,
400 caps: Capabilities,
401 fails: bool,
402 cx: &mut Context<Self>,
403 ) -> (SessionId, Task<Result<Entity<Session>>>) {
404 let Some(local_store) = self.as_local() else {
405 unimplemented!("Starting session on remote side");
406 };
407
408 let delegate = DapAdapterDelegate::new(
409 local_store.fs.clone(),
410 worktree.read(cx).id(),
411 local_store.node_runtime.clone(),
412 local_store.http_client.clone(),
413 local_store.language_registry.clone(),
414 local_store.toolchain_store.clone(),
415 local_store.environment.update(cx, |env, cx| {
416 env.get_worktree_environment(worktree.clone(), cx)
417 }),
418 );
419 let session_id = local_store.next_session_id();
420
421 if let Some(session) = &parent_session {
422 session.update(cx, |session, _| {
423 session.add_child_session_id(session_id);
424 });
425 }
426
427 let (initialized_tx, initialized_rx) = oneshot::channel();
428
429 let start_client_task = Session::fake(
430 self.breakpoint_store.clone(),
431 session_id,
432 parent_session,
433 delegate,
434 config,
435 local_store.start_debugging_tx.clone(),
436 initialized_tx,
437 caps,
438 fails,
439 cx,
440 );
441
442 let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
443 (session_id, task)
444 }
445
446 fn handle_start_debugging_request(
447 &mut self,
448 session_id: SessionId,
449 request: dap::messages::Request,
450 cx: &mut Context<Self>,
451 ) -> Task<Result<()>> {
452 let Some(local_store) = self.as_local() else {
453 unreachable!("Cannot response for non-local session");
454 };
455
456 let Some(parent_session) = self.session_by_id(session_id) else {
457 return Task::ready(Err(anyhow!("Session not found")));
458 };
459
460 let args = serde_json::from_value::<StartDebuggingRequestArguments>(
461 request.arguments.unwrap_or_default(),
462 )
463 .expect("To parse StartDebuggingRequestArguments");
464 let worktree = local_store
465 .worktree_store
466 .update(cx, |this, _| this.worktrees().next())
467 .expect("worktree-less project");
468
469 let Some(config) = parent_session.read(cx).configuration() else {
470 unreachable!("there must be a config for local sessions");
471 };
472
473 let debug_config = DebugAdapterConfig {
474 label: config.label,
475 adapter: config.adapter,
476 request: DebugRequestDisposition::ReverseRequest(args),
477 initialize_args: config.initialize_args.clone(),
478 tcp_connection: config.tcp_connection.clone(),
479 locator: None,
480 stop_on_entry: config.stop_on_entry,
481 };
482
483 #[cfg(any(test, feature = "test-support"))]
484 let new_session_task = {
485 let caps = parent_session.read(cx).capabilities.clone();
486 self.new_fake_session(
487 debug_config,
488 &worktree,
489 Some(parent_session.clone()),
490 caps,
491 false,
492 cx,
493 )
494 .1
495 };
496 #[cfg(not(any(test, feature = "test-support")))]
497 let new_session_task = self
498 .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
499 .1;
500
501 let request_seq = request.seq;
502 cx.spawn(async move |_, cx| {
503 let (success, body) = match new_session_task.await {
504 Ok(_) => (true, None),
505 Err(error) => (
506 false,
507 Some(serde_json::to_value(ErrorResponse {
508 error: Some(dap::Message {
509 id: request_seq,
510 format: error.to_string(),
511 variables: None,
512 send_telemetry: None,
513 show_user: None,
514 url: None,
515 url_label: None,
516 }),
517 })?),
518 ),
519 };
520
521 parent_session
522 .update(cx, |session, cx| {
523 session.respond_to_client(
524 request_seq,
525 success,
526 StartDebugging::COMMAND.to_string(),
527 body,
528 cx,
529 )
530 })?
531 .await
532 })
533 }
534
535 fn handle_run_in_terminal_request(
536 &mut self,
537 session_id: SessionId,
538 request: dap::messages::Request,
539 cx: &mut Context<Self>,
540 ) -> Task<Result<()>> {
541 let Some(session) = self.session_by_id(session_id) else {
542 return Task::ready(Err(anyhow!("Session not found")));
543 };
544
545 let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
546 request.arguments.unwrap_or_default(),
547 )
548 .expect("To parse StartDebuggingRequestArguments");
549
550 let seq = request.seq;
551
552 let cwd = Path::new(&request_args.cwd);
553
554 match cwd.try_exists() {
555 Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
556 return session.update(cx, |session, cx| {
557 session.respond_to_client(
558 seq,
559 false,
560 RunInTerminal::COMMAND.to_string(),
561 serde_json::to_value(dap::ErrorResponse {
562 error: Some(dap::Message {
563 id: seq,
564 format: format!("Received invalid/unknown cwd: {cwd:?}"),
565 variables: None,
566 send_telemetry: None,
567 show_user: None,
568 url: None,
569 url_label: None,
570 }),
571 })
572 .ok(),
573 cx,
574 )
575 });
576 }
577 _ => (),
578 }
579 let mut args = request_args.args.clone();
580
581 // Handle special case for NodeJS debug adapter
582 // If only the Node binary path is provided, we set the command to None
583 // This prevents the NodeJS REPL from appearing, which is not the desired behavior
584 // The expected usage is for users to provide their own Node command, e.g., `node test.js`
585 // This allows the NodeJS debug client to attach correctly
586 let command = if args.len() > 1 {
587 Some(args.remove(0))
588 } else {
589 None
590 };
591
592 let mut envs: HashMap<String, String> = Default::default();
593 if let Some(Value::Object(env)) = request_args.env {
594 for (key, value) in env {
595 let value_str = match (key.as_str(), value) {
596 (_, Value::String(value)) => value,
597 _ => continue,
598 };
599
600 envs.insert(key, value_str);
601 }
602 }
603
604 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
605 let cwd = Some(cwd)
606 .filter(|cwd| cwd.as_os_str().len() > 0)
607 .map(Arc::from)
608 .or_else(|| {
609 self.session_by_id(session_id)
610 .and_then(|session| {
611 session
612 .read(cx)
613 .configuration()
614 .and_then(|config| config.request.cwd())
615 })
616 .map(Arc::from)
617 });
618 cx.emit(DapStoreEvent::RunInTerminal {
619 session_id,
620 title: request_args.title,
621 cwd,
622 command,
623 args,
624 envs,
625 sender: tx,
626 });
627 cx.notify();
628
629 let session = session.downgrade();
630 cx.spawn(async move |_, cx| {
631 let (success, body) = match rx.next().await {
632 Some(Ok(pid)) => (
633 true,
634 serde_json::to_value(dap::RunInTerminalResponse {
635 process_id: None,
636 shell_process_id: Some(pid as u64),
637 })
638 .ok(),
639 ),
640 Some(Err(error)) => (
641 false,
642 serde_json::to_value(dap::ErrorResponse {
643 error: Some(dap::Message {
644 id: seq,
645 format: error.to_string(),
646 variables: None,
647 send_telemetry: None,
648 show_user: None,
649 url: None,
650 url_label: None,
651 }),
652 })
653 .ok(),
654 ),
655 None => (
656 false,
657 serde_json::to_value(dap::ErrorResponse {
658 error: Some(dap::Message {
659 id: seq,
660 format: "failed to receive response from spawn terminal".to_string(),
661 variables: None,
662 send_telemetry: None,
663 show_user: None,
664 url: None,
665 url_label: None,
666 }),
667 })
668 .ok(),
669 ),
670 };
671
672 session
673 .update(cx, |session, cx| {
674 session.respond_to_client(
675 seq,
676 success,
677 RunInTerminal::COMMAND.to_string(),
678 body,
679 cx,
680 )
681 })?
682 .await
683 })
684 }
685
686 pub fn evaluate(
687 &self,
688 session_id: &SessionId,
689 stack_frame_id: u64,
690 expression: String,
691 context: EvaluateArgumentsContext,
692 source: Option<Source>,
693 cx: &mut Context<Self>,
694 ) -> Task<Result<EvaluateResponse>> {
695 let Some(client) = self
696 .session_by_id(session_id)
697 .and_then(|client| client.read(cx).adapter_client())
698 else {
699 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
700 };
701
702 cx.background_executor().spawn(async move {
703 client
704 .request::<Evaluate>(EvaluateArguments {
705 expression: expression.clone(),
706 frame_id: Some(stack_frame_id),
707 context: Some(context),
708 format: None,
709 line: None,
710 column: None,
711 source,
712 })
713 .await
714 })
715 }
716
717 pub fn completions(
718 &self,
719 session_id: &SessionId,
720 stack_frame_id: u64,
721 text: String,
722 completion_column: u64,
723 cx: &mut Context<Self>,
724 ) -> Task<Result<Vec<CompletionItem>>> {
725 let Some(client) = self
726 .session_by_id(session_id)
727 .and_then(|client| client.read(cx).adapter_client())
728 else {
729 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
730 };
731
732 cx.background_executor().spawn(async move {
733 Ok(client
734 .request::<Completions>(CompletionsArguments {
735 frame_id: Some(stack_frame_id),
736 line: None,
737 text,
738 column: completion_column,
739 })
740 .await?
741 .targets)
742 })
743 }
744
745 pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
746 let mut tasks = vec![];
747 for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
748 tasks.push(self.shutdown_session(session_id, cx));
749 }
750
751 cx.background_executor().spawn(async move {
752 futures::future::join_all(tasks).await;
753 })
754 }
755
756 pub fn shutdown_session(
757 &mut self,
758 session_id: SessionId,
759 cx: &mut Context<Self>,
760 ) -> Task<Result<()>> {
761 let Some(_) = self.as_local_mut() else {
762 return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
763 };
764
765 let Some(session) = self.sessions.remove(&session_id) else {
766 return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
767 };
768
769 let shutdown_children = session
770 .read(cx)
771 .child_session_ids()
772 .iter()
773 .map(|session_id| self.shutdown_session(*session_id, cx))
774 .collect::<Vec<_>>();
775
776 let shutdown_parent_task = if let Some(parent_session) = session
777 .read(cx)
778 .parent_id()
779 .and_then(|session_id| self.session_by_id(session_id))
780 {
781 let shutdown_id = parent_session.update(cx, |parent_session, _| {
782 parent_session.remove_child_session_id(session_id);
783
784 if parent_session.child_session_ids().len() == 0 {
785 Some(parent_session.session_id())
786 } else {
787 None
788 }
789 });
790
791 shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
792 } else {
793 None
794 };
795
796 let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
797
798 cx.background_spawn(async move {
799 if shutdown_children.len() > 0 {
800 let _ = join_all(shutdown_children).await;
801 }
802
803 shutdown_task.await;
804
805 if let Some(parent_task) = shutdown_parent_task {
806 parent_task.await?;
807 }
808
809 Ok(())
810 })
811 }
812
813 pub fn shared(
814 &mut self,
815 project_id: u64,
816 downstream_client: AnyProtoClient,
817 _: &mut Context<Self>,
818 ) {
819 self.downstream_client = Some((downstream_client.clone(), project_id));
820 }
821
822 pub fn unshared(&mut self, cx: &mut Context<Self>) {
823 self.downstream_client.take();
824
825 cx.notify();
826 }
827}
828
829fn create_new_session(
830 session_id: SessionId,
831 initialized_rx: oneshot::Receiver<()>,
832 start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
833 cx: &mut Context<DapStore>,
834) -> Task<Result<Entity<Session>>> {
835 let task = cx.spawn(async move |this, cx| {
836 let session = match start_client_task.await {
837 Ok(session) => session,
838 Err(error) => {
839 this.update(cx, |_, cx| {
840 cx.emit(DapStoreEvent::Notification(error.to_string()));
841 })
842 .log_err();
843
844 return Err(error);
845 }
846 };
847
848 // we have to insert the session early, so we can handle reverse requests
849 // that need the session to be available
850 this.update(cx, |store, cx| {
851 store.sessions.insert(session_id, session.clone());
852 cx.emit(DapStoreEvent::DebugClientStarted(session_id));
853 cx.notify();
854 })?;
855 let seq_result = {
856 session
857 .update(cx, |session, cx| session.request_initialize(cx))?
858 .await?;
859
860 session
861 .update(cx, |session, cx| {
862 session.initialize_sequence(initialized_rx, cx)
863 })?
864 .await
865 };
866 match seq_result {
867 Ok(_) => {}
868 Err(error) => {
869 this.update(cx, |this, cx| {
870 cx.emit(DapStoreEvent::Notification(error.to_string()));
871
872 this.shutdown_session(session_id, cx)
873 })?
874 .await
875 .log_err();
876
877 return Err(error);
878 }
879 }
880
881 this.update(cx, |_, cx| {
882 cx.subscribe(
883 &session,
884 move |this: &mut DapStore, _, event: &SessionStateEvent, cx| match event {
885 SessionStateEvent::Shutdown => {
886 this.shutdown_session(session_id, cx).detach_and_log_err(cx);
887 }
888 },
889 )
890 .detach();
891 cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
892 })?;
893
894 Ok(session)
895 });
896 task
897}
898
899#[derive(Clone)]
900pub struct DapAdapterDelegate {
901 fs: Arc<dyn Fs>,
902 worktree_id: WorktreeId,
903 node_runtime: NodeRuntime,
904 http_client: Arc<dyn HttpClient>,
905 language_registry: Arc<LanguageRegistry>,
906 toolchain_store: Arc<dyn LanguageToolchainStore>,
907 updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
908 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
909}
910
911impl DapAdapterDelegate {
912 pub fn new(
913 fs: Arc<dyn Fs>,
914 worktree_id: WorktreeId,
915 node_runtime: NodeRuntime,
916 http_client: Arc<dyn HttpClient>,
917 language_registry: Arc<LanguageRegistry>,
918 toolchain_store: Arc<dyn LanguageToolchainStore>,
919 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
920 ) -> Self {
921 Self {
922 fs,
923 worktree_id,
924 http_client,
925 node_runtime,
926 toolchain_store,
927 language_registry,
928 load_shell_env_task,
929 updated_adapters: Default::default(),
930 }
931 }
932}
933
934#[async_trait(?Send)]
935impl dap::adapters::DapDelegate for DapAdapterDelegate {
936 fn worktree_id(&self) -> WorktreeId {
937 self.worktree_id
938 }
939
940 fn http_client(&self) -> Arc<dyn HttpClient> {
941 self.http_client.clone()
942 }
943
944 fn node_runtime(&self) -> NodeRuntime {
945 self.node_runtime.clone()
946 }
947
948 fn fs(&self) -> Arc<dyn Fs> {
949 self.fs.clone()
950 }
951
952 fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
953 self.updated_adapters.clone()
954 }
955
956 fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
957 let name = SharedString::from(dap_name.to_string());
958 let status = match status {
959 DapStatus::None => BinaryStatus::None,
960 DapStatus::Downloading => BinaryStatus::Downloading,
961 DapStatus::Failed { error } => BinaryStatus::Failed { error },
962 DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
963 };
964
965 self.language_registry
966 .update_dap_status(LanguageServerName(name), status);
967 }
968
969 fn which(&self, command: &OsStr) -> Option<PathBuf> {
970 which::which(command).ok()
971 }
972
973 async fn shell_env(&self) -> HashMap<String, String> {
974 let task = self.load_shell_env_task.clone();
975 task.await.unwrap_or_default()
976 }
977
978 fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
979 self.toolchain_store.clone()
980 }
981}