1use super::{
2 breakpoint_store::BreakpointStore,
3 locator_store::LocatorStore,
4 session::{self, Session},
5};
6use crate::{ProjectEnvironment, debugger, worktree_store::WorktreeStore};
7use anyhow::{Result, anyhow};
8use async_trait::async_trait;
9use collections::HashMap;
10use dap::{
11 Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
12 EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
13 Source, StartDebuggingRequestArguments,
14 adapters::{DapStatus, DebugAdapterName},
15 client::SessionId,
16 messages::Message,
17 requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
18};
19use fs::Fs;
20use futures::{
21 channel::{mpsc, oneshot},
22 future::{Shared, join_all},
23};
24use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
25use http_client::HttpClient;
26use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
27use lsp::LanguageServerName;
28use node_runtime::NodeRuntime;
29
30use rpc::{
31 AnyProtoClient, TypedEnvelope,
32 proto::{self},
33};
34use serde_json::Value;
35use settings::WorktreeId;
36use smol::{lock::Mutex, stream::StreamExt};
37use std::{
38 borrow::Borrow,
39 collections::{BTreeMap, HashSet},
40 ffi::OsStr,
41 path::PathBuf,
42 sync::{Arc, atomic::Ordering::SeqCst},
43};
44use std::{collections::VecDeque, sync::atomic::AtomicU32};
45use task::{DebugAdapterConfig, DebugRequestDisposition};
46use util::ResultExt as _;
47use worktree::Worktree;
48
49pub enum DapStoreEvent {
50 DebugClientStarted(SessionId),
51 DebugClientShutdown(SessionId),
52 DebugClientEvent {
53 session_id: SessionId,
54 message: Message,
55 },
56 RunInTerminal {
57 session_id: SessionId,
58 title: Option<String>,
59 cwd: PathBuf,
60 command: Option<String>,
61 args: Vec<String>,
62 envs: HashMap<String, String>,
63 sender: mpsc::Sender<Result<u32>>,
64 },
65 Notification(String),
66 RemoteHasInitialized,
67}
68
69#[allow(clippy::large_enum_variant)]
70pub enum DapStoreMode {
71 Local(LocalDapStore), // ssh host and collab host
72 Remote(RemoteDapStore), // collab guest
73}
74
75pub struct LocalDapStore {
76 fs: Arc<dyn Fs>,
77 node_runtime: NodeRuntime,
78 next_session_id: AtomicU32,
79 http_client: Arc<dyn HttpClient>,
80 worktree_store: Entity<WorktreeStore>,
81 environment: Entity<ProjectEnvironment>,
82 language_registry: Arc<LanguageRegistry>,
83 debug_adapters: Arc<DapRegistry>,
84 toolchain_store: Arc<dyn LanguageToolchainStore>,
85 locator_store: Arc<LocatorStore>,
86 start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
87 _start_debugging_task: Task<()>,
88}
89
90impl LocalDapStore {
91 fn next_session_id(&self) -> SessionId {
92 SessionId(self.next_session_id.fetch_add(1, SeqCst))
93 }
94}
95
96pub struct RemoteDapStore {
97 upstream_client: AnyProtoClient,
98 upstream_project_id: u64,
99 event_queue: Option<VecDeque<DapStoreEvent>>,
100}
101
102pub struct DapStore {
103 mode: DapStoreMode,
104 downstream_client: Option<(AnyProtoClient, u64)>,
105 breakpoint_store: Entity<BreakpointStore>,
106 sessions: BTreeMap<SessionId, Entity<Session>>,
107}
108
109impl EventEmitter<DapStoreEvent> for DapStore {}
110
111impl DapStore {
112 pub fn init(_client: &AnyProtoClient) {
113 // todo(debugger): Reenable these after we finish handle_dap_command refactor
114 // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
115 // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
116 // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
117 // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
118 // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
119 // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
120 // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
121 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
122 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
123 // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
124 // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
125 // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
126 }
127
128 #[expect(clippy::too_many_arguments)]
129 pub fn new_local(
130 http_client: Arc<dyn HttpClient>,
131 node_runtime: NodeRuntime,
132 fs: Arc<dyn Fs>,
133 language_registry: Arc<LanguageRegistry>,
134 debug_adapters: Arc<DapRegistry>,
135 environment: Entity<ProjectEnvironment>,
136 toolchain_store: Arc<dyn LanguageToolchainStore>,
137 breakpoint_store: Entity<BreakpointStore>,
138 worktree_store: Entity<WorktreeStore>,
139 cx: &mut Context<Self>,
140 ) -> Self {
141 cx.on_app_quit(Self::shutdown_sessions).detach();
142
143 let (start_debugging_tx, mut message_rx) =
144 futures::channel::mpsc::unbounded::<(SessionId, Message)>();
145
146 let _start_debugging_task = cx.spawn(async move |this, cx| {
147 while let Some((session_id, message)) = message_rx.next().await {
148 match message {
149 Message::Request(request) => {
150 let _ = this
151 .update(cx, |this, cx| {
152 if request.command == StartDebugging::COMMAND {
153 this.handle_start_debugging_request(session_id, request, cx)
154 .detach_and_log_err(cx);
155 } else if request.command == RunInTerminal::COMMAND {
156 this.handle_run_in_terminal_request(session_id, request, cx)
157 .detach_and_log_err(cx);
158 }
159 })
160 .log_err();
161 }
162 _ => {}
163 }
164 }
165 });
166 Self {
167 mode: DapStoreMode::Local(LocalDapStore {
168 fs,
169 environment,
170 http_client,
171 node_runtime,
172 worktree_store,
173 toolchain_store,
174 language_registry,
175 debug_adapters,
176 start_debugging_tx,
177 _start_debugging_task,
178 locator_store: Arc::from(LocatorStore::new()),
179 next_session_id: Default::default(),
180 }),
181 downstream_client: None,
182 breakpoint_store,
183 sessions: Default::default(),
184 }
185 }
186
187 pub fn new_remote(
188 project_id: u64,
189 upstream_client: AnyProtoClient,
190 breakpoint_store: Entity<BreakpointStore>,
191 ) -> Self {
192 Self {
193 mode: DapStoreMode::Remote(RemoteDapStore {
194 upstream_client,
195 upstream_project_id: project_id,
196 event_queue: Some(VecDeque::default()),
197 }),
198 downstream_client: None,
199 breakpoint_store,
200 sessions: Default::default(),
201 }
202 }
203
204 pub fn as_remote(&self) -> Option<&RemoteDapStore> {
205 match &self.mode {
206 DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
207 _ => None,
208 }
209 }
210
211 pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
212 if let DapStoreMode::Remote(remote) = &mut self.mode {
213 remote.event_queue.take()
214 } else {
215 None
216 }
217 }
218
219 pub fn as_local(&self) -> Option<&LocalDapStore> {
220 match &self.mode {
221 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
222 _ => None,
223 }
224 }
225
226 pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
227 match &mut self.mode {
228 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
229 _ => None,
230 }
231 }
232
233 pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
234 match &self.mode {
235 DapStoreMode::Remote(RemoteDapStore {
236 upstream_client,
237 upstream_project_id,
238 ..
239 }) => Some((upstream_client.clone(), *upstream_project_id)),
240
241 DapStoreMode::Local(_) => None,
242 }
243 }
244
245 pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
246 self.downstream_client.as_ref()
247 }
248
249 pub fn add_remote_client(
250 &mut self,
251 session_id: SessionId,
252 ignore: Option<bool>,
253 cx: &mut Context<Self>,
254 ) {
255 if let DapStoreMode::Remote(remote) = &self.mode {
256 self.sessions.insert(
257 session_id,
258 cx.new(|_| {
259 debugger::session::Session::remote(
260 session_id,
261 remote.upstream_client.clone(),
262 remote.upstream_project_id,
263 ignore.unwrap_or(false),
264 )
265 }),
266 );
267 } else {
268 debug_assert!(false);
269 }
270 }
271
272 pub fn session_by_id(
273 &self,
274 session_id: impl Borrow<SessionId>,
275 ) -> Option<Entity<session::Session>> {
276 let session_id = session_id.borrow();
277 let client = self.sessions.get(session_id).cloned();
278
279 client
280 }
281 pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
282 self.sessions.values()
283 }
284
285 pub fn capabilities_by_id(
286 &self,
287 session_id: impl Borrow<SessionId>,
288 cx: &App,
289 ) -> Option<Capabilities> {
290 let session_id = session_id.borrow();
291 self.sessions
292 .get(session_id)
293 .map(|client| client.read(cx).capabilities.clone())
294 }
295
296 pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
297 &self.breakpoint_store
298 }
299
300 #[allow(dead_code)]
301 async fn handle_ignore_breakpoint_state(
302 this: Entity<Self>,
303 envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
304 mut cx: AsyncApp,
305 ) -> Result<()> {
306 let session_id = SessionId::from_proto(envelope.payload.session_id);
307
308 this.update(&mut cx, |this, cx| {
309 if let Some(session) = this.session_by_id(&session_id) {
310 session.update(cx, |session, cx| {
311 session.set_ignore_breakpoints(envelope.payload.ignore, cx)
312 })
313 } else {
314 Task::ready(())
315 }
316 })?
317 .await;
318
319 Ok(())
320 }
321
322 pub fn new_session(
323 &mut self,
324 mut config: DebugAdapterConfig,
325 worktree: &Entity<Worktree>,
326 parent_session: Option<Entity<Session>>,
327 cx: &mut Context<Self>,
328 ) -> (SessionId, Task<Result<Entity<Session>>>) {
329 let Some(local_store) = self.as_local() else {
330 unimplemented!("Starting session on remote side");
331 };
332
333 let delegate = DapAdapterDelegate::new(
334 local_store.fs.clone(),
335 worktree.read(cx).id(),
336 local_store.node_runtime.clone(),
337 local_store.http_client.clone(),
338 local_store.language_registry.clone(),
339 local_store.toolchain_store.clone(),
340 local_store.environment.update(cx, |env, cx| {
341 let worktree = worktree.read(cx);
342 env.get_environment(worktree.abs_path().into(), cx)
343 }),
344 );
345 let session_id = local_store.next_session_id();
346
347 if let Some(session) = &parent_session {
348 session.update(cx, |session, _| {
349 session.add_child_session_id(session_id);
350 });
351 }
352
353 let (initialized_tx, initialized_rx) = oneshot::channel();
354 let locator_store = local_store.locator_store.clone();
355 let debug_adapters = local_store.debug_adapters.clone();
356
357 let start_debugging_tx = local_store.start_debugging_tx.clone();
358
359 let task = cx.spawn(async move |this, cx| {
360 if config.locator.is_some() {
361 config = cx
362 .background_spawn(async move {
363 locator_store
364 .resolve_debug_config(&mut config)
365 .await
366 .map(|_| config)
367 })
368 .await?;
369 }
370
371 let start_client_task = this.update(cx, |this, cx| {
372 Session::local(
373 this.breakpoint_store.clone(),
374 session_id,
375 parent_session,
376 delegate,
377 config,
378 start_debugging_tx.clone(),
379 initialized_tx,
380 debug_adapters,
381 cx,
382 )
383 })?;
384
385 this.update(cx, |_, cx| {
386 create_new_session(session_id, initialized_rx, start_client_task, cx)
387 })?
388 .await
389 });
390
391 (session_id, task)
392 }
393
394 #[cfg(any(test, feature = "test-support"))]
395 pub fn new_fake_session(
396 &mut self,
397 config: DebugAdapterConfig,
398 worktree: &Entity<Worktree>,
399 parent_session: Option<Entity<Session>>,
400 caps: Capabilities,
401 fails: bool,
402 cx: &mut Context<Self>,
403 ) -> (SessionId, Task<Result<Entity<Session>>>) {
404 let Some(local_store) = self.as_local() else {
405 unimplemented!("Starting session on remote side");
406 };
407
408 let delegate = DapAdapterDelegate::new(
409 local_store.fs.clone(),
410 worktree.read(cx).id(),
411 local_store.node_runtime.clone(),
412 local_store.http_client.clone(),
413 local_store.language_registry.clone(),
414 local_store.toolchain_store.clone(),
415 local_store.environment.update(cx, |env, cx| {
416 let worktree = worktree.read(cx);
417 env.get_environment(Some(worktree.abs_path()), cx)
418 }),
419 );
420 let session_id = local_store.next_session_id();
421
422 if let Some(session) = &parent_session {
423 session.update(cx, |session, _| {
424 session.add_child_session_id(session_id);
425 });
426 }
427
428 let (initialized_tx, initialized_rx) = oneshot::channel();
429
430 let start_client_task = Session::fake(
431 self.breakpoint_store.clone(),
432 session_id,
433 parent_session,
434 delegate,
435 config,
436 local_store.start_debugging_tx.clone(),
437 initialized_tx,
438 caps,
439 fails,
440 cx,
441 );
442
443 let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
444 (session_id, task)
445 }
446
447 fn handle_start_debugging_request(
448 &mut self,
449 session_id: SessionId,
450 request: dap::messages::Request,
451 cx: &mut Context<Self>,
452 ) -> Task<Result<()>> {
453 let Some(local_store) = self.as_local() else {
454 unreachable!("Cannot response for non-local session");
455 };
456
457 let Some(parent_session) = self.session_by_id(session_id) else {
458 return Task::ready(Err(anyhow!("Session not found")));
459 };
460
461 let args = serde_json::from_value::<StartDebuggingRequestArguments>(
462 request.arguments.unwrap_or_default(),
463 )
464 .expect("To parse StartDebuggingRequestArguments");
465 let worktree = local_store
466 .worktree_store
467 .update(cx, |this, _| this.worktrees().next())
468 .expect("worktree-less project");
469
470 let Some(config) = parent_session.read(cx).configuration() else {
471 unreachable!("there must be a config for local sessions");
472 };
473
474 let debug_config = DebugAdapterConfig {
475 label: config.label,
476 adapter: config.adapter,
477 request: DebugRequestDisposition::ReverseRequest(args),
478 initialize_args: config.initialize_args.clone(),
479 tcp_connection: config.tcp_connection.clone(),
480 locator: None,
481 stop_on_entry: config.stop_on_entry,
482 };
483
484 #[cfg(any(test, feature = "test-support"))]
485 let new_session_task = {
486 let caps = parent_session.read(cx).capabilities.clone();
487 self.new_fake_session(
488 debug_config,
489 &worktree,
490 Some(parent_session.clone()),
491 caps,
492 false,
493 cx,
494 )
495 .1
496 };
497 #[cfg(not(any(test, feature = "test-support")))]
498 let new_session_task = self
499 .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
500 .1;
501
502 let request_seq = request.seq;
503 cx.spawn(async move |_, cx| {
504 let (success, body) = match new_session_task.await {
505 Ok(_) => (true, None),
506 Err(error) => (
507 false,
508 Some(serde_json::to_value(ErrorResponse {
509 error: Some(dap::Message {
510 id: request_seq,
511 format: error.to_string(),
512 variables: None,
513 send_telemetry: None,
514 show_user: None,
515 url: None,
516 url_label: None,
517 }),
518 })?),
519 ),
520 };
521
522 parent_session
523 .update(cx, |session, cx| {
524 session.respond_to_client(
525 request_seq,
526 success,
527 StartDebugging::COMMAND.to_string(),
528 body,
529 cx,
530 )
531 })?
532 .await
533 })
534 }
535
536 fn handle_run_in_terminal_request(
537 &mut self,
538 session_id: SessionId,
539 request: dap::messages::Request,
540 cx: &mut Context<Self>,
541 ) -> Task<Result<()>> {
542 let Some(session) = self.session_by_id(session_id) else {
543 return Task::ready(Err(anyhow!("Session not found")));
544 };
545
546 let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
547 request.arguments.unwrap_or_default(),
548 )
549 .expect("To parse StartDebuggingRequestArguments");
550
551 let seq = request.seq;
552
553 let cwd = PathBuf::from(request_args.cwd);
554 match cwd.try_exists() {
555 Ok(true) => (),
556 Ok(false) | Err(_) => {
557 return session.update(cx, |session, cx| {
558 session.respond_to_client(
559 seq,
560 false,
561 RunInTerminal::COMMAND.to_string(),
562 serde_json::to_value(dap::ErrorResponse {
563 error: Some(dap::Message {
564 id: seq,
565 format: format!("Received invalid/unknown cwd: {cwd:?}"),
566 variables: None,
567 send_telemetry: None,
568 show_user: None,
569 url: None,
570 url_label: None,
571 }),
572 })
573 .ok(),
574 cx,
575 )
576 });
577 }
578 }
579
580 let mut args = request_args.args.clone();
581
582 // Handle special case for NodeJS debug adapter
583 // If only the Node binary path is provided, we set the command to None
584 // This prevents the NodeJS REPL from appearing, which is not the desired behavior
585 // The expected usage is for users to provide their own Node command, e.g., `node test.js`
586 // This allows the NodeJS debug client to attach correctly
587 let command = if args.len() > 1 {
588 Some(args.remove(0))
589 } else {
590 None
591 };
592
593 let mut envs: HashMap<String, String> = Default::default();
594 if let Some(Value::Object(env)) = request_args.env {
595 for (key, value) in env {
596 let value_str = match (key.as_str(), value) {
597 (_, Value::String(value)) => value,
598 _ => continue,
599 };
600
601 envs.insert(key, value_str);
602 }
603 }
604
605 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
606
607 cx.emit(DapStoreEvent::RunInTerminal {
608 session_id,
609 title: request_args.title,
610 cwd,
611 command,
612 args,
613 envs,
614 sender: tx,
615 });
616 cx.notify();
617
618 let session = session.downgrade();
619 cx.spawn(async move |_, cx| {
620 let (success, body) = match rx.next().await {
621 Some(Ok(pid)) => (
622 true,
623 serde_json::to_value(dap::RunInTerminalResponse {
624 process_id: None,
625 shell_process_id: Some(pid as u64),
626 })
627 .ok(),
628 ),
629 Some(Err(error)) => (
630 false,
631 serde_json::to_value(dap::ErrorResponse {
632 error: Some(dap::Message {
633 id: seq,
634 format: error.to_string(),
635 variables: None,
636 send_telemetry: None,
637 show_user: None,
638 url: None,
639 url_label: None,
640 }),
641 })
642 .ok(),
643 ),
644 None => (
645 false,
646 serde_json::to_value(dap::ErrorResponse {
647 error: Some(dap::Message {
648 id: seq,
649 format: "failed to receive response from spawn terminal".to_string(),
650 variables: None,
651 send_telemetry: None,
652 show_user: None,
653 url: None,
654 url_label: None,
655 }),
656 })
657 .ok(),
658 ),
659 };
660
661 session
662 .update(cx, |session, cx| {
663 session.respond_to_client(
664 seq,
665 success,
666 RunInTerminal::COMMAND.to_string(),
667 body,
668 cx,
669 )
670 })?
671 .await
672 })
673 }
674
675 pub fn evaluate(
676 &self,
677 session_id: &SessionId,
678 stack_frame_id: u64,
679 expression: String,
680 context: EvaluateArgumentsContext,
681 source: Option<Source>,
682 cx: &mut Context<Self>,
683 ) -> Task<Result<EvaluateResponse>> {
684 let Some(client) = self
685 .session_by_id(session_id)
686 .and_then(|client| client.read(cx).adapter_client())
687 else {
688 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
689 };
690
691 cx.background_executor().spawn(async move {
692 client
693 .request::<Evaluate>(EvaluateArguments {
694 expression: expression.clone(),
695 frame_id: Some(stack_frame_id),
696 context: Some(context),
697 format: None,
698 line: None,
699 column: None,
700 source,
701 })
702 .await
703 })
704 }
705
706 pub fn completions(
707 &self,
708 session_id: &SessionId,
709 stack_frame_id: u64,
710 text: String,
711 completion_column: u64,
712 cx: &mut Context<Self>,
713 ) -> Task<Result<Vec<CompletionItem>>> {
714 let Some(client) = self
715 .session_by_id(session_id)
716 .and_then(|client| client.read(cx).adapter_client())
717 else {
718 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
719 };
720
721 cx.background_executor().spawn(async move {
722 Ok(client
723 .request::<Completions>(CompletionsArguments {
724 frame_id: Some(stack_frame_id),
725 line: None,
726 text,
727 column: completion_column,
728 })
729 .await?
730 .targets)
731 })
732 }
733
734 pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
735 let mut tasks = vec![];
736 for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
737 tasks.push(self.shutdown_session(session_id, cx));
738 }
739
740 cx.background_executor().spawn(async move {
741 futures::future::join_all(tasks).await;
742 })
743 }
744
745 pub fn shutdown_session(
746 &mut self,
747 session_id: SessionId,
748 cx: &mut Context<Self>,
749 ) -> Task<Result<()>> {
750 let Some(_) = self.as_local_mut() else {
751 return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
752 };
753
754 let Some(session) = self.sessions.remove(&session_id) else {
755 return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
756 };
757
758 let shutdown_children = session
759 .read(cx)
760 .child_session_ids()
761 .iter()
762 .map(|session_id| self.shutdown_session(*session_id, cx))
763 .collect::<Vec<_>>();
764
765 let shutdown_parent_task = if let Some(parent_session) = session
766 .read(cx)
767 .parent_id()
768 .and_then(|session_id| self.session_by_id(session_id))
769 {
770 let shutdown_id = parent_session.update(cx, |parent_session, _| {
771 parent_session.remove_child_session_id(session_id);
772
773 if parent_session.child_session_ids().len() == 0 {
774 Some(parent_session.session_id())
775 } else {
776 None
777 }
778 });
779
780 shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
781 } else {
782 None
783 };
784
785 let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
786
787 cx.background_spawn(async move {
788 if shutdown_children.len() > 0 {
789 let _ = join_all(shutdown_children).await;
790 }
791
792 shutdown_task.await;
793
794 if let Some(parent_task) = shutdown_parent_task {
795 parent_task.await?;
796 }
797
798 Ok(())
799 })
800 }
801
802 pub fn shared(
803 &mut self,
804 project_id: u64,
805 downstream_client: AnyProtoClient,
806 _: &mut Context<Self>,
807 ) {
808 self.downstream_client = Some((downstream_client.clone(), project_id));
809 }
810
811 pub fn unshared(&mut self, cx: &mut Context<Self>) {
812 self.downstream_client.take();
813
814 cx.notify();
815 }
816}
817
818fn create_new_session(
819 session_id: SessionId,
820 initialized_rx: oneshot::Receiver<()>,
821 start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
822 cx: &mut Context<DapStore>,
823) -> Task<Result<Entity<Session>>> {
824 let task = cx.spawn(async move |this, cx| {
825 let session = match start_client_task.await {
826 Ok(session) => session,
827 Err(error) => {
828 this.update(cx, |_, cx| {
829 cx.emit(DapStoreEvent::Notification(error.to_string()));
830 })
831 .log_err();
832
833 return Err(error);
834 }
835 };
836
837 // we have to insert the session early, so we can handle reverse requests
838 // that need the session to be available
839 this.update(cx, |store, cx| {
840 store.sessions.insert(session_id, session.clone());
841 cx.emit(DapStoreEvent::DebugClientStarted(session_id));
842 cx.notify();
843 })?;
844
845 match session
846 .update(cx, |session, cx| {
847 session.initialize_sequence(initialized_rx, cx)
848 })?
849 .await
850 {
851 Ok(_) => {}
852 Err(error) => {
853 this.update(cx, |this, cx| {
854 cx.emit(DapStoreEvent::Notification(error.to_string()));
855
856 this.shutdown_session(session_id, cx)
857 })?
858 .await
859 .log_err();
860
861 return Err(error);
862 }
863 }
864
865 Ok(session)
866 });
867 task
868}
869
870#[derive(Clone)]
871pub struct DapAdapterDelegate {
872 fs: Arc<dyn Fs>,
873 worktree_id: WorktreeId,
874 node_runtime: NodeRuntime,
875 http_client: Arc<dyn HttpClient>,
876 language_registry: Arc<LanguageRegistry>,
877 toolchain_store: Arc<dyn LanguageToolchainStore>,
878 updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
879 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
880}
881
882impl DapAdapterDelegate {
883 pub fn new(
884 fs: Arc<dyn Fs>,
885 worktree_id: WorktreeId,
886 node_runtime: NodeRuntime,
887 http_client: Arc<dyn HttpClient>,
888 language_registry: Arc<LanguageRegistry>,
889 toolchain_store: Arc<dyn LanguageToolchainStore>,
890 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
891 ) -> Self {
892 Self {
893 fs,
894 worktree_id,
895 http_client,
896 node_runtime,
897 toolchain_store,
898 language_registry,
899 load_shell_env_task,
900 updated_adapters: Default::default(),
901 }
902 }
903}
904
905#[async_trait(?Send)]
906impl dap::adapters::DapDelegate for DapAdapterDelegate {
907 fn worktree_id(&self) -> WorktreeId {
908 self.worktree_id
909 }
910
911 fn http_client(&self) -> Arc<dyn HttpClient> {
912 self.http_client.clone()
913 }
914
915 fn node_runtime(&self) -> NodeRuntime {
916 self.node_runtime.clone()
917 }
918
919 fn fs(&self) -> Arc<dyn Fs> {
920 self.fs.clone()
921 }
922
923 fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
924 self.updated_adapters.clone()
925 }
926
927 fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
928 let name = SharedString::from(dap_name.to_string());
929 let status = match status {
930 DapStatus::None => BinaryStatus::None,
931 DapStatus::Downloading => BinaryStatus::Downloading,
932 DapStatus::Failed { error } => BinaryStatus::Failed { error },
933 DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
934 };
935
936 self.language_registry
937 .update_dap_status(LanguageServerName(name), status);
938 }
939
940 fn which(&self, command: &OsStr) -> Option<PathBuf> {
941 which::which(command).ok()
942 }
943
944 async fn shell_env(&self) -> HashMap<String, String> {
945 let task = self.load_shell_env_task.clone();
946 task.await.unwrap_or_default()
947 }
948
949 fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
950 self.toolchain_store.clone()
951 }
952}