1use super::{
2 breakpoint_store::BreakpointStore,
3 // Will need to uncomment this once we implement rpc message handler again
4 // dap_command::{
5 // ContinueCommand, DapCommand, DisconnectCommand, NextCommand, PauseCommand, RestartCommand,
6 // RestartStackFrameCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
7 // TerminateCommand, TerminateThreadsCommand, VariablesCommand,
8 // },
9 session::{self, Session},
10};
11use crate::{debugger, worktree_store::WorktreeStore, ProjectEnvironment};
12use anyhow::{anyhow, Result};
13use async_trait::async_trait;
14use collections::HashMap;
15use dap::{
16 adapters::{DapStatus, DebugAdapterName},
17 client::SessionId,
18 messages::Message,
19 requests::{
20 Completions, Evaluate, Request as _, RunInTerminal, SetExpression, SetVariable,
21 StartDebugging,
22 },
23 Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
24 EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
25 SetExpressionArguments, SetVariableArguments, Source, StartDebuggingRequestArguments,
26};
27use fs::Fs;
28use futures::{
29 channel::{mpsc, oneshot},
30 future::{join_all, Shared},
31};
32use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
33use http_client::HttpClient;
34use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
35use lsp::LanguageServerName;
36use node_runtime::NodeRuntime;
37
38use rpc::{
39 proto::{self},
40 AnyProtoClient, TypedEnvelope,
41};
42use serde_json::Value;
43use settings::WorktreeId;
44use smol::{lock::Mutex, stream::StreamExt};
45use std::{
46 borrow::Borrow,
47 collections::{BTreeMap, HashSet},
48 ffi::OsStr,
49 path::PathBuf,
50 sync::{atomic::Ordering::SeqCst, Arc},
51};
52use std::{collections::VecDeque, sync::atomic::AtomicU32};
53use task::{DebugAdapterConfig, DebugRequestDisposition};
54use util::ResultExt as _;
55use worktree::Worktree;
56
57pub enum DapStoreEvent {
58 DebugClientStarted(SessionId),
59 DebugClientShutdown(SessionId),
60 DebugClientEvent {
61 session_id: SessionId,
62 message: Message,
63 },
64 RunInTerminal {
65 session_id: SessionId,
66 title: Option<String>,
67 cwd: PathBuf,
68 command: Option<String>,
69 args: Vec<String>,
70 envs: HashMap<String, String>,
71 sender: mpsc::Sender<Result<u32>>,
72 },
73 Notification(String),
74 RemoteHasInitialized,
75}
76
77#[allow(clippy::large_enum_variant)]
78pub enum DapStoreMode {
79 Local(LocalDapStore), // ssh host and collab host
80 Remote(RemoteDapStore), // collab guest
81}
82
83pub struct LocalDapStore {
84 fs: Arc<dyn Fs>,
85 node_runtime: NodeRuntime,
86 next_session_id: AtomicU32,
87 http_client: Arc<dyn HttpClient>,
88 worktree_store: Entity<WorktreeStore>,
89 environment: Entity<ProjectEnvironment>,
90 language_registry: Arc<LanguageRegistry>,
91 debug_adapters: Arc<DapRegistry>,
92 toolchain_store: Arc<dyn LanguageToolchainStore>,
93 start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
94 _start_debugging_task: Task<()>,
95}
96
97impl LocalDapStore {
98 fn next_session_id(&self) -> SessionId {
99 SessionId(self.next_session_id.fetch_add(1, SeqCst))
100 }
101}
102
103pub struct RemoteDapStore {
104 upstream_client: AnyProtoClient,
105 upstream_project_id: u64,
106 event_queue: Option<VecDeque<DapStoreEvent>>,
107}
108
109pub struct DapStore {
110 mode: DapStoreMode,
111 downstream_client: Option<(AnyProtoClient, u64)>,
112 breakpoint_store: Entity<BreakpointStore>,
113 sessions: BTreeMap<SessionId, Entity<Session>>,
114}
115
116impl EventEmitter<DapStoreEvent> for DapStore {}
117
118impl DapStore {
119 pub fn init(_client: &AnyProtoClient) {
120 // todo(debugger): Reenable these after we finish handle_dap_command refactor
121 // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
122 // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
123 // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
124 // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
125 // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
126 // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
127 // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
128 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
129 // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
130 // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
131 // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
132 // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
133 }
134
135 #[expect(clippy::too_many_arguments)]
136 pub fn new_local(
137 http_client: Arc<dyn HttpClient>,
138 node_runtime: NodeRuntime,
139 fs: Arc<dyn Fs>,
140 language_registry: Arc<LanguageRegistry>,
141 debug_adapters: Arc<DapRegistry>,
142 environment: Entity<ProjectEnvironment>,
143 toolchain_store: Arc<dyn LanguageToolchainStore>,
144 breakpoint_store: Entity<BreakpointStore>,
145 worktree_store: Entity<WorktreeStore>,
146 cx: &mut Context<Self>,
147 ) -> Self {
148 cx.on_app_quit(Self::shutdown_sessions).detach();
149
150 let (start_debugging_tx, mut message_rx) =
151 futures::channel::mpsc::unbounded::<(SessionId, Message)>();
152
153 let _start_debugging_task = cx.spawn(async move |this, cx| {
154 while let Some((session_id, message)) = message_rx.next().await {
155 match message {
156 Message::Request(request) => {
157 let _ = this
158 .update(cx, |this, cx| {
159 if request.command == StartDebugging::COMMAND {
160 this.handle_start_debugging_request(session_id, request, cx)
161 .detach_and_log_err(cx);
162 } else if request.command == RunInTerminal::COMMAND {
163 this.handle_run_in_terminal_request(session_id, request, cx)
164 .detach_and_log_err(cx);
165 }
166 })
167 .log_err();
168 }
169 _ => {}
170 }
171 }
172 });
173 Self {
174 mode: DapStoreMode::Local(LocalDapStore {
175 fs,
176 environment,
177 http_client,
178 node_runtime,
179 worktree_store,
180 toolchain_store,
181 language_registry,
182 debug_adapters,
183 start_debugging_tx,
184 _start_debugging_task,
185 next_session_id: Default::default(),
186 }),
187 downstream_client: None,
188 breakpoint_store,
189 sessions: Default::default(),
190 }
191 }
192
193 pub fn new_remote(
194 project_id: u64,
195 upstream_client: AnyProtoClient,
196 breakpoint_store: Entity<BreakpointStore>,
197 ) -> Self {
198 Self {
199 mode: DapStoreMode::Remote(RemoteDapStore {
200 upstream_client,
201 upstream_project_id: project_id,
202 event_queue: Some(VecDeque::default()),
203 }),
204 downstream_client: None,
205 breakpoint_store,
206 sessions: Default::default(),
207 }
208 }
209
210 pub fn as_remote(&self) -> Option<&RemoteDapStore> {
211 match &self.mode {
212 DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
213 _ => None,
214 }
215 }
216
217 pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
218 if let DapStoreMode::Remote(remote) = &mut self.mode {
219 remote.event_queue.take()
220 } else {
221 None
222 }
223 }
224
225 pub fn as_local(&self) -> Option<&LocalDapStore> {
226 match &self.mode {
227 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
228 _ => None,
229 }
230 }
231
232 pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
233 match &mut self.mode {
234 DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
235 _ => None,
236 }
237 }
238
239 pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
240 match &self.mode {
241 DapStoreMode::Remote(RemoteDapStore {
242 upstream_client,
243 upstream_project_id,
244 ..
245 }) => Some((upstream_client.clone(), *upstream_project_id)),
246
247 DapStoreMode::Local(_) => None,
248 }
249 }
250
251 pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
252 self.downstream_client.as_ref()
253 }
254
255 pub fn add_remote_client(
256 &mut self,
257 session_id: SessionId,
258 ignore: Option<bool>,
259 cx: &mut Context<Self>,
260 ) {
261 if let DapStoreMode::Remote(remote) = &self.mode {
262 self.sessions.insert(
263 session_id,
264 cx.new(|_| {
265 debugger::session::Session::remote(
266 session_id,
267 remote.upstream_client.clone(),
268 remote.upstream_project_id,
269 ignore.unwrap_or(false),
270 )
271 }),
272 );
273 } else {
274 debug_assert!(false);
275 }
276 }
277
278 pub fn session_by_id(
279 &self,
280 session_id: impl Borrow<SessionId>,
281 ) -> Option<Entity<session::Session>> {
282 let session_id = session_id.borrow();
283 let client = self.sessions.get(session_id).cloned();
284
285 client
286 }
287 pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
288 self.sessions.values()
289 }
290
291 pub fn capabilities_by_id(
292 &self,
293 session_id: impl Borrow<SessionId>,
294 cx: &App,
295 ) -> Option<Capabilities> {
296 let session_id = session_id.borrow();
297 self.sessions
298 .get(session_id)
299 .map(|client| client.read(cx).capabilities.clone())
300 }
301
302 pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
303 &self.breakpoint_store
304 }
305
306 #[allow(dead_code)]
307 async fn handle_ignore_breakpoint_state(
308 this: Entity<Self>,
309 envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
310 mut cx: AsyncApp,
311 ) -> Result<()> {
312 let session_id = SessionId::from_proto(envelope.payload.session_id);
313
314 this.update(&mut cx, |this, cx| {
315 if let Some(session) = this.session_by_id(&session_id) {
316 session.update(cx, |session, cx| {
317 session.set_ignore_breakpoints(envelope.payload.ignore, cx)
318 })
319 } else {
320 Task::ready(())
321 }
322 })?
323 .await;
324
325 Ok(())
326 }
327
328 pub fn new_session(
329 &mut self,
330 config: DebugAdapterConfig,
331 worktree: &Entity<Worktree>,
332 parent_session: Option<Entity<Session>>,
333 cx: &mut Context<Self>,
334 ) -> (SessionId, Task<Result<Entity<Session>>>) {
335 let Some(local_store) = self.as_local() else {
336 unimplemented!("Starting session on remote side");
337 };
338
339 let delegate = DapAdapterDelegate::new(
340 local_store.fs.clone(),
341 worktree.read(cx).id(),
342 local_store.node_runtime.clone(),
343 local_store.http_client.clone(),
344 local_store.language_registry.clone(),
345 local_store.toolchain_store.clone(),
346 local_store.environment.update(cx, |env, cx| {
347 let worktree = worktree.read(cx);
348 env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
349 }),
350 );
351 let session_id = local_store.next_session_id();
352
353 if let Some(session) = &parent_session {
354 session.update(cx, |session, _| {
355 session.add_child_session_id(session_id);
356 });
357 }
358
359 let (initialized_tx, initialized_rx) = oneshot::channel();
360
361 let start_client_task = Session::local(
362 self.breakpoint_store.clone(),
363 session_id,
364 parent_session,
365 delegate,
366 config,
367 local_store.start_debugging_tx.clone(),
368 initialized_tx,
369 local_store.debug_adapters.clone(),
370 cx,
371 );
372
373 let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
374 (session_id, task)
375 }
376 #[cfg(any(test, feature = "test-support"))]
377 pub fn new_fake_session(
378 &mut self,
379 config: DebugAdapterConfig,
380 worktree: &Entity<Worktree>,
381 parent_session: Option<Entity<Session>>,
382 caps: Capabilities,
383 fails: bool,
384 cx: &mut Context<Self>,
385 ) -> (SessionId, Task<Result<Entity<Session>>>) {
386 let Some(local_store) = self.as_local() else {
387 unimplemented!("Starting session on remote side");
388 };
389
390 let delegate = DapAdapterDelegate::new(
391 local_store.fs.clone(),
392 worktree.read(cx).id(),
393 local_store.node_runtime.clone(),
394 local_store.http_client.clone(),
395 local_store.language_registry.clone(),
396 local_store.toolchain_store.clone(),
397 local_store.environment.update(cx, |env, cx| {
398 let worktree = worktree.read(cx);
399 env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
400 }),
401 );
402 let session_id = local_store.next_session_id();
403
404 if let Some(session) = &parent_session {
405 session.update(cx, |session, _| {
406 session.add_child_session_id(session_id);
407 });
408 }
409
410 let (initialized_tx, initialized_rx) = oneshot::channel();
411
412 let start_client_task = Session::fake(
413 self.breakpoint_store.clone(),
414 session_id,
415 parent_session,
416 delegate,
417 config,
418 local_store.start_debugging_tx.clone(),
419 initialized_tx,
420 caps,
421 fails,
422 cx,
423 );
424
425 let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
426 (session_id, task)
427 }
428
429 fn handle_start_debugging_request(
430 &mut self,
431 session_id: SessionId,
432 request: dap::messages::Request,
433 cx: &mut Context<Self>,
434 ) -> Task<Result<()>> {
435 let Some(local_store) = self.as_local() else {
436 unreachable!("Cannot response for non-local session");
437 };
438
439 let Some(parent_session) = self.session_by_id(session_id) else {
440 return Task::ready(Err(anyhow!("Session not found")));
441 };
442
443 let args = serde_json::from_value::<StartDebuggingRequestArguments>(
444 request.arguments.unwrap_or_default(),
445 )
446 .expect("To parse StartDebuggingRequestArguments");
447 let worktree = local_store
448 .worktree_store
449 .update(cx, |this, _| this.worktrees().next())
450 .expect("worktree-less project");
451
452 let Some(config) = parent_session.read(cx).configuration() else {
453 unreachable!("there must be a config for local sessions");
454 };
455
456 let debug_config = DebugAdapterConfig {
457 label: config.label,
458 adapter: config.adapter,
459 request: DebugRequestDisposition::ReverseRequest(args),
460 initialize_args: config.initialize_args.clone(),
461 tcp_connection: config.tcp_connection.clone(),
462 };
463 #[cfg(any(test, feature = "test-support"))]
464 let new_session_task = {
465 let caps = parent_session.read(cx).capabilities.clone();
466 self.new_fake_session(
467 debug_config,
468 &worktree,
469 Some(parent_session.clone()),
470 caps,
471 false,
472 cx,
473 )
474 .1
475 };
476 #[cfg(not(any(test, feature = "test-support")))]
477 let new_session_task = self
478 .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
479 .1;
480
481 let request_seq = request.seq;
482 cx.spawn(async move |_, cx| {
483 let (success, body) = match new_session_task.await {
484 Ok(_) => (true, None),
485 Err(error) => (
486 false,
487 Some(serde_json::to_value(ErrorResponse {
488 error: Some(dap::Message {
489 id: request_seq,
490 format: error.to_string(),
491 variables: None,
492 send_telemetry: None,
493 show_user: None,
494 url: None,
495 url_label: None,
496 }),
497 })?),
498 ),
499 };
500
501 parent_session
502 .update(cx, |session, cx| {
503 session.respond_to_client(
504 request_seq,
505 success,
506 StartDebugging::COMMAND.to_string(),
507 body,
508 cx,
509 )
510 })?
511 .await
512 })
513 }
514
515 fn handle_run_in_terminal_request(
516 &mut self,
517 session_id: SessionId,
518 request: dap::messages::Request,
519 cx: &mut Context<Self>,
520 ) -> Task<Result<()>> {
521 let Some(session) = self.session_by_id(session_id) else {
522 return Task::ready(Err(anyhow!("Session not found")));
523 };
524
525 let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
526 request.arguments.unwrap_or_default(),
527 )
528 .expect("To parse StartDebuggingRequestArguments");
529
530 let seq = request.seq;
531
532 let cwd = PathBuf::from(request_args.cwd);
533 match cwd.try_exists() {
534 Ok(true) => (),
535 Ok(false) | Err(_) => {
536 return session.update(cx, |session, cx| {
537 session.respond_to_client(
538 seq,
539 false,
540 RunInTerminal::COMMAND.to_string(),
541 serde_json::to_value(dap::ErrorResponse {
542 error: Some(dap::Message {
543 id: seq,
544 format: format!("Received invalid/unknown cwd: {cwd:?}"),
545 variables: None,
546 send_telemetry: None,
547 show_user: None,
548 url: None,
549 url_label: None,
550 }),
551 })
552 .ok(),
553 cx,
554 )
555 })
556 }
557 }
558
559 let mut args = request_args.args.clone();
560
561 // Handle special case for NodeJS debug adapter
562 // If only the Node binary path is provided, we set the command to None
563 // This prevents the NodeJS REPL from appearing, which is not the desired behavior
564 // The expected usage is for users to provide their own Node command, e.g., `node test.js`
565 // This allows the NodeJS debug client to attach correctly
566 let command = if args.len() > 1 {
567 Some(args.remove(0))
568 } else {
569 None
570 };
571
572 let mut envs: HashMap<String, String> = Default::default();
573 if let Some(Value::Object(env)) = request_args.env {
574 for (key, value) in env {
575 let value_str = match (key.as_str(), value) {
576 (_, Value::String(value)) => value,
577 _ => continue,
578 };
579
580 envs.insert(key, value_str);
581 }
582 }
583
584 let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
585
586 cx.emit(DapStoreEvent::RunInTerminal {
587 session_id,
588 title: request_args.title,
589 cwd,
590 command,
591 args,
592 envs,
593 sender: tx,
594 });
595 cx.notify();
596
597 let session = session.downgrade();
598 cx.spawn(async move |_, cx| {
599 let (success, body) = match rx.next().await {
600 Some(Ok(pid)) => (
601 true,
602 serde_json::to_value(dap::RunInTerminalResponse {
603 process_id: None,
604 shell_process_id: Some(pid as u64),
605 })
606 .ok(),
607 ),
608 Some(Err(error)) => (
609 false,
610 serde_json::to_value(dap::ErrorResponse {
611 error: Some(dap::Message {
612 id: seq,
613 format: error.to_string(),
614 variables: None,
615 send_telemetry: None,
616 show_user: None,
617 url: None,
618 url_label: None,
619 }),
620 })
621 .ok(),
622 ),
623 None => (
624 false,
625 serde_json::to_value(dap::ErrorResponse {
626 error: Some(dap::Message {
627 id: seq,
628 format: "failed to receive response from spawn terminal".to_string(),
629 variables: None,
630 send_telemetry: None,
631 show_user: None,
632 url: None,
633 url_label: None,
634 }),
635 })
636 .ok(),
637 ),
638 };
639
640 session
641 .update(cx, |session, cx| {
642 session.respond_to_client(
643 seq,
644 success,
645 RunInTerminal::COMMAND.to_string(),
646 body,
647 cx,
648 )
649 })?
650 .await
651 })
652 }
653
654 pub fn evaluate(
655 &self,
656 session_id: &SessionId,
657 stack_frame_id: u64,
658 expression: String,
659 context: EvaluateArgumentsContext,
660 source: Option<Source>,
661 cx: &mut Context<Self>,
662 ) -> Task<Result<EvaluateResponse>> {
663 let Some(client) = self
664 .session_by_id(session_id)
665 .and_then(|client| client.read(cx).adapter_client())
666 else {
667 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
668 };
669
670 cx.background_executor().spawn(async move {
671 client
672 .request::<Evaluate>(EvaluateArguments {
673 expression: expression.clone(),
674 frame_id: Some(stack_frame_id),
675 context: Some(context),
676 format: None,
677 line: None,
678 column: None,
679 source,
680 })
681 .await
682 })
683 }
684
685 pub fn completions(
686 &self,
687 session_id: &SessionId,
688 stack_frame_id: u64,
689 text: String,
690 completion_column: u64,
691 cx: &mut Context<Self>,
692 ) -> Task<Result<Vec<CompletionItem>>> {
693 let Some(client) = self
694 .session_by_id(session_id)
695 .and_then(|client| client.read(cx).adapter_client())
696 else {
697 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
698 };
699
700 cx.background_executor().spawn(async move {
701 Ok(client
702 .request::<Completions>(CompletionsArguments {
703 frame_id: Some(stack_frame_id),
704 line: None,
705 text,
706 column: completion_column,
707 })
708 .await?
709 .targets)
710 })
711 }
712
713 #[allow(clippy::too_many_arguments)]
714 pub fn set_variable_value(
715 &self,
716 session_id: &SessionId,
717 stack_frame_id: u64,
718 variables_reference: u64,
719 name: String,
720 value: String,
721 evaluate_name: Option<String>,
722 cx: &mut Context<Self>,
723 ) -> Task<Result<()>> {
724 let Some(client) = self
725 .session_by_id(session_id)
726 .and_then(|client| client.read(cx).adapter_client())
727 else {
728 return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
729 };
730
731 let supports_set_expression = self
732 .capabilities_by_id(session_id, cx)
733 .and_then(|caps| caps.supports_set_expression)
734 .unwrap_or_default();
735
736 cx.background_executor().spawn(async move {
737 if let Some(evaluate_name) = supports_set_expression.then(|| evaluate_name).flatten() {
738 client
739 .request::<SetExpression>(SetExpressionArguments {
740 expression: evaluate_name,
741 value,
742 frame_id: Some(stack_frame_id),
743 format: None,
744 })
745 .await?;
746 } else {
747 client
748 .request::<SetVariable>(SetVariableArguments {
749 variables_reference,
750 name,
751 value,
752 format: None,
753 })
754 .await?;
755 }
756
757 Ok(())
758 })
759 }
760
761 // .. get the client and what not
762 // let _ = client.modules(); // This can fire a request to a dap adapter or be a cheap getter.
763 // client.wait_for_request(request::Modules); // This ensures that the request that we've fired off runs to completions
764 // let returned_value = client.modules(); // this is a cheap getter.
765
766 pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
767 let mut tasks = vec![];
768 for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
769 tasks.push(self.shutdown_session(session_id, cx));
770 }
771
772 cx.background_executor().spawn(async move {
773 futures::future::join_all(tasks).await;
774 })
775 }
776
777 pub fn shutdown_session(
778 &mut self,
779 session_id: SessionId,
780 cx: &mut Context<Self>,
781 ) -> Task<Result<()>> {
782 let Some(_) = self.as_local_mut() else {
783 return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
784 };
785
786 let Some(session) = self.sessions.remove(&session_id) else {
787 return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
788 };
789
790 let shutdown_children = session
791 .read(cx)
792 .child_session_ids()
793 .iter()
794 .map(|session_id| self.shutdown_session(*session_id, cx))
795 .collect::<Vec<_>>();
796
797 let shutdown_parent_task = if let Some(parent_session) = session
798 .read(cx)
799 .parent_id()
800 .and_then(|session_id| self.session_by_id(session_id))
801 {
802 let shutdown_id = parent_session.update(cx, |parent_session, _| {
803 parent_session.remove_child_session_id(session_id);
804
805 if parent_session.child_session_ids().len() == 0 {
806 Some(parent_session.session_id())
807 } else {
808 None
809 }
810 });
811
812 shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
813 } else {
814 None
815 };
816
817 let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
818
819 cx.background_spawn(async move {
820 if shutdown_children.len() > 0 {
821 let _ = join_all(shutdown_children).await;
822 }
823
824 shutdown_task.await;
825
826 if let Some(parent_task) = shutdown_parent_task {
827 parent_task.await?;
828 }
829
830 Ok(())
831 })
832 }
833
834 pub fn shared(
835 &mut self,
836 project_id: u64,
837 downstream_client: AnyProtoClient,
838 _: &mut Context<Self>,
839 ) {
840 self.downstream_client = Some((downstream_client.clone(), project_id));
841 }
842
843 pub fn unshared(&mut self, cx: &mut Context<Self>) {
844 self.downstream_client.take();
845
846 cx.notify();
847 }
848}
849
850fn create_new_session(
851 session_id: SessionId,
852 initialized_rx: oneshot::Receiver<()>,
853 start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
854 cx: &mut Context<DapStore>,
855) -> Task<Result<Entity<Session>>> {
856 let task = cx.spawn(async move |this, cx| {
857 let session = match start_client_task.await {
858 Ok(session) => session,
859 Err(error) => {
860 this.update(cx, |_, cx| {
861 cx.emit(DapStoreEvent::Notification(error.to_string()));
862 })
863 .log_err();
864
865 return Err(error);
866 }
867 };
868
869 // we have to insert the session early, so we can handle reverse requests
870 // that need the session to be available
871 this.update(cx, |store, cx| {
872 store.sessions.insert(session_id, session.clone());
873 cx.emit(DapStoreEvent::DebugClientStarted(session_id));
874 cx.notify();
875 })?;
876
877 match session
878 .update(cx, |session, cx| {
879 session.initialize_sequence(initialized_rx, cx)
880 })?
881 .await
882 {
883 Ok(_) => {}
884 Err(error) => {
885 this.update(cx, |this, cx| {
886 cx.emit(DapStoreEvent::Notification(error.to_string()));
887
888 this.shutdown_session(session_id, cx)
889 })?
890 .await
891 .log_err();
892
893 return Err(error);
894 }
895 }
896
897 Ok(session)
898 });
899 task
900}
901
902#[derive(Clone)]
903pub struct DapAdapterDelegate {
904 fs: Arc<dyn Fs>,
905 worktree_id: WorktreeId,
906 node_runtime: NodeRuntime,
907 http_client: Arc<dyn HttpClient>,
908 language_registry: Arc<LanguageRegistry>,
909 toolchain_store: Arc<dyn LanguageToolchainStore>,
910 updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
911 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
912}
913
914impl DapAdapterDelegate {
915 pub fn new(
916 fs: Arc<dyn Fs>,
917 worktree_id: WorktreeId,
918 node_runtime: NodeRuntime,
919 http_client: Arc<dyn HttpClient>,
920 language_registry: Arc<LanguageRegistry>,
921 toolchain_store: Arc<dyn LanguageToolchainStore>,
922 load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
923 ) -> Self {
924 Self {
925 fs,
926 worktree_id,
927 http_client,
928 node_runtime,
929 toolchain_store,
930 language_registry,
931 load_shell_env_task,
932 updated_adapters: Default::default(),
933 }
934 }
935}
936
937#[async_trait(?Send)]
938impl dap::adapters::DapDelegate for DapAdapterDelegate {
939 fn worktree_id(&self) -> WorktreeId {
940 self.worktree_id
941 }
942
943 fn http_client(&self) -> Arc<dyn HttpClient> {
944 self.http_client.clone()
945 }
946
947 fn node_runtime(&self) -> NodeRuntime {
948 self.node_runtime.clone()
949 }
950
951 fn fs(&self) -> Arc<dyn Fs> {
952 self.fs.clone()
953 }
954
955 fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
956 self.updated_adapters.clone()
957 }
958
959 fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
960 let name = SharedString::from(dap_name.to_string());
961 let status = match status {
962 DapStatus::None => BinaryStatus::None,
963 DapStatus::Downloading => BinaryStatus::Downloading,
964 DapStatus::Failed { error } => BinaryStatus::Failed { error },
965 DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
966 };
967
968 self.language_registry
969 .update_dap_status(LanguageServerName(name), status);
970 }
971
972 fn which(&self, command: &OsStr) -> Option<PathBuf> {
973 which::which(command).ok()
974 }
975
976 async fn shell_env(&self) -> HashMap<String, String> {
977 let task = self.load_shell_env_task.clone();
978 task.await.unwrap_or_default()
979 }
980
981 fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
982 self.toolchain_store.clone()
983 }
984}