dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    locator_store::LocatorStore,
  4    session::{self, Session, SessionStateEvent},
  5};
  6use crate::{ProjectEnvironment, debugger};
  7use anyhow::{Result, anyhow};
  8use async_trait::async_trait;
  9use collections::HashMap;
 10use dap::{
 11    Capabilities, CompletionItem, CompletionsArguments, ErrorResponse, EvaluateArguments,
 12    EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments, Source,
 13    StartDebuggingRequestArguments,
 14    adapters::{DapStatus, DebugAdapterBinary, DebugAdapterName},
 15    client::SessionId,
 16    messages::Message,
 17    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 18};
 19use fs::Fs;
 20use futures::{
 21    channel::{mpsc, oneshot},
 22    future::{Shared, join_all},
 23};
 24use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 25use http_client::HttpClient;
 26use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 27use lsp::LanguageServerName;
 28use node_runtime::NodeRuntime;
 29
 30use rpc::{
 31    AnyProtoClient, TypedEnvelope,
 32    proto::{self},
 33};
 34use serde_json::Value;
 35use settings::WorktreeId;
 36use smol::{lock::Mutex, stream::StreamExt};
 37use std::{
 38    borrow::Borrow,
 39    collections::{BTreeMap, HashSet},
 40    ffi::OsStr,
 41    path::{Path, PathBuf},
 42    sync::{Arc, atomic::Ordering::SeqCst},
 43};
 44use std::{collections::VecDeque, sync::atomic::AtomicU32};
 45use task::DebugTaskDefinition;
 46use util::ResultExt as _;
 47use worktree::Worktree;
 48
 49pub enum DapStoreEvent {
 50    DebugClientStarted(SessionId),
 51    DebugSessionInitialized(SessionId),
 52    DebugClientShutdown(SessionId),
 53    DebugClientEvent {
 54        session_id: SessionId,
 55        message: Message,
 56    },
 57    RunInTerminal {
 58        session_id: SessionId,
 59        title: Option<String>,
 60        cwd: Option<Arc<Path>>,
 61        command: Option<String>,
 62        args: Vec<String>,
 63        envs: HashMap<String, String>,
 64        sender: mpsc::Sender<Result<u32>>,
 65    },
 66    Notification(String),
 67    RemoteHasInitialized,
 68}
 69
 70#[allow(clippy::large_enum_variant)]
 71pub enum DapStoreMode {
 72    Local(LocalDapStore),   // ssh host and collab host
 73    Remote(RemoteDapStore), // collab guest
 74}
 75
 76pub struct LocalDapStore {
 77    fs: Arc<dyn Fs>,
 78    node_runtime: NodeRuntime,
 79    next_session_id: AtomicU32,
 80    http_client: Arc<dyn HttpClient>,
 81    environment: Entity<ProjectEnvironment>,
 82    language_registry: Arc<LanguageRegistry>,
 83    toolchain_store: Arc<dyn LanguageToolchainStore>,
 84    locator_store: Arc<LocatorStore>,
 85    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 86    _start_debugging_task: Task<()>,
 87}
 88
 89impl LocalDapStore {
 90    fn next_session_id(&self) -> SessionId {
 91        SessionId(self.next_session_id.fetch_add(1, SeqCst))
 92    }
 93}
 94
 95pub struct RemoteDapStore {
 96    upstream_client: AnyProtoClient,
 97    upstream_project_id: u64,
 98    event_queue: Option<VecDeque<DapStoreEvent>>,
 99}
