dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    locator_store::LocatorStore,
  4    session::{self, Session},
  5};
  6use crate::{ProjectEnvironment, debugger, worktree_store::WorktreeStore};
  7use anyhow::{Result, anyhow};
  8use async_trait::async_trait;
  9use collections::HashMap;
 10use dap::{
 11    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
 12    EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
 13    Source, StartDebuggingRequestArguments,
 14    adapters::{DapStatus, DebugAdapterName},
 15    client::SessionId,
 16    messages::Message,
 17    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 18};
 19use fs::Fs;
 20use futures::{
 21    channel::{mpsc, oneshot},
 22    future::{Shared, join_all},
 23};
 24use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 25use http_client::HttpClient;
 26use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 27use lsp::LanguageServerName;
 28use node_runtime::NodeRuntime;
 29
 30use rpc::{
 31    AnyProtoClient, TypedEnvelope,
 32    proto::{self},
 33};
 34use serde_json::Value;
 35use settings::WorktreeId;
 36use smol::{lock::Mutex, stream::StreamExt};
 37use std::{
 38    borrow::Borrow,
 39    collections::{BTreeMap, HashSet},
 40    ffi::OsStr,
 41    path::PathBuf,
 42    sync::{Arc, atomic::Ordering::SeqCst},
 43};
 44use std::{collections::VecDeque, sync::atomic::AtomicU32};
 45use task::{DebugAdapterConfig, DebugRequestDisposition};
 46use util::ResultExt as _;
 47use worktree::Worktree;
 48
 49pub enum DapStoreEvent {
 50    DebugClientStarted(SessionId),
 51    DebugClientShutdown(SessionId),
 52    DebugClientEvent {
 53        session_id: SessionId,
 54        message: Message,
 55    },
 56    RunInTerminal {
 57        session_id: SessionId,
 58        title: Option<String>,
 59        cwd: PathBuf,
 60        command: Option<String>,
 61        args: Vec<String>,
 62        envs: HashMap<String, String>,
 63        sender: mpsc::Sender<Result<u32>>,
 64    },
 65    Notification(String),
 66    RemoteHasInitialized,
 67}
 68
 69#[allow(clippy::large_enum_variant)]
 70pub enum DapStoreMode {
 71    Local(LocalDapStore),   // ssh host and collab host
 72    Remote(RemoteDapStore), // collab guest
 73}
 74
 75pub struct LocalDapStore {
 76    fs: Arc<dyn Fs>,
 77    node_runtime: NodeRuntime,
 78    next_session_id: AtomicU32,
 79    http_client: Arc<dyn HttpClient>,
 80    worktree_store: Entity<WorktreeStore>,
 81    environment: Entity<ProjectEnvironment>,
 82    language_registry: Arc<LanguageRegistry>,
 83    debug_adapters: Arc<DapRegistry>,
 84    toolchain_store: Arc<dyn LanguageToolchainStore>,
 85    locator_store: Arc<LocatorStore>,
 86    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 87    _start_debugging_task: Task<()>,
 88}
 89
 90impl LocalDapStore {
 91    fn next_session_id(&self) -> SessionId {
 92        SessionId(self.next_session_id.fetch_add(1, SeqCst))
 93    }
 94}
 95
 96pub struct RemoteDapStore {
 97    upstream_client: AnyProtoClient,
 98    upstream_project_id: u64,
 99    event_queue: Option<VecDeque<DapStoreEvent>>,
100}
101
102pub struct DapStore {
103    mode: DapStoreMode,
104    downstream_client: Option<(AnyProtoClient, u64)>,
105    breakpoint_store: Entity<BreakpointStore>,
106    sessions: BTreeMap<SessionId, Entity<Session>>,
107}
108
109impl EventEmitter<DapStoreEvent> for DapStore {}
110
111impl DapStore {
112    pub fn init(_client: &AnyProtoClient) {
113        // todo(debugger): Reenable these after we finish handle_dap_command refactor
114        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
115        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
116        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
117        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
118        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
119        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
120        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
121        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
125        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
126    }
127
128    #[expect(clippy::too_many_arguments)]
129    pub fn new_local(
130        http_client: Arc<dyn HttpClient>,
131        node_runtime: NodeRuntime,
132        fs: Arc<dyn Fs>,
133        language_registry: Arc<LanguageRegistry>,
134        debug_adapters: Arc<DapRegistry>,
135        environment: Entity<ProjectEnvironment>,
136        toolchain_store: Arc<dyn LanguageToolchainStore>,
137        breakpoint_store: Entity<BreakpointStore>,
138        worktree_store: Entity<WorktreeStore>,
139        cx: &mut Context<Self>,
140    ) -> Self {
141        cx.on_app_quit(Self::shutdown_sessions).detach();
142
143        let (start_debugging_tx, mut message_rx) =
144            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
145
146        let _start_debugging_task = cx.spawn(async move |this, cx| {
147            while let Some((session_id, message)) = message_rx.next().await {
148                match message {
149                    Message::Request(request) => {
150                        let _ = this
151                            .update(cx, |this, cx| {
152                                if request.command == StartDebugging::COMMAND {
153                                    this.handle_start_debugging_request(session_id, request, cx)
154                                        .detach_and_log_err(cx);
155                                } else if request.command == RunInTerminal::COMMAND {
156                                    this.handle_run_in_terminal_request(session_id, request, cx)
157                                        .detach_and_log_err(cx);
158                                }
159                            })
160                            .log_err();
161                    }
162                    _ => {}
163                }
164            }
165        });
166        Self {
167            mode: DapStoreMode::Local(LocalDapStore {
168                fs,
169                environment,
170                http_client,
171                node_runtime,
172                worktree_store,
173                toolchain_store,
174                language_registry,
175                debug_adapters,
176                start_debugging_tx,
177                _start_debugging_task,
178                locator_store: Arc::from(LocatorStore::new()),
179                next_session_id: Default::default(),
180            }),
181            downstream_client: None,
182            breakpoint_store,
183            sessions: Default::default(),
184        }
185    }
186
187    pub fn new_remote(
188        project_id: u64,
189        upstream_client: AnyProtoClient,
190        breakpoint_store: Entity<BreakpointStore>,
191    ) -> Self {
192        Self {
193            mode: DapStoreMode::Remote(RemoteDapStore {
194                upstream_client,
195                upstream_project_id: project_id,
196                event_queue: Some(VecDeque::default()),
197            }),
198            downstream_client: None,
199            breakpoint_store,
200            sessions: Default::default(),
201        }
202    }
203
204    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
205        match &self.mode {
206            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
207            _ => None,
208        }
209    }
210
211    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
212        if let DapStoreMode::Remote(remote) = &mut self.mode {
213            remote.event_queue.take()
214        } else {
215            None
216        }
217    }
218
219    pub fn as_local(&self) -> Option<&LocalDapStore> {
220        match &self.mode {
221            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
222            _ => None,
223        }
224    }
225
226    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
227        match &mut self.mode {
228            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
229            _ => None,
230        }
231    }
232
233    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
234        match &self.mode {
235            DapStoreMode::Remote(RemoteDapStore {
236                upstream_client,
237                upstream_project_id,
238                ..
239            }) => Some((upstream_client.clone(), *upstream_project_id)),
240
241            DapStoreMode::Local(_) => None,
242        }
243    }
244
245    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
246        self.downstream_client.as_ref()
247    }
248
249    pub fn add_remote_client(
250        &mut self,
251        session_id: SessionId,
252        ignore: Option<bool>,
253        cx: &mut Context<Self>,
254    ) {
255        if let DapStoreMode::Remote(remote) = &self.mode {
256            self.sessions.insert(
257                session_id,
258                cx.new(|_| {
259                    debugger::session::Session::remote(
260                        session_id,
261                        remote.upstream_client.clone(),
262                        remote.upstream_project_id,
263                        ignore.unwrap_or(false),
264                    )
265                }),
266            );
267        } else {
268            debug_assert!(false);
269        }
270    }
271
272    pub fn session_by_id(
273        &self,
274        session_id: impl Borrow<SessionId>,
275    ) -> Option<Entity<session::Session>> {
276        let session_id = session_id.borrow();
277        let client = self.sessions.get(session_id).cloned();
278
279        client
280    }
281    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
282        self.sessions.values()
283    }
284
285    pub fn capabilities_by_id(
286        &self,
287        session_id: impl Borrow<SessionId>,
288        cx: &App,
289    ) -> Option<Capabilities> {
290        let session_id = session_id.borrow();
291        self.sessions
292            .get(session_id)
293            .map(|client| client.read(cx).capabilities.clone())
294    }
295
296    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
297        &self.breakpoint_store
298    }
299
300    #[allow(dead_code)]
301    async fn handle_ignore_breakpoint_state(
302        this: Entity<Self>,
303        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
304        mut cx: AsyncApp,
305    ) -> Result<()> {
306        let session_id = SessionId::from_proto(envelope.payload.session_id);
307
308        this.update(&mut cx, |this, cx| {
309            if let Some(session) = this.session_by_id(&session_id) {
310                session.update(cx, |session, cx| {
311                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
312                })
313            } else {
314                Task::ready(())
315            }
316        })?
317        .await;
318
319        Ok(())
320    }
321
322    pub fn new_session(
323        &mut self,
324        mut config: DebugAdapterConfig,
325        worktree: &Entity<Worktree>,
326        parent_session: Option<Entity<Session>>,
327        cx: &mut Context<Self>,
328    ) -> (SessionId, Task<Result<Entity<Session>>>) {
329        let Some(local_store) = self.as_local() else {
330            unimplemented!("Starting session on remote side");
331        };
332
333        let delegate = DapAdapterDelegate::new(
334            local_store.fs.clone(),
335            worktree.read(cx).id(),
336            local_store.node_runtime.clone(),
337            local_store.http_client.clone(),
338            local_store.language_registry.clone(),
339            local_store.toolchain_store.clone(),
340            local_store.environment.update(cx, |env, cx| {
341                let worktree = worktree.read(cx);
342                env.get_environment(worktree.abs_path().into(), cx)
343            }),
344        );
345        let session_id = local_store.next_session_id();
346
347        if let Some(session) = &parent_session {
348            session.update(cx, |session, _| {
349                session.add_child_session_id(session_id);
350            });
351        }
352
353        let (initialized_tx, initialized_rx) = oneshot::channel();
354        let locator_store = local_store.locator_store.clone();
355        let debug_adapters = local_store.debug_adapters.clone();
356
357        let start_debugging_tx = local_store.start_debugging_tx.clone();
358
359        let task = cx.spawn(async move |this, cx| {
360            if config.locator.is_some() {
361                config = cx
362                    .background_spawn(async move {
363                        locator_store
364                            .resolve_debug_config(&mut config)
365                            .await
366                            .map(|_| config)
367                    })
368                    .await?;
369            }
370
371            let start_client_task = this.update(cx, |this, cx| {
372                Session::local(
373                    this.breakpoint_store.clone(),
374                    session_id,
375                    parent_session,
376                    delegate,
377                    config,
378                    start_debugging_tx.clone(),
379                    initialized_tx,
380                    debug_adapters,
381                    cx,
382                )
383            })?;
384
385            this.update(cx, |_, cx| {
386                create_new_session(session_id, initialized_rx, start_client_task, cx)
387            })?
388            .await
389        });
390
391        (session_id, task)
392    }
393
394    #[cfg(any(test, feature = "test-support"))]
395    pub fn new_fake_session(
396        &mut self,
397        config: DebugAdapterConfig,
398        worktree: &Entity<Worktree>,
399        parent_session: Option<Entity<Session>>,
400        caps: Capabilities,
401        fails: bool,
402        cx: &mut Context<Self>,
403    ) -> (SessionId, Task<Result<Entity<Session>>>) {
404        let Some(local_store) = self.as_local() else {
405            unimplemented!("Starting session on remote side");
406        };
407
408        let delegate = DapAdapterDelegate::new(
409            local_store.fs.clone(),
410            worktree.read(cx).id(),
411            local_store.node_runtime.clone(),
412            local_store.http_client.clone(),
413            local_store.language_registry.clone(),
414            local_store.toolchain_store.clone(),
415            local_store.environment.update(cx, |env, cx| {
416                let worktree = worktree.read(cx);
417                env.get_environment(Some(worktree.abs_path()), cx)
418            }),
419        );
420        let session_id = local_store.next_session_id();
421
422        if let Some(session) = &parent_session {
423            session.update(cx, |session, _| {
424                session.add_child_session_id(session_id);
425            });
426        }
427
428        let (initialized_tx, initialized_rx) = oneshot::channel();
429
430        let start_client_task = Session::fake(
431            self.breakpoint_store.clone(),
432            session_id,
433            parent_session,
434            delegate,
435            config,
436            local_store.start_debugging_tx.clone(),
437            initialized_tx,
438            caps,
439            fails,
440            cx,
441        );
442
443        let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
444        (session_id, task)
445    }
446
447    fn handle_start_debugging_request(
448        &mut self,
449        session_id: SessionId,
450        request: dap::messages::Request,
451        cx: &mut Context<Self>,
452    ) -> Task<Result<()>> {
453        let Some(local_store) = self.as_local() else {
454            unreachable!("Cannot response for non-local session");
455        };
456
457        let Some(parent_session) = self.session_by_id(session_id) else {
458            return Task::ready(Err(anyhow!("Session not found")));
459        };
460
461        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
462            request.arguments.unwrap_or_default(),
463        )
464        .expect("To parse StartDebuggingRequestArguments");
465        let worktree = local_store
466            .worktree_store
467            .update(cx, |this, _| this.worktrees().next())
468            .expect("worktree-less project");
469
470        let Some(config) = parent_session.read(cx).configuration() else {
471            unreachable!("there must be a config for local sessions");
472        };
473
474        let debug_config = DebugAdapterConfig {
475            label: config.label,
476            adapter: config.adapter,
477            request: DebugRequestDisposition::ReverseRequest(args),
478            initialize_args: config.initialize_args.clone(),
479            tcp_connection: config.tcp_connection.clone(),
480            locator: None,
481            stop_on_entry: config.stop_on_entry,
482        };
483
484        #[cfg(any(test, feature = "test-support"))]
485        let new_session_task = {
486            let caps = parent_session.read(cx).capabilities.clone();
487            self.new_fake_session(
488                debug_config,
489                &worktree,
490                Some(parent_session.clone()),
491                caps,
492                false,
493                cx,
494            )
495            .1
496        };
497        #[cfg(not(any(test, feature = "test-support")))]
498        let new_session_task = self
499            .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
500            .1;
501
502        let request_seq = request.seq;
503        cx.spawn(async move |_, cx| {
504            let (success, body) = match new_session_task.await {
505                Ok(_) => (true, None),
506                Err(error) => (
507                    false,
508                    Some(serde_json::to_value(ErrorResponse {
509                        error: Some(dap::Message {
510                            id: request_seq,
511                            format: error.to_string(),
512                            variables: None,
513                            send_telemetry: None,
514                            show_user: None,
515                            url: None,
516                            url_label: None,
517                        }),
518                    })?),
519                ),
520            };
521
522            parent_session
523                .update(cx, |session, cx| {
524                    session.respond_to_client(
525                        request_seq,
526                        success,
527                        StartDebugging::COMMAND.to_string(),
528                        body,
529                        cx,
530                    )
531                })?
532                .await
533        })
534    }
535
536    fn handle_run_in_terminal_request(
537        &mut self,
538        session_id: SessionId,
539        request: dap::messages::Request,
540        cx: &mut Context<Self>,
541    ) -> Task<Result<()>> {
542        let Some(session) = self.session_by_id(session_id) else {
543            return Task::ready(Err(anyhow!("Session not found")));
544        };
545
546        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
547            request.arguments.unwrap_or_default(),
548        )
549        .expect("To parse StartDebuggingRequestArguments");
550
551        let seq = request.seq;
552
553        let cwd = PathBuf::from(request_args.cwd);
554        match cwd.try_exists() {
555            Ok(true) => (),
556            Ok(false) | Err(_) => {
557                return session.update(cx, |session, cx| {
558                    session.respond_to_client(
559                        seq,
560                        false,
561                        RunInTerminal::COMMAND.to_string(),
562                        serde_json::to_value(dap::ErrorResponse {
563                            error: Some(dap::Message {
564                                id: seq,
565                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
566                                variables: None,
567                                send_telemetry: None,
568                                show_user: None,
569                                url: None,
570                                url_label: None,
571                            }),
572                        })
573                        .ok(),
574                        cx,
575                    )
576                });
577            }
578        }
579
580        let mut args = request_args.args.clone();
581
582        // Handle special case for NodeJS debug adapter
583        // If only the Node binary path is provided, we set the command to None
584        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
585        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
586        // This allows the NodeJS debug client to attach correctly
587        let command = if args.len() > 1 {
588            Some(args.remove(0))
589        } else {
590            None
591        };
592
593        let mut envs: HashMap<String, String> = Default::default();
594        if let Some(Value::Object(env)) = request_args.env {
595            for (key, value) in env {
596                let value_str = match (key.as_str(), value) {
597                    (_, Value::String(value)) => value,
598                    _ => continue,
599                };
600
601                envs.insert(key, value_str);
602            }
603        }
604
605        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
606
607        cx.emit(DapStoreEvent::RunInTerminal {
608            session_id,
609            title: request_args.title,
610            cwd,
611            command,
612            args,
613            envs,
614            sender: tx,
615        });
616        cx.notify();
617
618        let session = session.downgrade();
619        cx.spawn(async move |_, cx| {
620            let (success, body) = match rx.next().await {
621                Some(Ok(pid)) => (
622                    true,
623                    serde_json::to_value(dap::RunInTerminalResponse {
624                        process_id: None,
625                        shell_process_id: Some(pid as u64),
626                    })
627                    .ok(),
628                ),
629                Some(Err(error)) => (
630                    false,
631                    serde_json::to_value(dap::ErrorResponse {
632                        error: Some(dap::Message {
633                            id: seq,
634                            format: error.to_string(),
635                            variables: None,
636                            send_telemetry: None,
637                            show_user: None,
638                            url: None,
639                            url_label: None,
640                        }),
641                    })
642                    .ok(),
643                ),
644                None => (
645                    false,
646                    serde_json::to_value(dap::ErrorResponse {
647                        error: Some(dap::Message {
648                            id: seq,
649                            format: "failed to receive response from spawn terminal".to_string(),
650                            variables: None,
651                            send_telemetry: None,
652                            show_user: None,
653                            url: None,
654                            url_label: None,
655                        }),
656                    })
657                    .ok(),
658                ),
659            };
660
661            session
662                .update(cx, |session, cx| {
663                    session.respond_to_client(
664                        seq,
665                        success,
666                        RunInTerminal::COMMAND.to_string(),
667                        body,
668                        cx,
669                    )
670                })?
671                .await
672        })
673    }
674
675    pub fn evaluate(
676        &self,
677        session_id: &SessionId,
678        stack_frame_id: u64,
679        expression: String,
680        context: EvaluateArgumentsContext,
681        source: Option<Source>,
682        cx: &mut Context<Self>,
683    ) -> Task<Result<EvaluateResponse>> {
684        let Some(client) = self
685            .session_by_id(session_id)
686            .and_then(|client| client.read(cx).adapter_client())
687        else {
688            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
689        };
690
691        cx.background_executor().spawn(async move {
692            client
693                .request::<Evaluate>(EvaluateArguments {
694                    expression: expression.clone(),
695                    frame_id: Some(stack_frame_id),
696                    context: Some(context),
697                    format: None,
698                    line: None,
699                    column: None,
700                    source,
701                })
702                .await
703        })
704    }
705
706    pub fn completions(
707        &self,
708        session_id: &SessionId,
709        stack_frame_id: u64,
710        text: String,
711        completion_column: u64,
712        cx: &mut Context<Self>,
713    ) -> Task<Result<Vec<CompletionItem>>> {
714        let Some(client) = self
715            .session_by_id(session_id)
716            .and_then(|client| client.read(cx).adapter_client())
717        else {
718            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
719        };
720
721        cx.background_executor().spawn(async move {
722            Ok(client
723                .request::<Completions>(CompletionsArguments {
724                    frame_id: Some(stack_frame_id),
725                    line: None,
726                    text,
727                    column: completion_column,
728                })
729                .await?
730                .targets)
731        })
732    }
733
734    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
735        let mut tasks = vec![];
736        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
737            tasks.push(self.shutdown_session(session_id, cx));
738        }
739
740        cx.background_executor().spawn(async move {
741            futures::future::join_all(tasks).await;
742        })
743    }
744
745    pub fn shutdown_session(
746        &mut self,
747        session_id: SessionId,
748        cx: &mut Context<Self>,
749    ) -> Task<Result<()>> {
750        let Some(_) = self.as_local_mut() else {
751            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
752        };
753
754        let Some(session) = self.sessions.remove(&session_id) else {
755            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
756        };
757
758        let shutdown_children = session
759            .read(cx)
760            .child_session_ids()
761            .iter()
762            .map(|session_id| self.shutdown_session(*session_id, cx))
763            .collect::<Vec<_>>();
764
765        let shutdown_parent_task = if let Some(parent_session) = session
766            .read(cx)
767            .parent_id()
768            .and_then(|session_id| self.session_by_id(session_id))
769        {
770            let shutdown_id = parent_session.update(cx, |parent_session, _| {
771                parent_session.remove_child_session_id(session_id);
772
773                if parent_session.child_session_ids().len() == 0 {
774                    Some(parent_session.session_id())
775                } else {
776                    None
777                }
778            });
779
780            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
781        } else {
782            None
783        };
784
785        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
786
787        cx.background_spawn(async move {
788            if shutdown_children.len() > 0 {
789                let _ = join_all(shutdown_children).await;
790            }
791
792            shutdown_task.await;
793
794            if let Some(parent_task) = shutdown_parent_task {
795                parent_task.await?;
796            }
797
798            Ok(())
799        })
800    }
801
802    pub fn shared(
803        &mut self,
804        project_id: u64,
805        downstream_client: AnyProtoClient,
806        _: &mut Context<Self>,
807    ) {
808        self.downstream_client = Some((downstream_client.clone(), project_id));
809    }
810
811    pub fn unshared(&mut self, cx: &mut Context<Self>) {
812        self.downstream_client.take();
813
814        cx.notify();
815    }
816}
817
818fn create_new_session(
819    session_id: SessionId,
820    initialized_rx: oneshot::Receiver<()>,
821    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
822    cx: &mut Context<DapStore>,
823) -> Task<Result<Entity<Session>>> {
824    let task = cx.spawn(async move |this, cx| {
825        let session = match start_client_task.await {
826            Ok(session) => session,
827            Err(error) => {
828                this.update(cx, |_, cx| {
829                    cx.emit(DapStoreEvent::Notification(error.to_string()));
830                })
831                .log_err();
832
833                return Err(error);
834            }
835        };
836
837        // we have to insert the session early, so we can handle reverse requests
838        // that need the session to be available
839        this.update(cx, |store, cx| {
840            store.sessions.insert(session_id, session.clone());
841            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
842            cx.notify();
843        })?;
844
845        match session
846            .update(cx, |session, cx| {
847                session.initialize_sequence(initialized_rx, cx)
848            })?
849            .await
850        {
851            Ok(_) => {}
852            Err(error) => {
853                this.update(cx, |this, cx| {
854                    cx.emit(DapStoreEvent::Notification(error.to_string()));
855
856                    this.shutdown_session(session_id, cx)
857                })?
858                .await
859                .log_err();
860
861                return Err(error);
862            }
863        }
864
865        Ok(session)
866    });
867    task
868}
869
870#[derive(Clone)]
871pub struct DapAdapterDelegate {
872    fs: Arc<dyn Fs>,
873    worktree_id: WorktreeId,
874    node_runtime: NodeRuntime,
875    http_client: Arc<dyn HttpClient>,
876    language_registry: Arc<LanguageRegistry>,
877    toolchain_store: Arc<dyn LanguageToolchainStore>,
878    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
879    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
880}
881
882impl DapAdapterDelegate {
883    pub fn new(
884        fs: Arc<dyn Fs>,
885        worktree_id: WorktreeId,
886        node_runtime: NodeRuntime,
887        http_client: Arc<dyn HttpClient>,
888        language_registry: Arc<LanguageRegistry>,
889        toolchain_store: Arc<dyn LanguageToolchainStore>,
890        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
891    ) -> Self {
892        Self {
893            fs,
894            worktree_id,
895            http_client,
896            node_runtime,
897            toolchain_store,
898            language_registry,
899            load_shell_env_task,
900            updated_adapters: Default::default(),
901        }
902    }
903}
904
905#[async_trait(?Send)]
906impl dap::adapters::DapDelegate for DapAdapterDelegate {
907    fn worktree_id(&self) -> WorktreeId {
908        self.worktree_id
909    }
910
911    fn http_client(&self) -> Arc<dyn HttpClient> {
912        self.http_client.clone()
913    }
914
915    fn node_runtime(&self) -> NodeRuntime {
916        self.node_runtime.clone()
917    }
918
919    fn fs(&self) -> Arc<dyn Fs> {
920        self.fs.clone()
921    }
922
923    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
924        self.updated_adapters.clone()
925    }
926
927    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
928        let name = SharedString::from(dap_name.to_string());
929        let status = match status {
930            DapStatus::None => BinaryStatus::None,
931            DapStatus::Downloading => BinaryStatus::Downloading,
932            DapStatus::Failed { error } => BinaryStatus::Failed { error },
933            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
934        };
935
936        self.language_registry
937            .update_dap_status(LanguageServerName(name), status);
938    }
939
940    fn which(&self, command: &OsStr) -> Option<PathBuf> {
941        which::which(command).ok()
942    }
943
944    async fn shell_env(&self) -> HashMap<String, String> {
945        let task = self.load_shell_env_task.clone();
946        task.await.unwrap_or_default()
947    }
948
949    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
950        self.toolchain_store.clone()
951    }
952}