dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    locator_store::LocatorStore,
  4    session::{self, Session, SessionStateEvent},
  5};
  6use crate::{ProjectEnvironment, debugger};
  7use anyhow::{Result, anyhow};
  8use async_trait::async_trait;
  9use collections::HashMap;
 10use dap::{
 11    Capabilities, CompletionItem, CompletionsArguments, ErrorResponse, EvaluateArguments,
 12    EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments, Source,
 13    StartDebuggingRequestArguments,
 14    adapters::{DapStatus, DebugAdapterBinary, DebugAdapterName},
 15    client::SessionId,
 16    messages::Message,
 17    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 18};
 19use fs::Fs;
 20use futures::{
 21    channel::{mpsc, oneshot},
 22    future::{Shared, join_all},
 23};
 24use gpui::{
 25    App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
 26    Task, WeakEntity,
 27};
 28use http_client::HttpClient;
 29use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 30use lsp::LanguageServerName;
 31use node_runtime::NodeRuntime;
 32
 33use rpc::{
 34    AnyProtoClient, TypedEnvelope,
 35    proto::{self},
 36};
 37use serde_json::Value;
 38use settings::WorktreeId;
 39use smol::{lock::Mutex, stream::StreamExt};
 40use std::{
 41    borrow::Borrow,
 42    collections::{BTreeMap, HashSet},
 43    ffi::OsStr,
 44    path::{Path, PathBuf},
 45    sync::{Arc, atomic::Ordering::SeqCst},
 46};
 47use std::{collections::VecDeque, sync::atomic::AtomicU32};
 48use task::DebugTaskDefinition;
 49use util::ResultExt as _;
 50use worktree::Worktree;
 51
 52pub enum DapStoreEvent {
 53    DebugClientStarted(SessionId),
 54    DebugSessionInitialized(SessionId),
 55    DebugClientShutdown(SessionId),
 56    DebugClientEvent {
 57        session_id: SessionId,
 58        message: Message,
 59    },
 60    RunInTerminal {
 61        session_id: SessionId,
 62        title: Option<String>,
 63        cwd: Option<Arc<Path>>,
 64        command: Option<String>,
 65        args: Vec<String>,
 66        envs: HashMap<String, String>,
 67        sender: mpsc::Sender<Result<u32>>,
 68    },
 69    Notification(String),
 70    RemoteHasInitialized,
 71}
 72
 73#[allow(clippy::large_enum_variant)]
 74pub enum DapStoreMode {
 75    Local(LocalDapStore),   // ssh host and collab host
 76    Remote(RemoteDapStore), // collab guest
 77}
 78
 79pub struct LocalDapStore {
 80    fs: Arc<dyn Fs>,
 81    node_runtime: NodeRuntime,
 82    next_session_id: AtomicU32,
 83    http_client: Arc<dyn HttpClient>,
 84    environment: Entity<ProjectEnvironment>,
 85    language_registry: Arc<LanguageRegistry>,
 86    toolchain_store: Arc<dyn LanguageToolchainStore>,
 87    locator_store: Arc<LocatorStore>,
 88    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 89    _start_debugging_task: Task<()>,
 90}
 91
 92impl LocalDapStore {
 93    fn next_session_id(&self) -> SessionId {
 94        SessionId(self.next_session_id.fetch_add(1, SeqCst))
 95    }
 96    pub(crate) fn locate_binary(
 97        &self,
 98        mut definition: DebugTaskDefinition,
 99        executor: BackgroundExecutor,
100    ) -> Task<DebugTaskDefinition> {
101        let locator_store = self.locator_store.clone();
102        executor.spawn(async move {
103            let _ = locator_store.resolve_debug_config(&mut definition).await;
104            definition
105        })
106    }
107}
108
109pub struct RemoteDapStore {
110    upstream_client: AnyProtoClient,
111    upstream_project_id: u64,
112    event_queue: Option<VecDeque<DapStoreEvent>>,
113}
114
115pub struct DapStore {
116    mode: DapStoreMode,
117    downstream_client: Option<(AnyProtoClient, u64)>,
118    breakpoint_store: Entity<BreakpointStore>,
119    sessions: BTreeMap<SessionId, Entity<Session>>,
120}
121
122impl EventEmitter<DapStoreEvent> for DapStore {}
123
124impl DapStore {
125    pub fn init(_client: &AnyProtoClient) {
126        // todo(debugger): Reenable these after we finish handle_dap_command refactor
127        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
128        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
129        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
130        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
131        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
132        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
133        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
134        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
135        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
136        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
137        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
138        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
139    }
140
141    #[expect(clippy::too_many_arguments)]
142    pub fn new_local(
143        http_client: Arc<dyn HttpClient>,
144        node_runtime: NodeRuntime,
145        fs: Arc<dyn Fs>,
146        language_registry: Arc<LanguageRegistry>,
147        environment: Entity<ProjectEnvironment>,
148        toolchain_store: Arc<dyn LanguageToolchainStore>,
149        breakpoint_store: Entity<BreakpointStore>,
150        cx: &mut Context<Self>,
151    ) -> Self {
152        cx.on_app_quit(Self::shutdown_sessions).detach();
153
154        let (start_debugging_tx, mut message_rx) =
155            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
156
157        let _start_debugging_task = cx.spawn(async move |this, cx| {
158            while let Some((session_id, message)) = message_rx.next().await {
159                match message {
160                    Message::Request(request) => {
161                        let _ = this
162                            .update(cx, |this, cx| {
163                                if request.command == StartDebugging::COMMAND {
164                                    this.handle_start_debugging_request(session_id, request, cx)
165                                        .detach_and_log_err(cx);
166                                } else if request.command == RunInTerminal::COMMAND {
167                                    this.handle_run_in_terminal_request(session_id, request, cx)
168                                        .detach_and_log_err(cx);
169                                }
170                            })
171                            .log_err();
172                    }
173                    _ => {}
174                }
175            }
176        });
177        Self {
178            mode: DapStoreMode::Local(LocalDapStore {
179                fs,
180                environment,
181                http_client,
182                node_runtime,
183                toolchain_store,
184                language_registry,
185                start_debugging_tx,
186                _start_debugging_task,
187                locator_store: Arc::from(LocatorStore::new()),
188                next_session_id: Default::default(),
189            }),
190            downstream_client: None,
191            breakpoint_store,
192            sessions: Default::default(),
193        }
194    }
195
196    pub fn new_remote(
197        project_id: u64,
198        upstream_client: AnyProtoClient,
199        breakpoint_store: Entity<BreakpointStore>,
200    ) -> Self {
201        Self {
202            mode: DapStoreMode::Remote(RemoteDapStore {
203                upstream_client,
204                upstream_project_id: project_id,
205                event_queue: Some(VecDeque::default()),
206            }),
207            downstream_client: None,
208            breakpoint_store,
209            sessions: Default::default(),
210        }
211    }
212
213    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
214        match &self.mode {
215            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
216            _ => None,
217        }
218    }
219
220    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
221        if let DapStoreMode::Remote(remote) = &mut self.mode {
222            remote.event_queue.take()
223        } else {
224            None
225        }
226    }
227
228    pub fn as_local(&self) -> Option<&LocalDapStore> {
229        match &self.mode {
230            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
231            _ => None,
232        }
233    }
234
235    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
236        match &mut self.mode {
237            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
238            _ => None,
239        }
240    }
241
242    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
243        match &self.mode {
244            DapStoreMode::Remote(RemoteDapStore {
245                upstream_client,
246                upstream_project_id,
247                ..
248            }) => Some((upstream_client.clone(), *upstream_project_id)),
249
250            DapStoreMode::Local(_) => None,
251        }
252    }
253
254    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
255        self.downstream_client.as_ref()
256    }
257
258    pub fn add_remote_client(
259        &mut self,
260        session_id: SessionId,
261        ignore: Option<bool>,
262        cx: &mut Context<Self>,
263    ) {
264        if let DapStoreMode::Remote(remote) = &self.mode {
265            self.sessions.insert(
266                session_id,
267                cx.new(|_| {
268                    debugger::session::Session::remote(
269                        session_id,
270                        remote.upstream_client.clone(),
271                        remote.upstream_project_id,
272                        ignore.unwrap_or(false),
273                    )
274                }),
275            );
276        } else {
277            debug_assert!(false);
278        }
279    }
280
281    pub fn session_by_id(
282        &self,
283        session_id: impl Borrow<SessionId>,
284    ) -> Option<Entity<session::Session>> {
285        let session_id = session_id.borrow();
286        let client = self.sessions.get(session_id).cloned();
287
288        client
289    }
290    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
291        self.sessions.values()
292    }
293
294    pub fn capabilities_by_id(
295        &self,
296        session_id: impl Borrow<SessionId>,
297        cx: &App,
298    ) -> Option<Capabilities> {
299        let session_id = session_id.borrow();
300        self.sessions
301            .get(session_id)
302            .map(|client| client.read(cx).capabilities.clone())
303    }
304
305    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
306        &self.breakpoint_store
307    }
308
309    #[allow(dead_code)]
310    async fn handle_ignore_breakpoint_state(
311        this: Entity<Self>,
312        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
313        mut cx: AsyncApp,
314    ) -> Result<()> {
315        let session_id = SessionId::from_proto(envelope.payload.session_id);
316
317        this.update(&mut cx, |this, cx| {
318            if let Some(session) = this.session_by_id(&session_id) {
319                session.update(cx, |session, cx| {
320                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
321                })
322            } else {
323                Task::ready(HashMap::default())
324            }
325        })?
326        .await;
327
328        Ok(())
329    }
330
331    pub fn delegate(&self, worktree: &Entity<Worktree>, cx: &mut App) -> DapAdapterDelegate {
332        let Some(local_store) = self.as_local() else {
333            unimplemented!("Starting session on remote side");
334        };
335
336        DapAdapterDelegate::new(
337            local_store.fs.clone(),
338            worktree.read(cx).id(),
339            local_store.node_runtime.clone(),
340            local_store.http_client.clone(),
341            local_store.language_registry.clone(),
342            local_store.toolchain_store.clone(),
343            local_store.environment.update(cx, |env, cx| {
344                env.get_worktree_environment(worktree.clone(), cx)
345            }),
346        )
347    }
348
349    pub fn new_session(
350        &mut self,
351        binary: DebugAdapterBinary,
352        config: DebugTaskDefinition,
353        worktree: WeakEntity<Worktree>,
354        parent_session: Option<Entity<Session>>,
355        cx: &mut Context<Self>,
356    ) -> (SessionId, Task<Result<Entity<Session>>>) {
357        let Some(local_store) = self.as_local() else {
358            unimplemented!("Starting session on remote side");
359        };
360
361        let session_id = local_store.next_session_id();
362
363        if let Some(session) = &parent_session {
364            session.update(cx, |session, _| {
365                session.add_child_session_id(session_id);
366            });
367        }
368
369        let (initialized_tx, initialized_rx) = oneshot::channel();
370
371        let start_debugging_tx = local_store.start_debugging_tx.clone();
372
373        let task = cx.spawn(async move |this, cx| {
374            let start_client_task = this.update(cx, |this, cx| {
375                Session::local(
376                    this.breakpoint_store.clone(),
377                    worktree.clone(),
378                    session_id,
379                    parent_session,
380                    binary,
381                    config,
382                    start_debugging_tx.clone(),
383                    initialized_tx,
384                    cx,
385                )
386            })?;
387
388            let ret = this
389                .update(cx, |_, cx| {
390                    create_new_session(session_id, initialized_rx, start_client_task, worktree, cx)
391                })?
392                .await;
393            ret
394        });
395
396        (session_id, task)
397    }
398
399    fn handle_start_debugging_request(
400        &mut self,
401        session_id: SessionId,
402        request: dap::messages::Request,
403        cx: &mut Context<Self>,
404    ) -> Task<Result<()>> {
405        let Some(parent_session) = self.session_by_id(session_id) else {
406            return Task::ready(Err(anyhow!("Session not found")));
407        };
408
409        let Some(worktree) = parent_session
410            .read(cx)
411            .as_local()
412            .map(|local| local.worktree().clone())
413        else {
414            return Task::ready(Err(anyhow!(
415                "Cannot handle start debugging request from remote end"
416            )));
417        };
418
419        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
420            request.arguments.unwrap_or_default(),
421        )
422        .expect("To parse StartDebuggingRequestArguments");
423        let mut binary = parent_session.read(cx).binary().clone();
424        let config = parent_session.read(cx).configuration().unwrap().clone();
425        binary.request_args = args;
426
427        let new_session_task = self
428            .new_session(binary, config, worktree, Some(parent_session.clone()), cx)
429            .1;
430
431        let request_seq = request.seq;
432        cx.spawn(async move |_, cx| {
433            let (success, body) = match new_session_task.await {
434                Ok(_) => (true, None),
435                Err(error) => (
436                    false,
437                    Some(serde_json::to_value(ErrorResponse {
438                        error: Some(dap::Message {
439                            id: request_seq,
440                            format: error.to_string(),
441                            variables: None,
442                            send_telemetry: None,
443                            show_user: None,
444                            url: None,
445                            url_label: None,
446                        }),
447                    })?),
448                ),
449            };
450
451            parent_session
452                .update(cx, |session, cx| {
453                    session.respond_to_client(
454                        request_seq,
455                        success,
456                        StartDebugging::COMMAND.to_string(),
457                        body,
458                        cx,
459                    )
460                })?
461                .await
462        })
463    }
464
465    fn handle_run_in_terminal_request(
466        &mut self,
467        session_id: SessionId,
468        request: dap::messages::Request,
469        cx: &mut Context<Self>,
470    ) -> Task<Result<()>> {
471        let Some(session) = self.session_by_id(session_id) else {
472            return Task::ready(Err(anyhow!("Session not found")));
473        };
474
475        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
476            request.arguments.unwrap_or_default(),
477        )
478        .expect("To parse StartDebuggingRequestArguments");
479
480        let seq = request.seq;
481
482        let cwd = Path::new(&request_args.cwd);
483
484        match cwd.try_exists() {
485            Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
486                return session.update(cx, |session, cx| {
487                    session.respond_to_client(
488                        seq,
489                        false,
490                        RunInTerminal::COMMAND.to_string(),
491                        serde_json::to_value(dap::ErrorResponse {
492                            error: Some(dap::Message {
493                                id: seq,
494                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
495                                variables: None,
496                                send_telemetry: None,
497                                show_user: None,
498                                url: None,
499                                url_label: None,
500                            }),
501                        })
502                        .ok(),
503                        cx,
504                    )
505                });
506            }
507            _ => (),
508        }
509        let mut args = request_args.args.clone();
510
511        // Handle special case for NodeJS debug adapter
512        // If only the Node binary path is provided, we set the command to None
513        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
514        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
515        // This allows the NodeJS debug client to attach correctly
516        let command = if args.len() > 1 {
517            Some(args.remove(0))
518        } else {
519            None
520        };
521
522        let mut envs: HashMap<String, String> = Default::default();
523        if let Some(Value::Object(env)) = request_args.env {
524            for (key, value) in env {
525                let value_str = match (key.as_str(), value) {
526                    (_, Value::String(value)) => value,
527                    _ => continue,
528                };
529
530                envs.insert(key, value_str);
531            }
532        }
533
534        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
535        let cwd = Some(cwd)
536            .filter(|cwd| cwd.as_os_str().len() > 0)
537            .map(Arc::from)
538            .or_else(|| {
539                self.session_by_id(session_id)
540                    .and_then(|session| session.read(cx).binary().cwd.as_deref().map(Arc::from))
541            });
542        cx.emit(DapStoreEvent::RunInTerminal {
543            session_id,
544            title: request_args.title,
545            cwd,
546            command,
547            args,
548            envs,
549            sender: tx,
550        });
551        cx.notify();
552
553        let session = session.downgrade();
554        cx.spawn(async move |_, cx| {
555            let (success, body) = match rx.next().await {
556                Some(Ok(pid)) => (
557                    true,
558                    serde_json::to_value(dap::RunInTerminalResponse {
559                        process_id: None,
560                        shell_process_id: Some(pid as u64),
561                    })
562                    .ok(),
563                ),
564                Some(Err(error)) => (
565                    false,
566                    serde_json::to_value(dap::ErrorResponse {
567                        error: Some(dap::Message {
568                            id: seq,
569                            format: error.to_string(),
570                            variables: None,
571                            send_telemetry: None,
572                            show_user: None,
573                            url: None,
574                            url_label: None,
575                        }),
576                    })
577                    .ok(),
578                ),
579                None => (
580                    false,
581                    serde_json::to_value(dap::ErrorResponse {
582                        error: Some(dap::Message {
583                            id: seq,
584                            format: "failed to receive response from spawn terminal".to_string(),
585                            variables: None,
586                            send_telemetry: None,
587                            show_user: None,
588                            url: None,
589                            url_label: None,
590                        }),
591                    })
592                    .ok(),
593                ),
594            };
595
596            session
597                .update(cx, |session, cx| {
598                    session.respond_to_client(
599                        seq,
600                        success,
601                        RunInTerminal::COMMAND.to_string(),
602                        body,
603                        cx,
604                    )
605                })?
606                .await
607        })
608    }
609
610    pub fn evaluate(
611        &self,
612        session_id: &SessionId,
613        stack_frame_id: u64,
614        expression: String,
615        context: EvaluateArgumentsContext,
616        source: Option<Source>,
617        cx: &mut Context<Self>,
618    ) -> Task<Result<EvaluateResponse>> {
619        let Some(client) = self
620            .session_by_id(session_id)
621            .and_then(|client| client.read(cx).adapter_client())
622        else {
623            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
624        };
625
626        cx.background_executor().spawn(async move {
627            client
628                .request::<Evaluate>(EvaluateArguments {
629                    expression: expression.clone(),
630                    frame_id: Some(stack_frame_id),
631                    context: Some(context),
632                    format: None,
633                    line: None,
634                    column: None,
635                    source,
636                })
637                .await
638        })
639    }
640
641    pub fn completions(
642        &self,
643        session_id: &SessionId,
644        stack_frame_id: u64,
645        text: String,
646        completion_column: u64,
647        cx: &mut Context<Self>,
648    ) -> Task<Result<Vec<CompletionItem>>> {
649        let Some(client) = self
650            .session_by_id(session_id)
651            .and_then(|client| client.read(cx).adapter_client())
652        else {
653            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
654        };
655
656        cx.background_executor().spawn(async move {
657            Ok(client
658                .request::<Completions>(CompletionsArguments {
659                    frame_id: Some(stack_frame_id),
660                    line: None,
661                    text,
662                    column: completion_column,
663                })
664                .await?
665                .targets)
666        })
667    }
668
669    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
670        let mut tasks = vec![];
671        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
672            tasks.push(self.shutdown_session(session_id, cx));
673        }
674
675        cx.background_executor().spawn(async move {
676            futures::future::join_all(tasks).await;
677        })
678    }
679
680    pub fn shutdown_session(
681        &mut self,
682        session_id: SessionId,
683        cx: &mut Context<Self>,
684    ) -> Task<Result<()>> {
685        let Some(_) = self.as_local_mut() else {
686            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
687        };
688
689        let Some(session) = self.sessions.remove(&session_id) else {
690            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
691        };
692
693        let shutdown_children = session
694            .read(cx)
695            .child_session_ids()
696            .iter()
697            .map(|session_id| self.shutdown_session(*session_id, cx))
698            .collect::<Vec<_>>();
699
700        let shutdown_parent_task = if let Some(parent_session) = session
701            .read(cx)
702            .parent_id()
703            .and_then(|session_id| self.session_by_id(session_id))
704        {
705            let shutdown_id = parent_session.update(cx, |parent_session, _| {
706                parent_session.remove_child_session_id(session_id);
707
708                if parent_session.child_session_ids().len() == 0 {
709                    Some(parent_session.session_id())
710                } else {
711                    None
712                }
713            });
714
715            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
716        } else {
717            None
718        };
719
720        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
721
722        cx.background_spawn(async move {
723            if shutdown_children.len() > 0 {
724                let _ = join_all(shutdown_children).await;
725            }
726
727            shutdown_task.await;
728
729            if let Some(parent_task) = shutdown_parent_task {
730                parent_task.await?;
731            }
732
733            Ok(())
734        })
735    }
736
737    pub fn shared(
738        &mut self,
739        project_id: u64,
740        downstream_client: AnyProtoClient,
741        _: &mut Context<Self>,
742    ) {
743        self.downstream_client = Some((downstream_client.clone(), project_id));
744    }
745
746    pub fn unshared(&mut self, cx: &mut Context<Self>) {
747        self.downstream_client.take();
748
749        cx.notify();
750    }
751}
752
753fn create_new_session(
754    session_id: SessionId,
755    initialized_rx: oneshot::Receiver<()>,
756    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
757    worktree: WeakEntity<Worktree>,
758    cx: &mut Context<DapStore>,
759) -> Task<Result<Entity<Session>>> {
760    let task = cx.spawn(async move |this, cx| {
761        let session = match start_client_task.await {
762            Ok(session) => session,
763            Err(error) => {
764                this.update(cx, |_, cx| {
765                    cx.emit(DapStoreEvent::Notification(error.to_string()));
766                })
767                .log_err();
768
769                return Err(error);
770            }
771        };
772
773        // we have to insert the session early, so we can handle reverse requests
774        // that need the session to be available
775        this.update(cx, |store, cx| {
776            store.sessions.insert(session_id, session.clone());
777            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
778            cx.notify();
779        })?;
780        let seq_result = async || {
781            session
782                .update(cx, |session, cx| session.request_initialize(cx))?
783                .await?;
784
785            session
786                .update(cx, |session, cx| {
787                    session.initialize_sequence(initialized_rx, this.clone(), cx)
788                })?
789                .await
790        };
791        match seq_result().await {
792            Ok(_) => {}
793            Err(error) => {
794                this.update(cx, |this, cx| {
795                    cx.emit(DapStoreEvent::Notification(error.to_string()));
796                    this.shutdown_session(session_id, cx)
797                })?
798                .await
799                .log_err();
800
801                return Err(error);
802            }
803        }
804
805        this.update(cx, |_, cx| {
806            cx.subscribe(
807                &session,
808                move |this: &mut DapStore, session, event: &SessionStateEvent, cx| match event {
809                    SessionStateEvent::Shutdown => {
810                        this.shutdown_session(session_id, cx).detach_and_log_err(cx);
811                    }
812                    SessionStateEvent::Restart => {
813                        let Some((config, binary)) = session.read_with(cx, |session, _| {
814                            session
815                                .configuration()
816                                .map(|config| (config, session.binary().clone()))
817                        }) else {
818                            log::error!("Failed to get debug config from session");
819                            return;
820                        };
821
822                        let mut curr_session = session;
823                        while let Some(parent_id) = curr_session.read(cx).parent_id() {
824                            if let Some(parent_session) = this.sessions.get(&parent_id).cloned() {
825                                curr_session = parent_session;
826                            } else {
827                                log::error!("Failed to get parent session from parent session id");
828                                break;
829                            }
830                        }
831
832                        let session_id = curr_session.read(cx).session_id();
833
834                        let task = curr_session.update(cx, |session, cx| session.shutdown(cx));
835
836                        let worktree = worktree.clone();
837                        cx.spawn(async move |this, cx| {
838                            task.await;
839
840                            this.update(cx, |this, cx| {
841                                this.sessions.remove(&session_id);
842                                this.new_session(binary, config, worktree, None, cx)
843                            })?
844                            .1
845                            .await?;
846
847                            anyhow::Ok(())
848                        })
849                        .detach_and_log_err(cx);
850                    }
851                },
852            )
853            .detach();
854            cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
855        })?;
856
857        Ok(session)
858    });
859    task
860}
861
862#[derive(Clone)]
863pub struct DapAdapterDelegate {
864    fs: Arc<dyn Fs>,
865    worktree_id: WorktreeId,
866    node_runtime: NodeRuntime,
867    http_client: Arc<dyn HttpClient>,
868    language_registry: Arc<LanguageRegistry>,
869    toolchain_store: Arc<dyn LanguageToolchainStore>,
870    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
871    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
872}
873
874impl DapAdapterDelegate {
875    pub fn new(
876        fs: Arc<dyn Fs>,
877        worktree_id: WorktreeId,
878        node_runtime: NodeRuntime,
879        http_client: Arc<dyn HttpClient>,
880        language_registry: Arc<LanguageRegistry>,
881        toolchain_store: Arc<dyn LanguageToolchainStore>,
882        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
883    ) -> Self {
884        Self {
885            fs,
886            worktree_id,
887            http_client,
888            node_runtime,
889            toolchain_store,
890            language_registry,
891            load_shell_env_task,
892            updated_adapters: Default::default(),
893        }
894    }
895}
896
897#[async_trait(?Send)]
898impl dap::adapters::DapDelegate for DapAdapterDelegate {
899    fn worktree_id(&self) -> WorktreeId {
900        self.worktree_id
901    }
902
903    fn http_client(&self) -> Arc<dyn HttpClient> {
904        self.http_client.clone()
905    }
906
907    fn node_runtime(&self) -> NodeRuntime {
908        self.node_runtime.clone()
909    }
910
911    fn fs(&self) -> Arc<dyn Fs> {
912        self.fs.clone()
913    }
914
915    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
916        self.updated_adapters.clone()
917    }
918
919    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
920        let name = SharedString::from(dap_name.to_string());
921        let status = match status {
922            DapStatus::None => BinaryStatus::None,
923            DapStatus::Downloading => BinaryStatus::Downloading,
924            DapStatus::Failed { error } => BinaryStatus::Failed { error },
925            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
926        };
927
928        self.language_registry
929            .update_dap_status(LanguageServerName(name), status);
930    }
931
932    fn which(&self, command: &OsStr) -> Option<PathBuf> {
933        which::which(command).ok()
934    }
935
936    async fn shell_env(&self) -> HashMap<String, String> {
937        let task = self.load_shell_env_task.clone();
938        task.await.unwrap_or_default()
939    }
940
941    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
942        self.toolchain_store.clone()
943    }
944}