dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    locator_store::LocatorStore,
  4    session::{self, Session, SessionStateEvent},
  5};
  6use crate::{ProjectEnvironment, debugger, worktree_store::WorktreeStore};
  7use anyhow::{Result, anyhow};
  8use async_trait::async_trait;
  9use collections::HashMap;
 10use dap::{
 11    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
 12    EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
 13    Source, StartDebuggingRequestArguments,
 14    adapters::{DapStatus, DebugAdapterName},
 15    client::SessionId,
 16    messages::Message,
 17    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 18};
 19use fs::Fs;
 20use futures::{
 21    channel::{mpsc, oneshot},
 22    future::{Shared, join_all},
 23};
 24use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 25use http_client::HttpClient;
 26use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 27use lsp::LanguageServerName;
 28use node_runtime::NodeRuntime;
 29
 30use rpc::{
 31    AnyProtoClient, TypedEnvelope,
 32    proto::{self},
 33};
 34use serde_json::Value;
 35use settings::WorktreeId;
 36use smol::{lock::Mutex, stream::StreamExt};
 37use std::{
 38    borrow::Borrow,
 39    collections::{BTreeMap, HashSet},
 40    ffi::OsStr,
 41    path::{Path, PathBuf},
 42    sync::{Arc, atomic::Ordering::SeqCst},
 43};
 44use std::{collections::VecDeque, sync::atomic::AtomicU32};
 45use task::{DebugAdapterConfig, DebugRequestDisposition};
 46use util::ResultExt as _;
 47use worktree::Worktree;
 48
 49pub enum DapStoreEvent {
 50    DebugClientStarted(SessionId),
 51    DebugSessionInitialized(SessionId),
 52    DebugClientShutdown(SessionId),
 53    DebugClientEvent {
 54        session_id: SessionId,
 55        message: Message,
 56    },
 57    RunInTerminal {
 58        session_id: SessionId,
 59        title: Option<String>,
 60        cwd: Option<Arc<Path>>,
 61        command: Option<String>,
 62        args: Vec<String>,
 63        envs: HashMap<String, String>,
 64        sender: mpsc::Sender<Result<u32>>,
 65    },
 66    Notification(String),
 67    RemoteHasInitialized,
 68}
 69
 70#[allow(clippy::large_enum_variant)]
 71pub enum DapStoreMode {
 72    Local(LocalDapStore),   // ssh host and collab host
 73    Remote(RemoteDapStore), // collab guest
 74}
 75
 76pub struct LocalDapStore {
 77    fs: Arc<dyn Fs>,
 78    node_runtime: NodeRuntime,
 79    next_session_id: AtomicU32,
 80    http_client: Arc<dyn HttpClient>,
 81    worktree_store: Entity<WorktreeStore>,
 82    environment: Entity<ProjectEnvironment>,
 83    language_registry: Arc<LanguageRegistry>,
 84    debug_adapters: Arc<DapRegistry>,
 85    toolchain_store: Arc<dyn LanguageToolchainStore>,
 86    locator_store: Arc<LocatorStore>,
 87    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 88    _start_debugging_task: Task<()>,
 89}
 90
 91impl LocalDapStore {
 92    fn next_session_id(&self) -> SessionId {
 93        SessionId(self.next_session_id.fetch_add(1, SeqCst))
 94    }
 95}
 96
 97pub struct RemoteDapStore {
 98    upstream_client: AnyProtoClient,
 99    upstream_project_id: u64,
100    event_queue: Option<VecDeque<DapStoreEvent>>,
101}
102
103pub struct DapStore {
104    mode: DapStoreMode,
105    downstream_client: Option<(AnyProtoClient, u64)>,
106    breakpoint_store: Entity<BreakpointStore>,
107    sessions: BTreeMap<SessionId, Entity<Session>>,
108}
109
110impl EventEmitter<DapStoreEvent> for DapStore {}
111
112impl DapStore {
113    pub fn init(_client: &AnyProtoClient) {
114        // todo(debugger): Reenable these after we finish handle_dap_command refactor
115        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
116        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
117        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
118        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
119        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
120        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
121        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
125        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
126        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
127    }
128
129    #[expect(clippy::too_many_arguments)]
130    pub fn new_local(
131        http_client: Arc<dyn HttpClient>,
132        node_runtime: NodeRuntime,
133        fs: Arc<dyn Fs>,
134        language_registry: Arc<LanguageRegistry>,
135        debug_adapters: Arc<DapRegistry>,
136        environment: Entity<ProjectEnvironment>,
137        toolchain_store: Arc<dyn LanguageToolchainStore>,
138        breakpoint_store: Entity<BreakpointStore>,
139        worktree_store: Entity<WorktreeStore>,
140        cx: &mut Context<Self>,
141    ) -> Self {
142        cx.on_app_quit(Self::shutdown_sessions).detach();
143
144        let (start_debugging_tx, mut message_rx) =
145            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
146
147        let _start_debugging_task = cx.spawn(async move |this, cx| {
148            while let Some((session_id, message)) = message_rx.next().await {
149                match message {
150                    Message::Request(request) => {
151                        let _ = this
152                            .update(cx, |this, cx| {
153                                if request.command == StartDebugging::COMMAND {
154                                    this.handle_start_debugging_request(session_id, request, cx)
155                                        .detach_and_log_err(cx);
156                                } else if request.command == RunInTerminal::COMMAND {
157                                    this.handle_run_in_terminal_request(session_id, request, cx)
158                                        .detach_and_log_err(cx);
159                                }
160                            })
161                            .log_err();
162                    }
163                    _ => {}
164                }
165            }
166        });
167        Self {
168            mode: DapStoreMode::Local(LocalDapStore {
169                fs,
170                environment,
171                http_client,
172                node_runtime,
173                worktree_store,
174                toolchain_store,
175                language_registry,
176                debug_adapters,
177                start_debugging_tx,
178                _start_debugging_task,
179                locator_store: Arc::from(LocatorStore::new()),
180                next_session_id: Default::default(),
181            }),
182            downstream_client: None,
183            breakpoint_store,
184            sessions: Default::default(),
185        }
186    }
187
188    pub fn new_remote(
189        project_id: u64,
190        upstream_client: AnyProtoClient,
191        breakpoint_store: Entity<BreakpointStore>,
192    ) -> Self {
193        Self {
194            mode: DapStoreMode::Remote(RemoteDapStore {
195                upstream_client,
196                upstream_project_id: project_id,
197                event_queue: Some(VecDeque::default()),
198            }),
199            downstream_client: None,
200            breakpoint_store,
201            sessions: Default::default(),
202        }
203    }
204
205    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
206        match &self.mode {
207            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
208            _ => None,
209        }
210    }
211
212    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
213        if let DapStoreMode::Remote(remote) = &mut self.mode {
214            remote.event_queue.take()
215        } else {
216            None
217        }
218    }
219
220    pub fn as_local(&self) -> Option<&LocalDapStore> {
221        match &self.mode {
222            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
223            _ => None,
224        }
225    }
226
227    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
228        match &mut self.mode {
229            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
230            _ => None,
231        }
232    }
233
234    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
235        match &self.mode {
236            DapStoreMode::Remote(RemoteDapStore {
237                upstream_client,
238                upstream_project_id,
239                ..
240            }) => Some((upstream_client.clone(), *upstream_project_id)),
241
242            DapStoreMode::Local(_) => None,
243        }
244    }
245
246    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
247        self.downstream_client.as_ref()
248    }
249
250    pub fn add_remote_client(
251        &mut self,
252        session_id: SessionId,
253        ignore: Option<bool>,
254        cx: &mut Context<Self>,
255    ) {
256        if let DapStoreMode::Remote(remote) = &self.mode {
257            self.sessions.insert(
258                session_id,
259                cx.new(|_| {
260                    debugger::session::Session::remote(
261                        session_id,
262                        remote.upstream_client.clone(),
263                        remote.upstream_project_id,
264                        ignore.unwrap_or(false),
265                    )
266                }),
267            );
268        } else {
269            debug_assert!(false);
270        }
271    }
272
273    pub fn session_by_id(
274        &self,
275        session_id: impl Borrow<SessionId>,
276    ) -> Option<Entity<session::Session>> {
277        let session_id = session_id.borrow();
278        let client = self.sessions.get(session_id).cloned();
279
280        client
281    }
282    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
283        self.sessions.values()
284    }
285
286    pub fn capabilities_by_id(
287        &self,
288        session_id: impl Borrow<SessionId>,
289        cx: &App,
290    ) -> Option<Capabilities> {
291        let session_id = session_id.borrow();
292        self.sessions
293            .get(session_id)
294            .map(|client| client.read(cx).capabilities.clone())
295    }
296
297    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
298        &self.breakpoint_store
299    }
300
301    #[allow(dead_code)]
302    async fn handle_ignore_breakpoint_state(
303        this: Entity<Self>,
304        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
305        mut cx: AsyncApp,
306    ) -> Result<()> {
307        let session_id = SessionId::from_proto(envelope.payload.session_id);
308
309        this.update(&mut cx, |this, cx| {
310            if let Some(session) = this.session_by_id(&session_id) {
311                session.update(cx, |session, cx| {
312                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
313                })
314            } else {
315                Task::ready(())
316            }
317        })?
318        .await;
319
320        Ok(())
321    }
322
323    pub fn new_session(
324        &mut self,
325        mut config: DebugAdapterConfig,
326        worktree: &Entity<Worktree>,
327        parent_session: Option<Entity<Session>>,
328        cx: &mut Context<Self>,
329    ) -> (SessionId, Task<Result<Entity<Session>>>) {
330        let Some(local_store) = self.as_local() else {
331            unimplemented!("Starting session on remote side");
332        };
333
334        let delegate = DapAdapterDelegate::new(
335            local_store.fs.clone(),
336            worktree.read(cx).id(),
337            local_store.node_runtime.clone(),
338            local_store.http_client.clone(),
339            local_store.language_registry.clone(),
340            local_store.toolchain_store.clone(),
341            local_store.environment.update(cx, |env, cx| {
342                env.get_worktree_environment(worktree.clone(), cx)
343            }),
344        );
345        let session_id = local_store.next_session_id();
346
347        if let Some(session) = &parent_session {
348            session.update(cx, |session, _| {
349                session.add_child_session_id(session_id);
350            });
351        }
352
353        let (initialized_tx, initialized_rx) = oneshot::channel();
354        let locator_store = local_store.locator_store.clone();
355        let debug_adapters = local_store.debug_adapters.clone();
356
357        let start_debugging_tx = local_store.start_debugging_tx.clone();
358
359        let task = cx.spawn(async move |this, cx| {
360            if config.locator.is_some() {
361                config = cx
362                    .background_spawn(async move {
363                        locator_store
364                            .resolve_debug_config(&mut config)
365                            .await
366                            .map(|_| config)
367                    })
368                    .await?;
369            }
370
371            let start_client_task = this.update(cx, |this, cx| {
372                Session::local(
373                    this.breakpoint_store.clone(),
374                    session_id,
375                    parent_session,
376                    delegate,
377                    config,
378                    start_debugging_tx.clone(),
379                    initialized_tx,
380                    debug_adapters,
381                    cx,
382                )
383            })?;
384
385            this.update(cx, |_, cx| {
386                create_new_session(session_id, initialized_rx, start_client_task, cx)
387            })?
388            .await
389        });
390
391        (session_id, task)
392    }
393
394    #[cfg(any(test, feature = "test-support"))]
395    pub fn new_fake_session(
396        &mut self,
397        config: DebugAdapterConfig,
398        worktree: &Entity<Worktree>,
399        parent_session: Option<Entity<Session>>,
400        caps: Capabilities,
401        fails: bool,
402        cx: &mut Context<Self>,
403    ) -> (SessionId, Task<Result<Entity<Session>>>) {
404        let Some(local_store) = self.as_local() else {
405            unimplemented!("Starting session on remote side");
406        };
407
408        let delegate = DapAdapterDelegate::new(
409            local_store.fs.clone(),
410            worktree.read(cx).id(),
411            local_store.node_runtime.clone(),
412            local_store.http_client.clone(),
413            local_store.language_registry.clone(),
414            local_store.toolchain_store.clone(),
415            local_store.environment.update(cx, |env, cx| {
416                env.get_worktree_environment(worktree.clone(), cx)
417            }),
418        );
419        let session_id = local_store.next_session_id();
420
421        if let Some(session) = &parent_session {
422            session.update(cx, |session, _| {
423                session.add_child_session_id(session_id);
424            });
425        }
426
427        let (initialized_tx, initialized_rx) = oneshot::channel();
428
429        let start_client_task = Session::fake(
430            self.breakpoint_store.clone(),
431            session_id,
432            parent_session,
433            delegate,
434            config,
435            local_store.start_debugging_tx.clone(),
436            initialized_tx,
437            caps,
438            fails,
439            cx,
440        );
441
442        let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
443        (session_id, task)
444    }
445
446    fn handle_start_debugging_request(
447        &mut self,
448        session_id: SessionId,
449        request: dap::messages::Request,
450        cx: &mut Context<Self>,
451    ) -> Task<Result<()>> {
452        let Some(local_store) = self.as_local() else {
453            unreachable!("Cannot response for non-local session");
454        };
455
456        let Some(parent_session) = self.session_by_id(session_id) else {
457            return Task::ready(Err(anyhow!("Session not found")));
458        };
459
460        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
461            request.arguments.unwrap_or_default(),
462        )
463        .expect("To parse StartDebuggingRequestArguments");
464        let worktree = local_store
465            .worktree_store
466            .update(cx, |this, _| this.worktrees().next())
467            .expect("worktree-less project");
468
469        let Some(config) = parent_session.read(cx).configuration() else {
470            unreachable!("there must be a config for local sessions");
471        };
472
473        let debug_config = DebugAdapterConfig {
474            label: config.label,
475            adapter: config.adapter,
476            request: DebugRequestDisposition::ReverseRequest(args),
477            initialize_args: config.initialize_args.clone(),
478            tcp_connection: config.tcp_connection.clone(),
479            locator: None,
480            stop_on_entry: config.stop_on_entry,
481        };
482
483        #[cfg(any(test, feature = "test-support"))]
484        let new_session_task = {
485            let caps = parent_session.read(cx).capabilities.clone();
486            self.new_fake_session(
487                debug_config,
488                &worktree,
489                Some(parent_session.clone()),
490                caps,
491                false,
492                cx,
493            )
494            .1
495        };
496        #[cfg(not(any(test, feature = "test-support")))]
497        let new_session_task = self
498            .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
499            .1;
500
501        let request_seq = request.seq;
502        cx.spawn(async move |_, cx| {
503            let (success, body) = match new_session_task.await {
504                Ok(_) => (true, None),
505                Err(error) => (
506                    false,
507                    Some(serde_json::to_value(ErrorResponse {
508                        error: Some(dap::Message {
509                            id: request_seq,
510                            format: error.to_string(),
511                            variables: None,
512                            send_telemetry: None,
513                            show_user: None,
514                            url: None,
515                            url_label: None,
516                        }),
517                    })?),
518                ),
519            };
520
521            parent_session
522                .update(cx, |session, cx| {
523                    session.respond_to_client(
524                        request_seq,
525                        success,
526                        StartDebugging::COMMAND.to_string(),
527                        body,
528                        cx,
529                    )
530                })?
531                .await
532        })
533    }
534
535    fn handle_run_in_terminal_request(
536        &mut self,
537        session_id: SessionId,
538        request: dap::messages::Request,
539        cx: &mut Context<Self>,
540    ) -> Task<Result<()>> {
541        let Some(session) = self.session_by_id(session_id) else {
542            return Task::ready(Err(anyhow!("Session not found")));
543        };
544
545        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
546            request.arguments.unwrap_or_default(),
547        )
548        .expect("To parse StartDebuggingRequestArguments");
549
550        let seq = request.seq;
551
552        let cwd = Path::new(&request_args.cwd);
553
554        match cwd.try_exists() {
555            Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
556                return session.update(cx, |session, cx| {
557                    session.respond_to_client(
558                        seq,
559                        false,
560                        RunInTerminal::COMMAND.to_string(),
561                        serde_json::to_value(dap::ErrorResponse {
562                            error: Some(dap::Message {
563                                id: seq,
564                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
565                                variables: None,
566                                send_telemetry: None,
567                                show_user: None,
568                                url: None,
569                                url_label: None,
570                            }),
571                        })
572                        .ok(),
573                        cx,
574                    )
575                });
576            }
577            _ => (),
578        }
579        let mut args = request_args.args.clone();
580
581        // Handle special case for NodeJS debug adapter
582        // If only the Node binary path is provided, we set the command to None
583        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
584        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
585        // This allows the NodeJS debug client to attach correctly
586        let command = if args.len() > 1 {
587            Some(args.remove(0))
588        } else {
589            None
590        };
591
592        let mut envs: HashMap<String, String> = Default::default();
593        if let Some(Value::Object(env)) = request_args.env {
594            for (key, value) in env {
595                let value_str = match (key.as_str(), value) {
596                    (_, Value::String(value)) => value,
597                    _ => continue,
598                };
599
600                envs.insert(key, value_str);
601            }
602        }
603
604        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
605        let cwd = Some(cwd)
606            .filter(|cwd| cwd.as_os_str().len() > 0)
607            .map(Arc::from)
608            .or_else(|| {
609                self.session_by_id(session_id)
610                    .and_then(|session| {
611                        session
612                            .read(cx)
613                            .configuration()
614                            .and_then(|config| config.request.cwd())
615                    })
616                    .map(Arc::from)
617            });
618        cx.emit(DapStoreEvent::RunInTerminal {
619            session_id,
620            title: request_args.title,
621            cwd,
622            command,
623            args,
624            envs,
625            sender: tx,
626        });
627        cx.notify();
628
629        let session = session.downgrade();
630        cx.spawn(async move |_, cx| {
631            let (success, body) = match rx.next().await {
632                Some(Ok(pid)) => (
633                    true,
634                    serde_json::to_value(dap::RunInTerminalResponse {
635                        process_id: None,
636                        shell_process_id: Some(pid as u64),
637                    })
638                    .ok(),
639                ),
640                Some(Err(error)) => (
641                    false,
642                    serde_json::to_value(dap::ErrorResponse {
643                        error: Some(dap::Message {
644                            id: seq,
645                            format: error.to_string(),
646                            variables: None,
647                            send_telemetry: None,
648                            show_user: None,
649                            url: None,
650                            url_label: None,
651                        }),
652                    })
653                    .ok(),
654                ),
655                None => (
656                    false,
657                    serde_json::to_value(dap::ErrorResponse {
658                        error: Some(dap::Message {
659                            id: seq,
660                            format: "failed to receive response from spawn terminal".to_string(),
661                            variables: None,
662                            send_telemetry: None,
663                            show_user: None,
664                            url: None,
665                            url_label: None,
666                        }),
667                    })
668                    .ok(),
669                ),
670            };
671
672            session
673                .update(cx, |session, cx| {
674                    session.respond_to_client(
675                        seq,
676                        success,
677                        RunInTerminal::COMMAND.to_string(),
678                        body,
679                        cx,
680                    )
681                })?
682                .await
683        })
684    }
685
686    pub fn evaluate(
687        &self,
688        session_id: &SessionId,
689        stack_frame_id: u64,
690        expression: String,
691        context: EvaluateArgumentsContext,
692        source: Option<Source>,
693        cx: &mut Context<Self>,
694    ) -> Task<Result<EvaluateResponse>> {
695        let Some(client) = self
696            .session_by_id(session_id)
697            .and_then(|client| client.read(cx).adapter_client())
698        else {
699            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
700        };
701
702        cx.background_executor().spawn(async move {
703            client
704                .request::<Evaluate>(EvaluateArguments {
705                    expression: expression.clone(),
706                    frame_id: Some(stack_frame_id),
707                    context: Some(context),
708                    format: None,
709                    line: None,
710                    column: None,
711                    source,
712                })
713                .await
714        })
715    }
716
717    pub fn completions(
718        &self,
719        session_id: &SessionId,
720        stack_frame_id: u64,
721        text: String,
722        completion_column: u64,
723        cx: &mut Context<Self>,
724    ) -> Task<Result<Vec<CompletionItem>>> {
725        let Some(client) = self
726            .session_by_id(session_id)
727            .and_then(|client| client.read(cx).adapter_client())
728        else {
729            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
730        };
731
732        cx.background_executor().spawn(async move {
733            Ok(client
734                .request::<Completions>(CompletionsArguments {
735                    frame_id: Some(stack_frame_id),
736                    line: None,
737                    text,
738                    column: completion_column,
739                })
740                .await?
741                .targets)
742        })
743    }
744
745    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
746        let mut tasks = vec![];
747        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
748            tasks.push(self.shutdown_session(session_id, cx));
749        }
750
751        cx.background_executor().spawn(async move {
752            futures::future::join_all(tasks).await;
753        })
754    }
755
756    pub fn shutdown_session(
757        &mut self,
758        session_id: SessionId,
759        cx: &mut Context<Self>,
760    ) -> Task<Result<()>> {
761        let Some(_) = self.as_local_mut() else {
762            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
763        };
764
765        let Some(session) = self.sessions.remove(&session_id) else {
766            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
767        };
768
769        let shutdown_children = session
770            .read(cx)
771            .child_session_ids()
772            .iter()
773            .map(|session_id| self.shutdown_session(*session_id, cx))
774            .collect::<Vec<_>>();
775
776        let shutdown_parent_task = if let Some(parent_session) = session
777            .read(cx)
778            .parent_id()
779            .and_then(|session_id| self.session_by_id(session_id))
780        {
781            let shutdown_id = parent_session.update(cx, |parent_session, _| {
782                parent_session.remove_child_session_id(session_id);
783
784                if parent_session.child_session_ids().len() == 0 {
785                    Some(parent_session.session_id())
786                } else {
787                    None
788                }
789            });
790
791            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
792        } else {
793            None
794        };
795
796        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
797
798        cx.background_spawn(async move {
799            if shutdown_children.len() > 0 {
800                let _ = join_all(shutdown_children).await;
801            }
802
803            shutdown_task.await;
804
805            if let Some(parent_task) = shutdown_parent_task {
806                parent_task.await?;
807            }
808
809            Ok(())
810        })
811    }
812
813    pub fn shared(
814        &mut self,
815        project_id: u64,
816        downstream_client: AnyProtoClient,
817        _: &mut Context<Self>,
818    ) {
819        self.downstream_client = Some((downstream_client.clone(), project_id));
820    }
821
822    pub fn unshared(&mut self, cx: &mut Context<Self>) {
823        self.downstream_client.take();
824
825        cx.notify();
826    }
827}
828
829fn create_new_session(
830    session_id: SessionId,
831    initialized_rx: oneshot::Receiver<()>,
832    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
833    cx: &mut Context<DapStore>,
834) -> Task<Result<Entity<Session>>> {
835    let task = cx.spawn(async move |this, cx| {
836        let session = match start_client_task.await {
837            Ok(session) => session,
838            Err(error) => {
839                this.update(cx, |_, cx| {
840                    cx.emit(DapStoreEvent::Notification(error.to_string()));
841                })
842                .log_err();
843
844                return Err(error);
845            }
846        };
847
848        // we have to insert the session early, so we can handle reverse requests
849        // that need the session to be available
850        this.update(cx, |store, cx| {
851            store.sessions.insert(session_id, session.clone());
852            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
853            cx.notify();
854        })?;
855
856        match {
857            session
858                .update(cx, |session, cx| session.request_initialize(cx))?
859                .await?;
860
861            session
862                .update(cx, |session, cx| {
863                    session.initialize_sequence(initialized_rx, cx)
864                })?
865                .await
866        } {
867            Ok(_) => {}
868            Err(error) => {
869                this.update(cx, |this, cx| {
870                    cx.emit(DapStoreEvent::Notification(error.to_string()));
871
872                    this.shutdown_session(session_id, cx)
873                })?
874                .await
875                .log_err();
876
877                return Err(error);
878            }
879        }
880
881        this.update(cx, |_, cx| {
882            cx.subscribe(
883                &session,
884                move |this: &mut DapStore, _, event: &SessionStateEvent, cx| match event {
885                    SessionStateEvent::Shutdown => {
886                        this.shutdown_session(session_id, cx).detach_and_log_err(cx);
887                    }
888                },
889            )
890            .detach();
891            cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
892        })?;
893
894        Ok(session)
895    });
896    task
897}
898
899#[derive(Clone)]
900pub struct DapAdapterDelegate {
901    fs: Arc<dyn Fs>,
902    worktree_id: WorktreeId,
903    node_runtime: NodeRuntime,
904    http_client: Arc<dyn HttpClient>,
905    language_registry: Arc<LanguageRegistry>,
906    toolchain_store: Arc<dyn LanguageToolchainStore>,
907    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
908    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
909}
910
911impl DapAdapterDelegate {
912    pub fn new(
913        fs: Arc<dyn Fs>,
914        worktree_id: WorktreeId,
915        node_runtime: NodeRuntime,
916        http_client: Arc<dyn HttpClient>,
917        language_registry: Arc<LanguageRegistry>,
918        toolchain_store: Arc<dyn LanguageToolchainStore>,
919        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
920    ) -> Self {
921        Self {
922            fs,
923            worktree_id,
924            http_client,
925            node_runtime,
926            toolchain_store,
927            language_registry,
928            load_shell_env_task,
929            updated_adapters: Default::default(),
930        }
931    }
932}
933
934#[async_trait(?Send)]
935impl dap::adapters::DapDelegate for DapAdapterDelegate {
936    fn worktree_id(&self) -> WorktreeId {
937        self.worktree_id
938    }
939
940    fn http_client(&self) -> Arc<dyn HttpClient> {
941        self.http_client.clone()
942    }
943
944    fn node_runtime(&self) -> NodeRuntime {
945        self.node_runtime.clone()
946    }
947
948    fn fs(&self) -> Arc<dyn Fs> {
949        self.fs.clone()
950    }
951
952    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
953        self.updated_adapters.clone()
954    }
955
956    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
957        let name = SharedString::from(dap_name.to_string());
958        let status = match status {
959            DapStatus::None => BinaryStatus::None,
960            DapStatus::Downloading => BinaryStatus::Downloading,
961            DapStatus::Failed { error } => BinaryStatus::Failed { error },
962            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
963        };
964
965        self.language_registry
966            .update_dap_status(LanguageServerName(name), status);
967    }
968
969    fn which(&self, command: &OsStr) -> Option<PathBuf> {
970        which::which(command).ok()
971    }
972
973    async fn shell_env(&self) -> HashMap<String, String> {
974        let task = self.load_shell_env_task.clone();
975        task.await.unwrap_or_default()
976    }
977
978    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
979        self.toolchain_store.clone()
980    }
981}