100
101pub struct DapStore {
102    mode: DapStoreMode,
103    downstream_client: Option<(AnyProtoClient, u64)>,
104    breakpoint_store: Entity<BreakpointStore>,
105    sessions: BTreeMap<SessionId, Entity<Session>>,
106}
107
108impl EventEmitter<DapStoreEvent> for DapStore {}
109
110impl DapStore {
111    pub fn init(_client: &AnyProtoClient) {
112        // todo(debugger): Reenable these after we finish handle_dap_command refactor
113        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
114        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
115        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
116        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
117        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
118        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
119        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
120        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
121        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
125    }
126
127    #[expect(clippy::too_many_arguments)]
128    pub fn new_local(
129        http_client: Arc<dyn HttpClient>,
130        node_runtime: NodeRuntime,
131        fs: Arc<dyn Fs>,
132        language_registry: Arc<LanguageRegistry>,
133        environment: Entity<ProjectEnvironment>,
134        toolchain_store: Arc<dyn LanguageToolchainStore>,
135        breakpoint_store: Entity<BreakpointStore>,
136        cx: &mut Context<Self>,
137    ) -> Self {
138        cx.on_app_quit(Self::shutdown_sessions).detach();
139
140        let (start_debugging_tx, mut message_rx) =
141            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
142
143        let _start_debugging_task = cx.spawn(async move |this, cx| {
144            while let Some((session_id, message)) = message_rx.next().await {
145                match message {
146                    Message::Request(request) => {
147                        let _ = this
148                            .update(cx, |this, cx| {
149                                if request.command == StartDebugging::COMMAND {
150                                    this.handle_start_debugging_request(session_id, request, cx)
151                                        .detach_and_log_err(cx);
152                                } else if request.command == RunInTerminal::COMMAND {
153                                    this.handle_run_in_terminal_request(session_id, request, cx)
154                                        .detach_and_log_err(cx);
155                                }
156                            })
157                            .log_err();
158                    }
159                    _ => {}
160                }
161            }
162        });
163        Self {
164            mode: DapStoreMode::Local(LocalDapStore {
165                fs,
166                environment,
167                http_client,
168                node_runtime,
169                toolchain_store,
170                language_registry,
171                start_debugging_tx,
172                _start_debugging_task,
173                locator_store: Arc::from(LocatorStore::new()),
174                next_session_id: Default::default(),
175            }),
176            downstream_client: None,
177            breakpoint_store,
178            sessions: Default::default(),
179        }
180    }
181
182    pub fn new_remote(
183        project_id: u64,
184        upstream_client: AnyProtoClient,
185        breakpoint_store: Entity<BreakpointStore>,
186    ) -> Self {
187        Self {
188            mode: DapStoreMode::Remote(RemoteDapStore {
189                upstream_client,
190                upstream_project_id: project_id,
191                event_queue: Some(VecDeque::default()),
192            }),
193            downstream_client: None,
194            breakpoint_store,
195            sessions: Default::default(),
196        }
197    }
198
199    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
200        match &self.mode {
201            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
202            _ => None,
203        }
204    }
205
206    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
207        if let DapStoreMode::Remote(remote) = &mut self.mode {
208            remote.event_queue.take()
209        } else {
210            None
211        }
212    }
213
214    pub fn as_local(&self) -> Option<&LocalDapStore> {
215        match &self.mode {
216            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
217            _ => None,
218        }
219    }
220
221    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
222        match &mut self.mode {
223            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
224            _ => None,
225        }
226    }
227
228    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
229        match &self.mode {
230            DapStoreMode::Remote(RemoteDapStore {
231                upstream_client,
232                upstream_project_id,
233                ..
234            }) => Some((upstream_client.clone(), *upstream_project_id)),
235
236            DapStoreMode::Local(_) => None,
237        }
238    }
239
240    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
241        self.downstream_client.as_ref()
242    }
243
244    pub fn add_remote_client(
245        &mut self,
246        session_id: SessionId,
247        ignore: Option<bool>,
248        cx: &mut Context<Self>,
249    ) {
250        if let DapStoreMode::Remote(remote) = &self.mode {
251            self.sessions.insert(
252                session_id,
253                cx.new(|_| {
254                    debugger::session::Session::remote(
255                        session_id,
256                        remote.upstream_client.clone(),
257                        remote.upstream_project_id,
258                        ignore.unwrap_or(false),
259                    )
260                }),
261            );
262        } else {
263            debug_assert!(false);
264        }
265    }
266
267    pub fn session_by_id(
268        &self,
269        session_id: impl Borrow<SessionId>,
270    ) -> Option<Entity<session::Session>> {
271        let session_id = session_id.borrow();
272        let client = self.sessions.get(session_id).cloned();
273
274        client
275    }
276    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
277        self.sessions.values()
278    }
279
280    pub fn capabilities_by_id(
281        &self,
282        session_id: impl Borrow<SessionId>,
283        cx: &App,
284    ) -> Option<Capabilities> {
285        let session_id = session_id.borrow();
286        self.sessions
287            .get(session_id)
288            .map(|client| client.read(cx).capabilities.clone())
289    }
290
291    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
292        &self.breakpoint_store
293    }
294
295    #[allow(dead_code)]
296    async fn handle_ignore_breakpoint_state(
297        this: Entity<Self>,
298        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
299        mut cx: AsyncApp,
300    ) -> Result<()> {
301        let session_id = SessionId::from_proto(envelope.payload.session_id);
302
303        this.update(&mut cx, |this, cx| {
304            if let Some(session) = this.session_by_id(&session_id) {
305                session.update(cx, |session, cx| {
306                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
307                })
308            } else {
309                Task::ready(())
310            }
311        })?
312        .await;
313
314        Ok(())
315    }
316
317    pub fn delegate(&self, worktree: &Entity<Worktree>, cx: &mut App) -> DapAdapterDelegate {
318        let Some(local_store) = self.as_local() else {
319            unimplemented!("Starting session on remote side");
320        };
321
322        DapAdapterDelegate::new(
323            local_store.fs.clone(),
324            worktree.read(cx).id(),
325            local_store.node_runtime.clone(),
326            local_store.http_client.clone(),
327            local_store.language_registry.clone(),
328            local_store.toolchain_store.clone(),
329            local_store.environment.update(cx, |env, cx| {
330                env.get_worktree_environment(worktree.clone(), cx)
331            }),
332        )
333    }
334
335    pub fn new_session(
336        &mut self,
337        binary: DebugAdapterBinary,
338        mut config: DebugTaskDefinition,
339        parent_session: Option<Entity<Session>>,
340        cx: &mut Context<Self>,
341    ) -> (SessionId, Task<Result<Entity<Session>>>) {
342        let Some(local_store) = self.as_local() else {
343            unimplemented!("Starting session on remote side");
344        };
345
346        let session_id = local_store.next_session_id();
347
348        if let Some(session) = &parent_session {
349            session.update(cx, |session, _| {
350                session.add_child_session_id(session_id);
351            });
352        }
353
354        let (initialized_tx, initialized_rx) = oneshot::channel();
355        let locator_store = local_store.locator_store.clone();
356
357        let start_debugging_tx = local_store.start_debugging_tx.clone();
358
359        let task = cx.spawn(async move |this, cx| {
360            if config.locator.is_some() {
361                config = cx
362                    .background_spawn(async move {
363                        locator_store
364                            .resolve_debug_config(&mut config)
365                            .await
366                            .map(|_| config)
367                    })
368                    .await?;
369            }
370
371            let start_client_task = this.update(cx, |this, cx| {
372                Session::local(
373                    this.breakpoint_store.clone(),
374                    session_id,
375                    parent_session,
376                    binary,
377                    config,
378                    start_debugging_tx.clone(),
379                    initialized_tx,
380                    cx,
381                )
382            })?;
383
384            let ret = this
385                .update(cx, |_, cx| {
386                    create_new_session(session_id, initialized_rx, start_client_task, cx)
387                })?
388                .await;
389            ret
390        });
391
392        (session_id, task)
393    }
394
395    fn handle_start_debugging_request(
396        &mut self,
397        session_id: SessionId,
398        request: dap::messages::Request,
399        cx: &mut Context<Self>,
400    ) -> Task<Result<()>> {
401        let Some(parent_session) = self.session_by_id(session_id) else {
402            return Task::ready(Err(anyhow!("Session not found")));
403        };
404
405        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
406            request.arguments.unwrap_or_default(),
407        )
408        .expect("To parse StartDebuggingRequestArguments");
409        let mut binary = parent_session.read(cx).binary().clone();
410        let config = parent_session.read(cx).configuration().unwrap().clone();
411        binary.request_args = args;
412
413        let new_session_task = self
414            .new_session(binary, config, Some(parent_session.clone()), cx)
415            .1;
416
417        let request_seq = request.seq;
418        cx.spawn(async move |_, cx| {
419            let (success, body) = match new_session_task.await {
420                Ok(_) => (true, None),
421                Err(error) => (
422                    false,
423                    Some(serde_json::to_value(ErrorResponse {
424                        error: Some(dap::Message {
425                            id: request_seq,
426                            format: error.to_string(),
427                            variables: None,
428                            send_telemetry: None,
429                            show_user: None,
430                            url: None,
431                            url_label: None,
432                        }),
433                    })?),
434                ),
435            };
436
437            parent_session
438                .update(cx, |session, cx| {
439                    session.respond_to_client(
440                        request_seq,
441                        success,
442                        StartDebugging::COMMAND.to_string(),
443                        body,
444                        cx,
445                    )
446                })?
447                .await
448        })
449    }
450
451    fn handle_run_in_terminal_request(
452        &mut self,
453        session_id: SessionId,
454        request: dap::messages::Request,
455        cx: &mut Context<Self>,
456    ) -> Task<Result<()>> {
457        let Some(session) = self.session_by_id(session_id) else {
458            return Task::ready(Err(anyhow!("Session not found")));
459        };
460
461        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
462            request.arguments.unwrap_or_default(),
463        )
464        .expect("To parse StartDebuggingRequestArguments");
465
466        let seq = request.seq;
467
468        let cwd = Path::new(&request_args.cwd);
469
470        match cwd.try_exists() {
471            Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
472                return session.update(cx, |session, cx| {
473                    session.respond_to_client(
474                        seq,
475                        false,
476                        RunInTerminal::COMMAND.to_string(),
477                        serde_json::to_value(dap::ErrorResponse {
478                            error: Some(dap::Message {
479                                id: seq,
480                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
481                                variables: None,
482                                send_telemetry: None,
483                                show_user: None,
484                                url: None,
485                                url_label: None,
486                            }),
487                        })
488                        .ok(),
489                        cx,
490                    )
491                });
492            }
493            _ => (),
494        }
495        let mut args = request_args.args.clone();
496
497        // Handle special case for NodeJS debug adapter
498        // If only the Node binary path is provided, we set the command to None
499        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
500        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
501        // This allows the NodeJS debug client to attach correctly
502        let command = if args.len() > 1 {
503            Some(args.remove(0))
504        } else {
505            None
506        };
507
508        let mut envs: HashMap<String, String> = Default::default();
509        if let Some(Value::Object(env)) = request_args.env {
510            for (key, value) in env {
511                let value_str = match (key.as_str(), value) {
512                    (_, Value::String(value)) => value,
513                    _ => continue,
514                };
515
516                envs.insert(key, value_str);
517            }
518        }
519
520        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
521        let cwd = Some(cwd)
522            .filter(|cwd| cwd.as_os_str().len() > 0)
523            .map(Arc::from)
524            .or_else(|| {
525                self.session_by_id(session_id)
526                    .and_then(|session| session.read(cx).binary().cwd.as_deref().map(Arc::from))
527            });
528        cx.emit(DapStoreEvent::RunInTerminal {
529            session_id,
530            title: request_args.title,
531            cwd,
532            command,
533            args,
534            envs,
535            sender: tx,
536        });
537        cx.notify();
538
539        let session = session.downgrade();
540        cx.spawn(async move |_, cx| {
541            let (success, body) = match rx.next().await {
542                Some(Ok(pid)) => (
543                    true,
544                    serde_json::to_value(dap::RunInTerminalResponse {
545                        process_id: None,
546                        shell_process_id: Some(pid as u64),
547                    })
548                    .ok(),
549                ),
550                Some(Err(error)) => (
551                    false,
552                    serde_json::to_value(dap::ErrorResponse {
553                        error: Some(dap::Message {
554                            id: seq,
555                            format: error.to_string(),
556                            variables: None,
557                            send_telemetry: None,
558                            show_user: None,
559                            url: None,
560                            url_label: None,
561                        }),
562                    })
563                    .ok(),
564                ),
565                None => (
566                    false,
567                    serde_json::to_value(dap::ErrorResponse {
568                        error: Some(dap::Message {
569                            id: seq,
570                            format: "failed to receive response from spawn terminal".to_string(),
571                            variables: None,
572                            send_telemetry: None,
573                            show_user: None,
574                            url: None,
575                            url_label: None,
576                        }),
577                    })
578                    .ok(),
579                ),
580            };
581
582            session
583                .update(cx, |session, cx| {
584                    session.respond_to_client(
585                        seq,
586                        success,
587                        RunInTerminal::COMMAND.to_string(),
588                        body,
589                        cx,
590                    )
591                })?
592                .await
593        })
594    }
595
596    pub fn evaluate(
597        &self,
598        session_id: &SessionId,
599        stack_frame_id: u64,
600        expression: String,
601        context: EvaluateArgumentsContext,
602        source: Option<Source>,
603        cx: &mut Context<Self>,
604    ) -> Task<Result<EvaluateResponse>> {
605        let Some(client) = self
606            .session_by_id(session_id)
607            .and_then(|client| client.read(cx).adapter_client())
608        else {
609            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
610        };
611
612        cx.background_executor().spawn(async move {
613            client
614                .request::<Evaluate>(EvaluateArguments {
615                    expression: expression.clone(),
616                    frame_id: Some(stack_frame_id),
617                    context: Some(context),
618                    format: None,
619                    line: None,
620                    column: None,
621                    source,
622                })
623                .await
624        })
625    }
626
627    pub fn completions(
628        &self,
629        session_id: &SessionId,
630        stack_frame_id: u64,
631        text: String,
632        completion_column: u64,
633        cx: &mut Context<Self>,
634    ) -> Task<Result<Vec<CompletionItem>>> {
635        let Some(client) = self
636            .session_by_id(session_id)
637            .and_then(|client| client.read(cx).adapter_client())
638        else {
639            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
640        };
641
642        cx.background_executor().spawn(async move {
643            Ok(client
644                .request::<Completions>(CompletionsArguments {
645                    frame_id: Some(stack_frame_id),
646                    line: None,
647                    text,
648                    column: completion_column,
649                })
650                .await?
651                .targets)
652        })
653    }
654
655    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
656        let mut tasks = vec![];
657        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
658            tasks.push(self.shutdown_session(session_id, cx));
659        }
660
661        cx.background_executor().spawn(async move {
662            futures::future::join_all(tasks).await;
663        })
664    }
665
666    pub fn shutdown_session(
667        &mut self,
668        session_id: SessionId,
669        cx: &mut Context<Self>,
670    ) -> Task<Result<()>> {
671        let Some(_) = self.as_local_mut() else {
672            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
673        };
674
675        let Some(session) = self.sessions.remove(&session_id) else {
676            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
677        };
678
679        let shutdown_children = session
680            .read(cx)
681            .child_session_ids()
682            .iter()
683            .map(|session_id| self.shutdown_session(*session_id, cx))
684            .collect::<Vec<_>>();
685
686        let shutdown_parent_task = if let Some(parent_session) = session
687            .read(cx)
688            .parent_id()
689            .and_then(|session_id| self.session_by_id(session_id))
690        {
691            let shutdown_id = parent_session.update(cx, |parent_session, _| {
692                parent_session.remove_child_session_id(session_id);
693
694                if parent_session.child_session_ids().len() == 0 {
695                    Some(parent_session.session_id())
696                } else {
697                    None
698                }
699            });
700
701            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
702        } else {
703            None
704        };
705
706        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
707
708        cx.background_spawn(async move {
709            if shutdown_children.len() > 0 {
710                let _ = join_all(shutdown_children).await;
711            }
712
713            shutdown_task.await;
714
715            if let Some(parent_task) = shutdown_parent_task {
716                parent_task.await?;
717            }
718
719            Ok(())
720        })
721    }
722
723    pub fn shared(
724        &mut self,
725        project_id: u64,
726        downstream_client: AnyProtoClient,
727        _: &mut Context<Self>,
728    ) {
729        self.downstream_client = Some((downstream_client.clone(), project_id));
730    }
731
732    pub fn unshared(&mut self, cx: &mut Context<Self>) {
733        self.downstream_client.take();
734
735        cx.notify();
736    }
737}
738
739fn create_new_session(
740    session_id: SessionId,
741    initialized_rx: oneshot::Receiver<()>,
742    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
743    cx: &mut Context<DapStore>,
744) -> Task<Result<Entity<Session>>> {
745    let task = cx.spawn(async move |this, cx| {
746        let session = match start_client_task.await {
747            Ok(session) => session,
748            Err(error) => {
749                this.update(cx, |_, cx| {
750                    cx.emit(DapStoreEvent::Notification(error.to_string()));
751                })
752                .log_err();
753
754                return Err(error);
755            }
756        };
757
758        // we have to insert the session early, so we can handle reverse requests
759        // that need the session to be available
760        this.update(cx, |store, cx| {
761            store.sessions.insert(session_id, session.clone());
762            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
763            cx.notify();
764        })?;
765        let seq_result = async || {
766            session
767                .update(cx, |session, cx| session.request_initialize(cx))?
768                .await?;
769
770            session
771                .update(cx, |session, cx| {
772                    session.initialize_sequence(initialized_rx, cx)
773                })?
774                .await
775        };
776        match seq_result().await {
777            Ok(_) => {}
778            Err(error) => {
779                this.update(cx, |this, cx| {
780                    cx.emit(DapStoreEvent::Notification(error.to_string()));
781                    this.shutdown_session(session_id, cx)
782                })?
783                .await
784                .log_err();
785
786                return Err(error);
787            }
788        }
789
790        this.update(cx, |_, cx| {
791            cx.subscribe(
792                &session,
793                move |this: &mut DapStore, _, event: &SessionStateEvent, cx| match event {
794                    SessionStateEvent::Shutdown => {
795                        this.shutdown_session(session_id, cx).detach_and_log_err(cx);
796                    }
797                },
798            )
799            .detach();
800            cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
801        })?;
802
803        Ok(session)
804    });
805    task
806}
807
808#[derive(Clone)]
809pub struct DapAdapterDelegate {
810    fs: Arc<dyn Fs>,
811    worktree_id: WorktreeId,
812    node_runtime: NodeRuntime,
813    http_client: Arc<dyn HttpClient>,
814    language_registry: Arc<LanguageRegistry>,
815    toolchain_store: Arc<dyn LanguageToolchainStore>,
816    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
817    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
818}
819
820impl DapAdapterDelegate {
821    pub fn new(
822        fs: Arc<dyn Fs>,
823        worktree_id: WorktreeId,
824        node_runtime: NodeRuntime,
825        http_client: Arc<dyn HttpClient>,
826        language_registry: Arc<LanguageRegistry>,
827        toolchain_store: Arc<dyn LanguageToolchainStore>,
828        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
829    ) -> Self {
830        Self {
831            fs,
832            worktree_id,
833            http_client,
834            node_runtime,
835            toolchain_store,
836            language_registry,
837            load_shell_env_task,
838            updated_adapters: Default::default(),
839        }
840    }
841}
842
843#[async_trait(?Send)]
844impl dap::adapters::DapDelegate for DapAdapterDelegate {
845    fn worktree_id(&self) -> WorktreeId {
846        self.worktree_id
847    }
848
849    fn http_client(&self) -> Arc<dyn HttpClient> {
850        self.http_client.clone()
851    }
852
853    fn node_runtime(&self) -> NodeRuntime {
854        self.node_runtime.clone()
855    }
856
857    fn fs(&self) -> Arc<dyn Fs> {
858        self.fs.clone()
859    }
860
861    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
862        self.updated_adapters.clone()
863    }
864
865    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
866        let name = SharedString::from(dap_name.to_string());
867        let status = match status {
868            DapStatus::None => BinaryStatus::None,
869            DapStatus::Downloading => BinaryStatus::Downloading,
870            DapStatus::Failed { error } => BinaryStatus::Failed { error },
871            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
872        };
873
874        self.language_registry
875            .update_dap_status(LanguageServerName(name), status);
876    }
877
878    fn which(&self, command: &OsStr) -> Option<PathBuf> {
879        which::which(command).ok()
880    }
881
882    async fn shell_env(&self) -> HashMap<String, String> {
883        let task = self.load_shell_env_task.clone();
884        task.await.unwrap_or_default()
885    }
886
887    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
888        self.toolchain_store.clone()
889    }
890}