dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    // Will need to uncomment this once we implement rpc message handler again
  4    // dap_command::{
  5    //     ContinueCommand, DapCommand, DisconnectCommand, NextCommand, PauseCommand, RestartCommand,
  6    //     RestartStackFrameCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  7    //     TerminateCommand, TerminateThreadsCommand, VariablesCommand,
  8    // },
  9    session::{self, Session},
 10};
 11use crate::{debugger, worktree_store::WorktreeStore, ProjectEnvironment};
 12use anyhow::{anyhow, Result};
 13use async_trait::async_trait;
 14use collections::HashMap;
 15use dap::{
 16    adapters::{DapStatus, DebugAdapterName},
 17    client::SessionId,
 18    messages::Message,
 19    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 20    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
 21    EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
 22    Source, StartDebuggingRequestArguments,
 23};
 24use fs::Fs;
 25use futures::{
 26    channel::{mpsc, oneshot},
 27    future::{join_all, Shared},
 28};
 29use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 30use http_client::HttpClient;
 31use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 32use lsp::LanguageServerName;
 33use node_runtime::NodeRuntime;
 34
 35use rpc::{
 36    proto::{self},
 37    AnyProtoClient, TypedEnvelope,
 38};
 39use serde_json::Value;
 40use settings::WorktreeId;
 41use smol::{lock::Mutex, stream::StreamExt};
 42use std::{
 43    borrow::Borrow,
 44    collections::{BTreeMap, HashSet},
 45    ffi::OsStr,
 46    path::PathBuf,
 47    sync::{atomic::Ordering::SeqCst, Arc},
 48};
 49use std::{collections::VecDeque, sync::atomic::AtomicU32};
 50use task::{DebugAdapterConfig, DebugRequestDisposition};
 51use util::ResultExt as _;
 52use worktree::Worktree;
 53
 54pub enum DapStoreEvent {
 55    DebugClientStarted(SessionId),
 56    DebugClientShutdown(SessionId),
 57    DebugClientEvent {
 58        session_id: SessionId,
 59        message: Message,
 60    },
 61    RunInTerminal {
 62        session_id: SessionId,
 63        title: Option<String>,
 64        cwd: PathBuf,
 65        command: Option<String>,
 66        args: Vec<String>,
 67        envs: HashMap<String, String>,
 68        sender: mpsc::Sender<Result<u32>>,
 69    },
 70    Notification(String),
 71    RemoteHasInitialized,
 72}
 73
 74#[allow(clippy::large_enum_variant)]
 75pub enum DapStoreMode {
 76    Local(LocalDapStore),   // ssh host and collab host
 77    Remote(RemoteDapStore), // collab guest
 78}
 79
 80pub struct LocalDapStore {
 81    fs: Arc<dyn Fs>,
 82    node_runtime: NodeRuntime,
 83    next_session_id: AtomicU32,
 84    http_client: Arc<dyn HttpClient>,
 85    worktree_store: Entity<WorktreeStore>,
 86    environment: Entity<ProjectEnvironment>,
 87    language_registry: Arc<LanguageRegistry>,
 88    debug_adapters: Arc<DapRegistry>,
 89    toolchain_store: Arc<dyn LanguageToolchainStore>,
 90    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 91    _start_debugging_task: Task<()>,
 92}
 93
 94impl LocalDapStore {
 95    fn next_session_id(&self) -> SessionId {
 96        SessionId(self.next_session_id.fetch_add(1, SeqCst))
 97    }
 98}
 99
