dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    locator_store::LocatorStore,
  4    session::{self, Session},
  5};
  6use crate::{ProjectEnvironment, debugger, worktree_store::WorktreeStore};
  7use anyhow::{Result, anyhow};
  8use async_trait::async_trait;
  9use collections::HashMap;
 10use dap::{
 11    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
 12    EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
 13    Source, StartDebuggingRequestArguments,
 14    adapters::{DapStatus, DebugAdapterName},
 15    client::SessionId,
 16    messages::Message,
 17    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 18};
 19use fs::Fs;
 20use futures::{
 21    channel::{mpsc, oneshot},
 22    future::{Shared, join_all},
 23};
 24use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 25use http_client::HttpClient;
 26use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 27use lsp::LanguageServerName;
 28use node_runtime::NodeRuntime;
 29
 30use rpc::{
 31    AnyProtoClient, TypedEnvelope,
 32    proto::{self},
 33};
 34use serde_json::Value;
 35use settings::WorktreeId;
 36use smol::{lock::Mutex, stream::StreamExt};
 37use std::{
 38    borrow::Borrow,
 39    collections::{BTreeMap, HashSet},
 40    ffi::OsStr,
 41    path::PathBuf,
 42    sync::{Arc, atomic::Ordering::SeqCst},
 43};
 44use std::{collections::VecDeque, sync::atomic::AtomicU32};
 45use task::{DebugAdapterConfig, DebugRequestDisposition};
 46use util::ResultExt as _;
 47use worktree::Worktree;
 48
 49pub enum DapStoreEvent {
 50    DebugClientStarted(SessionId),
 51    DebugSessionInitialized(SessionId),
 52    DebugClientShutdown(SessionId),
 53    DebugClientEvent {
 54        session_id: SessionId,
 55        message: Message,
 56    },
 57    RunInTerminal {
 58        session_id: SessionId,
 59        title: Option<String>,
 60        cwd: PathBuf,
 61        command: Option<String>,
 62        args: Vec<String>,
 63        envs: HashMap<String, String>,
 64        sender: mpsc::Sender<Result<u32>>,
 65    },
 66    Notification(String),
 67    RemoteHasInitialized,
 68}
 69
 70#[allow(clippy::large_enum_variant)]
 71pub enum DapStoreMode {
 72    Local(LocalDapStore),   // ssh host and collab host
 73    Remote(RemoteDapStore), // collab guest
 74}
 75
 76pub struct LocalDapStore {
 77    fs: Arc<dyn Fs>,
 78    node_runtime: NodeRuntime,
 79    next_session_id: AtomicU32,
 80    http_client: Arc<dyn HttpClient>,
 81    worktree_store: Entity<WorktreeStore>,
 82    environment: Entity<ProjectEnvironment>,
 83    language_registry: Arc<LanguageRegistry>,
 84    debug_adapters: Arc<DapRegistry>,
 85    toolchain_store: Arc<dyn LanguageToolchainStore>,
 86    locator_store: Arc<LocatorStore>,
 87    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 88    _start_debugging_task: Task<()>,
 89}
 90
 91impl LocalDapStore {
 92    fn next_session_id(&self) -> SessionId {
 93        SessionId(self.next_session_id.fetch_add(1, SeqCst))
 94    }
 95}
 96
 97pub struct RemoteDapStore {
 98    upstream_client: AnyProtoClient,
 99    upstream_project_id: u64,
100    event_queue: Option<VecDeque<DapStoreEvent>>,
101}
102
103pub struct DapStore {
104    mode: DapStoreMode,
105    downstream_client: Option<(AnyProtoClient, u64)>,
106    breakpoint_store: Entity<BreakpointStore>,
107    sessions: BTreeMap<SessionId, Entity<Session>>,
108}
109
110impl EventEmitter<DapStoreEvent> for DapStore {}
111
112impl DapStore {
113    pub fn init(_client: &AnyProtoClient) {
114        // todo(debugger): Reenable these after we finish handle_dap_command refactor
115        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
116        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
117        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
118        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
119        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
120        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
121        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
125        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
126        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
127    }
128
129    #[expect(clippy::too_many_arguments)]
130    pub fn new_local(
131        http_client: Arc<dyn HttpClient>,
132        node_runtime: NodeRuntime,
133        fs: Arc<dyn Fs>,
134        language_registry: Arc<LanguageRegistry>,
135        debug_adapters: Arc<DapRegistry>,
136        environment: Entity<ProjectEnvironment>,
137        toolchain_store: Arc<dyn LanguageToolchainStore>,
138        breakpoint_store: Entity<BreakpointStore>,
139        worktree_store: Entity<WorktreeStore>,
140        cx: &mut Context<Self>,
141    ) -> Self {
142        cx.on_app_quit(Self::shutdown_sessions).detach();
143
144        let (start_debugging_tx, mut message_rx) =
145            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
146
147        let _start_debugging_task = cx.spawn(async move |this, cx| {
148            while let Some((session_id, message)) = message_rx.next().await {
149                match message {
150                    Message::Request(request) => {
151                        let _ = this
152                            .update(cx, |this, cx| {
153                                if request.command == StartDebugging::COMMAND {
154                                    this.handle_start_debugging_request(session_id, request, cx)
155                                        .detach_and_log_err(cx);
156                                } else if request.command == RunInTerminal::COMMAND {
157                                    this.handle_run_in_terminal_request(session_id, request, cx)
158                                        .detach_and_log_err(cx);
159                                }
160                            })
161                            .log_err();
162                    }
163                    _ => {}
164                }
165            }
166        });
167        Self {
168            mode: DapStoreMode::Local(LocalDapStore {
169                fs,
170                environment,
171                http_client,
172                node_runtime,
173                worktree_store,
174                toolchain_store,
175                language_registry,
176                debug_adapters,
177                start_debugging_tx,
178                _start_debugging_task,
179                locator_store: Arc::from(LocatorStore::new()),
180                next_session_id: Default::default(),
181            }),
182            downstream_client: None,
183            breakpoint_store,
184            sessions: Default::default(),
185        }
186    }
187
188    pub fn new_remote(
189        project_id: u64,
190        upstream_client: AnyProtoClient,
191        breakpoint_store: Entity<BreakpointStore>,
192    ) -> Self {
193        Self {
194            mode: DapStoreMode::Remote(RemoteDapStore {
195                upstream_client,
196                upstream_project_id: project_id,
197                event_queue: Some(VecDeque::default()),
198            }),
199            downstream_client: None,
200            breakpoint_store,
201            sessions: Default::default(),
202        }
203    }
204
205    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
206        match &self.mode {
207            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
208            _ => None,
209        }
210    }
211
212    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
213        if let DapStoreMode::Remote(remote) = &mut self.mode {
214            remote.event_queue.take()
215        } else {
216            None
217        }
218    }
219
220    pub fn as_local(&self) -> Option<&LocalDapStore> {
221        match &self.mode {
222            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
223            _ => None,
224        }
225    }
226
227    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
228        match &mut self.mode {
229            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
230            _ => None,
231        }
232    }
233
234    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
235        match &self.mode {
236            DapStoreMode::Remote(RemoteDapStore {
237                upstream_client,
238                upstream_project_id,
239                ..
240            }) => Some((upstream_client.clone(), *upstream_project_id)),
241
242            DapStoreMode::Local(_) => None,
243        }
244    }
245
246    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
247        self.downstream_client.as_ref()
248    }
249
250    pub fn add_remote_client(
251        &mut self,
252        session_id: SessionId,
253        ignore: Option<bool>,
254        cx: &mut Context<Self>,
255    ) {
256        if let DapStoreMode::Remote(remote) = &self.mode {
257            self.sessions.insert(
258                session_id,
259                cx.new(|_| {
260                    debugger::session::Session::remote(
261                        session_id,
262                        remote.upstream_client.clone(),
263                        remote.upstream_project_id,
264                        ignore.unwrap_or(false),
265                    )
266                }),
267            );
268        } else {
269            debug_assert!(false);
270        }
271    }
272
273    pub fn session_by_id(
274        &self,
275        session_id: impl Borrow<SessionId>,
276    ) -> Option<Entity<session::Session>> {
277        let session_id = session_id.borrow();
278        let client = self.sessions.get(session_id).cloned();
279
280        client
281    }
282    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
283        self.sessions.values()
284    }
285
286    pub fn capabilities_by_id(
287        &self,
288        session_id: impl Borrow<SessionId>,
289        cx: &App,
290    ) -> Option<Capabilities> {
291        let session_id = session_id.borrow();
292        self.sessions
293            .get(session_id)
294            .map(|client| client.read(cx).capabilities.clone())
295    }
296
297    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
298        &self.breakpoint_store
299    }
300
301    #[allow(dead_code)]
302    async fn handle_ignore_breakpoint_state(
303        this: Entity<Self>,
304        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
305        mut cx: AsyncApp,
306    ) -> Result<()> {
307        let session_id = SessionId::from_proto(envelope.payload.session_id);
308
309        this.update(&mut cx, |this, cx| {
310            if let Some(session) = this.session_by_id(&session_id) {
311                session.update(cx, |session, cx| {
312                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
313                })
314            } else {
315                Task::ready(())
316            }
317        })?
318        .await;
319
320        Ok(())
321    }
322
323    pub fn new_session(
324        &mut self,
325        mut config: DebugAdapterConfig,
326        worktree: &Entity<Worktree>,
327        parent_session: Option<Entity<Session>>,
328        cx: &mut Context<Self>,
329    ) -> (SessionId, Task<Result<Entity<Session>>>) {
330        let Some(local_store) = self.as_local() else {
331            unimplemented!("Starting session on remote side");
332        };
333
334        let delegate = DapAdapterDelegate::new(
335            local_store.fs.clone(),
336            worktree.read(cx).id(),
337            local_store.node_runtime.clone(),
338            local_store.http_client.clone(),
339            local_store.language_registry.clone(),
340            local_store.toolchain_store.clone(),
341            local_store.environment.update(cx, |env, cx| {
342                let worktree = worktree.read(cx);
343                env.get_environment(worktree.abs_path().into(), cx)
344            }),
345        );
346        let session_id = local_store.next_session_id();
347
348        if let Some(session) = &parent_session {
349            session.update(cx, |session, _| {
350                session.add_child_session_id(session_id);
351            });
352        }
353
354        let (initialized_tx, initialized_rx) = oneshot::channel();
355        let locator_store = local_store.locator_store.clone();
356        let debug_adapters = local_store.debug_adapters.clone();
357
358        let start_debugging_tx = local_store.start_debugging_tx.clone();
359
360        let task = cx.spawn(async move |this, cx| {
361            if config.locator.is_some() {
362                config = cx
363                    .background_spawn(async move {
364                        locator_store
365                            .resolve_debug_config(&mut config)
366                            .await
367                            .map(|_| config)
368                    })
369                    .await?;
370            }
371
372            let start_client_task = this.update(cx, |this, cx| {
373                Session::local(
374                    this.breakpoint_store.clone(),
375                    session_id,
376                    parent_session,
377                    delegate,
378                    config,
379                    start_debugging_tx.clone(),
380                    initialized_tx,
381                    debug_adapters,
382                    cx,
383                )
384            })?;
385
386            this.update(cx, |_, cx| {
387                create_new_session(session_id, initialized_rx, start_client_task, cx)
388            })?
389            .await
390        });
391
392        (session_id, task)
393    }
394
395    #[cfg(any(test, feature = "test-support"))]
396    pub fn new_fake_session(
397        &mut self,
398        config: DebugAdapterConfig,
399        worktree: &Entity<Worktree>,
400        parent_session: Option<Entity<Session>>,
401        caps: Capabilities,
402        fails: bool,
403        cx: &mut Context<Self>,
404    ) -> (SessionId, Task<Result<Entity<Session>>>) {
405        let Some(local_store) = self.as_local() else {
406            unimplemented!("Starting session on remote side");
407        };
408
409        let delegate = DapAdapterDelegate::new(
410            local_store.fs.clone(),
411            worktree.read(cx).id(),
412            local_store.node_runtime.clone(),
413            local_store.http_client.clone(),
414            local_store.language_registry.clone(),
415            local_store.toolchain_store.clone(),
416            local_store.environment.update(cx, |env, cx| {
417                let worktree = worktree.read(cx);
418                env.get_environment(Some(worktree.abs_path()), cx)
419            }),
420        );
421        let session_id = local_store.next_session_id();
422
423        if let Some(session) = &parent_session {
424            session.update(cx, |session, _| {
425                session.add_child_session_id(session_id);
426            });
427        }
428
429        let (initialized_tx, initialized_rx) = oneshot::channel();
430
431        let start_client_task = Session::fake(
432            self.breakpoint_store.clone(),
433            session_id,
434            parent_session,
435            delegate,
436            config,
437            local_store.start_debugging_tx.clone(),
438            initialized_tx,
439            caps,
440            fails,
441            cx,
442        );
443
444        let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
445        (session_id, task)
446    }
447
448    fn handle_start_debugging_request(
449        &mut self,
450        session_id: SessionId,
451        request: dap::messages::Request,
452        cx: &mut Context<Self>,
453    ) -> Task<Result<()>> {
454        let Some(local_store) = self.as_local() else {
455            unreachable!("Cannot response for non-local session");
456        };
457
458        let Some(parent_session) = self.session_by_id(session_id) else {
459            return Task::ready(Err(anyhow!("Session not found")));
460        };
461
462        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
463            request.arguments.unwrap_or_default(),
464        )
465        .expect("To parse StartDebuggingRequestArguments");
466        let worktree = local_store
467            .worktree_store
468            .update(cx, |this, _| this.worktrees().next())
469            .expect("worktree-less project");
470
471        let Some(config) = parent_session.read(cx).configuration() else {
472            unreachable!("there must be a config for local sessions");
473        };
474
475        let debug_config = DebugAdapterConfig {
476            label: config.label,
477            adapter: config.adapter,
478            request: DebugRequestDisposition::ReverseRequest(args),
479            initialize_args: config.initialize_args.clone(),
480            tcp_connection: config.tcp_connection.clone(),
481            locator: None,
482            stop_on_entry: config.stop_on_entry,
483        };
484
485        #[cfg(any(test, feature = "test-support"))]
486        let new_session_task = {
487            let caps = parent_session.read(cx).capabilities.clone();
488            self.new_fake_session(
489                debug_config,
490                &worktree,
491                Some(parent_session.clone()),
492                caps,
493                false,
494                cx,
495            )
496            .1
497        };
498        #[cfg(not(any(test, feature = "test-support")))]
499        let new_session_task = self
500            .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
501            .1;
502
503        let request_seq = request.seq;
504        cx.spawn(async move |_, cx| {
505            let (success, body) = match new_session_task.await {
506                Ok(_) => (true, None),
507                Err(error) => (
508                    false,
509                    Some(serde_json::to_value(ErrorResponse {
510                        error: Some(dap::Message {
511                            id: request_seq,
512                            format: error.to_string(),
513                            variables: None,
514                            send_telemetry: None,
515                            show_user: None,
516                            url: None,
517                            url_label: None,
518                        }),
519                    })?),
520                ),
521            };
522
523            parent_session
524                .update(cx, |session, cx| {
525                    session.respond_to_client(
526                        request_seq,
527                        success,
528                        StartDebugging::COMMAND.to_string(),
529                        body,
530                        cx,
531                    )
532                })?
533                .await
534        })
535    }
536
537    fn handle_run_in_terminal_request(
538        &mut self,
539        session_id: SessionId,
540        request: dap::messages::Request,
541        cx: &mut Context<Self>,
542    ) -> Task<Result<()>> {
543        let Some(session) = self.session_by_id(session_id) else {
544            return Task::ready(Err(anyhow!("Session not found")));
545        };
546
547        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
548            request.arguments.unwrap_or_default(),
549        )
550        .expect("To parse StartDebuggingRequestArguments");
551
552        let seq = request.seq;
553
554        let cwd = PathBuf::from(request_args.cwd);
555        match cwd.try_exists() {
556            Ok(true) => (),
557            Ok(false) | Err(_) => {
558                return session.update(cx, |session, cx| {
559                    session.respond_to_client(
560                        seq,
561                        false,
562                        RunInTerminal::COMMAND.to_string(),
563                        serde_json::to_value(dap::ErrorResponse {
564                            error: Some(dap::Message {
565                                id: seq,
566                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
567                                variables: None,
568                                send_telemetry: None,
569                                show_user: None,
570                                url: None,
571                                url_label: None,
572                            }),
573                        })
574                        .ok(),
575                        cx,
576                    )
577                });
578            }
579        }
580
581        let mut args = request_args.args.clone();
582
583        // Handle special case for NodeJS debug adapter
584        // If only the Node binary path is provided, we set the command to None
585        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
586        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
587        // This allows the NodeJS debug client to attach correctly
588        let command = if args.len() > 1 {
589            Some(args.remove(0))
590        } else {
591            None
592        };
593
594        let mut envs: HashMap<String, String> = Default::default();
595        if let Some(Value::Object(env)) = request_args.env {
596            for (key, value) in env {
597                let value_str = match (key.as_str(), value) {
598                    (_, Value::String(value)) => value,
599                    _ => continue,
600                };
601
602                envs.insert(key, value_str);
603            }
604        }
605
606        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
607
608        cx.emit(DapStoreEvent::RunInTerminal {
609            session_id,
610            title: request_args.title,
611            cwd,
612            command,
613            args,
614            envs,
615            sender: tx,
616        });
617        cx.notify();
618
619        let session = session.downgrade();
620        cx.spawn(async move |_, cx| {
621            let (success, body) = match rx.next().await {
622                Some(Ok(pid)) => (
623                    true,
624                    serde_json::to_value(dap::RunInTerminalResponse {
625                        process_id: None,
626                        shell_process_id: Some(pid as u64),
627                    })
628                    .ok(),
629                ),
630                Some(Err(error)) => (
631                    false,
632                    serde_json::to_value(dap::ErrorResponse {
633                        error: Some(dap::Message {
634                            id: seq,
635                            format: error.to_string(),
636                            variables: None,
637                            send_telemetry: None,
638                            show_user: None,
639                            url: None,
640                            url_label: None,
641                        }),
642                    })
643                    .ok(),
644                ),
645                None => (
646                    false,
647                    serde_json::to_value(dap::ErrorResponse {
648                        error: Some(dap::Message {
649                            id: seq,
650                            format: "failed to receive response from spawn terminal".to_string(),
651                            variables: None,
652                            send_telemetry: None,
653                            show_user: None,
654                            url: None,
655                            url_label: None,
656                        }),
657                    })
658                    .ok(),
659                ),
660            };
661
662            session
663                .update(cx, |session, cx| {
664                    session.respond_to_client(
665                        seq,
666                        success,
667                        RunInTerminal::COMMAND.to_string(),
668                        body,
669                        cx,
670                    )
671                })?
672                .await
673        })
674    }
675
676    pub fn evaluate(
677        &self,
678        session_id: &SessionId,
679        stack_frame_id: u64,
680        expression: String,
681        context: EvaluateArgumentsContext,
682        source: Option<Source>,
683        cx: &mut Context<Self>,
684    ) -> Task<Result<EvaluateResponse>> {
685        let Some(client) = self
686            .session_by_id(session_id)
687            .and_then(|client| client.read(cx).adapter_client())
688        else {
689            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
690        };
691
692        cx.background_executor().spawn(async move {
693            client
694                .request::<Evaluate>(EvaluateArguments {
695                    expression: expression.clone(),
696                    frame_id: Some(stack_frame_id),
697                    context: Some(context),
698                    format: None,
699                    line: None,
700                    column: None,
701                    source,
702                })
703                .await
704        })
705    }
706
707    pub fn completions(
708        &self,
709        session_id: &SessionId,
710        stack_frame_id: u64,
711        text: String,
712        completion_column: u64,
713        cx: &mut Context<Self>,
714    ) -> Task<Result<Vec<CompletionItem>>> {
715        let Some(client) = self
716            .session_by_id(session_id)
717            .and_then(|client| client.read(cx).adapter_client())
718        else {
719            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
720        };
721
722        cx.background_executor().spawn(async move {
723            Ok(client
724                .request::<Completions>(CompletionsArguments {
725                    frame_id: Some(stack_frame_id),
726                    line: None,
727                    text,
728                    column: completion_column,
729                })
730                .await?
731                .targets)
732        })
733    }
734
735    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
736        let mut tasks = vec![];
737        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
738            tasks.push(self.shutdown_session(session_id, cx));
739        }
740
741        cx.background_executor().spawn(async move {
742            futures::future::join_all(tasks).await;
743        })
744    }
745
746    pub fn shutdown_session(
747        &mut self,
748        session_id: SessionId,
749        cx: &mut Context<Self>,
750    ) -> Task<Result<()>> {
751        let Some(_) = self.as_local_mut() else {
752            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
753        };
754
755        let Some(session) = self.sessions.remove(&session_id) else {
756            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
757        };
758
759        let shutdown_children = session
760            .read(cx)
761            .child_session_ids()
762            .iter()
763            .map(|session_id| self.shutdown_session(*session_id, cx))
764            .collect::<Vec<_>>();
765
766        let shutdown_parent_task = if let Some(parent_session) = session
767            .read(cx)
768            .parent_id()
769            .and_then(|session_id| self.session_by_id(session_id))
770        {
771            let shutdown_id = parent_session.update(cx, |parent_session, _| {
772                parent_session.remove_child_session_id(session_id);
773
774                if parent_session.child_session_ids().len() == 0 {
775                    Some(parent_session.session_id())
776                } else {
777                    None
778                }
779            });
780
781            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
782        } else {
783            None
784        };
785
786        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
787
788        cx.background_spawn(async move {
789            if shutdown_children.len() > 0 {
790                let _ = join_all(shutdown_children).await;
791            }
792
793            shutdown_task.await;
794
795            if let Some(parent_task) = shutdown_parent_task {
796                parent_task.await?;
797            }
798
799            Ok(())
800        })
801    }
802
803    pub fn shared(
804        &mut self,
805        project_id: u64,
806        downstream_client: AnyProtoClient,
807        _: &mut Context<Self>,
808    ) {
809        self.downstream_client = Some((downstream_client.clone(), project_id));
810    }
811
812    pub fn unshared(&mut self, cx: &mut Context<Self>) {
813        self.downstream_client.take();
814
815        cx.notify();
816    }
817}
818
819fn create_new_session(
820    session_id: SessionId,
821    initialized_rx: oneshot::Receiver<()>,
822    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
823    cx: &mut Context<DapStore>,
824) -> Task<Result<Entity<Session>>> {
825    let task = cx.spawn(async move |this, cx| {
826        let session = match start_client_task.await {
827            Ok(session) => session,
828            Err(error) => {
829                this.update(cx, |_, cx| {
830                    cx.emit(DapStoreEvent::Notification(error.to_string()));
831                })
832                .log_err();
833
834                return Err(error);
835            }
836        };
837
838        // we have to insert the session early, so we can handle reverse requests
839        // that need the session to be available
840        this.update(cx, |store, cx| {
841            store.sessions.insert(session_id, session.clone());
842            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
843            cx.notify();
844        })?;
845
846        match {
847            session
848                .update(cx, |session, cx| session.request_initialize(cx))?
849                .await?;
850
851            session
852                .update(cx, |session, cx| {
853                    session.initialize_sequence(initialized_rx, cx)
854                })?
855                .await
856        } {
857            Ok(_) => {}
858            Err(error) => {
859                this.update(cx, |this, cx| {
860                    cx.emit(DapStoreEvent::Notification(error.to_string()));
861
862                    this.shutdown_session(session_id, cx)
863                })?
864                .await
865                .log_err();
866
867                return Err(error);
868            }
869        }
870
871        this.update(cx, |_, cx| {
872            cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
873        })?;
874
875        Ok(session)
876    });
877    task
878}
879
880#[derive(Clone)]
881pub struct DapAdapterDelegate {
882    fs: Arc<dyn Fs>,
883    worktree_id: WorktreeId,
884    node_runtime: NodeRuntime,
885    http_client: Arc<dyn HttpClient>,
886    language_registry: Arc<LanguageRegistry>,
887    toolchain_store: Arc<dyn LanguageToolchainStore>,
888    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
889    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
890}
891
892impl DapAdapterDelegate {
893    pub fn new(
894        fs: Arc<dyn Fs>,
895        worktree_id: WorktreeId,
896        node_runtime: NodeRuntime,
897        http_client: Arc<dyn HttpClient>,
898        language_registry: Arc<LanguageRegistry>,
899        toolchain_store: Arc<dyn LanguageToolchainStore>,
900        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
901    ) -> Self {
902        Self {
903            fs,
904            worktree_id,
905            http_client,
906            node_runtime,
907            toolchain_store,
908            language_registry,
909            load_shell_env_task,
910            updated_adapters: Default::default(),
911        }
912    }
913}
914
915#[async_trait(?Send)]
916impl dap::adapters::DapDelegate for DapAdapterDelegate {
917    fn worktree_id(&self) -> WorktreeId {
918        self.worktree_id
919    }
920
921    fn http_client(&self) -> Arc<dyn HttpClient> {
922        self.http_client.clone()
923    }
924
925    fn node_runtime(&self) -> NodeRuntime {
926        self.node_runtime.clone()
927    }
928
929    fn fs(&self) -> Arc<dyn Fs> {
930        self.fs.clone()
931    }
932
933    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
934        self.updated_adapters.clone()
935    }
936
937    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
938        let name = SharedString::from(dap_name.to_string());
939        let status = match status {
940            DapStatus::None => BinaryStatus::None,
941            DapStatus::Downloading => BinaryStatus::Downloading,
942            DapStatus::Failed { error } => BinaryStatus::Failed { error },
943            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
944        };
945
946        self.language_registry
947            .update_dap_status(LanguageServerName(name), status);
948    }
949
950    fn which(&self, command: &OsStr) -> Option<PathBuf> {
951        which::which(command).ok()
952    }
953
954    async fn shell_env(&self) -> HashMap<String, String> {
955        let task = self.load_shell_env_task.clone();
956        task.await.unwrap_or_default()
957    }
958
959    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
960        self.toolchain_store.clone()
961    }
962}