1use super::{
2 breakpoint_store::BreakpointStore,
3 locators::DapLocator,
4 session::{self, Session, SessionStateEvent},
5};
6use crate::{
7 ProjectEnvironment, debugger, project_settings::ProjectSettings, worktree_store::WorktreeStore,
8};
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use collections::HashMap;
12use dap::{
13 Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
14 EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
15 Source, StartDebuggingRequestArguments,
16 adapters::{DapStatus, DebugAdapterBinary, DebugAdapterName},
17 client::SessionId,
18 messages::Message,
19 requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
20};
21use fs::Fs;
22use futures::{
23 channel::{mpsc, oneshot},
24 future::{Shared, join_all},
25};
26use gpui::{
27 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
28};
29use http_client::HttpClient;
30use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
31use lsp::LanguageServerName;
32use node_runtime::NodeRuntime;
33
34use rpc::{
35 AnyProtoClient, TypedEnvelope,
36 proto::{self},
37};
38use serde_json::Value;
39use settings::{Settings, WorktreeId};
40use smol::{lock::Mutex, stream::StreamExt};
41use std::{
42 borrow::Borrow,
43 collections::{BTreeMap, HashSet},
44 ffi::OsStr,
45 path::{Path, PathBuf},
46 sync::Arc,
47};
48use task::{DebugTaskDefinition, DebugTaskTemplate};
49use util::ResultExt as _;
50use worktree::Worktree;
51
52pub enum DapStoreEvent {
53 DebugClientStarted(SessionId),
54 DebugSessionInitialized(SessionId),
55 DebugClientShutdown(SessionId),
56 DebugClientEvent {
57 session_id: SessionId,
58 message: Message,
59 },
60 RunInTerminal {
61 session_id: SessionId,
62 title: Option<String>,
63 cwd: Option<Arc<Path>>,
64 command: Option<String>,
65 args: Vec<String>,
66 envs: HashMap<String, String>,
67 sender: mpsc::Sender<Result<u32>>,
68 },
69 Notification(String),
70 RemoteHasInitialized,
71}
72
73#[allow(clippy::large_enum_variant)]
74enum DapStoreMode {
75 Local(LocalDapStore),
76 Ssh(SshDapStore),
77 Collab,
78}
79
80pub struct LocalDapStore {
81 fs: Arc<dyn Fs>,
82 node_runtime: NodeRuntime,
83 http_client: Arc<dyn HttpClient>,
84 environment: Entity<ProjectEnvironment>,
85 language_registry: Arc<LanguageRegistry>,
86 worktree_store: Entity<WorktreeStore>,
87 toolchain_store: Arc<dyn LanguageToolchainStore>,
88 locators: HashMap<String, Arc<dyn DapLocator>>,
89}
90
91pub struct SshDapStore {
92 upstream_client: AnyProtoClient,
93 upstream_project_id: u64,
94}
95
96pub struct DapStore {
97 mode: DapStoreMode,
98 downstream_client: Option<(AnyProtoClient, u64)>,
99 breakpoint_store: Entity<BreakpointStore>,
100 sessions: BTreeMap<SessionId, Entity<Session>>,
101 next_session_id: u32,
102 start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
103 _start_debugging_task: Task<()>,
104}
105
106impl EventEmitter<DapStoreEvent> for DapStore {}
107
108impl DapStore {
109 pub fn init(client: &AnyProtoClient) {
110 client.add_entity_request_handler(Self::handle_run_debug_locator);
111 client.add_entity_request_handler(Self::handle_get_debug_adapter_binary);
112 }
113
114 #[expect(clippy::too_many_arguments)]
115 pub fn new_local(
116 http_client: Arc<dyn HttpClient>,
117 node_runtime: NodeRuntime,
118 fs: Arc<dyn Fs>,
119 language_registry: Arc<LanguageRegistry>,
120 environment: Entity<ProjectEnvironment>,
121 toolchain_store: Arc<dyn LanguageToolchainStore>,
122 worktree_store: Entity<WorktreeStore>,
123 breakpoint_store: Entity<BreakpointStore>,
124 cx: &mut Context<Self>,
125 ) -> Self {
126 cx.on_app_quit(Self::shutdown_sessions).detach();
127
128 let locators = HashMap::from_iter([(
129 "cargo".to_string(),
130 Arc::new(super::locators::cargo::CargoLocator {}) as _,
131 )]);
132
133 let mode = DapStoreMode::Local(LocalDapStore {
134 fs,
135 environment,
136 http_client,
137 node_runtime,
138 toolchain_store,
139 worktree_store,
140 language_registry,
141 locators,
142 });
143
144 Self::new(mode, breakpoint_store, cx)
145 }
146
147 pub fn new_ssh(
148 project_id: u64,
149 upstream_client: AnyProtoClient,
150 breakpoint_store: Entity<BreakpointStore>,
151 cx: &mut Context<Self>,
152 ) -> Self {
153 let mode = DapStoreMode::Ssh(SshDapStore {
154 upstream_client,
155 upstream_project_id: project_id,
156 });
157
158 Self::new(mode, breakpoint_store, cx)
159 }
160
161 pub fn new_collab(
162 _project_id: u64,
163 _upstream_client: AnyProtoClient,
164 breakpoint_store: Entity<BreakpointStore>,
165 cx: &mut Context<Self>,
166 ) -> Self {
167 Self::new(DapStoreMode::Collab, breakpoint_store, cx)
168 }
169
170 fn new(
171 mode: DapStoreMode,
172 breakpoint_store: Entity<BreakpointStore>,
173 cx: &mut Context<Self>,
174 ) -> Self {
175 let (start_debugging_tx, mut message_rx) =
176 futures::channel::mpsc::unbounded::<(SessionId, Message)>();
177 let task = cx.spawn(async move |this, cx| {
178 while let Some((session_id, message)) = message_rx.next().await {
179 match message {
180 Message::Request(request) => {
181 let _ = this
182 .update(cx, |this, cx| {
183 if request.command == StartDebugging::COMMAND {
184 this.handle_start_debugging_request(session_id, request, cx)
185 .detach_and_log_err(cx);
186 } else if request.command == RunInTerminal::COMMAND {
187 this.handle_run_in_terminal_request(session_id, request, cx)
188 .detach_and_log_err(cx);
189 }
190 })
191 .log_err();
192 }
193 _ => {}
194 }
195 }
196 });
197
198 Self {
199 mode,
200 _start_debugging_task: task,
201 start_debugging_tx,
202 next_session_id: 0,
203 downstream_client: None,
204 breakpoint_store,
205 sessions: Default::default(),
206 }
207 }
208
209 pub fn get_debug_adapter_binary(
210 &mut self,
211 definition: DebugTaskDefinition,
212 cx: &mut Context<Self>,
213 ) -> Task<Result<DebugAdapterBinary>> {
214 match &self.mode {
215 DapStoreMode::Local(local) => {
216 let Some(worktree) = local.worktree_store.read(cx).visible_worktrees(cx).next()
217 else {
218 return Task::ready(Err(anyhow!("Failed to find a worktree")));
219 };
220 let Some(adapter) = DapRegistry::global(cx).adapter(&definition.adapter) else {
221 return Task::ready(Err(anyhow!("Failed to find a debug adapter")));
222 };
223
224 let user_installed_path = ProjectSettings::get_global(cx)
225 .dap
226 .get(&adapter.name())
227 .and_then(|s| s.binary.as_ref().map(PathBuf::from));
228
229 let delegate = self.delegate(&worktree, cx);
230 let cwd: Arc<Path> = definition
231 .cwd()
232 .unwrap_or(worktree.read(cx).abs_path().as_ref())
233 .into();
234
235 cx.spawn(async move |this, cx| {
236 let mut binary = adapter
237 .get_binary(&delegate, &definition, user_installed_path, cx)
238 .await?;
239
240 let env = this
241 .update(cx, |this, cx| {
242 this.as_local()
243 .unwrap()
244 .environment
245 .update(cx, |environment, cx| {
246 environment.get_directory_environment(cwd, cx)
247 })
248 })?
249 .await;
250
251 if let Some(mut env) = env {
252 env.extend(std::mem::take(&mut binary.envs));
253 binary.envs = env;
254 }
255
256 Ok(binary)
257 })
258 }
259 DapStoreMode::Ssh(ssh) => {
260 let request = ssh.upstream_client.request(proto::GetDebugAdapterBinary {
261 project_id: ssh.upstream_project_id,
262 task: Some(definition.to_proto()),
263 });
264
265 cx.background_spawn(async move {
266 let response = request.await?;
267 DebugAdapterBinary::from_proto(response)
268 })
269 }
270 DapStoreMode::Collab => {
271 Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
272 }
273 }
274 }
275
276 pub fn run_debug_locator(
277 &mut self,
278 template: DebugTaskTemplate,
279 cx: &mut Context<Self>,
280 ) -> Task<Result<DebugTaskDefinition>> {
281 let Some(locator_name) = template.locator else {
282 return Task::ready(Ok(template.definition));
283 };
284
285 match &self.mode {
286 DapStoreMode::Local(local) => {
287 if let Some(locator) = local.locators.get(&locator_name).cloned() {
288 cx.background_spawn(
289 async move { locator.run_locator(template.definition).await },
290 )
291 } else {
292 Task::ready(Err(anyhow!("Couldn't find locator {}", locator_name)))
293 }
294 }
295 DapStoreMode::Ssh(ssh) => {
296 let request = ssh.upstream_client.request(proto::RunDebugLocator {
297 project_id: ssh.upstream_project_id,
298 locator: locator_name,
299 task: Some(template.definition.to_proto()),
300 });
301 cx.background_spawn(async move {
302 let response = request.await?;
303 DebugTaskDefinition::from_proto(response)
304 })
305 }
306 DapStoreMode::Collab => {
307 Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
308 }
309 }
310 }
311
312 fn as_local(&self) -> Option<&LocalDapStore> {
313 match &self.mode {
314 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
315 _ => None,
316 }
317 }
318
319 pub fn add_remote_client(
320 &mut self,
321 session_id: SessionId,
322 ignore: Option<bool>,
323 cx: &mut Context<Self>,
324 ) {
325 if let DapStoreMode::Ssh(remote) = &self.mode {
326 self.sessions.insert(
327 session_id,
328 cx.new(|_| {
329 debugger::session::Session::remote(
330 session_id,
331 remote.upstream_client.clone(),
332 remote.upstream_project_id,
333 ignore.unwrap_or(false),
334 )
335 }),
336 );
337 } else {
338 debug_assert!(false);
339 }
340 }
341
342 pub fn session_by_id(
343 &self,
344 session_id: impl Borrow<SessionId>,
345 ) -> Option<Entity<session::Session>> {
346 let session_id = session_id.borrow();
347 let client = self.sessions.get(session_id).cloned();
348
349 client
350 }
351 pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
352 self.sessions.values()
353 }
354
355 pub fn capabilities_by_id(
356 &self,
357 session_id: impl Borrow<SessionId>,
358 cx: &App,
359 ) -> Option<Capabilities> {
360 let session_id = session_id.borrow();
361 self.sessions
362 .get(session_id)
363 .map(|client| client.read(cx).capabilities.clone())
364 }
365
366 pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
367 &self.breakpoint_store
368 }
369
370 #[allow(dead_code)]
371 async fn handle_ignore_breakpoint_state(
372 this: Entity<Self>,
373 envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
374 mut cx: AsyncApp,
375 ) -> Result<()> {
376 let session_id = SessionId::from_proto(envelope.payload.session_id);
377
378 this.update(&mut cx, |this, cx| {
379 if let Some(session) = this.session_by_id(&session_id) {
380 session.update(cx, |session, cx| {
381 session.set_ignore_breakpoints(envelope.payload.ignore, cx)
382 })
383 } else {
384 Task::ready(HashMap::default())
385 }
386 })?
387 .await;
388
389 Ok(())
390 }
391
392 fn delegate(&self, worktree: &Entity<Worktree>, cx: &mut App) -> DapAdapterDelegate {
393 let Some(local_store) = self.as_local() else {
394 unimplemented!("Starting session on remote side");
395 };
396
397 DapAdapterDelegate::new(
398 local_store.fs.clone(),
399 worktree.read(cx).id(),
400 local_store.node_runtime.clone(),
401 local_store.http_client.clone(),
402 local_store.language_registry.clone(),
403 local_store.toolchain_store.clone(),
404 local_store.environment.update(cx, |env, cx| {
405 env.get_worktree_environment(worktree.clone(), cx)
406 }),
407 )
408 }
409
410 pub fn new_session(
411 &mut self,
412 binary: DebugAdapterBinary,
413 config: DebugTaskDefinition,
414 worktree: WeakEntity<Worktree>,
415 parent_session: Option<Entity<Session>>,
416 cx: &mut Context<Self>,
417 ) -> (SessionId, Task<Result<Entity<Session>>>) {
418 let session_id = SessionId(util::post_inc(&mut self.next_session_id));
419
420 if let Some(session) = &parent_session {
421 session.update(cx, |session, _| {
422 session.add_child_session_id(session_id);
423 });
424 }
425
426 let (initialized_tx, initialized_rx) = oneshot::channel();
427
428 let start_debugging_tx = self.start_debugging_tx.clone();
429
430 let task = cx.spawn(async move |this, cx| {
431 let start_client_task = this.update(cx, |this, cx| {
432 Session::local(
433 this.breakpoint_store.clone(),
434 worktree.clone(),
435 session_id,
436 parent_session,
437 binary,
438 config,
439 start_debugging_tx.clone(),
440 initialized_tx,
441 cx,
442 )
443 })?;
444
445 let ret = this
446 .update(cx, |_, cx| {
447 create_new_session(session_id, initialized_rx, start_client_task, worktree, cx)
448 })?
449 .await;
450 ret
451 });
452
453 (session_id, task)
454 }
455
456 fn handle_start_debugging_request(
457 &mut self,
458 session_id: SessionId,
459 request: dap::messages::Request,
460 cx: &mut Context<Self>,
461 ) -> Task<Result<()>> {
462 let Some(parent_session) = self.session_by_id(session_id) else {
463 return Task::ready(Err(anyhow!("Session not found")));
464 };
465
466 let Some(worktree) = parent_session
467 .read(cx)
468 .as_local()
469 .map(|local| local.worktree().clone())
470 else {
471 return Task::ready(Err(anyhow!(
472 "Cannot handle start debugging request from remote end"
473 )));
474 };
475
476 let args = serde_json::from_value::<StartDebuggingRequestArguments>(
477 request.arguments.unwrap_or_default(),
478 )
479 .expect("To parse StartDebuggingRequestArguments");
480 let mut binary = parent_session.read(cx).binary().clone();
481 let config = parent_session.read(cx).configuration().unwrap().clone();
482 binary.request_args = args;
483
484 let new_session_task = self
485 .new_session(binary, config, worktree, Some(parent_session.clone()), cx)
486 .1;
487
488 let request_seq = request.seq;
489 cx.spawn(async move |_, cx| {
490 let (success, body) = match new_session_task.await {
491 Ok(_) => (true, None),
492 Err(error) => (
493 false,
494 Some(serde_json::to_value(ErrorResponse {
495 error: Some(dap::Message {
496 id: request_seq,
497 format: error.to_string(),
498 variables: None,
499 send_telemetry: None,
500 show_user: None,
501 url: None,
502 url_label: None,
503 }),
504 })?),
505 ),
506 };
507
508 parent_session
509 .update(cx, |session, cx| {
510 session.respond_to_client(
511 request_seq,
512 success,
513 StartDebugging::COMMAND.to_string(),
514 body,
515 cx,
516 )
517 })?
518 .await
519 })
520 }
521
522 fn handle_run_in_terminal_request(
523 &mut self,
524 session_id: SessionId,
525 request: dap::messages::Request,
526 cx: &mut Context<Self>,
527 ) -> Task<Result<()>> {
528 let Some(session) = self.session_by_id(session_id) else {
529 return Task::ready(Err(anyhow!("Session not found")));
530 };
531
532 let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
533 request.arguments.unwrap_or_default(),
534 )
535 .expect("To parse StartDebuggingRequestArguments");
536
537 let seq = request.seq;
538
539 let cwd = Path::new(&request_args.cwd);
540
541 match cwd.try_exists() {
542 Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
543 return session.update(cx, |session, cx| {
544 session.respond_to_client(
545 seq,
546 false,
547 RunInTerminal::COMMAND.to_string(),
548 serde_json::to_value(dap::ErrorResponse {
549 error: Some(dap::Message {
550 id: seq,
551 format: format!("Received invalid/unknown cwd: {cwd:?}"),
552 variables: None,
553 send_telemetry: None,
554 show_user: None,
555 url: None,
556 url_label: None,
557 }),
558 })
559 .ok(),
560 cx,
561 )
562 });
563 }
564 _ => (),
565 }
566 let mut args = request_args.args.clone();
567
568 // Handle special case for NodeJS debug adapter
569 // If only the Node binary path is provided, we set the command to None
570 // This prevents the NodeJS REPL from appearing, which is not the desired behavior
571 // The expected usage is for users to provide their own Node command, e.g., `node test.js`
572 // This allows the NodeJS debug client to attach correctly
573 let command = if args.len() > 1 {
574 Some(args.remove(0))
575 } else {
576 None
577 };
578
579 let mut envs: HashMap<String, String> = Default::default();
580 if let Some(Value::Object(env)) = request_args.env {
581 for (key, value) in env {
582 let value_str = match (key.as_str(), value) {
583 (_, Value::String(value)) => value,
584 _ => continue,
585 };
586
587 envs.insert(key, value_str);
588 }
589 }
590
591 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
592 let cwd = Some(cwd)
593 .filter(|cwd| cwd.as_os_str().len() > 0)
594 .map(Arc::from)
595 .or_else(|| {
596 self.session_by_id(session_id)
597 .and_then(|session| session.read(cx).binary().cwd.as_deref().map(Arc::from))
598 });
599 cx.emit(DapStoreEvent::RunInTerminal {
600 session_id,
601 title: request_args.title,
602 cwd,
603 command,
604 args,
605 envs,
606 sender: tx,
607 });
608 cx.notify();
609
610 let session = session.downgrade();
611 cx.spawn(async move |_, cx| {
612 let (success, body) = match rx.next().await {
613 Some(Ok(pid)) => (
614 true,
615 serde_json::to_value(dap::RunInTerminalResponse {
616 process_id: None,
617 shell_process_id: Some(pid as u64),
618 })
619 .ok(),
620 ),
621 Some(Err(error)) => (
622 false,
623 serde_json::to_value(dap::ErrorResponse {
624 error: Some(dap::Message {
625 id: seq,
626 format: error.to_string(),
627 variables: None,
628 send_telemetry: None,
629 show_user: None,
630 url: None,
631 url_label: None,
632 }),
633 })
634 .ok(),
635 ),
636 None => (
637 false,
638 serde_json::to_value(dap::ErrorResponse {
639 error: Some(dap::Message {
640 id: seq,
641 format: "failed to receive response from spawn terminal".to_string(),
642 variables: None,
643 send_telemetry: None,
644 show_user: None,
645 url: None,
646 url_label: None,
647 }),
648 })
649 .ok(),
650 ),
651 };
652
653 session
654 .update(cx, |session, cx| {
655 session.respond_to_client(
656 seq,
657 success,
658 RunInTerminal::COMMAND.to_string(),
659 body,
660 cx,
661 )
662 })?
663 .await
664 })
665 }
666
667 pub fn evaluate(
668 &self,
669 session_id: &SessionId,
670 stack_frame_id: u64,
671 expression: String,
672 context: EvaluateArgumentsContext,
673 source: Option<Source>,
674 cx: &mut Context<Self>,
675 ) -> Task<Result<EvaluateResponse>> {
676 let Some(client) = self
677 .session_by_id(session_id)
678 .and_then(|client| client.read(cx).adapter_client())
679 else {
680 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
681 };
682
683 cx.background_executor().spawn(async move {
684 client
685 .request::<Evaluate>(EvaluateArguments {
686 expression: expression.clone(),
687 frame_id: Some(stack_frame_id),
688 context: Some(context),
689 format: None,
690 line: None,
691 column: None,
692 source,
693 })
694 .await
695 })
696 }
697
698 pub fn completions(
699 &self,
700 session_id: &SessionId,
701 stack_frame_id: u64,
702 text: String,
703 completion_column: u64,
704 cx: &mut Context<Self>,
705 ) -> Task<Result<Vec<CompletionItem>>> {
706 let Some(client) = self
707 .session_by_id(session_id)
708 .and_then(|client| client.read(cx).adapter_client())
709 else {
710 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
711 };
712
713 cx.background_executor().spawn(async move {
714 Ok(client
715 .request::<Completions>(CompletionsArguments {
716 frame_id: Some(stack_frame_id),
717 line: None,
718 text,
719 column: completion_column,
720 })
721 .await?
722 .targets)
723 })
724 }
725
726 pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
727 let mut tasks = vec![];
728 for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
729 tasks.push(self.shutdown_session(session_id, cx));
730 }
731
732 cx.background_executor().spawn(async move {
733 futures::future::join_all(tasks).await;
734 })
735 }
736
737 pub fn shutdown_session(
738 &mut self,
739 session_id: SessionId,
740 cx: &mut Context<Self>,
741 ) -> Task<Result<()>> {
742 let Some(session) = self.sessions.remove(&session_id) else {
743 return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
744 };
745
746 let shutdown_children = session
747 .read(cx)
748 .child_session_ids()
749 .iter()
750 .map(|session_id| self.shutdown_session(*session_id, cx))
751 .collect::<Vec<_>>();
752
753 let shutdown_parent_task = if let Some(parent_session) = session
754 .read(cx)
755 .parent_id()
756 .and_then(|session_id| self.session_by_id(session_id))
757 {
758 let shutdown_id = parent_session.update(cx, |parent_session, _| {
759 parent_session.remove_child_session_id(session_id);
760
761 if parent_session.child_session_ids().len() == 0 {
762 Some(parent_session.session_id())
763 } else {
764 None
765 }
766 });
767
768 shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
769 } else {
770 None
771 };
772
773 let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
774
775 cx.background_spawn(async move {
776 if shutdown_children.len() > 0 {
777 let _ = join_all(shutdown_children).await;
778 }
779
780 shutdown_task.await;
781
782 if let Some(parent_task) = shutdown_parent_task {
783 parent_task.await?;
784 }
785
786 Ok(())
787 })
788 }
789
790 pub fn shared(
791 &mut self,
792 project_id: u64,
793 downstream_client: AnyProtoClient,
794 _: &mut Context<Self>,
795 ) {
796 self.downstream_client = Some((downstream_client.clone(), project_id));
797 }
798
799 pub fn unshared(&mut self, cx: &mut Context<Self>) {
800 self.downstream_client.take();
801
802 cx.notify();
803 }
804
805 async fn handle_run_debug_locator(
806 this: Entity<Self>,
807 envelope: TypedEnvelope<proto::RunDebugLocator>,
808 mut cx: AsyncApp,
809 ) -> Result<proto::DebugTaskDefinition> {
810 let template = DebugTaskTemplate {
811 locator: Some(envelope.payload.locator),
812 definition: DebugTaskDefinition::from_proto(
813 envelope
814 .payload
815 .task
816 .ok_or_else(|| anyhow!("missing definition"))?,
817 )?,
818 };
819 let definition = this
820 .update(&mut cx, |this, cx| this.run_debug_locator(template, cx))?
821 .await?;
822 Ok(definition.to_proto())
823 }
824
825 async fn handle_get_debug_adapter_binary(
826 this: Entity<Self>,
827 envelope: TypedEnvelope<proto::GetDebugAdapterBinary>,
828 mut cx: AsyncApp,
829 ) -> Result<proto::DebugAdapterBinary> {
830 let definition = DebugTaskDefinition::from_proto(
831 envelope
832 .payload
833 .task
834 .ok_or_else(|| anyhow!("missing definition"))?,
835 )?;
836 let binary = this
837 .update(&mut cx, |this, cx| {
838 this.get_debug_adapter_binary(definition, cx)
839 })?
840 .await?;
841 Ok(binary.to_proto())
842 }
843}
844
845fn create_new_session(
846 session_id: SessionId,
847 initialized_rx: oneshot::Receiver<()>,
848 start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
849 worktree: WeakEntity<Worktree>,
850 cx: &mut Context<DapStore>,
851) -> Task<Result<Entity<Session>>> {
852 let task = cx.spawn(async move |this, cx| {
853 let session = match start_client_task.await {
854 Ok(session) => session,
855 Err(error) => {
856 this.update(cx, |_, cx| {
857 cx.emit(DapStoreEvent::Notification(error.to_string()));
858 })
859 .log_err();
860
861 return Err(error);
862 }
863 };
864
865 // we have to insert the session early, so we can handle reverse requests
866 // that need the session to be available
867 this.update(cx, |store, cx| {
868 store.sessions.insert(session_id, session.clone());
869 cx.emit(DapStoreEvent::DebugClientStarted(session_id));
870 cx.notify();
871 })?;
872 let seq_result = async || {
873 session
874 .update(cx, |session, cx| session.request_initialize(cx))?
875 .await?;
876
877 session
878 .update(cx, |session, cx| {
879 session.initialize_sequence(initialized_rx, this.clone(), cx)
880 })?
881 .await
882 };
883 match seq_result().await {
884 Ok(_) => {}
885 Err(error) => {
886 this.update(cx, |this, cx| {
887 cx.emit(DapStoreEvent::Notification(error.to_string()));
888 this.shutdown_session(session_id, cx)
889 })?
890 .await
891 .log_err();
892
893 return Err(error);
894 }
895 }
896
897 this.update(cx, |_, cx| {
898 cx.subscribe(
899 &session,
900 move |this: &mut DapStore, session, event: &SessionStateEvent, cx| match event {
901 SessionStateEvent::Shutdown => {
902 this.shutdown_session(session_id, cx).detach_and_log_err(cx);
903 }
904 SessionStateEvent::Restart => {
905 let Some((config, binary)) = session.read_with(cx, |session, _| {
906 session
907 .configuration()
908 .map(|config| (config, session.binary().clone()))
909 }) else {
910 log::error!("Failed to get debug config from session");
911 return;
912 };
913
914 let mut curr_session = session;
915 while let Some(parent_id) = curr_session.read(cx).parent_id() {
916 if let Some(parent_session) = this.sessions.get(&parent_id).cloned() {
917 curr_session = parent_session;
918 } else {
919 log::error!("Failed to get parent session from parent session id");
920 break;
921 }
922 }
923
924 let session_id = curr_session.read(cx).session_id();
925
926 let task = curr_session.update(cx, |session, cx| session.shutdown(cx));
927
928 let worktree = worktree.clone();
929 cx.spawn(async move |this, cx| {
930 task.await;
931
932 this.update(cx, |this, cx| {
933 this.sessions.remove(&session_id);
934 this.new_session(binary, config, worktree, None, cx)
935 })?
936 .1
937 .await?;
938
939 anyhow::Ok(())
940 })
941 .detach_and_log_err(cx);
942 }
943 },
944 )
945 .detach();
946 cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
947 })?;
948
949 Ok(session)
950 });
951 task
952}
953
954#[derive(Clone)]
955pub struct DapAdapterDelegate {
956 fs: Arc<dyn Fs>,
957 worktree_id: WorktreeId,
958 node_runtime: NodeRuntime,
959 http_client: Arc<dyn HttpClient>,
960 language_registry: Arc<LanguageRegistry>,
961 toolchain_store: Arc<dyn LanguageToolchainStore>,
962 updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
963 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
964}
965
966impl DapAdapterDelegate {
967 pub fn new(
968 fs: Arc<dyn Fs>,
969 worktree_id: WorktreeId,
970 node_runtime: NodeRuntime,
971 http_client: Arc<dyn HttpClient>,
972 language_registry: Arc<LanguageRegistry>,
973 toolchain_store: Arc<dyn LanguageToolchainStore>,
974 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
975 ) -> Self {
976 Self {
977 fs,
978 worktree_id,
979 http_client,
980 node_runtime,
981 toolchain_store,
982 language_registry,
983 load_shell_env_task,
984 updated_adapters: Default::default(),
985 }
986 }
987}
988
989#[async_trait(?Send)]
990impl dap::adapters::DapDelegate for DapAdapterDelegate {
991 fn worktree_id(&self) -> WorktreeId {
992 self.worktree_id
993 }
994
995 fn http_client(&self) -> Arc<dyn HttpClient> {
996 self.http_client.clone()
997 }
998
999 fn node_runtime(&self) -> NodeRuntime {
1000 self.node_runtime.clone()
1001 }
1002
1003 fn fs(&self) -> Arc<dyn Fs> {
1004 self.fs.clone()
1005 }
1006
1007 fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
1008 self.updated_adapters.clone()
1009 }
1010
1011 fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
1012 let name = SharedString::from(dap_name.to_string());
1013 let status = match status {
1014 DapStatus::None => BinaryStatus::None,
1015 DapStatus::Downloading => BinaryStatus::Downloading,
1016 DapStatus::Failed { error } => BinaryStatus::Failed { error },
1017 DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
1018 };
1019
1020 self.language_registry
1021 .update_dap_status(LanguageServerName(name), status);
1022 }
1023
1024 fn which(&self, command: &OsStr) -> Option<PathBuf> {
1025 which::which(command).ok()
1026 }
1027
1028 async fn shell_env(&self) -> HashMap<String, String> {
1029 let task = self.load_shell_env_task.clone();
1030 task.await.unwrap_or_default()
1031 }
1032
1033 fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
1034 self.toolchain_store.clone()
1035 }
1036}