dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    // Will need to uncomment this once we implement rpc message handler again
  4    // dap_command::{
  5    //     ContinueCommand, DapCommand, DisconnectCommand, NextCommand, PauseCommand, RestartCommand,
  6    //     RestartStackFrameCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  7    //     TerminateCommand, TerminateThreadsCommand, VariablesCommand,
  8    // },
  9    session::{self, Session},
 10};
 11use crate::{debugger, worktree_store::WorktreeStore, ProjectEnvironment};
 12use anyhow::{anyhow, Result};
 13use async_trait::async_trait;
 14use collections::HashMap;
 15use dap::{
 16    adapters::{DapStatus, DebugAdapterName},
 17    client::SessionId,
 18    messages::Message,
 19    requests::{
 20        Completions, Evaluate, Request as _, RunInTerminal, SetExpression, SetVariable,
 21        StartDebugging,
 22    },
 23    Capabilities, CompletionItem, CompletionsArguments, ErrorResponse, EvaluateArguments,
 24    EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
 25    SetExpressionArguments, SetVariableArguments, Source, StartDebuggingRequestArguments,
 26    StartDebuggingRequestArgumentsRequest,
 27};
 28use fs::Fs;
 29use futures::{
 30    channel::{mpsc, oneshot},
 31    future::Shared,
 32};
 33use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 34use http_client::HttpClient;
 35use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 36use lsp::LanguageServerName;
 37use node_runtime::NodeRuntime;
 38
 39use rpc::{
 40    proto::{self},
 41    AnyProtoClient, TypedEnvelope,
 42};
 43use serde_json::Value;
 44use settings::WorktreeId;
 45use smol::{lock::Mutex, stream::StreamExt};
 46use std::{
 47    borrow::Borrow,
 48    collections::{BTreeMap, HashSet},
 49    ffi::OsStr,
 50    path::PathBuf,
 51    sync::{atomic::Ordering::SeqCst, Arc},
 52};
 53use std::{collections::VecDeque, sync::atomic::AtomicU32};
 54use task::{AttachConfig, DebugAdapterConfig, DebugRequestType};
 55use util::ResultExt as _;
 56use worktree::Worktree;
 57
 58pub enum DapStoreEvent {
 59    DebugClientStarted(SessionId),
 60    DebugClientShutdown(SessionId),
 61    DebugClientEvent {
 62        session_id: SessionId,
 63        message: Message,
 64    },
 65    RunInTerminal {
 66        session_id: SessionId,
 67        title: Option<String>,
 68        cwd: PathBuf,
 69        command: Option<String>,
 70        args: Vec<String>,
 71        envs: HashMap<String, String>,
 72        sender: mpsc::Sender<Result<u32>>,
 73    },
 74    Notification(String),
 75    RemoteHasInitialized,
 76}
 77
 78#[allow(clippy::large_enum_variant)]
 79pub enum DapStoreMode {
 80    Local(LocalDapStore),   // ssh host and collab host
 81    Remote(RemoteDapStore), // collab guest
 82}
 83
 84pub struct LocalDapStore {
 85    fs: Arc<dyn Fs>,
 86    node_runtime: NodeRuntime,
 87    next_session_id: AtomicU32,
 88    http_client: Arc<dyn HttpClient>,
 89    worktree_store: Entity<WorktreeStore>,
 90    environment: Entity<ProjectEnvironment>,
 91    language_registry: Arc<LanguageRegistry>,
 92    toolchain_store: Arc<dyn LanguageToolchainStore>,
 93    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 94    _start_debugging_task: Task<()>,
 95}
 96
 97impl LocalDapStore {
 98    fn next_session_id(&self) -> SessionId {
 99        SessionId(self.next_session_id.fetch_add(1, SeqCst))
100    }
101}
102
103pub struct RemoteDapStore {
104    upstream_client: AnyProtoClient,
105    upstream_project_id: u64,
106    event_queue: Option<VecDeque<DapStoreEvent>>,
107}
108
109pub struct DapStore {
110    mode: DapStoreMode,
111    downstream_client: Option<(AnyProtoClient, u64)>,
112    breakpoint_store: Entity<BreakpointStore>,
113    sessions: BTreeMap<SessionId, Entity<Session>>,
114}
115
116impl EventEmitter<DapStoreEvent> for DapStore {}
117
118impl DapStore {
119    pub fn init(_client: &AnyProtoClient) {
120        // todo(debugger): Reenable these after we finish handle_dap_command refactor
121        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
125        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
126        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
127        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
128        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
129        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
130        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
131        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
132        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
133    }
134
135    #[expect(clippy::too_many_arguments)]
136    pub fn new_local(
137        http_client: Arc<dyn HttpClient>,
138        node_runtime: NodeRuntime,
139        fs: Arc<dyn Fs>,
140        language_registry: Arc<LanguageRegistry>,
141        environment: Entity<ProjectEnvironment>,
142        toolchain_store: Arc<dyn LanguageToolchainStore>,
143        breakpoint_store: Entity<BreakpointStore>,
144        worktree_store: Entity<WorktreeStore>,
145        cx: &mut Context<Self>,
146    ) -> Self {
147        cx.on_app_quit(Self::shutdown_sessions).detach();
148
149        let (start_debugging_tx, mut message_rx) =
150            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
151
152        let _start_debugging_task = cx.spawn(move |this, mut cx| async move {
153            while let Some((session_id, message)) = message_rx.next().await {
154                match message {
155                    Message::Request(request) => {
156                        let _ = this
157                            .update(&mut cx, |this, cx| {
158                                if request.command == StartDebugging::COMMAND {
159                                    this.handle_start_debugging_request(session_id, request, cx)
160                                        .detach_and_log_err(cx);
161                                } else if request.command == RunInTerminal::COMMAND {
162                                    this.handle_run_in_terminal_request(session_id, request, cx)
163                                        .detach_and_log_err(cx);
164                                }
165                            })
166                            .log_err();
167                    }
168                    _ => {}
169                }
170            }
171        });
172        Self {
173            mode: DapStoreMode::Local(LocalDapStore {
174                fs,
175                environment,
176                http_client,
177                node_runtime,
178                worktree_store,
179                toolchain_store,
180                language_registry,
181                start_debugging_tx,
182                _start_debugging_task,
183                next_session_id: Default::default(),
184            }),
185            downstream_client: None,
186            breakpoint_store,
187            sessions: Default::default(),
188        }
189    }
190
191    pub fn new_remote(
192        project_id: u64,
193        upstream_client: AnyProtoClient,
194        breakpoint_store: Entity<BreakpointStore>,
195    ) -> Self {
196        Self {
197            mode: DapStoreMode::Remote(RemoteDapStore {
198                upstream_client,
199                upstream_project_id: project_id,
200                event_queue: Some(VecDeque::default()),
201            }),
202            downstream_client: None,
203            breakpoint_store,
204            sessions: Default::default(),
205        }
206    }
207
208    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
209        match &self.mode {
210            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
211            _ => None,
212        }
213    }
214
215    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
216        if let DapStoreMode::Remote(remote) = &mut self.mode {
217            remote.event_queue.take()
218        } else {
219            None
220        }
221    }
222
223    pub fn as_local(&self) -> Option<&LocalDapStore> {
224        match &self.mode {
225            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
226            _ => None,
227        }
228    }
229
230    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
231        match &mut self.mode {
232            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
233            _ => None,
234        }
235    }
236
237    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
238        match &self.mode {
239            DapStoreMode::Remote(RemoteDapStore {
240                upstream_client,
241                upstream_project_id,
242                ..
243            }) => Some((upstream_client.clone(), *upstream_project_id)),
244
245            DapStoreMode::Local(_) => None,
246        }
247    }
248
249    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
250        self.downstream_client.as_ref()
251    }
252
253    pub fn add_remote_client(
254        &mut self,
255        session_id: SessionId,
256        ignore: Option<bool>,
257        cx: &mut Context<Self>,
258    ) {
259        if let DapStoreMode::Remote(remote) = &self.mode {
260            self.sessions.insert(
261                session_id,
262                cx.new(|_| {
263                    debugger::session::Session::remote(
264                        session_id,
265                        remote.upstream_client.clone(),
266                        remote.upstream_project_id,
267                        ignore.unwrap_or(false),
268                    )
269                }),
270            );
271        } else {
272            debug_assert!(false);
273        }
274    }
275
276    pub fn session_by_id(
277        &self,
278        session_id: impl Borrow<SessionId>,
279    ) -> Option<Entity<session::Session>> {
280        let session_id = session_id.borrow();
281        let client = self.sessions.get(session_id).cloned();
282
283        client
284    }
285    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
286        self.sessions.values()
287    }
288
289    pub fn capabilities_by_id(
290        &self,
291        session_id: impl Borrow<SessionId>,
292        cx: &App,
293    ) -> Option<Capabilities> {
294        let session_id = session_id.borrow();
295        self.sessions
296            .get(session_id)
297            .map(|client| client.read(cx).capabilities.clone())
298    }
299
300    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
301        &self.breakpoint_store
302    }
303
304    #[allow(dead_code)]
305    async fn handle_ignore_breakpoint_state(
306        this: Entity<Self>,
307        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
308        mut cx: AsyncApp,
309    ) -> Result<()> {
310        let session_id = SessionId::from_proto(envelope.payload.session_id);
311
312        this.update(&mut cx, |this, cx| {
313            if let Some(session) = this.session_by_id(&session_id) {
314                session.update(cx, |session, cx| {
315                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
316                })
317            } else {
318                Task::ready(())
319            }
320        })?
321        .await;
322
323        Ok(())
324    }
325
326    pub fn new_session(
327        &mut self,
328        config: DebugAdapterConfig,
329        worktree: &Entity<Worktree>,
330        parent_session: Option<Entity<Session>>,
331        cx: &mut Context<Self>,
332    ) -> (SessionId, Task<Result<Entity<Session>>>) {
333        let Some(local_store) = self.as_local() else {
334            unimplemented!("Starting session on remote side");
335        };
336
337        let delegate = DapAdapterDelegate::new(
338            local_store.fs.clone(),
339            worktree.read(cx).id(),
340            local_store.node_runtime.clone(),
341            local_store.http_client.clone(),
342            local_store.language_registry.clone(),
343            local_store.toolchain_store.clone(),
344            local_store.environment.update(cx, |env, cx| {
345                let worktree = worktree.read(cx);
346                env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
347            }),
348        );
349        let session_id = local_store.next_session_id();
350
351        let (initialized_tx, initialized_rx) = oneshot::channel();
352
353        let start_client_task = Session::local(
354            self.breakpoint_store.clone(),
355            session_id,
356            parent_session,
357            delegate,
358            config,
359            local_store.start_debugging_tx.clone(),
360            initialized_tx,
361            cx,
362        );
363
364        let task = cx.spawn(|this, mut cx| async move {
365            let session = match start_client_task.await {
366                Ok(session) => session,
367                Err(error) => {
368                    this.update(&mut cx, |_, cx| {
369                        cx.emit(DapStoreEvent::Notification(error.to_string()));
370                    })
371                    .log_err();
372
373                    return Err(error);
374                }
375            };
376
377            // we have to insert the session early, so we can handle reverse requests
378            // that need the session to be available
379            this.update(&mut cx, |store, cx| {
380                store.sessions.insert(session_id, session.clone());
381                cx.emit(DapStoreEvent::DebugClientStarted(session_id));
382                cx.notify();
383            })?;
384
385            match session
386                .update(&mut cx, |session, cx| {
387                    session.initialize_sequence(initialized_rx, cx)
388                })?
389                .await
390            {
391                Ok(_) => {}
392                Err(error) => {
393                    this.update(&mut cx, |this, cx| {
394                        cx.emit(DapStoreEvent::Notification(error.to_string()));
395
396                        this.shutdown_session(session_id, cx)
397                    })?
398                    .await
399                    .log_err();
400
401                    return Err(error);
402                }
403            }
404
405            Ok(session)
406        });
407        (session_id, task)
408    }
409
410    fn handle_start_debugging_request(
411        &mut self,
412        session_id: SessionId,
413        request: dap::messages::Request,
414        cx: &mut Context<Self>,
415    ) -> Task<Result<()>> {
416        let Some(local_store) = self.as_local() else {
417            unreachable!("Cannot response for non-local session");
418        };
419
420        let Some(parent_session) = self.session_by_id(session_id) else {
421            return Task::ready(Err(anyhow!("Session not found")));
422        };
423
424        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
425            request.arguments.unwrap_or_default(),
426        )
427        .expect("To parse StartDebuggingRequestArguments");
428
429        let worktree = local_store
430            .worktree_store
431            .update(cx, |this, _| this.worktrees().next())
432            .expect("worktree-less project");
433
434        let Some(config) = parent_session.read(cx).configuration() else {
435            unreachable!("there must be a config for local sessions");
436        };
437
438        let (_, new_session_task) = self.new_session(
439            DebugAdapterConfig {
440                label: config.label,
441                kind: config.kind,
442                request: match &args.request {
443                    StartDebuggingRequestArgumentsRequest::Launch => DebugRequestType::Launch,
444                    StartDebuggingRequestArgumentsRequest::Attach => {
445                        DebugRequestType::Attach(AttachConfig::default())
446                    }
447                },
448                program: config.program,
449                cwd: config.cwd,
450                initialize_args: Some(args.configuration),
451                supports_attach: config.supports_attach,
452            },
453            &worktree,
454            Some(parent_session.clone()),
455            cx,
456        );
457
458        let request_seq = request.seq;
459        cx.spawn(|_, mut cx| async move {
460            let (success, body) = match new_session_task.await {
461                Ok(_) => (true, None),
462                Err(error) => (
463                    false,
464                    Some(serde_json::to_value(ErrorResponse {
465                        error: Some(dap::Message {
466                            id: request_seq,
467                            format: error.to_string(),
468                            variables: None,
469                            send_telemetry: None,
470                            show_user: None,
471                            url: None,
472                            url_label: None,
473                        }),
474                    })?),
475                ),
476            };
477
478            parent_session
479                .update(&mut cx, |session, cx| {
480                    session.respond_to_client(
481                        request_seq,
482                        success,
483                        StartDebugging::COMMAND.to_string(),
484                        body,
485                        cx,
486                    )
487                })?
488                .await
489        })
490    }
491
492    fn handle_run_in_terminal_request(
493        &mut self,
494        session_id: SessionId,
495        request: dap::messages::Request,
496        cx: &mut Context<Self>,
497    ) -> Task<Result<()>> {
498        let Some(session) = self.session_by_id(session_id) else {
499            return Task::ready(Err(anyhow!("Session not found")));
500        };
501
502        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
503            request.arguments.unwrap_or_default(),
504        )
505        .expect("To parse StartDebuggingRequestArguments");
506
507        let seq = request.seq;
508
509        let cwd = PathBuf::from(request_args.cwd);
510        match cwd.try_exists() {
511            Ok(true) => (),
512            Ok(false) | Err(_) => {
513                return session.update(cx, |session, cx| {
514                    session.respond_to_client(
515                        seq,
516                        false,
517                        RunInTerminal::COMMAND.to_string(),
518                        serde_json::to_value(dap::ErrorResponse {
519                            error: Some(dap::Message {
520                                id: seq,
521                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
522                                variables: None,
523                                send_telemetry: None,
524                                show_user: None,
525                                url: None,
526                                url_label: None,
527                            }),
528                        })
529                        .ok(),
530                        cx,
531                    )
532                })
533            }
534        }
535
536        let mut args = request_args.args.clone();
537
538        // Handle special case for NodeJS debug adapter
539        // If only the Node binary path is provided, we set the command to None
540        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
541        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
542        // This allows the NodeJS debug client to attach correctly
543        let command = if args.len() > 1 {
544            Some(args.remove(0))
545        } else {
546            None
547        };
548
549        let mut envs: HashMap<String, String> = Default::default();
550        if let Some(Value::Object(env)) = request_args.env {
551            for (key, value) in env {
552                let value_str = match (key.as_str(), value) {
553                    (_, Value::String(value)) => value,
554                    _ => continue,
555                };
556
557                envs.insert(key, value_str);
558            }
559        }
560
561        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
562
563        cx.emit(DapStoreEvent::RunInTerminal {
564            session_id,
565            title: request_args.title,
566            cwd,
567            command,
568            args,
569            envs,
570            sender: tx,
571        });
572        cx.notify();
573
574        let session = session.downgrade();
575        cx.spawn(|_, mut cx| async move {
576            let (success, body) = match rx.next().await {
577                Some(Ok(pid)) => (
578                    true,
579                    serde_json::to_value(dap::RunInTerminalResponse {
580                        process_id: None,
581                        shell_process_id: Some(pid as u64),
582                    })
583                    .ok(),
584                ),
585                Some(Err(error)) => (
586                    false,
587                    serde_json::to_value(dap::ErrorResponse {
588                        error: Some(dap::Message {
589                            id: seq,
590                            format: error.to_string(),
591                            variables: None,
592                            send_telemetry: None,
593                            show_user: None,
594                            url: None,
595                            url_label: None,
596                        }),
597                    })
598                    .ok(),
599                ),
600                None => (
601                    false,
602                    serde_json::to_value(dap::ErrorResponse {
603                        error: Some(dap::Message {
604                            id: seq,
605                            format: "failed to receive response from spawn terminal".to_string(),
606                            variables: None,
607                            send_telemetry: None,
608                            show_user: None,
609                            url: None,
610                            url_label: None,
611                        }),
612                    })
613                    .ok(),
614                ),
615            };
616
617            session
618                .update(&mut cx, |session, cx| {
619                    session.respond_to_client(
620                        seq,
621                        success,
622                        RunInTerminal::COMMAND.to_string(),
623                        body,
624                        cx,
625                    )
626                })?
627                .await
628        })
629    }
630
631    pub fn evaluate(
632        &self,
633        session_id: &SessionId,
634        stack_frame_id: u64,
635        expression: String,
636        context: EvaluateArgumentsContext,
637        source: Option<Source>,
638        cx: &mut Context<Self>,
639    ) -> Task<Result<EvaluateResponse>> {
640        let Some(client) = self
641            .session_by_id(session_id)
642            .and_then(|client| client.read(cx).adapter_client())
643        else {
644            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
645        };
646
647        cx.background_executor().spawn(async move {
648            client
649                .request::<Evaluate>(EvaluateArguments {
650                    expression: expression.clone(),
651                    frame_id: Some(stack_frame_id),
652                    context: Some(context),
653                    format: None,
654                    line: None,
655                    column: None,
656                    source,
657                })
658                .await
659        })
660    }
661
662    pub fn completions(
663        &self,
664        session_id: &SessionId,
665        stack_frame_id: u64,
666        text: String,
667        completion_column: u64,
668        cx: &mut Context<Self>,
669    ) -> Task<Result<Vec<CompletionItem>>> {
670        let Some(client) = self
671            .session_by_id(session_id)
672            .and_then(|client| client.read(cx).adapter_client())
673        else {
674            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
675        };
676
677        cx.background_executor().spawn(async move {
678            Ok(client
679                .request::<Completions>(CompletionsArguments {
680                    frame_id: Some(stack_frame_id),
681                    line: None,
682                    text,
683                    column: completion_column,
684                })
685                .await?
686                .targets)
687        })
688    }
689
690    #[allow(clippy::too_many_arguments)]
691    pub fn set_variable_value(
692        &self,
693        session_id: &SessionId,
694        stack_frame_id: u64,
695        variables_reference: u64,
696        name: String,
697        value: String,
698        evaluate_name: Option<String>,
699        cx: &mut Context<Self>,
700    ) -> Task<Result<()>> {
701        let Some(client) = self
702            .session_by_id(session_id)
703            .and_then(|client| client.read(cx).adapter_client())
704        else {
705            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
706        };
707
708        let supports_set_expression = self
709            .capabilities_by_id(session_id, cx)
710            .and_then(|caps| caps.supports_set_expression)
711            .unwrap_or_default();
712
713        cx.background_executor().spawn(async move {
714            if let Some(evaluate_name) = supports_set_expression.then(|| evaluate_name).flatten() {
715                client
716                    .request::<SetExpression>(SetExpressionArguments {
717                        expression: evaluate_name,
718                        value,
719                        frame_id: Some(stack_frame_id),
720                        format: None,
721                    })
722                    .await?;
723            } else {
724                client
725                    .request::<SetVariable>(SetVariableArguments {
726                        variables_reference,
727                        name,
728                        value,
729                        format: None,
730                    })
731                    .await?;
732            }
733
734            Ok(())
735        })
736    }
737
738    // .. get the client and what not
739    // let _ = client.modules(); // This can fire a request to a dap adapter or be a cheap getter.
740    // client.wait_for_request(request::Modules); // This ensures that the request that we've fired off runs to completions
741    // let returned_value = client.modules(); // this is a cheap getter.
742
743    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
744        let mut tasks = vec![];
745        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
746            tasks.push(self.shutdown_session(session_id, cx));
747        }
748
749        cx.background_executor().spawn(async move {
750            futures::future::join_all(tasks).await;
751        })
752    }
753
754    pub fn shutdown_session(
755        &mut self,
756        session_id: SessionId,
757        cx: &mut Context<Self>,
758    ) -> Task<Result<()>> {
759        let Some(_) = self.as_local_mut() else {
760            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
761        };
762
763        let Some(session) = self.sessions.remove(&session_id) else {
764            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
765        };
766
767        let shutdown_parent_task = session
768            .read(cx)
769            .parent_id()
770            .map(|parent_id| self.shutdown_session(parent_id, cx));
771        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
772
773        cx.background_spawn(async move {
774            shutdown_task.await;
775
776            if let Some(parent_task) = shutdown_parent_task {
777                parent_task.await?;
778            }
779
780            Ok(())
781        })
782    }
783
784    pub fn shared(
785        &mut self,
786        project_id: u64,
787        downstream_client: AnyProtoClient,
788        _: &mut Context<Self>,
789    ) {
790        self.downstream_client = Some((downstream_client.clone(), project_id));
791    }
792
793    pub fn unshared(&mut self, cx: &mut Context<Self>) {
794        self.downstream_client.take();
795
796        cx.notify();
797    }
798}
799
800#[derive(Clone)]
801pub struct DapAdapterDelegate {
802    fs: Arc<dyn Fs>,
803    worktree_id: WorktreeId,
804    node_runtime: NodeRuntime,
805    http_client: Arc<dyn HttpClient>,
806    language_registry: Arc<LanguageRegistry>,
807    toolchain_store: Arc<dyn LanguageToolchainStore>,
808    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
809    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
810}
811
812impl DapAdapterDelegate {
813    pub fn new(
814        fs: Arc<dyn Fs>,
815        worktree_id: WorktreeId,
816        node_runtime: NodeRuntime,
817        http_client: Arc<dyn HttpClient>,
818        language_registry: Arc<LanguageRegistry>,
819        toolchain_store: Arc<dyn LanguageToolchainStore>,
820        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
821    ) -> Self {
822        Self {
823            fs,
824            worktree_id,
825            http_client,
826            node_runtime,
827            toolchain_store,
828            language_registry,
829            load_shell_env_task,
830            updated_adapters: Default::default(),
831        }
832    }
833}
834
835#[async_trait(?Send)]
836impl dap::adapters::DapDelegate for DapAdapterDelegate {
837    fn worktree_id(&self) -> WorktreeId {
838        self.worktree_id
839    }
840
841    fn http_client(&self) -> Arc<dyn HttpClient> {
842        self.http_client.clone()
843    }
844
845    fn node_runtime(&self) -> NodeRuntime {
846        self.node_runtime.clone()
847    }
848
849    fn fs(&self) -> Arc<dyn Fs> {
850        self.fs.clone()
851    }
852
853    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
854        self.updated_adapters.clone()
855    }
856
857    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
858        let name = SharedString::from(dap_name.to_string());
859        let status = match status {
860            DapStatus::None => BinaryStatus::None,
861            DapStatus::Downloading => BinaryStatus::Downloading,
862            DapStatus::Failed { error } => BinaryStatus::Failed { error },
863            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
864        };
865
866        self.language_registry
867            .update_dap_status(LanguageServerName(name), status);
868    }
869
870    fn which(&self, command: &OsStr) -> Option<PathBuf> {
871        which::which(command).ok()
872    }
873
874    async fn shell_env(&self) -> HashMap<String, String> {
875        let task = self.load_shell_env_task.clone();
876        task.await.unwrap_or_default()
877    }
878
879    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
880        self.toolchain_store.clone()
881    }
882}