100pub struct RemoteDapStore {
101    upstream_client: AnyProtoClient,
102    upstream_project_id: u64,
103    event_queue: Option<VecDeque<DapStoreEvent>>,
104}
105
106pub struct DapStore {
107    mode: DapStoreMode,
108    downstream_client: Option<(AnyProtoClient, u64)>,
109    breakpoint_store: Entity<BreakpointStore>,
110    sessions: BTreeMap<SessionId, Entity<Session>>,
111}
112
113impl EventEmitter<DapStoreEvent> for DapStore {}
114
115impl DapStore {
116    pub fn init(_client: &AnyProtoClient) {
117        // todo(debugger): Reenable these after we finish handle_dap_command refactor
118        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
119        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
120        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
121        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
125        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
126        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
127        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
128        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
129        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
130    }
131
132    #[expect(clippy::too_many_arguments)]
133    pub fn new_local(
134        http_client: Arc<dyn HttpClient>,
135        node_runtime: NodeRuntime,
136        fs: Arc<dyn Fs>,
137        language_registry: Arc<LanguageRegistry>,
138        debug_adapters: Arc<DapRegistry>,
139        environment: Entity<ProjectEnvironment>,
140        toolchain_store: Arc<dyn LanguageToolchainStore>,
141        breakpoint_store: Entity<BreakpointStore>,
142        worktree_store: Entity<WorktreeStore>,
143        cx: &mut Context<Self>,
144    ) -> Self {
145        cx.on_app_quit(Self::shutdown_sessions).detach();
146
147        let (start_debugging_tx, mut message_rx) =
148            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
149
150        let _start_debugging_task = cx.spawn(async move |this, cx| {
151            while let Some((session_id, message)) = message_rx.next().await {
152                match message {
153                    Message::Request(request) => {
154                        let _ = this
155                            .update(cx, |this, cx| {
156                                if request.command == StartDebugging::COMMAND {
157                                    this.handle_start_debugging_request(session_id, request, cx)
158                                        .detach_and_log_err(cx);
159                                } else if request.command == RunInTerminal::COMMAND {
160                                    this.handle_run_in_terminal_request(session_id, request, cx)
161                                        .detach_and_log_err(cx);
162                                }
163                            })
164                            .log_err();
165                    }
166                    _ => {}
167                }
168            }
169        });
170        Self {
171            mode: DapStoreMode::Local(LocalDapStore {
172                fs,
173                environment,
174                http_client,
175                node_runtime,
176                worktree_store,
177                toolchain_store,
178                language_registry,
179                debug_adapters,
180                start_debugging_tx,
181                _start_debugging_task,
182                next_session_id: Default::default(),
183            }),
184            downstream_client: None,
185            breakpoint_store,
186            sessions: Default::default(),
187        }
188    }
189
190    pub fn new_remote(
191        project_id: u64,
192        upstream_client: AnyProtoClient,
193        breakpoint_store: Entity<BreakpointStore>,
194    ) -> Self {
195        Self {
196            mode: DapStoreMode::Remote(RemoteDapStore {
197                upstream_client,
198                upstream_project_id: project_id,
199                event_queue: Some(VecDeque::default()),
200            }),
201            downstream_client: None,
202            breakpoint_store,
203            sessions: Default::default(),
204        }
205    }
206
207    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
208        match &self.mode {
209            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
210            _ => None,
211        }
212    }
213
214    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
215        if let DapStoreMode::Remote(remote) = &mut self.mode {
216            remote.event_queue.take()
217        } else {
218            None
219        }
220    }
221
222    pub fn as_local(&self) -> Option<&LocalDapStore> {
223        match &self.mode {
224            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
225            _ => None,
226        }
227    }
228
229    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
230        match &mut self.mode {
231            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
232            _ => None,
233        }
234    }
235
236    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
237        match &self.mode {
238            DapStoreMode::Remote(RemoteDapStore {
239                upstream_client,
240                upstream_project_id,
241                ..
242            }) => Some((upstream_client.clone(), *upstream_project_id)),
243
244            DapStoreMode::Local(_) => None,
245        }
246    }
247
248    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
249        self.downstream_client.as_ref()
250    }
251
252    pub fn add_remote_client(
253        &mut self,
254        session_id: SessionId,
255        ignore: Option<bool>,
256        cx: &mut Context<Self>,
257    ) {
258        if let DapStoreMode::Remote(remote) = &self.mode {
259            self.sessions.insert(
260                session_id,
261                cx.new(|_| {
262                    debugger::session::Session::remote(
263                        session_id,
264                        remote.upstream_client.clone(),
265                        remote.upstream_project_id,
266                        ignore.unwrap_or(false),
267                    )
268                }),
269            );
270        } else {
271            debug_assert!(false);
272        }
273    }
274
275    pub fn session_by_id(
276        &self,
277        session_id: impl Borrow<SessionId>,
278    ) -> Option<Entity<session::Session>> {
279        let session_id = session_id.borrow();
280        let client = self.sessions.get(session_id).cloned();
281
282        client
283    }
284    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
285        self.sessions.values()
286    }
287
288    pub fn capabilities_by_id(
289        &self,
290        session_id: impl Borrow<SessionId>,
291        cx: &App,
292    ) -> Option<Capabilities> {
293        let session_id = session_id.borrow();
294        self.sessions
295            .get(session_id)
296            .map(|client| client.read(cx).capabilities.clone())
297    }
298
299    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
300        &self.breakpoint_store
301    }
302
303    #[allow(dead_code)]
304    async fn handle_ignore_breakpoint_state(
305        this: Entity<Self>,
306        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
307        mut cx: AsyncApp,
308    ) -> Result<()> {
309        let session_id = SessionId::from_proto(envelope.payload.session_id);
310
311        this.update(&mut cx, |this, cx| {
312            if let Some(session) = this.session_by_id(&session_id) {
313                session.update(cx, |session, cx| {
314                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
315                })
316            } else {
317                Task::ready(())
318            }
319        })?
320        .await;
321
322        Ok(())
323    }
324
325    pub fn new_session(
326        &mut self,
327        config: DebugAdapterConfig,
328        worktree: &Entity<Worktree>,
329        parent_session: Option<Entity<Session>>,
330        cx: &mut Context<Self>,
331    ) -> (SessionId, Task<Result<Entity<Session>>>) {
332        let Some(local_store) = self.as_local() else {
333            unimplemented!("Starting session on remote side");
334        };
335
336        let delegate = DapAdapterDelegate::new(
337            local_store.fs.clone(),
338            worktree.read(cx).id(),
339            local_store.node_runtime.clone(),
340            local_store.http_client.clone(),
341            local_store.language_registry.clone(),
342            local_store.toolchain_store.clone(),
343            local_store.environment.update(cx, |env, cx| {
344                let worktree = worktree.read(cx);
345                env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
346            }),
347        );
348        let session_id = local_store.next_session_id();
349
350        if let Some(session) = &parent_session {
351            session.update(cx, |session, _| {
352                session.add_child_session_id(session_id);
353            });
354        }
355
356        let (initialized_tx, initialized_rx) = oneshot::channel();
357
358        let start_client_task = Session::local(
359            self.breakpoint_store.clone(),
360            session_id,
361            parent_session,
362            delegate,
363            config,
364            local_store.start_debugging_tx.clone(),
365            initialized_tx,
366            local_store.debug_adapters.clone(),
367            cx,
368        );
369
370        let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
371        (session_id, task)
372    }
373    #[cfg(any(test, feature = "test-support"))]
374    pub fn new_fake_session(
375        &mut self,
376        config: DebugAdapterConfig,
377        worktree: &Entity<Worktree>,
378        parent_session: Option<Entity<Session>>,
379        caps: Capabilities,
380        fails: bool,
381        cx: &mut Context<Self>,
382    ) -> (SessionId, Task<Result<Entity<Session>>>) {
383        let Some(local_store) = self.as_local() else {
384            unimplemented!("Starting session on remote side");
385        };
386
387        let delegate = DapAdapterDelegate::new(
388            local_store.fs.clone(),
389            worktree.read(cx).id(),
390            local_store.node_runtime.clone(),
391            local_store.http_client.clone(),
392            local_store.language_registry.clone(),
393            local_store.toolchain_store.clone(),
394            local_store.environment.update(cx, |env, cx| {
395                let worktree = worktree.read(cx);
396                env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
397            }),
398        );
399        let session_id = local_store.next_session_id();
400
401        if let Some(session) = &parent_session {
402            session.update(cx, |session, _| {
403                session.add_child_session_id(session_id);
404            });
405        }
406
407        let (initialized_tx, initialized_rx) = oneshot::channel();
408
409        let start_client_task = Session::fake(
410            self.breakpoint_store.clone(),
411            session_id,
412            parent_session,
413            delegate,
414            config,
415            local_store.start_debugging_tx.clone(),
416            initialized_tx,
417            caps,
418            fails,
419            cx,
420        );
421
422        let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
423        (session_id, task)
424    }
425
426    fn handle_start_debugging_request(
427        &mut self,
428        session_id: SessionId,
429        request: dap::messages::Request,
430        cx: &mut Context<Self>,
431    ) -> Task<Result<()>> {
432        let Some(local_store) = self.as_local() else {
433            unreachable!("Cannot response for non-local session");
434        };
435
436        let Some(parent_session) = self.session_by_id(session_id) else {
437            return Task::ready(Err(anyhow!("Session not found")));
438        };
439
440        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
441            request.arguments.unwrap_or_default(),
442        )
443        .expect("To parse StartDebuggingRequestArguments");
444        let worktree = local_store
445            .worktree_store
446            .update(cx, |this, _| this.worktrees().next())
447            .expect("worktree-less project");
448
449        let Some(config) = parent_session.read(cx).configuration() else {
450            unreachable!("there must be a config for local sessions");
451        };
452
453        let debug_config = DebugAdapterConfig {
454            label: config.label,
455            adapter: config.adapter,
456            request: DebugRequestDisposition::ReverseRequest(args),
457            initialize_args: config.initialize_args.clone(),
458            tcp_connection: config.tcp_connection.clone(),
459        };
460        #[cfg(any(test, feature = "test-support"))]
461        let new_session_task = {
462            let caps = parent_session.read(cx).capabilities.clone();
463            self.new_fake_session(
464                debug_config,
465                &worktree,
466                Some(parent_session.clone()),
467                caps,
468                false,
469                cx,
470            )
471            .1
472        };
473        #[cfg(not(any(test, feature = "test-support")))]
474        let new_session_task = self
475            .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
476            .1;
477
478        let request_seq = request.seq;
479        cx.spawn(async move |_, cx| {
480            let (success, body) = match new_session_task.await {
481                Ok(_) => (true, None),
482                Err(error) => (
483                    false,
484                    Some(serde_json::to_value(ErrorResponse {
485                        error: Some(dap::Message {
486                            id: request_seq,
487                            format: error.to_string(),
488                            variables: None,
489                            send_telemetry: None,
490                            show_user: None,
491                            url: None,
492                            url_label: None,
493                        }),
494                    })?),
495                ),
496            };
497
498            parent_session
499                .update(cx, |session, cx| {
500                    session.respond_to_client(
501                        request_seq,
502                        success,
503                        StartDebugging::COMMAND.to_string(),
504                        body,
505                        cx,
506                    )
507                })?
508                .await
509        })
510    }
511
512    fn handle_run_in_terminal_request(
513        &mut self,
514        session_id: SessionId,
515        request: dap::messages::Request,
516        cx: &mut Context<Self>,
517    ) -> Task<Result<()>> {
518        let Some(session) = self.session_by_id(session_id) else {
519            return Task::ready(Err(anyhow!("Session not found")));
520        };
521
522        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
523            request.arguments.unwrap_or_default(),
524        )
525        .expect("To parse StartDebuggingRequestArguments");
526
527        let seq = request.seq;
528
529        let cwd = PathBuf::from(request_args.cwd);
530        match cwd.try_exists() {
531            Ok(true) => (),
532            Ok(false) | Err(_) => {
533                return session.update(cx, |session, cx| {
534                    session.respond_to_client(
535                        seq,
536                        false,
537                        RunInTerminal::COMMAND.to_string(),
538                        serde_json::to_value(dap::ErrorResponse {
539                            error: Some(dap::Message {
540                                id: seq,
541                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
542                                variables: None,
543                                send_telemetry: None,
544                                show_user: None,
545                                url: None,
546                                url_label: None,
547                            }),
548                        })
549                        .ok(),
550                        cx,
551                    )
552                })
553            }
554        }
555
556        let mut args = request_args.args.clone();
557
558        // Handle special case for NodeJS debug adapter
559        // If only the Node binary path is provided, we set the command to None
560        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
561        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
562        // This allows the NodeJS debug client to attach correctly
563        let command = if args.len() > 1 {
564            Some(args.remove(0))
565        } else {
566            None
567        };
568
569        let mut envs: HashMap<String, String> = Default::default();
570        if let Some(Value::Object(env)) = request_args.env {
571            for (key, value) in env {
572                let value_str = match (key.as_str(), value) {
573                    (_, Value::String(value)) => value,
574                    _ => continue,
575                };
576
577                envs.insert(key, value_str);
578            }
579        }
580
581        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
582
583        cx.emit(DapStoreEvent::RunInTerminal {
584            session_id,
585            title: request_args.title,
586            cwd,
587            command,
588            args,
589            envs,
590            sender: tx,
591        });
592        cx.notify();
593
594        let session = session.downgrade();
595        cx.spawn(async move |_, cx| {
596            let (success, body) = match rx.next().await {
597                Some(Ok(pid)) => (
598                    true,
599                    serde_json::to_value(dap::RunInTerminalResponse {
600                        process_id: None,
601                        shell_process_id: Some(pid as u64),
602                    })
603                    .ok(),
604                ),
605                Some(Err(error)) => (
606                    false,
607                    serde_json::to_value(dap::ErrorResponse {
608                        error: Some(dap::Message {
609                            id: seq,
610                            format: error.to_string(),
611                            variables: None,
612                            send_telemetry: None,
613                            show_user: None,
614                            url: None,
615                            url_label: None,
616                        }),
617                    })
618                    .ok(),
619                ),
620                None => (
621                    false,
622                    serde_json::to_value(dap::ErrorResponse {
623                        error: Some(dap::Message {
624                            id: seq,
625                            format: "failed to receive response from spawn terminal".to_string(),
626                            variables: None,
627                            send_telemetry: None,
628                            show_user: None,
629                            url: None,
630                            url_label: None,
631                        }),
632                    })
633                    .ok(),
634                ),
635            };
636
637            session
638                .update(cx, |session, cx| {
639                    session.respond_to_client(
640                        seq,
641                        success,
642                        RunInTerminal::COMMAND.to_string(),
643                        body,
644                        cx,
645                    )
646                })?
647                .await
648        })
649    }
650
651    pub fn evaluate(
652        &self,
653        session_id: &SessionId,
654        stack_frame_id: u64,
655        expression: String,
656        context: EvaluateArgumentsContext,
657        source: Option<Source>,
658        cx: &mut Context<Self>,
659    ) -> Task<Result<EvaluateResponse>> {
660        let Some(client) = self
661            .session_by_id(session_id)
662            .and_then(|client| client.read(cx).adapter_client())
663        else {
664            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
665        };
666
667        cx.background_executor().spawn(async move {
668            client
669                .request::<Evaluate>(EvaluateArguments {
670                    expression: expression.clone(),
671                    frame_id: Some(stack_frame_id),
672                    context: Some(context),
673                    format: None,
674                    line: None,
675                    column: None,
676                    source,
677                })
678                .await
679        })
680    }
681
682    pub fn completions(
683        &self,
684        session_id: &SessionId,
685        stack_frame_id: u64,
686        text: String,
687        completion_column: u64,
688        cx: &mut Context<Self>,
689    ) -> Task<Result<Vec<CompletionItem>>> {
690        let Some(client) = self
691            .session_by_id(session_id)
692            .and_then(|client| client.read(cx).adapter_client())
693        else {
694            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
695        };
696
697        cx.background_executor().spawn(async move {
698            Ok(client
699                .request::<Completions>(CompletionsArguments {
700                    frame_id: Some(stack_frame_id),
701                    line: None,
702                    text,
703                    column: completion_column,
704                })
705                .await?
706                .targets)
707        })
708    }
709
710    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
711        let mut tasks = vec![];
712        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
713            tasks.push(self.shutdown_session(session_id, cx));
714        }
715
716        cx.background_executor().spawn(async move {
717            futures::future::join_all(tasks).await;
718        })
719    }
720
721    pub fn shutdown_session(
722        &mut self,
723        session_id: SessionId,
724        cx: &mut Context<Self>,
725    ) -> Task<Result<()>> {
726        let Some(_) = self.as_local_mut() else {
727            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
728        };
729
730        let Some(session) = self.sessions.remove(&session_id) else {
731            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
732        };
733
734        let shutdown_children = session
735            .read(cx)
736            .child_session_ids()
737            .iter()
738            .map(|session_id| self.shutdown_session(*session_id, cx))
739            .collect::<Vec<_>>();
740
741        let shutdown_parent_task = if let Some(parent_session) = session
742            .read(cx)
743            .parent_id()
744            .and_then(|session_id| self.session_by_id(session_id))
745        {
746            let shutdown_id = parent_session.update(cx, |parent_session, _| {
747                parent_session.remove_child_session_id(session_id);
748
749                if parent_session.child_session_ids().len() == 0 {
750                    Some(parent_session.session_id())
751                } else {
752                    None
753                }
754            });
755
756            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
757        } else {
758            None
759        };
760
761        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
762
763        cx.background_spawn(async move {
764            if shutdown_children.len() > 0 {
765                let _ = join_all(shutdown_children).await;
766            }
767
768            shutdown_task.await;
769
770            if let Some(parent_task) = shutdown_parent_task {
771                parent_task.await?;
772            }
773
774            Ok(())
775        })
776    }
777
778    pub fn shared(
779        &mut self,
780        project_id: u64,
781        downstream_client: AnyProtoClient,
782        _: &mut Context<Self>,
783    ) {
784        self.downstream_client = Some((downstream_client.clone(), project_id));
785    }
786
787    pub fn unshared(&mut self, cx: &mut Context<Self>) {
788        self.downstream_client.take();
789
790        cx.notify();
791    }
792}
793
794fn create_new_session(
795    session_id: SessionId,
796    initialized_rx: oneshot::Receiver<()>,
797    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
798    cx: &mut Context<DapStore>,
799) -> Task<Result<Entity<Session>>> {
800    let task = cx.spawn(async move |this, cx| {
801        let session = match start_client_task.await {
802            Ok(session) => session,
803            Err(error) => {
804                this.update(cx, |_, cx| {
805                    cx.emit(DapStoreEvent::Notification(error.to_string()));
806                })
807                .log_err();
808
809                return Err(error);
810            }
811        };
812
813        // we have to insert the session early, so we can handle reverse requests
814        // that need the session to be available
815        this.update(cx, |store, cx| {
816            store.sessions.insert(session_id, session.clone());
817            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
818            cx.notify();
819        })?;
820
821        match session
822            .update(cx, |session, cx| {
823                session.initialize_sequence(initialized_rx, cx)
824            })?
825            .await
826        {
827            Ok(_) => {}
828            Err(error) => {
829                this.update(cx, |this, cx| {
830                    cx.emit(DapStoreEvent::Notification(error.to_string()));
831
832                    this.shutdown_session(session_id, cx)
833                })?
834                .await
835                .log_err();
836
837                return Err(error);
838            }
839        }
840
841        Ok(session)
842    });
843    task
844}
845
846#[derive(Clone)]
847pub struct DapAdapterDelegate {
848    fs: Arc<dyn Fs>,
849    worktree_id: WorktreeId,
850    node_runtime: NodeRuntime,
851    http_client: Arc<dyn HttpClient>,
852    language_registry: Arc<LanguageRegistry>,
853    toolchain_store: Arc<dyn LanguageToolchainStore>,
854    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
855    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
856}
857
858impl DapAdapterDelegate {
859    pub fn new(
860        fs: Arc<dyn Fs>,
861        worktree_id: WorktreeId,
862        node_runtime: NodeRuntime,
863        http_client: Arc<dyn HttpClient>,
864        language_registry: Arc<LanguageRegistry>,
865        toolchain_store: Arc<dyn LanguageToolchainStore>,
866        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
867    ) -> Self {
868        Self {
869            fs,
870            worktree_id,
871            http_client,
872            node_runtime,
873            toolchain_store,
874            language_registry,
875            load_shell_env_task,
876            updated_adapters: Default::default(),
877        }
878    }
879}
880
881#[async_trait(?Send)]
882impl dap::adapters::DapDelegate for DapAdapterDelegate {
883    fn worktree_id(&self) -> WorktreeId {
884        self.worktree_id
885    }
886
887    fn http_client(&self) -> Arc<dyn HttpClient> {
888        self.http_client.clone()
889    }
890
891    fn node_runtime(&self) -> NodeRuntime {
892        self.node_runtime.clone()
893    }
894
895    fn fs(&self) -> Arc<dyn Fs> {
896        self.fs.clone()
897    }
898
899    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
900        self.updated_adapters.clone()
901    }
902
903    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
904        let name = SharedString::from(dap_name.to_string());
905        let status = match status {
906            DapStatus::None => BinaryStatus::None,
907            DapStatus::Downloading => BinaryStatus::Downloading,
908            DapStatus::Failed { error } => BinaryStatus::Failed { error },
909            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
910        };
911
912        self.language_registry
913            .update_dap_status(LanguageServerName(name), status);
914    }
915
916    fn which(&self, command: &OsStr) -> Option<PathBuf> {
917        which::which(command).ok()
918    }
919
920    async fn shell_env(&self) -> HashMap<String, String> {
921        let task = self.load_shell_env_task.clone();
922        task.await.unwrap_or_default()
923    }
924
925    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
926        self.toolchain_store.clone()
927    }
928}