dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    locator_store::LocatorStore,
  4    session::{self, Session, SessionStateEvent},
  5};
  6use crate::{ProjectEnvironment, debugger};
  7use anyhow::{Result, anyhow};
  8use async_trait::async_trait;
  9use collections::HashMap;
 10use dap::{
 11    Capabilities, CompletionItem, CompletionsArguments, ErrorResponse, EvaluateArguments,
 12    EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments, Source,
 13    StartDebuggingRequestArguments,
 14    adapters::{DapStatus, DebugAdapterBinary, DebugAdapterName},
 15    client::SessionId,
 16    messages::Message,
 17    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 18};
 19use fs::Fs;
 20use futures::{
 21    channel::{mpsc, oneshot},
 22    future::{Shared, join_all},
 23};
 24use gpui::{
 25    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
 26    Task,
 27};
 28use http_client::HttpClient;
 29use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 30use lsp::LanguageServerName;
 31use node_runtime::NodeRuntime;
 32
 33use rpc::{
 34    AnyProtoClient, TypedEnvelope,
 35    proto::{self},
 36};
 37use serde_json::Value;
 38use settings::WorktreeId;
 39use smol::{lock::Mutex, stream::StreamExt};
 40use std::{
 41    borrow::Borrow,
 42    collections::{BTreeMap, HashSet},
 43    ffi::OsStr,
 44    path::{Path, PathBuf},
 45    sync::{Arc, atomic::Ordering::SeqCst},
 46};
 47use std::{collections::VecDeque, sync::atomic::AtomicU32};
 48use task::DebugTaskDefinition;
 49use util::ResultExt as _;
 50use worktree::Worktree;
 51
 52pub enum DapStoreEvent {
 53    DebugClientStarted(SessionId),
 54    DebugSessionInitialized(SessionId),
 55    DebugClientShutdown(SessionId),
 56    DebugClientEvent {
 57        session_id: SessionId,
 58        message: Message,
 59    },
 60    RunInTerminal {
 61        session_id: SessionId,
 62        title: Option<String>,
 63        cwd: Option<Arc<Path>>,
 64        command: Option<String>,
 65        args: Vec<String>,
 66        envs: HashMap<String, String>,
 67        sender: mpsc::Sender<Result<u32>>,
 68    },
 69    Notification(String),
 70    RemoteHasInitialized,
 71}
 72
 73#[allow(clippy::large_enum_variant)]
 74pub enum DapStoreMode {
 75    Local(LocalDapStore),   // ssh host and collab host
 76    Remote(RemoteDapStore), // collab guest
 77}
 78
 79pub struct LocalDapStore {
 80    fs: Arc<dyn Fs>,
 81    node_runtime: NodeRuntime,
 82    next_session_id: AtomicU32,
 83    http_client: Arc<dyn HttpClient>,
 84    environment: Entity<ProjectEnvironment>,
 85    language_registry: Arc<LanguageRegistry>,
 86    toolchain_store: Arc<dyn LanguageToolchainStore>,
 87    locator_store: Arc<LocatorStore>,
 88    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 89    _start_debugging_task: Task<()>,
 90}
 91
 92impl LocalDapStore {
 93    fn next_session_id(&self) -> SessionId {
 94        SessionId(self.next_session_id.fetch_add(1, SeqCst))
 95    }
 96    pub(crate) fn locate_binary(
 97        &self,
 98        mut definition: DebugTaskDefinition,
 99        executor: BackgroundExecutor,
100    ) -> Task<DebugTaskDefinition> {
101        let locator_store = self.locator_store.clone();
102        executor.spawn(async move {
103            let _ = locator_store.resolve_debug_config(&mut definition).await;
104            definition
105        })
106    }
107}
108
109pub struct RemoteDapStore {
110    upstream_client: AnyProtoClient,
111    upstream_project_id: u64,
112    event_queue: Option<VecDeque<DapStoreEvent>>,
113}
114
115pub struct DapStore {
116    mode: DapStoreMode,
117    downstream_client: Option<(AnyProtoClient, u64)>,
118    breakpoint_store: Entity<BreakpointStore>,
119    sessions: BTreeMap<SessionId, Entity<Session>>,
120}
121
122impl EventEmitter<DapStoreEvent> for DapStore {}
123
124impl DapStore {
125    pub fn init(_client: &AnyProtoClient) {
126        // todo(debugger): Reenable these after we finish handle_dap_command refactor
127        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
128        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
129        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
130        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
131        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
132        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
133        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
134        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
135        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
136        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
137        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
138        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
139    }
140
141    #[expect(clippy::too_many_arguments)]
142    pub fn new_local(
143        http_client: Arc<dyn HttpClient>,
144        node_runtime: NodeRuntime,
145        fs: Arc<dyn Fs>,
146        language_registry: Arc<LanguageRegistry>,
147        environment: Entity<ProjectEnvironment>,
148        toolchain_store: Arc<dyn LanguageToolchainStore>,
149        breakpoint_store: Entity<BreakpointStore>,
150        cx: &mut Context<Self>,
151    ) -> Self {
152        cx.on_app_quit(Self::shutdown_sessions).detach();
153
154        let (start_debugging_tx, mut message_rx) =
155            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
156
157        let _start_debugging_task = cx.spawn(async move |this, cx| {
158            while let Some((session_id, message)) = message_rx.next().await {
159                match message {
160                    Message::Request(request) => {
161                        let _ = this
162                            .update(cx, |this, cx| {
163                                if request.command == StartDebugging::COMMAND {
164                                    this.handle_start_debugging_request(session_id, request, cx)
165                                        .detach_and_log_err(cx);
166                                } else if request.command == RunInTerminal::COMMAND {
167                                    this.handle_run_in_terminal_request(session_id, request, cx)
168                                        .detach_and_log_err(cx);
169                                }
170                            })
171                            .log_err();
172                    }
173                    _ => {}
174                }
175            }
176        });
177        Self {
178            mode: DapStoreMode::Local(LocalDapStore {
179                fs,
180                environment,
181                http_client,
182                node_runtime,
183                toolchain_store,
184                language_registry,
185                start_debugging_tx,
186                _start_debugging_task,
187                locator_store: Arc::from(LocatorStore::new()),
188                next_session_id: Default::default(),
189            }),
190            downstream_client: None,
191            breakpoint_store,
192            sessions: Default::default(),
193        }
194    }
195
196    pub fn new_remote(
197        project_id: u64,
198        upstream_client: AnyProtoClient,
199        breakpoint_store: Entity<BreakpointStore>,
200    ) -> Self {
201        Self {
202            mode: DapStoreMode::Remote(RemoteDapStore {
203                upstream_client,
204                upstream_project_id: project_id,
205                event_queue: Some(VecDeque::default()),
206            }),
207            downstream_client: None,
208            breakpoint_store,
209            sessions: Default::default(),
210        }
211    }
212
213    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
214        match &self.mode {
215            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
216            _ => None,
217        }
218    }
219
220    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
221        if let DapStoreMode::Remote(remote) = &mut self.mode {
222            remote.event_queue.take()
223        } else {
224            None
225        }
226    }
227
228    pub fn as_local(&self) -> Option<&LocalDapStore> {
229        match &self.mode {
230            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
231            _ => None,
232        }
233    }
234
235    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
236        match &mut self.mode {
237            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
238            _ => None,
239        }
240    }
241
242    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
243        match &self.mode {
244            DapStoreMode::Remote(RemoteDapStore {
245                upstream_client,
246                upstream_project_id,
247                ..
248            }) => Some((upstream_client.clone(), *upstream_project_id)),
249
250            DapStoreMode::Local(_) => None,
251        }
252    }
253
254    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
255        self.downstream_client.as_ref()
256    }
257
258    pub fn add_remote_client(
259        &mut self,
260        session_id: SessionId,
261        ignore: Option<bool>,
262        cx: &mut Context<Self>,
263    ) {
264        if let DapStoreMode::Remote(remote) = &self.mode {
265            self.sessions.insert(
266                session_id,
267                cx.new(|_| {
268                    debugger::session::Session::remote(
269                        session_id,
270                        remote.upstream_client.clone(),
271                        remote.upstream_project_id,
272                        ignore.unwrap_or(false),
273                    )
274                }),
275            );
276        } else {
277            debug_assert!(false);
278        }
279    }
280
281    pub fn session_by_id(
282        &self,
283        session_id: impl Borrow<SessionId>,
284    ) -> Option<Entity<session::Session>> {
285        let session_id = session_id.borrow();
286        let client = self.sessions.get(session_id).cloned();
287
288        client
289    }
290    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
291        self.sessions.values()
292    }
293
294    pub fn capabilities_by_id(
295        &self,
296        session_id: impl Borrow<SessionId>,
297        cx: &App,
298    ) -> Option<Capabilities> {
299        let session_id = session_id.borrow();
300        self.sessions
301            .get(session_id)
302            .map(|client| client.read(cx).capabilities.clone())
303    }
304
305    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
306        &self.breakpoint_store
307    }
308
309    #[allow(dead_code)]
310    async fn handle_ignore_breakpoint_state(
311        this: Entity<Self>,
312        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
313        mut cx: AsyncApp,
314    ) -> Result<()> {
315        let session_id = SessionId::from_proto(envelope.payload.session_id);
316
317        this.update(&mut cx, |this, cx| {
318            if let Some(session) = this.session_by_id(&session_id) {
319                session.update(cx, |session, cx| {
320                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
321                })
322            } else {
323                Task::ready(())
324            }
325        })?
326        .await;
327
328        Ok(())
329    }
330
331    pub fn delegate(&self, worktree: &Entity<Worktree>, cx: &mut App) -> DapAdapterDelegate {
332        let Some(local_store) = self.as_local() else {
333            unimplemented!("Starting session on remote side");
334        };
335
336        DapAdapterDelegate::new(
337            local_store.fs.clone(),
338            worktree.read(cx).id(),
339            local_store.node_runtime.clone(),
340            local_store.http_client.clone(),
341            local_store.language_registry.clone(),
342            local_store.toolchain_store.clone(),
343            local_store.environment.update(cx, |env, cx| {
344                env.get_worktree_environment(worktree.clone(), cx)
345            }),
346        )
347    }
348
349    pub fn new_session(
350        &mut self,
351        binary: DebugAdapterBinary,
352        config: DebugTaskDefinition,
353        parent_session: Option<Entity<Session>>,
354        cx: &mut Context<Self>,
355    ) -> (SessionId, Task<Result<Entity<Session>>>) {
356        let Some(local_store) = self.as_local() else {
357            unimplemented!("Starting session on remote side");
358        };
359
360        let session_id = local_store.next_session_id();
361
362        if let Some(session) = &parent_session {
363            session.update(cx, |session, _| {
364                session.add_child_session_id(session_id);
365            });
366        }
367
368        let (initialized_tx, initialized_rx) = oneshot::channel();
369
370        let start_debugging_tx = local_store.start_debugging_tx.clone();
371
372        let task = cx.spawn(async move |this, cx| {
373            let start_client_task = this.update(cx, |this, cx| {
374                Session::local(
375                    this.breakpoint_store.clone(),
376                    session_id,
377                    parent_session,
378                    binary,
379                    config,
380                    start_debugging_tx.clone(),
381                    initialized_tx,
382                    cx,
383                )
384            })?;
385
386            let ret = this
387                .update(cx, |_, cx| {
388                    create_new_session(session_id, initialized_rx, start_client_task, cx)
389                })?
390                .await;
391            ret
392        });
393
394        (session_id, task)
395    }
396
397    fn handle_start_debugging_request(
398        &mut self,
399        session_id: SessionId,
400        request: dap::messages::Request,
401        cx: &mut Context<Self>,
402    ) -> Task<Result<()>> {
403        let Some(parent_session) = self.session_by_id(session_id) else {
404            return Task::ready(Err(anyhow!("Session not found")));
405        };
406
407        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
408            request.arguments.unwrap_or_default(),
409        )
410        .expect("To parse StartDebuggingRequestArguments");
411        let mut binary = parent_session.read(cx).binary().clone();
412        let config = parent_session.read(cx).configuration().unwrap().clone();
413        binary.request_args = args;
414
415        let new_session_task = self
416            .new_session(binary, config, Some(parent_session.clone()), cx)
417            .1;
418
419        let request_seq = request.seq;
420        cx.spawn(async move |_, cx| {
421            let (success, body) = match new_session_task.await {
422                Ok(_) => (true, None),
423                Err(error) => (
424                    false,
425                    Some(serde_json::to_value(ErrorResponse {
426                        error: Some(dap::Message {
427                            id: request_seq,
428                            format: error.to_string(),
429                            variables: None,
430                            send_telemetry: None,
431                            show_user: None,
432                            url: None,
433                            url_label: None,
434                        }),
435                    })?),
436                ),
437            };
438
439            parent_session
440                .update(cx, |session, cx| {
441                    session.respond_to_client(
442                        request_seq,
443                        success,
444                        StartDebugging::COMMAND.to_string(),
445                        body,
446                        cx,
447                    )
448                })?
449                .await
450        })
451    }
452
453    fn handle_run_in_terminal_request(
454        &mut self,
455        session_id: SessionId,
456        request: dap::messages::Request,
457        cx: &mut Context<Self>,
458    ) -> Task<Result<()>> {
459        let Some(session) = self.session_by_id(session_id) else {
460            return Task::ready(Err(anyhow!("Session not found")));
461        };
462
463        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
464            request.arguments.unwrap_or_default(),
465        )
466        .expect("To parse StartDebuggingRequestArguments");
467
468        let seq = request.seq;
469
470        let cwd = Path::new(&request_args.cwd);
471
472        match cwd.try_exists() {
473            Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
474                return session.update(cx, |session, cx| {
475                    session.respond_to_client(
476                        seq,
477                        false,
478                        RunInTerminal::COMMAND.to_string(),
479                        serde_json::to_value(dap::ErrorResponse {
480                            error: Some(dap::Message {
481                                id: seq,
482                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
483                                variables: None,
484                                send_telemetry: None,
485                                show_user: None,
486                                url: None,
487                                url_label: None,
488                            }),
489                        })
490                        .ok(),
491                        cx,
492                    )
493                });
494            }
495            _ => (),
496        }
497        let mut args = request_args.args.clone();
498
499        // Handle special case for NodeJS debug adapter
500        // If only the Node binary path is provided, we set the command to None
501        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
502        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
503        // This allows the NodeJS debug client to attach correctly
504        let command = if args.len() > 1 {
505            Some(args.remove(0))
506        } else {
507            None
508        };
509
510        let mut envs: HashMap<String, String> = Default::default();
511        if let Some(Value::Object(env)) = request_args.env {
512            for (key, value) in env {
513                let value_str = match (key.as_str(), value) {
514                    (_, Value::String(value)) => value,
515                    _ => continue,
516                };
517
518                envs.insert(key, value_str);
519            }
520        }
521
522        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
523        let cwd = Some(cwd)
524            .filter(|cwd| cwd.as_os_str().len() > 0)
525            .map(Arc::from)
526            .or_else(|| {
527                self.session_by_id(session_id)
528                    .and_then(|session| session.read(cx).binary().cwd.as_deref().map(Arc::from))
529            });
530        cx.emit(DapStoreEvent::RunInTerminal {
531            session_id,
532            title: request_args.title,
533            cwd,
534            command,
535            args,
536            envs,
537            sender: tx,
538        });
539        cx.notify();
540
541        let session = session.downgrade();
542        cx.spawn(async move |_, cx| {
543            let (success, body) = match rx.next().await {
544                Some(Ok(pid)) => (
545                    true,
546                    serde_json::to_value(dap::RunInTerminalResponse {
547                        process_id: None,
548                        shell_process_id: Some(pid as u64),
549                    })
550                    .ok(),
551                ),
552                Some(Err(error)) => (
553                    false,
554                    serde_json::to_value(dap::ErrorResponse {
555                        error: Some(dap::Message {
556                            id: seq,
557                            format: error.to_string(),
558                            variables: None,
559                            send_telemetry: None,
560                            show_user: None,
561                            url: None,
562                            url_label: None,
563                        }),
564                    })
565                    .ok(),
566                ),
567                None => (
568                    false,
569                    serde_json::to_value(dap::ErrorResponse {
570                        error: Some(dap::Message {
571                            id: seq,
572                            format: "failed to receive response from spawn terminal".to_string(),
573                            variables: None,
574                            send_telemetry: None,
575                            show_user: None,
576                            url: None,
577                            url_label: None,
578                        }),
579                    })
580                    .ok(),
581                ),
582            };
583
584            session
585                .update(cx, |session, cx| {
586                    session.respond_to_client(
587                        seq,
588                        success,
589                        RunInTerminal::COMMAND.to_string(),
590                        body,
591                        cx,
592                    )
593                })?
594                .await
595        })
596    }
597
598    pub fn evaluate(
599        &self,
600        session_id: &SessionId,
601        stack_frame_id: u64,
602        expression: String,
603        context: EvaluateArgumentsContext,
604        source: Option<Source>,
605        cx: &mut Context<Self>,
606    ) -> Task<Result<EvaluateResponse>> {
607        let Some(client) = self
608            .session_by_id(session_id)
609            .and_then(|client| client.read(cx).adapter_client())
610        else {
611            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
612        };
613
614        cx.background_executor().spawn(async move {
615            client
616                .request::<Evaluate>(EvaluateArguments {
617                    expression: expression.clone(),
618                    frame_id: Some(stack_frame_id),
619                    context: Some(context),
620                    format: None,
621                    line: None,
622                    column: None,
623                    source,
624                })
625                .await
626        })
627    }
628
629    pub fn completions(
630        &self,
631        session_id: &SessionId,
632        stack_frame_id: u64,
633        text: String,
634        completion_column: u64,
635        cx: &mut Context<Self>,
636    ) -> Task<Result<Vec<CompletionItem>>> {
637        let Some(client) = self
638            .session_by_id(session_id)
639            .and_then(|client| client.read(cx).adapter_client())
640        else {
641            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
642        };
643
644        cx.background_executor().spawn(async move {
645            Ok(client
646                .request::<Completions>(CompletionsArguments {
647                    frame_id: Some(stack_frame_id),
648                    line: None,
649                    text,
650                    column: completion_column,
651                })
652                .await?
653                .targets)
654        })
655    }
656
657    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
658        let mut tasks = vec![];
659        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
660            tasks.push(self.shutdown_session(session_id, cx));
661        }
662
663        cx.background_executor().spawn(async move {
664            futures::future::join_all(tasks).await;
665        })
666    }
667
668    pub fn shutdown_session(
669        &mut self,
670        session_id: SessionId,
671        cx: &mut Context<Self>,
672    ) -> Task<Result<()>> {
673        let Some(_) = self.as_local_mut() else {
674            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
675        };
676
677        let Some(session) = self.sessions.remove(&session_id) else {
678            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
679        };
680
681        let shutdown_children = session
682            .read(cx)
683            .child_session_ids()
684            .iter()
685            .map(|session_id| self.shutdown_session(*session_id, cx))
686            .collect::<Vec<_>>();
687
688        let shutdown_parent_task = if let Some(parent_session) = session
689            .read(cx)
690            .parent_id()
691            .and_then(|session_id| self.session_by_id(session_id))
692        {
693            let shutdown_id = parent_session.update(cx, |parent_session, _| {
694                parent_session.remove_child_session_id(session_id);
695
696                if parent_session.child_session_ids().len() == 0 {
697                    Some(parent_session.session_id())
698                } else {
699                    None
700                }
701            });
702
703            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
704        } else {
705            None
706        };
707
708        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
709
710        cx.background_spawn(async move {
711            if shutdown_children.len() > 0 {
712                let _ = join_all(shutdown_children).await;
713            }
714
715            shutdown_task.await;
716
717            if let Some(parent_task) = shutdown_parent_task {
718                parent_task.await?;
719            }
720
721            Ok(())
722        })
723    }
724
725    pub fn shared(
726        &mut self,
727        project_id: u64,
728        downstream_client: AnyProtoClient,
729        _: &mut Context<Self>,
730    ) {
731        self.downstream_client = Some((downstream_client.clone(), project_id));
732    }
733
734    pub fn unshared(&mut self, cx: &mut Context<Self>) {
735        self.downstream_client.take();
736
737        cx.notify();
738    }
739}
740
741fn create_new_session(
742    session_id: SessionId,
743    initialized_rx: oneshot::Receiver<()>,
744    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
745    cx: &mut Context<DapStore>,
746) -> Task<Result<Entity<Session>>> {
747    let task = cx.spawn(async move |this, cx| {
748        let session = match start_client_task.await {
749            Ok(session) => session,
750            Err(error) => {
751                this.update(cx, |_, cx| {
752                    cx.emit(DapStoreEvent::Notification(error.to_string()));
753                })
754                .log_err();
755
756                return Err(error);
757            }
758        };
759
760        // we have to insert the session early, so we can handle reverse requests
761        // that need the session to be available
762        this.update(cx, |store, cx| {
763            store.sessions.insert(session_id, session.clone());
764            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
765            cx.notify();
766        })?;
767        let seq_result = async || {
768            session
769                .update(cx, |session, cx| session.request_initialize(cx))?
770                .await?;
771
772            session
773                .update(cx, |session, cx| {
774                    session.initialize_sequence(initialized_rx, cx)
775                })?
776                .await
777        };
778        match seq_result().await {
779            Ok(_) => {}
780            Err(error) => {
781                this.update(cx, |this, cx| {
782                    cx.emit(DapStoreEvent::Notification(error.to_string()));
783                    this.shutdown_session(session_id, cx)
784                })?
785                .await
786                .log_err();
787
788                return Err(error);
789            }
790        }
791
792        this.update(cx, |_, cx| {
793            cx.subscribe(
794                &session,
795                move |this: &mut DapStore, _, event: &SessionStateEvent, cx| match event {
796                    SessionStateEvent::Shutdown => {
797                        this.shutdown_session(session_id, cx).detach_and_log_err(cx);
798                    }
799                },
800            )
801            .detach();
802            cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
803        })?;
804
805        Ok(session)
806    });
807    task
808}
809
810#[derive(Clone)]
811pub struct DapAdapterDelegate {
812    fs: Arc<dyn Fs>,
813    worktree_id: WorktreeId,
814    node_runtime: NodeRuntime,
815    http_client: Arc<dyn HttpClient>,
816    language_registry: Arc<LanguageRegistry>,
817    toolchain_store: Arc<dyn LanguageToolchainStore>,
818    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
819    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
820}
821
822impl DapAdapterDelegate {
823    pub fn new(
824        fs: Arc<dyn Fs>,
825        worktree_id: WorktreeId,
826        node_runtime: NodeRuntime,
827        http_client: Arc<dyn HttpClient>,
828        language_registry: Arc<LanguageRegistry>,
829        toolchain_store: Arc<dyn LanguageToolchainStore>,
830        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
831    ) -> Self {
832        Self {
833            fs,
834            worktree_id,
835            http_client,
836            node_runtime,
837            toolchain_store,
838            language_registry,
839            load_shell_env_task,
840            updated_adapters: Default::default(),
841        }
842    }
843}
844
845#[async_trait(?Send)]
846impl dap::adapters::DapDelegate for DapAdapterDelegate {
847    fn worktree_id(&self) -> WorktreeId {
848        self.worktree_id
849    }
850
851    fn http_client(&self) -> Arc<dyn HttpClient> {
852        self.http_client.clone()
853    }
854
855    fn node_runtime(&self) -> NodeRuntime {
856        self.node_runtime.clone()
857    }
858
859    fn fs(&self) -> Arc<dyn Fs> {
860        self.fs.clone()
861    }
862
863    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
864        self.updated_adapters.clone()
865    }
866
867    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
868        let name = SharedString::from(dap_name.to_string());
869        let status = match status {
870            DapStatus::None => BinaryStatus::None,
871            DapStatus::Downloading => BinaryStatus::Downloading,
872            DapStatus::Failed { error } => BinaryStatus::Failed { error },
873            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
874        };
875
876        self.language_registry
877            .update_dap_status(LanguageServerName(name), status);
878    }
879
880    fn which(&self, command: &OsStr) -> Option<PathBuf> {
881        which::which(command).ok()
882    }
883
884    async fn shell_env(&self) -> HashMap<String, String> {
885        let task = self.load_shell_env_task.clone();
886        task.await.unwrap_or_default()
887    }
888
889    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
890        self.toolchain_store.clone()
891    }
892}