1use super::{
2 breakpoint_store::BreakpointStore,
3 // Will need to uncomment this once we implement rpc message handler again
4 // dap_command::{
5 // ContinueCommand, DapCommand, DisconnectCommand, NextCommand, PauseCommand, RestartCommand,
6 // RestartStackFrameCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
7 // TerminateCommand, TerminateThreadsCommand, VariablesCommand,
8 // },
9 session::{self, Session},
10};
11use crate::{debugger, worktree_store::WorktreeStore, ProjectEnvironment};
12use anyhow::{anyhow, Result};
13use async_trait::async_trait;
14use collections::HashMap;
15use dap::{
16 adapters::{DapStatus, DebugAdapterName},
17 client::SessionId,
18 messages::Message,
19 requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
20 Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
21 EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
22 Source, StartDebuggingRequestArguments,
23};
24use fs::Fs;
25use futures::{
26 channel::{mpsc, oneshot},
27 future::{join_all, Shared},
28};
29use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
30use http_client::HttpClient;
31use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
32use lsp::LanguageServerName;
33use node_runtime::NodeRuntime;
34
35use rpc::{
36 proto::{self},
37 AnyProtoClient, TypedEnvelope,
38};
39use serde_json::Value;
40use settings::WorktreeId;
41use smol::{lock::Mutex, stream::StreamExt};
42use std::{
43 borrow::Borrow,
44 collections::{BTreeMap, HashSet},
45 ffi::OsStr,
46 path::PathBuf,
47 sync::{atomic::Ordering::SeqCst, Arc},
48};
49use std::{collections::VecDeque, sync::atomic::AtomicU32};
50use task::{DebugAdapterConfig, DebugRequestDisposition};
51use util::ResultExt as _;
52use worktree::Worktree;
53
54pub enum DapStoreEvent {
55 DebugClientStarted(SessionId),
56 DebugClientShutdown(SessionId),
57 DebugClientEvent {
58 session_id: SessionId,
59 message: Message,
60 },
61 RunInTerminal {
62 session_id: SessionId,
63 title: Option<String>,
64 cwd: PathBuf,
65 command: Option<String>,
66 args: Vec<String>,
67 envs: HashMap<String, String>,
68 sender: mpsc::Sender<Result<u32>>,
69 },
70 Notification(String),
71 RemoteHasInitialized,
72}
73
74#[allow(clippy::large_enum_variant)]
75pub enum DapStoreMode {
76 Local(LocalDapStore), // ssh host and collab host
77 Remote(RemoteDapStore), // collab guest
78}
79
80pub struct LocalDapStore {
81 fs: Arc<dyn Fs>,
82 node_runtime: NodeRuntime,
83 next_session_id: AtomicU32,
84 http_client: Arc<dyn HttpClient>,
85 worktree_store: Entity<WorktreeStore>,
86 environment: Entity<ProjectEnvironment>,
87 language_registry: Arc<LanguageRegistry>,
88 debug_adapters: Arc<DapRegistry>,
89 toolchain_store: Arc<dyn LanguageToolchainStore>,
90 start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
91 _start_debugging_task: Task<()>,
92}
93
94impl LocalDapStore {
95 fn next_session_id(&self) -> SessionId {
96 SessionId(self.next_session_id.fetch_add(1, SeqCst))
97 }
98}
99
100pub struct RemoteDapStore {
101 upstream_client: AnyProtoClient,
102 upstream_project_id: u64,
103 event_queue: Option<VecDeque<DapStoreEvent>>,
104}
105
106pub struct DapStore {
107 mode: DapStoreMode,
108 downstream_client: Option<(AnyProtoClient, u64)>,
109 breakpoint_store: Entity<BreakpointStore>,
110 sessions: BTreeMap<SessionId, Entity<Session>>,
111}
112
113impl EventEmitter<DapStoreEvent> for DapStore {}
114
115impl DapStore {
116 pub fn init(_client: &AnyProtoClient) {
117 // todo(debugger): Reenable these after we finish handle_dap_command refactor
118 // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
119 // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
120 // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
121 // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
122 // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
123 // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
124 // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
125 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
126 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
127 // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
128 // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
129 // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
130 }
131
132 #[expect(clippy::too_many_arguments)]
133 pub fn new_local(
134 http_client: Arc<dyn HttpClient>,
135 node_runtime: NodeRuntime,
136 fs: Arc<dyn Fs>,
137 language_registry: Arc<LanguageRegistry>,
138 debug_adapters: Arc<DapRegistry>,
139 environment: Entity<ProjectEnvironment>,
140 toolchain_store: Arc<dyn LanguageToolchainStore>,
141 breakpoint_store: Entity<BreakpointStore>,
142 worktree_store: Entity<WorktreeStore>,
143 cx: &mut Context<Self>,
144 ) -> Self {
145 cx.on_app_quit(Self::shutdown_sessions).detach();
146
147 let (start_debugging_tx, mut message_rx) =
148 futures::channel::mpsc::unbounded::<(SessionId, Message)>();
149
150 let _start_debugging_task = cx.spawn(async move |this, cx| {
151 while let Some((session_id, message)) = message_rx.next().await {
152 match message {
153 Message::Request(request) => {
154 let _ = this
155 .update(cx, |this, cx| {
156 if request.command == StartDebugging::COMMAND {
157 this.handle_start_debugging_request(session_id, request, cx)
158 .detach_and_log_err(cx);
159 } else if request.command == RunInTerminal::COMMAND {
160 this.handle_run_in_terminal_request(session_id, request, cx)
161 .detach_and_log_err(cx);
162 }
163 })
164 .log_err();
165 }
166 _ => {}
167 }
168 }
169 });
170 Self {
171 mode: DapStoreMode::Local(LocalDapStore {
172 fs,
173 environment,
174 http_client,
175 node_runtime,
176 worktree_store,
177 toolchain_store,
178 language_registry,
179 debug_adapters,
180 start_debugging_tx,
181 _start_debugging_task,
182 next_session_id: Default::default(),
183 }),
184 downstream_client: None,
185 breakpoint_store,
186 sessions: Default::default(),
187 }
188 }
189
190 pub fn new_remote(
191 project_id: u64,
192 upstream_client: AnyProtoClient,
193 breakpoint_store: Entity<BreakpointStore>,
194 ) -> Self {
195 Self {
196 mode: DapStoreMode::Remote(RemoteDapStore {
197 upstream_client,
198 upstream_project_id: project_id,
199 event_queue: Some(VecDeque::default()),
200 }),
201 downstream_client: None,
202 breakpoint_store,
203 sessions: Default::default(),
204 }
205 }
206
207 pub fn as_remote(&self) -> Option<&RemoteDapStore> {
208 match &self.mode {
209 DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
210 _ => None,
211 }
212 }
213
214 pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
215 if let DapStoreMode::Remote(remote) = &mut self.mode {
216 remote.event_queue.take()
217 } else {
218 None
219 }
220 }
221
222 pub fn as_local(&self) -> Option<&LocalDapStore> {
223 match &self.mode {
224 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
225 _ => None,
226 }
227 }
228
229 pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
230 match &mut self.mode {
231 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
232 _ => None,
233 }
234 }
235
236 pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
237 match &self.mode {
238 DapStoreMode::Remote(RemoteDapStore {
239 upstream_client,
240 upstream_project_id,
241 ..
242 }) => Some((upstream_client.clone(), *upstream_project_id)),
243
244 DapStoreMode::Local(_) => None,
245 }
246 }
247
248 pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
249 self.downstream_client.as_ref()
250 }
251
252 pub fn add_remote_client(
253 &mut self,
254 session_id: SessionId,
255 ignore: Option<bool>,
256 cx: &mut Context<Self>,
257 ) {
258 if let DapStoreMode::Remote(remote) = &self.mode {
259 self.sessions.insert(
260 session_id,
261 cx.new(|_| {
262 debugger::session::Session::remote(
263 session_id,
264 remote.upstream_client.clone(),
265 remote.upstream_project_id,
266 ignore.unwrap_or(false),
267 )
268 }),
269 );
270 } else {
271 debug_assert!(false);
272 }
273 }
274
275 pub fn session_by_id(
276 &self,
277 session_id: impl Borrow<SessionId>,
278 ) -> Option<Entity<session::Session>> {
279 let session_id = session_id.borrow();
280 let client = self.sessions.get(session_id).cloned();
281
282 client
283 }
284 pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
285 self.sessions.values()
286 }
287
288 pub fn capabilities_by_id(
289 &self,
290 session_id: impl Borrow<SessionId>,
291 cx: &App,
292 ) -> Option<Capabilities> {
293 let session_id = session_id.borrow();
294 self.sessions
295 .get(session_id)
296 .map(|client| client.read(cx).capabilities.clone())
297 }
298
299 pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
300 &self.breakpoint_store
301 }
302
303 #[allow(dead_code)]
304 async fn handle_ignore_breakpoint_state(
305 this: Entity<Self>,
306 envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
307 mut cx: AsyncApp,
308 ) -> Result<()> {
309 let session_id = SessionId::from_proto(envelope.payload.session_id);
310
311 this.update(&mut cx, |this, cx| {
312 if let Some(session) = this.session_by_id(&session_id) {
313 session.update(cx, |session, cx| {
314 session.set_ignore_breakpoints(envelope.payload.ignore, cx)
315 })
316 } else {
317 Task::ready(())
318 }
319 })?
320 .await;
321
322 Ok(())
323 }
324
325 pub fn new_session(
326 &mut self,
327 config: DebugAdapterConfig,
328 worktree: &Entity<Worktree>,
329 parent_session: Option<Entity<Session>>,
330 cx: &mut Context<Self>,
331 ) -> (SessionId, Task<Result<Entity<Session>>>) {
332 let Some(local_store) = self.as_local() else {
333 unimplemented!("Starting session on remote side");
334 };
335
336 let delegate = DapAdapterDelegate::new(
337 local_store.fs.clone(),
338 worktree.read(cx).id(),
339 local_store.node_runtime.clone(),
340 local_store.http_client.clone(),
341 local_store.language_registry.clone(),
342 local_store.toolchain_store.clone(),
343 local_store.environment.update(cx, |env, cx| {
344 let worktree = worktree.read(cx);
345 env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
346 }),
347 );
348 let session_id = local_store.next_session_id();
349
350 if let Some(session) = &parent_session {
351 session.update(cx, |session, _| {
352 session.add_child_session_id(session_id);
353 });
354 }
355
356 let (initialized_tx, initialized_rx) = oneshot::channel();
357
358 let start_client_task = Session::local(
359 self.breakpoint_store.clone(),
360 session_id,
361 parent_session,
362 delegate,
363 config,
364 local_store.start_debugging_tx.clone(),
365 initialized_tx,
366 local_store.debug_adapters.clone(),
367 cx,
368 );
369
370 let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
371 (session_id, task)
372 }
373 #[cfg(any(test, feature = "test-support"))]
374 pub fn new_fake_session(
375 &mut self,
376 config: DebugAdapterConfig,
377 worktree: &Entity<Worktree>,
378 parent_session: Option<Entity<Session>>,
379 caps: Capabilities,
380 fails: bool,
381 cx: &mut Context<Self>,
382 ) -> (SessionId, Task<Result<Entity<Session>>>) {
383 let Some(local_store) = self.as_local() else {
384 unimplemented!("Starting session on remote side");
385 };
386
387 let delegate = DapAdapterDelegate::new(
388 local_store.fs.clone(),
389 worktree.read(cx).id(),
390 local_store.node_runtime.clone(),
391 local_store.http_client.clone(),
392 local_store.language_registry.clone(),
393 local_store.toolchain_store.clone(),
394 local_store.environment.update(cx, |env, cx| {
395 let worktree = worktree.read(cx);
396 env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
397 }),
398 );
399 let session_id = local_store.next_session_id();
400
401 if let Some(session) = &parent_session {
402 session.update(cx, |session, _| {
403 session.add_child_session_id(session_id);
404 });
405 }
406
407 let (initialized_tx, initialized_rx) = oneshot::channel();
408
409 let start_client_task = Session::fake(
410 self.breakpoint_store.clone(),
411 session_id,
412 parent_session,
413 delegate,
414 config,
415 local_store.start_debugging_tx.clone(),
416 initialized_tx,
417 caps,
418 fails,
419 cx,
420 );
421
422 let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
423 (session_id, task)
424 }
425
426 fn handle_start_debugging_request(
427 &mut self,
428 session_id: SessionId,
429 request: dap::messages::Request,
430 cx: &mut Context<Self>,
431 ) -> Task<Result<()>> {
432 let Some(local_store) = self.as_local() else {
433 unreachable!("Cannot response for non-local session");
434 };
435
436 let Some(parent_session) = self.session_by_id(session_id) else {
437 return Task::ready(Err(anyhow!("Session not found")));
438 };
439
440 let args = serde_json::from_value::<StartDebuggingRequestArguments>(
441 request.arguments.unwrap_or_default(),
442 )
443 .expect("To parse StartDebuggingRequestArguments");
444 let worktree = local_store
445 .worktree_store
446 .update(cx, |this, _| this.worktrees().next())
447 .expect("worktree-less project");
448
449 let Some(config) = parent_session.read(cx).configuration() else {
450 unreachable!("there must be a config for local sessions");
451 };
452
453 let debug_config = DebugAdapterConfig {
454 label: config.label,
455 adapter: config.adapter,
456 request: DebugRequestDisposition::ReverseRequest(args),
457 initialize_args: config.initialize_args.clone(),
458 tcp_connection: config.tcp_connection.clone(),
459 };
460 #[cfg(any(test, feature = "test-support"))]
461 let new_session_task = {
462 let caps = parent_session.read(cx).capabilities.clone();
463 self.new_fake_session(
464 debug_config,
465 &worktree,
466 Some(parent_session.clone()),
467 caps,
468 false,
469 cx,
470 )
471 .1
472 };
473 #[cfg(not(any(test, feature = "test-support")))]
474 let new_session_task = self
475 .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
476 .1;
477
478 let request_seq = request.seq;
479 cx.spawn(async move |_, cx| {
480 let (success, body) = match new_session_task.await {
481 Ok(_) => (true, None),
482 Err(error) => (
483 false,
484 Some(serde_json::to_value(ErrorResponse {
485 error: Some(dap::Message {
486 id: request_seq,
487 format: error.to_string(),
488 variables: None,
489 send_telemetry: None,
490 show_user: None,
491 url: None,
492 url_label: None,
493 }),
494 })?),
495 ),
496 };
497
498 parent_session
499 .update(cx, |session, cx| {
500 session.respond_to_client(
501 request_seq,
502 success,
503 StartDebugging::COMMAND.to_string(),
504 body,
505 cx,
506 )
507 })?
508 .await
509 })
510 }
511
512 fn handle_run_in_terminal_request(
513 &mut self,
514 session_id: SessionId,
515 request: dap::messages::Request,
516 cx: &mut Context<Self>,
517 ) -> Task<Result<()>> {
518 let Some(session) = self.session_by_id(session_id) else {
519 return Task::ready(Err(anyhow!("Session not found")));
520 };
521
522 let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
523 request.arguments.unwrap_or_default(),
524 )
525 .expect("To parse StartDebuggingRequestArguments");
526
527 let seq = request.seq;
528
529 let cwd = PathBuf::from(request_args.cwd);
530 match cwd.try_exists() {
531 Ok(true) => (),
532 Ok(false) | Err(_) => {
533 return session.update(cx, |session, cx| {
534 session.respond_to_client(
535 seq,
536 false,
537 RunInTerminal::COMMAND.to_string(),
538 serde_json::to_value(dap::ErrorResponse {
539 error: Some(dap::Message {
540 id: seq,
541 format: format!("Received invalid/unknown cwd: {cwd:?}"),
542 variables: None,
543 send_telemetry: None,
544 show_user: None,
545 url: None,
546 url_label: None,
547 }),
548 })
549 .ok(),
550 cx,
551 )
552 })
553 }
554 }
555
556 let mut args = request_args.args.clone();
557
558 // Handle special case for NodeJS debug adapter
559 // If only the Node binary path is provided, we set the command to None
560 // This prevents the NodeJS REPL from appearing, which is not the desired behavior
561 // The expected usage is for users to provide their own Node command, e.g., `node test.js`
562 // This allows the NodeJS debug client to attach correctly
563 let command = if args.len() > 1 {
564 Some(args.remove(0))
565 } else {
566 None
567 };
568
569 let mut envs: HashMap<String, String> = Default::default();
570 if let Some(Value::Object(env)) = request_args.env {
571 for (key, value) in env {
572 let value_str = match (key.as_str(), value) {
573 (_, Value::String(value)) => value,
574 _ => continue,
575 };
576
577 envs.insert(key, value_str);
578 }
579 }
580
581 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
582
583 cx.emit(DapStoreEvent::RunInTerminal {
584 session_id,
585 title: request_args.title,
586 cwd,
587 command,
588 args,
589 envs,
590 sender: tx,
591 });
592 cx.notify();
593
594 let session = session.downgrade();
595 cx.spawn(async move |_, cx| {
596 let (success, body) = match rx.next().await {
597 Some(Ok(pid)) => (
598 true,
599 serde_json::to_value(dap::RunInTerminalResponse {
600 process_id: None,
601 shell_process_id: Some(pid as u64),
602 })
603 .ok(),
604 ),
605 Some(Err(error)) => (
606 false,
607 serde_json::to_value(dap::ErrorResponse {
608 error: Some(dap::Message {
609 id: seq,
610 format: error.to_string(),
611 variables: None,
612 send_telemetry: None,
613 show_user: None,
614 url: None,
615 url_label: None,
616 }),
617 })
618 .ok(),
619 ),
620 None => (
621 false,
622 serde_json::to_value(dap::ErrorResponse {
623 error: Some(dap::Message {
624 id: seq,
625 format: "failed to receive response from spawn terminal".to_string(),
626 variables: None,
627 send_telemetry: None,
628 show_user: None,
629 url: None,
630 url_label: None,
631 }),
632 })
633 .ok(),
634 ),
635 };
636
637 session
638 .update(cx, |session, cx| {
639 session.respond_to_client(
640 seq,
641 success,
642 RunInTerminal::COMMAND.to_string(),
643 body,
644 cx,
645 )
646 })?
647 .await
648 })
649 }
650
651 pub fn evaluate(
652 &self,
653 session_id: &SessionId,
654 stack_frame_id: u64,
655 expression: String,
656 context: EvaluateArgumentsContext,
657 source: Option<Source>,
658 cx: &mut Context<Self>,
659 ) -> Task<Result<EvaluateResponse>> {
660 let Some(client) = self
661 .session_by_id(session_id)
662 .and_then(|client| client.read(cx).adapter_client())
663 else {
664 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
665 };
666
667 cx.background_executor().spawn(async move {
668 client
669 .request::<Evaluate>(EvaluateArguments {
670 expression: expression.clone(),
671 frame_id: Some(stack_frame_id),
672 context: Some(context),
673 format: None,
674 line: None,
675 column: None,
676 source,
677 })
678 .await
679 })
680 }
681
682 pub fn completions(
683 &self,
684 session_id: &SessionId,
685 stack_frame_id: u64,
686 text: String,
687 completion_column: u64,
688 cx: &mut Context<Self>,
689 ) -> Task<Result<Vec<CompletionItem>>> {
690 let Some(client) = self
691 .session_by_id(session_id)
692 .and_then(|client| client.read(cx).adapter_client())
693 else {
694 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
695 };
696
697 cx.background_executor().spawn(async move {
698 Ok(client
699 .request::<Completions>(CompletionsArguments {
700 frame_id: Some(stack_frame_id),
701 line: None,
702 text,
703 column: completion_column,
704 })
705 .await?
706 .targets)
707 })
708 }
709
710 pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
711 let mut tasks = vec![];
712 for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
713 tasks.push(self.shutdown_session(session_id, cx));
714 }
715
716 cx.background_executor().spawn(async move {
717 futures::future::join_all(tasks).await;
718 })
719 }
720
721 pub fn shutdown_session(
722 &mut self,
723 session_id: SessionId,
724 cx: &mut Context<Self>,
725 ) -> Task<Result<()>> {
726 let Some(_) = self.as_local_mut() else {
727 return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
728 };
729
730 let Some(session) = self.sessions.remove(&session_id) else {
731 return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
732 };
733
734 let shutdown_children = session
735 .read(cx)
736 .child_session_ids()
737 .iter()
738 .map(|session_id| self.shutdown_session(*session_id, cx))
739 .collect::<Vec<_>>();
740
741 let shutdown_parent_task = if let Some(parent_session) = session
742 .read(cx)
743 .parent_id()
744 .and_then(|session_id| self.session_by_id(session_id))
745 {
746 let shutdown_id = parent_session.update(cx, |parent_session, _| {
747 parent_session.remove_child_session_id(session_id);
748
749 if parent_session.child_session_ids().len() == 0 {
750 Some(parent_session.session_id())
751 } else {
752 None
753 }
754 });
755
756 shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
757 } else {
758 None
759 };
760
761 let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
762
763 cx.background_spawn(async move {
764 if shutdown_children.len() > 0 {
765 let _ = join_all(shutdown_children).await;
766 }
767
768 shutdown_task.await;
769
770 if let Some(parent_task) = shutdown_parent_task {
771 parent_task.await?;
772 }
773
774 Ok(())
775 })
776 }
777
778 pub fn shared(
779 &mut self,
780 project_id: u64,
781 downstream_client: AnyProtoClient,
782 _: &mut Context<Self>,
783 ) {
784 self.downstream_client = Some((downstream_client.clone(), project_id));
785 }
786
787 pub fn unshared(&mut self, cx: &mut Context<Self>) {
788 self.downstream_client.take();
789
790 cx.notify();
791 }
792}
793
794fn create_new_session(
795 session_id: SessionId,
796 initialized_rx: oneshot::Receiver<()>,
797 start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
798 cx: &mut Context<DapStore>,
799) -> Task<Result<Entity<Session>>> {
800 let task = cx.spawn(async move |this, cx| {
801 let session = match start_client_task.await {
802 Ok(session) => session,
803 Err(error) => {
804 this.update(cx, |_, cx| {
805 cx.emit(DapStoreEvent::Notification(error.to_string()));
806 })
807 .log_err();
808
809 return Err(error);
810 }
811 };
812
813 // we have to insert the session early, so we can handle reverse requests
814 // that need the session to be available
815 this.update(cx, |store, cx| {
816 store.sessions.insert(session_id, session.clone());
817 cx.emit(DapStoreEvent::DebugClientStarted(session_id));
818 cx.notify();
819 })?;
820
821 match session
822 .update(cx, |session, cx| {
823 session.initialize_sequence(initialized_rx, cx)
824 })?
825 .await
826 {
827 Ok(_) => {}
828 Err(error) => {
829 this.update(cx, |this, cx| {
830 cx.emit(DapStoreEvent::Notification(error.to_string()));
831
832 this.shutdown_session(session_id, cx)
833 })?
834 .await
835 .log_err();
836
837 return Err(error);
838 }
839 }
840
841 Ok(session)
842 });
843 task
844}
845
846#[derive(Clone)]
847pub struct DapAdapterDelegate {
848 fs: Arc<dyn Fs>,
849 worktree_id: WorktreeId,
850 node_runtime: NodeRuntime,
851 http_client: Arc<dyn HttpClient>,
852 language_registry: Arc<LanguageRegistry>,
853 toolchain_store: Arc<dyn LanguageToolchainStore>,
854 updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
855 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
856}
857
858impl DapAdapterDelegate {
859 pub fn new(
860 fs: Arc<dyn Fs>,
861 worktree_id: WorktreeId,
862 node_runtime: NodeRuntime,
863 http_client: Arc<dyn HttpClient>,
864 language_registry: Arc<LanguageRegistry>,
865 toolchain_store: Arc<dyn LanguageToolchainStore>,
866 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
867 ) -> Self {
868 Self {
869 fs,
870 worktree_id,
871 http_client,
872 node_runtime,
873 toolchain_store,
874 language_registry,
875 load_shell_env_task,
876 updated_adapters: Default::default(),
877 }
878 }
879}
880
881#[async_trait(?Send)]
882impl dap::adapters::DapDelegate for DapAdapterDelegate {
883 fn worktree_id(&self) -> WorktreeId {
884 self.worktree_id
885 }
886
887 fn http_client(&self) -> Arc<dyn HttpClient> {
888 self.http_client.clone()
889 }
890
891 fn node_runtime(&self) -> NodeRuntime {
892 self.node_runtime.clone()
893 }
894
895 fn fs(&self) -> Arc<dyn Fs> {
896 self.fs.clone()
897 }
898
899 fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
900 self.updated_adapters.clone()
901 }
902
903 fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
904 let name = SharedString::from(dap_name.to_string());
905 let status = match status {
906 DapStatus::None => BinaryStatus::None,
907 DapStatus::Downloading => BinaryStatus::Downloading,
908 DapStatus::Failed { error } => BinaryStatus::Failed { error },
909 DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
910 };
911
912 self.language_registry
913 .update_dap_status(LanguageServerName(name), status);
914 }
915
916 fn which(&self, command: &OsStr) -> Option<PathBuf> {
917 which::which(command).ok()
918 }
919
920 async fn shell_env(&self) -> HashMap<String, String> {
921 let task = self.load_shell_env_task.clone();
922 task.await.unwrap_or_default()
923 }
924
925 fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
926 self.toolchain_store.clone()
927 }
928}