1use super::{
2 breakpoint_store::BreakpointStore,
3 // Will need to uncomment this once we implement rpc message handler again
4 // dap_command::{
5 // ContinueCommand, DapCommand, DisconnectCommand, NextCommand, PauseCommand, RestartCommand,
6 // RestartStackFrameCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
7 // TerminateCommand, TerminateThreadsCommand, VariablesCommand,
8 // },
9 session::{self, Session},
10};
11use crate::{debugger, worktree_store::WorktreeStore, ProjectEnvironment};
12use anyhow::{anyhow, Result};
13use async_trait::async_trait;
14use collections::HashMap;
15use dap::{
16 adapters::{DapStatus, DebugAdapterName},
17 client::SessionId,
18 messages::Message,
19 requests::{
20 Completions, Evaluate, Request as _, RunInTerminal, SetExpression, SetVariable,
21 StartDebugging,
22 },
23 Capabilities, CompletionItem, CompletionsArguments, ErrorResponse, EvaluateArguments,
24 EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
25 SetExpressionArguments, SetVariableArguments, Source, StartDebuggingRequestArguments,
26 StartDebuggingRequestArgumentsRequest,
27};
28use fs::Fs;
29use futures::{
30 channel::{mpsc, oneshot},
31 future::{join_all, Shared},
32};
33use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
34use http_client::HttpClient;
35use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
36use lsp::LanguageServerName;
37use node_runtime::NodeRuntime;
38
39use rpc::{
40 proto::{self},
41 AnyProtoClient, TypedEnvelope,
42};
43use serde_json::Value;
44use settings::WorktreeId;
45use smol::{lock::Mutex, stream::StreamExt};
46use std::{
47 borrow::Borrow,
48 collections::{BTreeMap, HashSet},
49 ffi::OsStr,
50 path::PathBuf,
51 sync::{atomic::Ordering::SeqCst, Arc},
52};
53use std::{collections::VecDeque, sync::atomic::AtomicU32};
54use task::{AttachConfig, DebugAdapterConfig, DebugRequestType};
55use util::ResultExt as _;
56use worktree::Worktree;
57
58pub enum DapStoreEvent {
59 DebugClientStarted(SessionId),
60 DebugClientShutdown(SessionId),
61 DebugClientEvent {
62 session_id: SessionId,
63 message: Message,
64 },
65 RunInTerminal {
66 session_id: SessionId,
67 title: Option<String>,
68 cwd: PathBuf,
69 command: Option<String>,
70 args: Vec<String>,
71 envs: HashMap<String, String>,
72 sender: mpsc::Sender<Result<u32>>,
73 },
74 Notification(String),
75 RemoteHasInitialized,
76}
77
78#[allow(clippy::large_enum_variant)]
79pub enum DapStoreMode {
80 Local(LocalDapStore), // ssh host and collab host
81 Remote(RemoteDapStore), // collab guest
82}
83
84pub struct LocalDapStore {
85 fs: Arc<dyn Fs>,
86 node_runtime: NodeRuntime,
87 next_session_id: AtomicU32,
88 http_client: Arc<dyn HttpClient>,
89 worktree_store: Entity<WorktreeStore>,
90 environment: Entity<ProjectEnvironment>,
91 language_registry: Arc<LanguageRegistry>,
92 toolchain_store: Arc<dyn LanguageToolchainStore>,
93 start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
94 _start_debugging_task: Task<()>,
95}
96
97impl LocalDapStore {
98 fn next_session_id(&self) -> SessionId {
99 SessionId(self.next_session_id.fetch_add(1, SeqCst))
100 }
101}
102
103pub struct RemoteDapStore {
104 upstream_client: AnyProtoClient,
105 upstream_project_id: u64,
106 event_queue: Option<VecDeque<DapStoreEvent>>,
107}
108
109pub struct DapStore {
110 mode: DapStoreMode,
111 downstream_client: Option<(AnyProtoClient, u64)>,
112 breakpoint_store: Entity<BreakpointStore>,
113 sessions: BTreeMap<SessionId, Entity<Session>>,
114}
115
116impl EventEmitter<DapStoreEvent> for DapStore {}
117
118impl DapStore {
119 pub fn init(_client: &AnyProtoClient) {
120 // todo(debugger): Reenable these after we finish handle_dap_command refactor
121 // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
122 // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
123 // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
124 // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
125 // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
126 // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
127 // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
128 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
129 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
130 // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
131 // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
132 // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
133 }
134
135 #[expect(clippy::too_many_arguments)]
136 pub fn new_local(
137 http_client: Arc<dyn HttpClient>,
138 node_runtime: NodeRuntime,
139 fs: Arc<dyn Fs>,
140 language_registry: Arc<LanguageRegistry>,
141 environment: Entity<ProjectEnvironment>,
142 toolchain_store: Arc<dyn LanguageToolchainStore>,
143 breakpoint_store: Entity<BreakpointStore>,
144 worktree_store: Entity<WorktreeStore>,
145 cx: &mut Context<Self>,
146 ) -> Self {
147 cx.on_app_quit(Self::shutdown_sessions).detach();
148
149 let (start_debugging_tx, mut message_rx) =
150 futures::channel::mpsc::unbounded::<(SessionId, Message)>();
151
152 let _start_debugging_task = cx.spawn(async move |this, cx| {
153 while let Some((session_id, message)) = message_rx.next().await {
154 match message {
155 Message::Request(request) => {
156 let _ = this
157 .update(cx, |this, cx| {
158 if request.command == StartDebugging::COMMAND {
159 this.handle_start_debugging_request(session_id, request, cx)
160 .detach_and_log_err(cx);
161 } else if request.command == RunInTerminal::COMMAND {
162 this.handle_run_in_terminal_request(session_id, request, cx)
163 .detach_and_log_err(cx);
164 }
165 })
166 .log_err();
167 }
168 _ => {}
169 }
170 }
171 });
172 Self {
173 mode: DapStoreMode::Local(LocalDapStore {
174 fs,
175 environment,
176 http_client,
177 node_runtime,
178 worktree_store,
179 toolchain_store,
180 language_registry,
181 start_debugging_tx,
182 _start_debugging_task,
183 next_session_id: Default::default(),
184 }),
185 downstream_client: None,
186 breakpoint_store,
187 sessions: Default::default(),
188 }
189 }
190
191 pub fn new_remote(
192 project_id: u64,
193 upstream_client: AnyProtoClient,
194 breakpoint_store: Entity<BreakpointStore>,
195 ) -> Self {
196 Self {
197 mode: DapStoreMode::Remote(RemoteDapStore {
198 upstream_client,
199 upstream_project_id: project_id,
200 event_queue: Some(VecDeque::default()),
201 }),
202 downstream_client: None,
203 breakpoint_store,
204 sessions: Default::default(),
205 }
206 }
207
208 pub fn as_remote(&self) -> Option<&RemoteDapStore> {
209 match &self.mode {
210 DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
211 _ => None,
212 }
213 }
214
215 pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
216 if let DapStoreMode::Remote(remote) = &mut self.mode {
217 remote.event_queue.take()
218 } else {
219 None
220 }
221 }
222
223 pub fn as_local(&self) -> Option<&LocalDapStore> {
224 match &self.mode {
225 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
226 _ => None,
227 }
228 }
229
230 pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
231 match &mut self.mode {
232 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
233 _ => None,
234 }
235 }
236
237 pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
238 match &self.mode {
239 DapStoreMode::Remote(RemoteDapStore {
240 upstream_client,
241 upstream_project_id,
242 ..
243 }) => Some((upstream_client.clone(), *upstream_project_id)),
244
245 DapStoreMode::Local(_) => None,
246 }
247 }
248
249 pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
250 self.downstream_client.as_ref()
251 }
252
253 pub fn add_remote_client(
254 &mut self,
255 session_id: SessionId,
256 ignore: Option<bool>,
257 cx: &mut Context<Self>,
258 ) {
259 if let DapStoreMode::Remote(remote) = &self.mode {
260 self.sessions.insert(
261 session_id,
262 cx.new(|_| {
263 debugger::session::Session::remote(
264 session_id,
265 remote.upstream_client.clone(),
266 remote.upstream_project_id,
267 ignore.unwrap_or(false),
268 )
269 }),
270 );
271 } else {
272 debug_assert!(false);
273 }
274 }
275
276 pub fn session_by_id(
277 &self,
278 session_id: impl Borrow<SessionId>,
279 ) -> Option<Entity<session::Session>> {
280 let session_id = session_id.borrow();
281 let client = self.sessions.get(session_id).cloned();
282
283 client
284 }
285 pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
286 self.sessions.values()
287 }
288
289 pub fn capabilities_by_id(
290 &self,
291 session_id: impl Borrow<SessionId>,
292 cx: &App,
293 ) -> Option<Capabilities> {
294 let session_id = session_id.borrow();
295 self.sessions
296 .get(session_id)
297 .map(|client| client.read(cx).capabilities.clone())
298 }
299
300 pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
301 &self.breakpoint_store
302 }
303
304 #[allow(dead_code)]
305 async fn handle_ignore_breakpoint_state(
306 this: Entity<Self>,
307 envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
308 mut cx: AsyncApp,
309 ) -> Result<()> {
310 let session_id = SessionId::from_proto(envelope.payload.session_id);
311
312 this.update(&mut cx, |this, cx| {
313 if let Some(session) = this.session_by_id(&session_id) {
314 session.update(cx, |session, cx| {
315 session.set_ignore_breakpoints(envelope.payload.ignore, cx)
316 })
317 } else {
318 Task::ready(())
319 }
320 })?
321 .await;
322
323 Ok(())
324 }
325
326 pub fn new_session(
327 &mut self,
328 config: DebugAdapterConfig,
329 worktree: &Entity<Worktree>,
330 parent_session: Option<Entity<Session>>,
331 cx: &mut Context<Self>,
332 ) -> (SessionId, Task<Result<Entity<Session>>>) {
333 let Some(local_store) = self.as_local() else {
334 unimplemented!("Starting session on remote side");
335 };
336
337 let delegate = DapAdapterDelegate::new(
338 local_store.fs.clone(),
339 worktree.read(cx).id(),
340 local_store.node_runtime.clone(),
341 local_store.http_client.clone(),
342 local_store.language_registry.clone(),
343 local_store.toolchain_store.clone(),
344 local_store.environment.update(cx, |env, cx| {
345 let worktree = worktree.read(cx);
346 env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
347 }),
348 );
349 let session_id = local_store.next_session_id();
350
351 if let Some(session) = &parent_session {
352 session.update(cx, |session, _| {
353 session.add_child_session_id(session_id);
354 });
355 }
356
357 let (initialized_tx, initialized_rx) = oneshot::channel();
358
359 let start_client_task = Session::local(
360 self.breakpoint_store.clone(),
361 session_id,
362 parent_session,
363 delegate,
364 config,
365 local_store.start_debugging_tx.clone(),
366 initialized_tx,
367 cx,
368 );
369
370 let task = cx.spawn(async move |this, cx| {
371 let session = match start_client_task.await {
372 Ok(session) => session,
373 Err(error) => {
374 this.update(cx, |_, cx| {
375 cx.emit(DapStoreEvent::Notification(error.to_string()));
376 })
377 .log_err();
378
379 return Err(error);
380 }
381 };
382
383 // we have to insert the session early, so we can handle reverse requests
384 // that need the session to be available
385 this.update(cx, |store, cx| {
386 store.sessions.insert(session_id, session.clone());
387 cx.emit(DapStoreEvent::DebugClientStarted(session_id));
388 cx.notify();
389 })?;
390
391 match session
392 .update(cx, |session, cx| {
393 session.initialize_sequence(initialized_rx, cx)
394 })?
395 .await
396 {
397 Ok(_) => {}
398 Err(error) => {
399 this.update(cx, |this, cx| {
400 cx.emit(DapStoreEvent::Notification(error.to_string()));
401
402 this.shutdown_session(session_id, cx)
403 })?
404 .await
405 .log_err();
406
407 return Err(error);
408 }
409 }
410
411 Ok(session)
412 });
413 (session_id, task)
414 }
415
416 fn handle_start_debugging_request(
417 &mut self,
418 session_id: SessionId,
419 request: dap::messages::Request,
420 cx: &mut Context<Self>,
421 ) -> Task<Result<()>> {
422 let Some(local_store) = self.as_local() else {
423 unreachable!("Cannot response for non-local session");
424 };
425
426 let Some(parent_session) = self.session_by_id(session_id) else {
427 return Task::ready(Err(anyhow!("Session not found")));
428 };
429
430 let args = serde_json::from_value::<StartDebuggingRequestArguments>(
431 request.arguments.unwrap_or_default(),
432 )
433 .expect("To parse StartDebuggingRequestArguments");
434
435 let worktree = local_store
436 .worktree_store
437 .update(cx, |this, _| this.worktrees().next())
438 .expect("worktree-less project");
439
440 let Some(config) = parent_session.read(cx).configuration() else {
441 unreachable!("there must be a config for local sessions");
442 };
443
444 let (_, new_session_task) = self.new_session(
445 DebugAdapterConfig {
446 label: config.label,
447 kind: config.kind,
448 request: match &args.request {
449 StartDebuggingRequestArgumentsRequest::Launch => DebugRequestType::Launch,
450 StartDebuggingRequestArgumentsRequest::Attach => {
451 DebugRequestType::Attach(AttachConfig::default())
452 }
453 },
454 program: config.program,
455 cwd: config.cwd,
456 initialize_args: Some(args.configuration),
457 supports_attach: config.supports_attach,
458 },
459 &worktree,
460 Some(parent_session.clone()),
461 cx,
462 );
463
464 let request_seq = request.seq;
465 cx.spawn(async move |_, cx| {
466 let (success, body) = match new_session_task.await {
467 Ok(_) => (true, None),
468 Err(error) => (
469 false,
470 Some(serde_json::to_value(ErrorResponse {
471 error: Some(dap::Message {
472 id: request_seq,
473 format: error.to_string(),
474 variables: None,
475 send_telemetry: None,
476 show_user: None,
477 url: None,
478 url_label: None,
479 }),
480 })?),
481 ),
482 };
483
484 parent_session
485 .update(cx, |session, cx| {
486 session.respond_to_client(
487 request_seq,
488 success,
489 StartDebugging::COMMAND.to_string(),
490 body,
491 cx,
492 )
493 })?
494 .await
495 })
496 }
497
498 fn handle_run_in_terminal_request(
499 &mut self,
500 session_id: SessionId,
501 request: dap::messages::Request,
502 cx: &mut Context<Self>,
503 ) -> Task<Result<()>> {
504 let Some(session) = self.session_by_id(session_id) else {
505 return Task::ready(Err(anyhow!("Session not found")));
506 };
507
508 let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
509 request.arguments.unwrap_or_default(),
510 )
511 .expect("To parse StartDebuggingRequestArguments");
512
513 let seq = request.seq;
514
515 let cwd = PathBuf::from(request_args.cwd);
516 match cwd.try_exists() {
517 Ok(true) => (),
518 Ok(false) | Err(_) => {
519 return session.update(cx, |session, cx| {
520 session.respond_to_client(
521 seq,
522 false,
523 RunInTerminal::COMMAND.to_string(),
524 serde_json::to_value(dap::ErrorResponse {
525 error: Some(dap::Message {
526 id: seq,
527 format: format!("Received invalid/unknown cwd: {cwd:?}"),
528 variables: None,
529 send_telemetry: None,
530 show_user: None,
531 url: None,
532 url_label: None,
533 }),
534 })
535 .ok(),
536 cx,
537 )
538 })
539 }
540 }
541
542 let mut args = request_args.args.clone();
543
544 // Handle special case for NodeJS debug adapter
545 // If only the Node binary path is provided, we set the command to None
546 // This prevents the NodeJS REPL from appearing, which is not the desired behavior
547 // The expected usage is for users to provide their own Node command, e.g., `node test.js`
548 // This allows the NodeJS debug client to attach correctly
549 let command = if args.len() > 1 {
550 Some(args.remove(0))
551 } else {
552 None
553 };
554
555 let mut envs: HashMap<String, String> = Default::default();
556 if let Some(Value::Object(env)) = request_args.env {
557 for (key, value) in env {
558 let value_str = match (key.as_str(), value) {
559 (_, Value::String(value)) => value,
560 _ => continue,
561 };
562
563 envs.insert(key, value_str);
564 }
565 }
566
567 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
568
569 cx.emit(DapStoreEvent::RunInTerminal {
570 session_id,
571 title: request_args.title,
572 cwd,
573 command,
574 args,
575 envs,
576 sender: tx,
577 });
578 cx.notify();
579
580 let session = session.downgrade();
581 cx.spawn(async move |_, cx| {
582 let (success, body) = match rx.next().await {
583 Some(Ok(pid)) => (
584 true,
585 serde_json::to_value(dap::RunInTerminalResponse {
586 process_id: None,
587 shell_process_id: Some(pid as u64),
588 })
589 .ok(),
590 ),
591 Some(Err(error)) => (
592 false,
593 serde_json::to_value(dap::ErrorResponse {
594 error: Some(dap::Message {
595 id: seq,
596 format: error.to_string(),
597 variables: None,
598 send_telemetry: None,
599 show_user: None,
600 url: None,
601 url_label: None,
602 }),
603 })
604 .ok(),
605 ),
606 None => (
607 false,
608 serde_json::to_value(dap::ErrorResponse {
609 error: Some(dap::Message {
610 id: seq,
611 format: "failed to receive response from spawn terminal".to_string(),
612 variables: None,
613 send_telemetry: None,
614 show_user: None,
615 url: None,
616 url_label: None,
617 }),
618 })
619 .ok(),
620 ),
621 };
622
623 session
624 .update(cx, |session, cx| {
625 session.respond_to_client(
626 seq,
627 success,
628 RunInTerminal::COMMAND.to_string(),
629 body,
630 cx,
631 )
632 })?
633 .await
634 })
635 }
636
637 pub fn evaluate(
638 &self,
639 session_id: &SessionId,
640 stack_frame_id: u64,
641 expression: String,
642 context: EvaluateArgumentsContext,
643 source: Option<Source>,
644 cx: &mut Context<Self>,
645 ) -> Task<Result<EvaluateResponse>> {
646 let Some(client) = self
647 .session_by_id(session_id)
648 .and_then(|client| client.read(cx).adapter_client())
649 else {
650 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
651 };
652
653 cx.background_executor().spawn(async move {
654 client
655 .request::<Evaluate>(EvaluateArguments {
656 expression: expression.clone(),
657 frame_id: Some(stack_frame_id),
658 context: Some(context),
659 format: None,
660 line: None,
661 column: None,
662 source,
663 })
664 .await
665 })
666 }
667
668 pub fn completions(
669 &self,
670 session_id: &SessionId,
671 stack_frame_id: u64,
672 text: String,
673 completion_column: u64,
674 cx: &mut Context<Self>,
675 ) -> Task<Result<Vec<CompletionItem>>> {
676 let Some(client) = self
677 .session_by_id(session_id)
678 .and_then(|client| client.read(cx).adapter_client())
679 else {
680 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
681 };
682
683 cx.background_executor().spawn(async move {
684 Ok(client
685 .request::<Completions>(CompletionsArguments {
686 frame_id: Some(stack_frame_id),
687 line: None,
688 text,
689 column: completion_column,
690 })
691 .await?
692 .targets)
693 })
694 }
695
696 #[allow(clippy::too_many_arguments)]
697 pub fn set_variable_value(
698 &self,
699 session_id: &SessionId,
700 stack_frame_id: u64,
701 variables_reference: u64,
702 name: String,
703 value: String,
704 evaluate_name: Option<String>,
705 cx: &mut Context<Self>,
706 ) -> Task<Result<()>> {
707 let Some(client) = self
708 .session_by_id(session_id)
709 .and_then(|client| client.read(cx).adapter_client())
710 else {
711 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
712 };
713
714 let supports_set_expression = self
715 .capabilities_by_id(session_id, cx)
716 .and_then(|caps| caps.supports_set_expression)
717 .unwrap_or_default();
718
719 cx.background_executor().spawn(async move {
720 if let Some(evaluate_name) = supports_set_expression.then(|| evaluate_name).flatten() {
721 client
722 .request::<SetExpression>(SetExpressionArguments {
723 expression: evaluate_name,
724 value,
725 frame_id: Some(stack_frame_id),
726 format: None,
727 })
728 .await?;
729 } else {
730 client
731 .request::<SetVariable>(SetVariableArguments {
732 variables_reference,
733 name,
734 value,
735 format: None,
736 })
737 .await?;
738 }
739
740 Ok(())
741 })
742 }
743
744 // .. get the client and what not
745 // let _ = client.modules(); // This can fire a request to a dap adapter or be a cheap getter.
746 // client.wait_for_request(request::Modules); // This ensures that the request that we've fired off runs to completions
747 // let returned_value = client.modules(); // this is a cheap getter.
748
749 pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
750 let mut tasks = vec![];
751 for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
752 tasks.push(self.shutdown_session(session_id, cx));
753 }
754
755 cx.background_executor().spawn(async move {
756 futures::future::join_all(tasks).await;
757 })
758 }
759
760 pub fn shutdown_session(
761 &mut self,
762 session_id: SessionId,
763 cx: &mut Context<Self>,
764 ) -> Task<Result<()>> {
765 let Some(_) = self.as_local_mut() else {
766 return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
767 };
768
769 let Some(session) = self.sessions.remove(&session_id) else {
770 return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
771 };
772
773 let shutdown_children = session
774 .read(cx)
775 .child_session_ids()
776 .iter()
777 .map(|session_id| self.shutdown_session(*session_id, cx))
778 .collect::<Vec<_>>();
779
780 let shutdown_parent_task = if let Some(parent_session) = session
781 .read(cx)
782 .parent_id()
783 .and_then(|session_id| self.session_by_id(session_id))
784 {
785 let shutdown_id = parent_session.update(cx, |parent_session, _| {
786 parent_session.remove_child_session_id(session_id);
787
788 if parent_session.child_session_ids().len() == 0 {
789 Some(parent_session.session_id())
790 } else {
791 None
792 }
793 });
794
795 shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
796 } else {
797 None
798 };
799
800 let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
801
802 cx.background_spawn(async move {
803 if shutdown_children.len() > 0 {
804 let _ = join_all(shutdown_children).await;
805 }
806
807 shutdown_task.await;
808
809 if let Some(parent_task) = shutdown_parent_task {
810 parent_task.await?;
811 }
812
813 Ok(())
814 })
815 }
816
817 pub fn shared(
818 &mut self,
819 project_id: u64,
820 downstream_client: AnyProtoClient,
821 _: &mut Context<Self>,
822 ) {
823 self.downstream_client = Some((downstream_client.clone(), project_id));
824 }
825
826 pub fn unshared(&mut self, cx: &mut Context<Self>) {
827 self.downstream_client.take();
828
829 cx.notify();
830 }
831}
832
833#[derive(Clone)]
834pub struct DapAdapterDelegate {
835 fs: Arc<dyn Fs>,
836 worktree_id: WorktreeId,
837 node_runtime: NodeRuntime,
838 http_client: Arc<dyn HttpClient>,
839 language_registry: Arc<LanguageRegistry>,
840 toolchain_store: Arc<dyn LanguageToolchainStore>,
841 updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
842 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
843}
844
845impl DapAdapterDelegate {
846 pub fn new(
847 fs: Arc<dyn Fs>,
848 worktree_id: WorktreeId,
849 node_runtime: NodeRuntime,
850 http_client: Arc<dyn HttpClient>,
851 language_registry: Arc<LanguageRegistry>,
852 toolchain_store: Arc<dyn LanguageToolchainStore>,
853 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
854 ) -> Self {
855 Self {
856 fs,
857 worktree_id,
858 http_client,
859 node_runtime,
860 toolchain_store,
861 language_registry,
862 load_shell_env_task,
863 updated_adapters: Default::default(),
864 }
865 }
866}
867
868#[async_trait(?Send)]
869impl dap::adapters::DapDelegate for DapAdapterDelegate {
870 fn worktree_id(&self) -> WorktreeId {
871 self.worktree_id
872 }
873
874 fn http_client(&self) -> Arc<dyn HttpClient> {
875 self.http_client.clone()
876 }
877
878 fn node_runtime(&self) -> NodeRuntime {
879 self.node_runtime.clone()
880 }
881
882 fn fs(&self) -> Arc<dyn Fs> {
883 self.fs.clone()
884 }
885
886 fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
887 self.updated_adapters.clone()
888 }
889
890 fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
891 let name = SharedString::from(dap_name.to_string());
892 let status = match status {
893 DapStatus::None => BinaryStatus::None,
894 DapStatus::Downloading => BinaryStatus::Downloading,
895 DapStatus::Failed { error } => BinaryStatus::Failed { error },
896 DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
897 };
898
899 self.language_registry
900 .update_dap_status(LanguageServerName(name), status);
901 }
902
903 fn which(&self, command: &OsStr) -> Option<PathBuf> {
904 which::which(command).ok()
905 }
906
907 async fn shell_env(&self) -> HashMap<String, String> {
908 let task = self.load_shell_env_task.clone();
909 task.await.unwrap_or_default()
910 }
911
912 fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
913 self.toolchain_store.clone()
914 }
915}