dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    // Will need to uncomment this once we implement rpc message handler again
  4    // dap_command::{
  5    //     ContinueCommand, DapCommand, DisconnectCommand, NextCommand, PauseCommand, RestartCommand,
  6    //     RestartStackFrameCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  7    //     TerminateCommand, TerminateThreadsCommand, VariablesCommand,
  8    // },
  9    session::{self, Session},
 10};
 11use crate::{debugger, worktree_store::WorktreeStore, ProjectEnvironment};
 12use anyhow::{anyhow, Result};
 13use async_trait::async_trait;
 14use collections::HashMap;
 15use dap::{
 16    adapters::{DapStatus, DebugAdapterName},
 17    client::SessionId,
 18    messages::Message,
 19    requests::{
 20        Completions, Evaluate, Request as _, RunInTerminal, SetExpression, SetVariable,
 21        StartDebugging,
 22    },
 23    Capabilities, CompletionItem, CompletionsArguments, ErrorResponse, EvaluateArguments,
 24    EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
 25    SetExpressionArguments, SetVariableArguments, Source, StartDebuggingRequestArguments,
 26    StartDebuggingRequestArgumentsRequest,
 27};
 28use fs::Fs;
 29use futures::{
 30    channel::{mpsc, oneshot},
 31    future::{join_all, Shared},
 32};
 33use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 34use http_client::HttpClient;
 35use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 36use lsp::LanguageServerName;
 37use node_runtime::NodeRuntime;
 38
 39use rpc::{
 40    proto::{self},
 41    AnyProtoClient, TypedEnvelope,
 42};
 43use serde_json::Value;
 44use settings::WorktreeId;
 45use smol::{lock::Mutex, stream::StreamExt};
 46use std::{
 47    borrow::Borrow,
 48    collections::{BTreeMap, HashSet},
 49    ffi::OsStr,
 50    path::PathBuf,
 51    sync::{atomic::Ordering::SeqCst, Arc},
 52};
 53use std::{collections::VecDeque, sync::atomic::AtomicU32};
 54use task::{AttachConfig, DebugAdapterConfig, DebugRequestType};
 55use util::ResultExt as _;
 56use worktree::Worktree;
 57
 58pub enum DapStoreEvent {
 59    DebugClientStarted(SessionId),
 60    DebugClientShutdown(SessionId),
 61    DebugClientEvent {
 62        session_id: SessionId,
 63        message: Message,
 64    },
 65    RunInTerminal {
 66        session_id: SessionId,
 67        title: Option<String>,
 68        cwd: PathBuf,
 69        command: Option<String>,
 70        args: Vec<String>,
 71        envs: HashMap<String, String>,
 72        sender: mpsc::Sender<Result<u32>>,
 73    },
 74    Notification(String),
 75    RemoteHasInitialized,
 76}
 77
 78#[allow(clippy::large_enum_variant)]
 79pub enum DapStoreMode {
 80    Local(LocalDapStore),   // ssh host and collab host
 81    Remote(RemoteDapStore), // collab guest
 82}
 83
 84pub struct LocalDapStore {
 85    fs: Arc<dyn Fs>,
 86    node_runtime: NodeRuntime,
 87    next_session_id: AtomicU32,
 88    http_client: Arc<dyn HttpClient>,
 89    worktree_store: Entity<WorktreeStore>,
 90    environment: Entity<ProjectEnvironment>,
 91    language_registry: Arc<LanguageRegistry>,
 92    toolchain_store: Arc<dyn LanguageToolchainStore>,
 93    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 94    _start_debugging_task: Task<()>,
 95}
 96
 97impl LocalDapStore {
 98    fn next_session_id(&self) -> SessionId {
 99        SessionId(self.next_session_id.fetch_add(1, SeqCst))
100    }
101}
102
103pub struct RemoteDapStore {
104    upstream_client: AnyProtoClient,
105    upstream_project_id: u64,
106    event_queue: Option<VecDeque<DapStoreEvent>>,
107}
108
109pub struct DapStore {
110    mode: DapStoreMode,
111    downstream_client: Option<(AnyProtoClient, u64)>,
112    breakpoint_store: Entity<BreakpointStore>,
113    sessions: BTreeMap<SessionId, Entity<Session>>,
114}
115
116impl EventEmitter<DapStoreEvent> for DapStore {}
117
118impl DapStore {
119    pub fn init(_client: &AnyProtoClient) {
120        // todo(debugger): Reenable these after we finish handle_dap_command refactor
121        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
125        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
126        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
127        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
128        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
129        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
130        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
131        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
132        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
133    }
134
135    #[expect(clippy::too_many_arguments)]
136    pub fn new_local(
137        http_client: Arc<dyn HttpClient>,
138        node_runtime: NodeRuntime,
139        fs: Arc<dyn Fs>,
140        language_registry: Arc<LanguageRegistry>,
141        environment: Entity<ProjectEnvironment>,
142        toolchain_store: Arc<dyn LanguageToolchainStore>,
143        breakpoint_store: Entity<BreakpointStore>,
144        worktree_store: Entity<WorktreeStore>,
145        cx: &mut Context<Self>,
146    ) -> Self {
147        cx.on_app_quit(Self::shutdown_sessions).detach();
148
149        let (start_debugging_tx, mut message_rx) =
150            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
151
152        let _start_debugging_task = cx.spawn(async move |this, cx| {
153            while let Some((session_id, message)) = message_rx.next().await {
154                match message {
155                    Message::Request(request) => {
156                        let _ = this
157                            .update(cx, |this, cx| {
158                                if request.command == StartDebugging::COMMAND {
159                                    this.handle_start_debugging_request(session_id, request, cx)
160                                        .detach_and_log_err(cx);
161                                } else if request.command == RunInTerminal::COMMAND {
162                                    this.handle_run_in_terminal_request(session_id, request, cx)
163                                        .detach_and_log_err(cx);
164                                }
165                            })
166                            .log_err();
167                    }
168                    _ => {}
169                }
170            }
171        });
172        Self {
173            mode: DapStoreMode::Local(LocalDapStore {
174                fs,
175                environment,
176                http_client,
177                node_runtime,
178                worktree_store,
179                toolchain_store,
180                language_registry,
181                start_debugging_tx,
182                _start_debugging_task,
183                next_session_id: Default::default(),
184            }),
185            downstream_client: None,
186            breakpoint_store,
187            sessions: Default::default(),
188        }
189    }
190
191    pub fn new_remote(
192        project_id: u64,
193        upstream_client: AnyProtoClient,
194        breakpoint_store: Entity<BreakpointStore>,
195    ) -> Self {
196        Self {
197            mode: DapStoreMode::Remote(RemoteDapStore {
198                upstream_client,
199                upstream_project_id: project_id,
200                event_queue: Some(VecDeque::default()),
201            }),
202            downstream_client: None,
203            breakpoint_store,
204            sessions: Default::default(),
205        }
206    }
207
208    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
209        match &self.mode {
210            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
211            _ => None,
212        }
213    }
214
215    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
216        if let DapStoreMode::Remote(remote) = &mut self.mode {
217            remote.event_queue.take()
218        } else {
219            None
220        }
221    }
222
223    pub fn as_local(&self) -> Option<&LocalDapStore> {
224        match &self.mode {
225            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
226            _ => None,
227        }
228    }
229
230    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
231        match &mut self.mode {
232            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
233            _ => None,
234        }
235    }
236
237    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
238        match &self.mode {
239            DapStoreMode::Remote(RemoteDapStore {
240                upstream_client,
241                upstream_project_id,
242                ..
243            }) => Some((upstream_client.clone(), *upstream_project_id)),
244
245            DapStoreMode::Local(_) => None,
246        }
247    }
248
249    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
250        self.downstream_client.as_ref()
251    }
252
253    pub fn add_remote_client(
254        &mut self,
255        session_id: SessionId,
256        ignore: Option<bool>,
257        cx: &mut Context<Self>,
258    ) {
259        if let DapStoreMode::Remote(remote) = &self.mode {
260            self.sessions.insert(
261                session_id,
262                cx.new(|_| {
263                    debugger::session::Session::remote(
264                        session_id,
265                        remote.upstream_client.clone(),
266                        remote.upstream_project_id,
267                        ignore.unwrap_or(false),
268                    )
269                }),
270            );
271        } else {
272            debug_assert!(false);
273        }
274    }
275
276    pub fn session_by_id(
277        &self,
278        session_id: impl Borrow<SessionId>,
279    ) -> Option<Entity<session::Session>> {
280        let session_id = session_id.borrow();
281        let client = self.sessions.get(session_id).cloned();
282
283        client
284    }
285    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
286        self.sessions.values()
287    }
288
289    pub fn capabilities_by_id(
290        &self,
291        session_id: impl Borrow<SessionId>,
292        cx: &App,
293    ) -> Option<Capabilities> {
294        let session_id = session_id.borrow();
295        self.sessions
296            .get(session_id)
297            .map(|client| client.read(cx).capabilities.clone())
298    }
299
300    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
301        &self.breakpoint_store
302    }
303
304    #[allow(dead_code)]
305    async fn handle_ignore_breakpoint_state(
306        this: Entity<Self>,
307        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
308        mut cx: AsyncApp,
309    ) -> Result<()> {
310        let session_id = SessionId::from_proto(envelope.payload.session_id);
311
312        this.update(&mut cx, |this, cx| {
313            if let Some(session) = this.session_by_id(&session_id) {
314                session.update(cx, |session, cx| {
315                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
316                })
317            } else {
318                Task::ready(())
319            }
320        })?
321        .await;
322
323        Ok(())
324    }
325
326    pub fn new_session(
327        &mut self,
328        config: DebugAdapterConfig,
329        worktree: &Entity<Worktree>,
330        parent_session: Option<Entity<Session>>,
331        cx: &mut Context<Self>,
332    ) -> (SessionId, Task<Result<Entity<Session>>>) {
333        let Some(local_store) = self.as_local() else {
334            unimplemented!("Starting session on remote side");
335        };
336
337        let delegate = DapAdapterDelegate::new(
338            local_store.fs.clone(),
339            worktree.read(cx).id(),
340            local_store.node_runtime.clone(),
341            local_store.http_client.clone(),
342            local_store.language_registry.clone(),
343            local_store.toolchain_store.clone(),
344            local_store.environment.update(cx, |env, cx| {
345                let worktree = worktree.read(cx);
346                env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
347            }),
348        );
349        let session_id = local_store.next_session_id();
350
351        if let Some(session) = &parent_session {
352            session.update(cx, |session, _| {
353                session.add_child_session_id(session_id);
354            });
355        }
356
357        let (initialized_tx, initialized_rx) = oneshot::channel();
358
359        let start_client_task = Session::local(
360            self.breakpoint_store.clone(),
361            session_id,
362            parent_session,
363            delegate,
364            config,
365            local_store.start_debugging_tx.clone(),
366            initialized_tx,
367            cx,
368        );
369
370        let task = cx.spawn(async move |this, cx| {
371            let session = match start_client_task.await {
372                Ok(session) => session,
373                Err(error) => {
374                    this.update(cx, |_, cx| {
375                        cx.emit(DapStoreEvent::Notification(error.to_string()));
376                    })
377                    .log_err();
378
379                    return Err(error);
380                }
381            };
382
383            // we have to insert the session early, so we can handle reverse requests
384            // that need the session to be available
385            this.update(cx, |store, cx| {
386                store.sessions.insert(session_id, session.clone());
387                cx.emit(DapStoreEvent::DebugClientStarted(session_id));
388                cx.notify();
389            })?;
390
391            match session
392                .update(cx, |session, cx| {
393                    session.initialize_sequence(initialized_rx, cx)
394                })?
395                .await
396            {
397                Ok(_) => {}
398                Err(error) => {
399                    this.update(cx, |this, cx| {
400                        cx.emit(DapStoreEvent::Notification(error.to_string()));
401
402                        this.shutdown_session(session_id, cx)
403                    })?
404                    .await
405                    .log_err();
406
407                    return Err(error);
408                }
409            }
410
411            Ok(session)
412        });
413        (session_id, task)
414    }
415
416    fn handle_start_debugging_request(
417        &mut self,
418        session_id: SessionId,
419        request: dap::messages::Request,
420        cx: &mut Context<Self>,
421    ) -> Task<Result<()>> {
422        let Some(local_store) = self.as_local() else {
423            unreachable!("Cannot response for non-local session");
424        };
425
426        let Some(parent_session) = self.session_by_id(session_id) else {
427            return Task::ready(Err(anyhow!("Session not found")));
428        };
429
430        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
431            request.arguments.unwrap_or_default(),
432        )
433        .expect("To parse StartDebuggingRequestArguments");
434
435        let worktree = local_store
436            .worktree_store
437            .update(cx, |this, _| this.worktrees().next())
438            .expect("worktree-less project");
439
440        let Some(config) = parent_session.read(cx).configuration() else {
441            unreachable!("there must be a config for local sessions");
442        };
443
444        let (_, new_session_task) = self.new_session(
445            DebugAdapterConfig {
446                label: config.label,
447                kind: config.kind,
448                request: match &args.request {
449                    StartDebuggingRequestArgumentsRequest::Launch => DebugRequestType::Launch,
450                    StartDebuggingRequestArgumentsRequest::Attach => {
451                        DebugRequestType::Attach(AttachConfig::default())
452                    }
453                },
454                program: config.program,
455                cwd: config.cwd,
456                initialize_args: Some(args.configuration),
457                supports_attach: config.supports_attach,
458            },
459            &worktree,
460            Some(parent_session.clone()),
461            cx,
462        );
463
464        let request_seq = request.seq;
465        cx.spawn(async move |_, cx| {
466            let (success, body) = match new_session_task.await {
467                Ok(_) => (true, None),
468                Err(error) => (
469                    false,
470                    Some(serde_json::to_value(ErrorResponse {
471                        error: Some(dap::Message {
472                            id: request_seq,
473                            format: error.to_string(),
474                            variables: None,
475                            send_telemetry: None,
476                            show_user: None,
477                            url: None,
478                            url_label: None,
479                        }),
480                    })?),
481                ),
482            };
483
484            parent_session
485                .update(cx, |session, cx| {
486                    session.respond_to_client(
487                        request_seq,
488                        success,
489                        StartDebugging::COMMAND.to_string(),
490                        body,
491                        cx,
492                    )
493                })?
494                .await
495        })
496    }
497
498    fn handle_run_in_terminal_request(
499        &mut self,
500        session_id: SessionId,
501        request: dap::messages::Request,
502        cx: &mut Context<Self>,
503    ) -> Task<Result<()>> {
504        let Some(session) = self.session_by_id(session_id) else {
505            return Task::ready(Err(anyhow!("Session not found")));
506        };
507
508        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
509            request.arguments.unwrap_or_default(),
510        )
511        .expect("To parse StartDebuggingRequestArguments");
512
513        let seq = request.seq;
514
515        let cwd = PathBuf::from(request_args.cwd);
516        match cwd.try_exists() {
517            Ok(true) => (),
518            Ok(false) | Err(_) => {
519                return session.update(cx, |session, cx| {
520                    session.respond_to_client(
521                        seq,
522                        false,
523                        RunInTerminal::COMMAND.to_string(),
524                        serde_json::to_value(dap::ErrorResponse {
525                            error: Some(dap::Message {
526                                id: seq,
527                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
528                                variables: None,
529                                send_telemetry: None,
530                                show_user: None,
531                                url: None,
532                                url_label: None,
533                            }),
534                        })
535                        .ok(),
536                        cx,
537                    )
538                })
539            }
540        }
541
542        let mut args = request_args.args.clone();
543
544        // Handle special case for NodeJS debug adapter
545        // If only the Node binary path is provided, we set the command to None
546        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
547        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
548        // This allows the NodeJS debug client to attach correctly
549        let command = if args.len() > 1 {
550            Some(args.remove(0))
551        } else {
552            None
553        };
554
555        let mut envs: HashMap<String, String> = Default::default();
556        if let Some(Value::Object(env)) = request_args.env {
557            for (key, value) in env {
558                let value_str = match (key.as_str(), value) {
559                    (_, Value::String(value)) => value,
560                    _ => continue,
561                };
562
563                envs.insert(key, value_str);
564            }
565        }
566
567        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
568
569        cx.emit(DapStoreEvent::RunInTerminal {
570            session_id,
571            title: request_args.title,
572            cwd,
573            command,
574            args,
575            envs,
576            sender: tx,
577        });
578        cx.notify();
579
580        let session = session.downgrade();
581        cx.spawn(async move |_, cx| {
582            let (success, body) = match rx.next().await {
583                Some(Ok(pid)) => (
584                    true,
585                    serde_json::to_value(dap::RunInTerminalResponse {
586                        process_id: None,
587                        shell_process_id: Some(pid as u64),
588                    })
589                    .ok(),
590                ),
591                Some(Err(error)) => (
592                    false,
593                    serde_json::to_value(dap::ErrorResponse {
594                        error: Some(dap::Message {
595                            id: seq,
596                            format: error.to_string(),
597                            variables: None,
598                            send_telemetry: None,
599                            show_user: None,
600                            url: None,
601                            url_label: None,
602                        }),
603                    })
604                    .ok(),
605                ),
606                None => (
607                    false,
608                    serde_json::to_value(dap::ErrorResponse {
609                        error: Some(dap::Message {
610                            id: seq,
611                            format: "failed to receive response from spawn terminal".to_string(),
612                            variables: None,
613                            send_telemetry: None,
614                            show_user: None,
615                            url: None,
616                            url_label: None,
617                        }),
618                    })
619                    .ok(),
620                ),
621            };
622
623            session
624                .update(cx, |session, cx| {
625                    session.respond_to_client(
626                        seq,
627                        success,
628                        RunInTerminal::COMMAND.to_string(),
629                        body,
630                        cx,
631                    )
632                })?
633                .await
634        })
635    }
636
637    pub fn evaluate(
638        &self,
639        session_id: &SessionId,
640        stack_frame_id: u64,
641        expression: String,
642        context: EvaluateArgumentsContext,
643        source: Option<Source>,
644        cx: &mut Context<Self>,
645    ) -> Task<Result<EvaluateResponse>> {
646        let Some(client) = self
647            .session_by_id(session_id)
648            .and_then(|client| client.read(cx).adapter_client())
649        else {
650            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
651        };
652
653        cx.background_executor().spawn(async move {
654            client
655                .request::<Evaluate>(EvaluateArguments {
656                    expression: expression.clone(),
657                    frame_id: Some(stack_frame_id),
658                    context: Some(context),
659                    format: None,
660                    line: None,
661                    column: None,
662                    source,
663                })
664                .await
665        })
666    }
667
668    pub fn completions(
669        &self,
670        session_id: &SessionId,
671        stack_frame_id: u64,
672        text: String,
673        completion_column: u64,
674        cx: &mut Context<Self>,
675    ) -> Task<Result<Vec<CompletionItem>>> {
676        let Some(client) = self
677            .session_by_id(session_id)
678            .and_then(|client| client.read(cx).adapter_client())
679        else {
680            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
681        };
682
683        cx.background_executor().spawn(async move {
684            Ok(client
685                .request::<Completions>(CompletionsArguments {
686                    frame_id: Some(stack_frame_id),
687                    line: None,
688                    text,
689                    column: completion_column,
690                })
691                .await?
692                .targets)
693        })
694    }
695
696    #[allow(clippy::too_many_arguments)]
697    pub fn set_variable_value(
698        &self,
699        session_id: &SessionId,
700        stack_frame_id: u64,
701        variables_reference: u64,
702        name: String,
703        value: String,
704        evaluate_name: Option<String>,
705        cx: &mut Context<Self>,
706    ) -> Task<Result<()>> {
707        let Some(client) = self
708            .session_by_id(session_id)
709            .and_then(|client| client.read(cx).adapter_client())
710        else {
711            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
712        };
713
714        let supports_set_expression = self
715            .capabilities_by_id(session_id, cx)
716            .and_then(|caps| caps.supports_set_expression)
717            .unwrap_or_default();
718
719        cx.background_executor().spawn(async move {
720            if let Some(evaluate_name) = supports_set_expression.then(|| evaluate_name).flatten() {
721                client
722                    .request::<SetExpression>(SetExpressionArguments {
723                        expression: evaluate_name,
724                        value,
725                        frame_id: Some(stack_frame_id),
726                        format: None,
727                    })
728                    .await?;
729            } else {
730                client
731                    .request::<SetVariable>(SetVariableArguments {
732                        variables_reference,
733                        name,
734                        value,
735                        format: None,
736                    })
737                    .await?;
738            }
739
740            Ok(())
741        })
742    }
743
744    // .. get the client and what not
745    // let _ = client.modules(); // This can fire a request to a dap adapter or be a cheap getter.
746    // client.wait_for_request(request::Modules); // This ensures that the request that we've fired off runs to completions
747    // let returned_value = client.modules(); // this is a cheap getter.
748
749    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
750        let mut tasks = vec![];
751        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
752            tasks.push(self.shutdown_session(session_id, cx));
753        }
754
755        cx.background_executor().spawn(async move {
756            futures::future::join_all(tasks).await;
757        })
758    }
759
760    pub fn shutdown_session(
761        &mut self,
762        session_id: SessionId,
763        cx: &mut Context<Self>,
764    ) -> Task<Result<()>> {
765        let Some(_) = self.as_local_mut() else {
766            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
767        };
768
769        let Some(session) = self.sessions.remove(&session_id) else {
770            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
771        };
772
773        let shutdown_children = session
774            .read(cx)
775            .child_session_ids()
776            .iter()
777            .map(|session_id| self.shutdown_session(*session_id, cx))
778            .collect::<Vec<_>>();
779
780        let shutdown_parent_task = if let Some(parent_session) = session
781            .read(cx)
782            .parent_id()
783            .and_then(|session_id| self.session_by_id(session_id))
784        {
785            let shutdown_id = parent_session.update(cx, |parent_session, _| {
786                parent_session.remove_child_session_id(session_id);
787
788                if parent_session.child_session_ids().len() == 0 {
789                    Some(parent_session.session_id())
790                } else {
791                    None
792                }
793            });
794
795            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
796        } else {
797            None
798        };
799
800        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
801
802        cx.background_spawn(async move {
803            if shutdown_children.len() > 0 {
804                let _ = join_all(shutdown_children).await;
805            }
806
807            shutdown_task.await;
808
809            if let Some(parent_task) = shutdown_parent_task {
810                parent_task.await?;
811            }
812
813            Ok(())
814        })
815    }
816
817    pub fn shared(
818        &mut self,
819        project_id: u64,
820        downstream_client: AnyProtoClient,
821        _: &mut Context<Self>,
822    ) {
823        self.downstream_client = Some((downstream_client.clone(), project_id));
824    }
825
826    pub fn unshared(&mut self, cx: &mut Context<Self>) {
827        self.downstream_client.take();
828
829        cx.notify();
830    }
831}
832
833#[derive(Clone)]
834pub struct DapAdapterDelegate {
835    fs: Arc<dyn Fs>,
836    worktree_id: WorktreeId,
837    node_runtime: NodeRuntime,
838    http_client: Arc<dyn HttpClient>,
839    language_registry: Arc<LanguageRegistry>,
840    toolchain_store: Arc<dyn LanguageToolchainStore>,
841    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
842    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
843}
844
845impl DapAdapterDelegate {
846    pub fn new(
847        fs: Arc<dyn Fs>,
848        worktree_id: WorktreeId,
849        node_runtime: NodeRuntime,
850        http_client: Arc<dyn HttpClient>,
851        language_registry: Arc<LanguageRegistry>,
852        toolchain_store: Arc<dyn LanguageToolchainStore>,
853        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
854    ) -> Self {
855        Self {
856            fs,
857            worktree_id,
858            http_client,
859            node_runtime,
860            toolchain_store,
861            language_registry,
862            load_shell_env_task,
863            updated_adapters: Default::default(),
864        }
865    }
866}
867
868#[async_trait(?Send)]
869impl dap::adapters::DapDelegate for DapAdapterDelegate {
870    fn worktree_id(&self) -> WorktreeId {
871        self.worktree_id
872    }
873
874    fn http_client(&self) -> Arc<dyn HttpClient> {
875        self.http_client.clone()
876    }
877
878    fn node_runtime(&self) -> NodeRuntime {
879        self.node_runtime.clone()
880    }
881
882    fn fs(&self) -> Arc<dyn Fs> {
883        self.fs.clone()
884    }
885
886    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
887        self.updated_adapters.clone()
888    }
889
890    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
891        let name = SharedString::from(dap_name.to_string());
892        let status = match status {
893            DapStatus::None => BinaryStatus::None,
894            DapStatus::Downloading => BinaryStatus::Downloading,
895            DapStatus::Failed { error } => BinaryStatus::Failed { error },
896            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
897        };
898
899        self.language_registry
900            .update_dap_status(LanguageServerName(name), status);
901    }
902
903    fn which(&self, command: &OsStr) -> Option<PathBuf> {
904        which::which(command).ok()
905    }
906
907    async fn shell_env(&self) -> HashMap<String, String> {
908        let task = self.load_shell_env_task.clone();
909        task.await.unwrap_or_default()
910    }
911
912    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
913        self.toolchain_store.clone()
914    }
915}