dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    locators::DapLocator,
  4    session::{self, Session, SessionStateEvent},
  5};
  6use crate::{
  7    ProjectEnvironment,
  8    project_settings::ProjectSettings,
  9    terminals::{SshCommand, wrap_for_ssh},
 10    worktree_store::WorktreeStore,
 11};
 12use anyhow::{Result, anyhow};
 13use async_trait::async_trait;
 14use collections::HashMap;
 15use dap::{
 16    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, EvaluateArguments,
 17    EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments, Source,
 18    StartDebuggingRequestArguments,
 19    adapters::{DapStatus, DebugAdapterBinary, DebugAdapterName, TcpArguments},
 20    client::SessionId,
 21    messages::Message,
 22    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
 23};
 24use fs::Fs;
 25use futures::{
 26    channel::mpsc,
 27    future::{Shared, join_all},
 28};
 29use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 30use http_client::HttpClient;
 31use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 32use lsp::LanguageServerName;
 33use node_runtime::NodeRuntime;
 34
 35use remote::SshRemoteClient;
 36use rpc::{
 37    AnyProtoClient, TypedEnvelope,
 38    proto::{self},
 39};
 40use serde_json::Value;
 41use settings::{Settings, WorktreeId};
 42use smol::{lock::Mutex, stream::StreamExt};
 43use std::{
 44    borrow::Borrow,
 45    collections::{BTreeMap, HashSet},
 46    ffi::OsStr,
 47    net::Ipv4Addr,
 48    path::{Path, PathBuf},
 49    sync::Arc,
 50};
 51use task::{DebugTaskDefinition, DebugTaskTemplate};
 52use util::ResultExt as _;
 53use worktree::Worktree;
 54
 55pub enum DapStoreEvent {
 56    DebugClientStarted(SessionId),
 57    DebugSessionInitialized(SessionId),
 58    DebugClientShutdown(SessionId),
 59    DebugClientEvent {
 60        session_id: SessionId,
 61        message: Message,
 62    },
 63    RunInTerminal {
 64        session_id: SessionId,
 65        title: Option<String>,
 66        cwd: Option<Arc<Path>>,
 67        command: Option<String>,
 68        args: Vec<String>,
 69        envs: HashMap<String, String>,
 70        sender: mpsc::Sender<Result<u32>>,
 71    },
 72    SpawnChildSession {
 73        request: StartDebuggingRequestArguments,
 74        parent_session: Entity<Session>,
 75    },
 76    Notification(String),
 77    RemoteHasInitialized,
 78}
 79
 80#[allow(clippy::large_enum_variant)]
 81enum DapStoreMode {
 82    Local(LocalDapStore),
 83    Ssh(SshDapStore),
 84    Collab,
 85}
 86
 87pub struct LocalDapStore {
 88    fs: Arc<dyn Fs>,
 89    node_runtime: NodeRuntime,
 90    http_client: Arc<dyn HttpClient>,
 91    environment: Entity<ProjectEnvironment>,
 92    language_registry: Arc<LanguageRegistry>,
 93    toolchain_store: Arc<dyn LanguageToolchainStore>,
 94    locators: HashMap<String, Arc<dyn DapLocator>>,
 95}
 96
 97pub struct SshDapStore {
 98    ssh_client: Entity<SshRemoteClient>,
 99    upstream_client: AnyProtoClient,
100    upstream_project_id: u64,
101}
102
103pub struct DapStore {
104    mode: DapStoreMode,
105    downstream_client: Option<(AnyProtoClient, u64)>,
106    breakpoint_store: Entity<BreakpointStore>,
107    worktree_store: Entity<WorktreeStore>,
108    sessions: BTreeMap<SessionId, Entity<Session>>,
109    next_session_id: u32,
110    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
111    _start_debugging_task: Task<()>,
112}
113
114impl EventEmitter<DapStoreEvent> for DapStore {}
115
116impl DapStore {
117    pub fn init(client: &AnyProtoClient) {
118        client.add_entity_request_handler(Self::handle_run_debug_locator);
119        client.add_entity_request_handler(Self::handle_get_debug_adapter_binary);
120    }
121
122    #[expect(clippy::too_many_arguments)]
123    pub fn new_local(
124        http_client: Arc<dyn HttpClient>,
125        node_runtime: NodeRuntime,
126        fs: Arc<dyn Fs>,
127        language_registry: Arc<LanguageRegistry>,
128        environment: Entity<ProjectEnvironment>,
129        toolchain_store: Arc<dyn LanguageToolchainStore>,
130        worktree_store: Entity<WorktreeStore>,
131        breakpoint_store: Entity<BreakpointStore>,
132        cx: &mut Context<Self>,
133    ) -> Self {
134        cx.on_app_quit(Self::shutdown_sessions).detach();
135
136        let locators = HashMap::from_iter([(
137            "cargo".to_string(),
138            Arc::new(super::locators::cargo::CargoLocator {}) as _,
139        )]);
140
141        let mode = DapStoreMode::Local(LocalDapStore {
142            fs,
143            environment,
144            http_client,
145            node_runtime,
146            toolchain_store,
147            language_registry,
148            locators,
149        });
150
151        Self::new(mode, breakpoint_store, worktree_store, cx)
152    }
153
154    pub fn new_ssh(
155        project_id: u64,
156        ssh_client: Entity<SshRemoteClient>,
157        breakpoint_store: Entity<BreakpointStore>,
158        worktree_store: Entity<WorktreeStore>,
159        cx: &mut Context<Self>,
160    ) -> Self {
161        let mode = DapStoreMode::Ssh(SshDapStore {
162            upstream_client: ssh_client.read(cx).proto_client(),
163            ssh_client,
164            upstream_project_id: project_id,
165        });
166
167        Self::new(mode, breakpoint_store, worktree_store, cx)
168    }
169
170    pub fn new_collab(
171        _project_id: u64,
172        _upstream_client: AnyProtoClient,
173        breakpoint_store: Entity<BreakpointStore>,
174        worktree_store: Entity<WorktreeStore>,
175        cx: &mut Context<Self>,
176    ) -> Self {
177        Self::new(DapStoreMode::Collab, breakpoint_store, worktree_store, cx)
178    }
179
180    fn new(
181        mode: DapStoreMode,
182        breakpoint_store: Entity<BreakpointStore>,
183        worktree_store: Entity<WorktreeStore>,
184        cx: &mut Context<Self>,
185    ) -> Self {
186        let (start_debugging_tx, mut message_rx) =
187            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
188        let task = cx.spawn(async move |this, cx| {
189            while let Some((session_id, message)) = message_rx.next().await {
190                match message {
191                    Message::Request(request) => {
192                        let _ = this
193                            .update(cx, |this, cx| {
194                                if request.command == StartDebugging::COMMAND {
195                                    this.handle_start_debugging_request(session_id, request, cx)
196                                        .detach_and_log_err(cx);
197                                } else if request.command == RunInTerminal::COMMAND {
198                                    this.handle_run_in_terminal_request(session_id, request, cx)
199                                        .detach_and_log_err(cx);
200                                }
201                            })
202                            .log_err();
203                    }
204                    _ => {}
205                }
206            }
207        });
208
209        Self {
210            mode,
211            _start_debugging_task: task,
212            start_debugging_tx,
213            next_session_id: 0,
214            downstream_client: None,
215            breakpoint_store,
216            worktree_store,
217            sessions: Default::default(),
218        }
219    }
220
221    pub fn get_debug_adapter_binary(
222        &mut self,
223        definition: DebugTaskDefinition,
224        cx: &mut Context<Self>,
225    ) -> Task<Result<DebugAdapterBinary>> {
226        match &self.mode {
227            DapStoreMode::Local(_) => {
228                let Some(worktree) = self.worktree_store.read(cx).visible_worktrees(cx).next()
229                else {
230                    return Task::ready(Err(anyhow!("Failed to find a worktree")));
231                };
232                let Some(adapter) = DapRegistry::global(cx).adapter(&definition.adapter) else {
233                    return Task::ready(Err(anyhow!("Failed to find a debug adapter")));
234                };
235
236                let user_installed_path = ProjectSettings::get_global(cx)
237                    .dap
238                    .get(&adapter.name())
239                    .and_then(|s| s.binary.as_ref().map(PathBuf::from));
240
241                let delegate = self.delegate(&worktree, cx);
242                let cwd: Arc<Path> = definition
243                    .cwd()
244                    .unwrap_or(worktree.read(cx).abs_path().as_ref())
245                    .into();
246
247                cx.spawn(async move |this, cx| {
248                    let mut binary = adapter
249                        .get_binary(&delegate, &definition, user_installed_path, cx)
250                        .await?;
251
252                    let env = this
253                        .update(cx, |this, cx| {
254                            this.as_local()
255                                .unwrap()
256                                .environment
257                                .update(cx, |environment, cx| {
258                                    environment.get_directory_environment(cwd, cx)
259                                })
260                        })?
261                        .await;
262
263                    if let Some(mut env) = env {
264                        env.extend(std::mem::take(&mut binary.envs));
265                        binary.envs = env;
266                    }
267
268                    Ok(binary)
269                })
270            }
271            DapStoreMode::Ssh(ssh) => {
272                let request = ssh.upstream_client.request(proto::GetDebugAdapterBinary {
273                    project_id: ssh.upstream_project_id,
274                    task: Some(definition.to_proto()),
275                });
276                let ssh_client = ssh.ssh_client.clone();
277
278                cx.spawn(async move |_, cx| {
279                    let response = request.await?;
280                    let binary = DebugAdapterBinary::from_proto(response)?;
281                    let mut ssh_command = ssh_client.update(cx, |ssh, _| {
282                        anyhow::Ok(SshCommand {
283                            arguments: ssh
284                                .ssh_args()
285                                .ok_or_else(|| anyhow!("SSH arguments not found"))?,
286                        })
287                    })??;
288
289                    let mut connection = None;
290                    if let Some(c) = binary.connection {
291                        let local_bind_addr = Ipv4Addr::new(127, 0, 0, 1);
292                        let port =
293                            dap::transport::TcpTransport::unused_port(local_bind_addr).await?;
294
295                        ssh_command.add_port_forwarding(port, c.host.to_string(), c.port);
296                        connection = Some(TcpArguments {
297                            port: c.port,
298                            host: local_bind_addr,
299                            timeout: c.timeout,
300                        })
301                    }
302
303                    let (program, args) = wrap_for_ssh(
304                        &ssh_command,
305                        Some((&binary.command, &binary.arguments)),
306                        binary.cwd.as_deref(),
307                        binary.envs,
308                        None,
309                    );
310
311                    Ok(DebugAdapterBinary {
312                        command: program,
313                        arguments: args,
314                        envs: HashMap::default(),
315                        cwd: None,
316                        connection,
317                        request_args: binary.request_args,
318                    })
319                })
320            }
321            DapStoreMode::Collab => {
322                Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
323            }
324        }
325    }
326
327    pub fn run_debug_locator(
328        &mut self,
329        template: DebugTaskTemplate,
330        cx: &mut Context<Self>,
331    ) -> Task<Result<DebugTaskDefinition>> {
332        let Some(locator_name) = template.locator else {
333            return Task::ready(Ok(template.definition));
334        };
335
336        match &self.mode {
337            DapStoreMode::Local(local) => {
338                if let Some(locator) = local.locators.get(&locator_name).cloned() {
339                    cx.background_spawn(
340                        async move { locator.run_locator(template.definition).await },
341                    )
342                } else {
343                    Task::ready(Err(anyhow!("Couldn't find locator {}", locator_name)))
344                }
345            }
346            DapStoreMode::Ssh(ssh) => {
347                let request = ssh.upstream_client.request(proto::RunDebugLocator {
348                    project_id: ssh.upstream_project_id,
349                    locator: locator_name,
350                    task: Some(template.definition.to_proto()),
351                });
352                cx.background_spawn(async move {
353                    let response = request.await?;
354                    DebugTaskDefinition::from_proto(response)
355                })
356            }
357            DapStoreMode::Collab => {
358                Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
359            }
360        }
361    }
362
363    fn as_local(&self) -> Option<&LocalDapStore> {
364        match &self.mode {
365            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
366            _ => None,
367        }
368    }
369
370    pub fn new_session(
371        &mut self,
372        template: DebugTaskDefinition,
373        parent_session: Option<Entity<Session>>,
374        cx: &mut Context<Self>,
375    ) -> Entity<Session> {
376        let session_id = SessionId(util::post_inc(&mut self.next_session_id));
377
378        if let Some(session) = &parent_session {
379            session.update(cx, |session, _| {
380                session.add_child_session_id(session_id);
381            });
382        }
383
384        let start_debugging_tx = self.start_debugging_tx.clone();
385
386        let session = Session::new(
387            self.breakpoint_store.clone(),
388            session_id,
389            parent_session,
390            template.clone(),
391            start_debugging_tx,
392            cx,
393        );
394
395        self.sessions.insert(session_id, session.clone());
396        cx.notify();
397
398        cx.subscribe(&session, {
399            move |this: &mut DapStore, _, event: &SessionStateEvent, cx| match event {
400                SessionStateEvent::Shutdown => {
401                    this.shutdown_session(session_id, cx).detach_and_log_err(cx);
402                }
403                SessionStateEvent::Restart => {}
404                SessionStateEvent::Running => {
405                    cx.emit(DapStoreEvent::DebugClientStarted(session_id));
406                }
407            }
408        })
409        .detach();
410
411        session
412    }
413
414    pub fn boot_session(
415        &self,
416        session: Entity<Session>,
417        cx: &mut Context<Self>,
418    ) -> Task<Result<()>> {
419        let Some(worktree) = self.worktree_store.read(cx).visible_worktrees(cx).next() else {
420            return Task::ready(Err(anyhow!("Failed to find a worktree")));
421        };
422
423        let dap_store = cx.weak_entity();
424        let breakpoint_store = self.breakpoint_store.clone();
425        let definition = session.read(cx).definition();
426
427        cx.spawn({
428            let session = session.clone();
429            async move |this, cx| {
430                let binary = this
431                    .update(cx, |this, cx| {
432                        this.get_debug_adapter_binary(definition.clone(), cx)
433                    })?
434                    .await?;
435
436                session
437                    .update(cx, |session, cx| {
438                        session.boot(binary, worktree, breakpoint_store, dap_store, cx)
439                    })?
440                    .await
441            }
442        })
443    }
444
445    pub fn session_by_id(
446        &self,
447        session_id: impl Borrow<SessionId>,
448    ) -> Option<Entity<session::Session>> {
449        let session_id = session_id.borrow();
450        let client = self.sessions.get(session_id).cloned();
451
452        client
453    }
454    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
455        self.sessions.values()
456    }
457
458    pub fn capabilities_by_id(
459        &self,
460        session_id: impl Borrow<SessionId>,
461        cx: &App,
462    ) -> Option<Capabilities> {
463        let session_id = session_id.borrow();
464        self.sessions
465            .get(session_id)
466            .map(|client| client.read(cx).capabilities.clone())
467    }
468
469    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
470        &self.breakpoint_store
471    }
472
473    pub fn worktree_store(&self) -> &Entity<WorktreeStore> {
474        &self.worktree_store
475    }
476
477    #[allow(dead_code)]
478    async fn handle_ignore_breakpoint_state(
479        this: Entity<Self>,
480        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
481        mut cx: AsyncApp,
482    ) -> Result<()> {
483        let session_id = SessionId::from_proto(envelope.payload.session_id);
484
485        this.update(&mut cx, |this, cx| {
486            if let Some(session) = this.session_by_id(&session_id) {
487                session.update(cx, |session, cx| {
488                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
489                })
490            } else {
491                Task::ready(HashMap::default())
492            }
493        })?
494        .await;
495
496        Ok(())
497    }
498
499    fn delegate(&self, worktree: &Entity<Worktree>, cx: &mut App) -> DapAdapterDelegate {
500        let Some(local_store) = self.as_local() else {
501            unimplemented!("Starting session on remote side");
502        };
503
504        DapAdapterDelegate::new(
505            local_store.fs.clone(),
506            worktree.read(cx).id(),
507            local_store.node_runtime.clone(),
508            local_store.http_client.clone(),
509            local_store.language_registry.clone(),
510            local_store.toolchain_store.clone(),
511            local_store.environment.update(cx, |env, cx| {
512                env.get_worktree_environment(worktree.clone(), cx)
513            }),
514        )
515    }
516
517    fn handle_start_debugging_request(
518        &mut self,
519        session_id: SessionId,
520        request: dap::messages::Request,
521        cx: &mut Context<Self>,
522    ) -> Task<Result<()>> {
523        let Some(parent_session) = self.session_by_id(session_id) else {
524            return Task::ready(Err(anyhow!("Session not found")));
525        };
526        let request_seq = request.seq;
527
528        let launch_request: Option<Result<StartDebuggingRequestArguments, _>> = request
529            .arguments
530            .as_ref()
531            .map(|value| serde_json::from_value(value.clone()));
532
533        let mut success = true;
534        if let Some(Ok(request)) = launch_request {
535            cx.emit(DapStoreEvent::SpawnChildSession {
536                request,
537                parent_session: parent_session.clone(),
538            });
539        } else {
540            log::error!(
541                "Failed to parse launch request arguments: {:?}",
542                request.arguments
543            );
544            success = false;
545        }
546
547        cx.spawn(async move |_, cx| {
548            parent_session
549                .update(cx, |session, cx| {
550                    session.respond_to_client(
551                        request_seq,
552                        success,
553                        StartDebugging::COMMAND.to_string(),
554                        None,
555                        cx,
556                    )
557                })?
558                .await
559        })
560    }
561
562    fn handle_run_in_terminal_request(
563        &mut self,
564        session_id: SessionId,
565        request: dap::messages::Request,
566        cx: &mut Context<Self>,
567    ) -> Task<Result<()>> {
568        let Some(session) = self.session_by_id(session_id) else {
569            return Task::ready(Err(anyhow!("Session not found")));
570        };
571
572        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
573            request.arguments.unwrap_or_default(),
574        )
575        .expect("To parse StartDebuggingRequestArguments");
576
577        let seq = request.seq;
578
579        let cwd = Path::new(&request_args.cwd);
580
581        match cwd.try_exists() {
582            Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
583                return session.update(cx, |session, cx| {
584                    session.respond_to_client(
585                        seq,
586                        false,
587                        RunInTerminal::COMMAND.to_string(),
588                        serde_json::to_value(dap::ErrorResponse {
589                            error: Some(dap::Message {
590                                id: seq,
591                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
592                                variables: None,
593                                send_telemetry: None,
594                                show_user: None,
595                                url: None,
596                                url_label: None,
597                            }),
598                        })
599                        .ok(),
600                        cx,
601                    )
602                });
603            }
604            _ => (),
605        }
606        let mut args = request_args.args.clone();
607
608        // Handle special case for NodeJS debug adapter
609        // If only the Node binary path is provided, we set the command to None
610        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
611        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
612        // This allows the NodeJS debug client to attach correctly
613        let command = if args.len() > 1 {
614            Some(args.remove(0))
615        } else {
616            None
617        };
618
619        let mut envs: HashMap<String, String> = Default::default();
620        if let Some(Value::Object(env)) = request_args.env {
621            for (key, value) in env {
622                let value_str = match (key.as_str(), value) {
623                    (_, Value::String(value)) => value,
624                    _ => continue,
625                };
626
627                envs.insert(key, value_str);
628            }
629        }
630
631        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
632        let cwd = Some(cwd)
633            .filter(|cwd| cwd.as_os_str().len() > 0)
634            .map(Arc::from)
635            .or_else(|| {
636                self.session_by_id(session_id)
637                    .and_then(|session| session.read(cx).binary().cwd.as_deref().map(Arc::from))
638            });
639        cx.emit(DapStoreEvent::RunInTerminal {
640            session_id,
641            title: request_args.title,
642            cwd,
643            command,
644            args,
645            envs,
646            sender: tx,
647        });
648        cx.notify();
649
650        let session = session.downgrade();
651        cx.spawn(async move |_, cx| {
652            let (success, body) = match rx.next().await {
653                Some(Ok(pid)) => (
654                    true,
655                    serde_json::to_value(dap::RunInTerminalResponse {
656                        process_id: None,
657                        shell_process_id: Some(pid as u64),
658                    })
659                    .ok(),
660                ),
661                Some(Err(error)) => (
662                    false,
663                    serde_json::to_value(dap::ErrorResponse {
664                        error: Some(dap::Message {
665                            id: seq,
666                            format: error.to_string(),
667                            variables: None,
668                            send_telemetry: None,
669                            show_user: None,
670                            url: None,
671                            url_label: None,
672                        }),
673                    })
674                    .ok(),
675                ),
676                None => (
677                    false,
678                    serde_json::to_value(dap::ErrorResponse {
679                        error: Some(dap::Message {
680                            id: seq,
681                            format: "failed to receive response from spawn terminal".to_string(),
682                            variables: None,
683                            send_telemetry: None,
684                            show_user: None,
685                            url: None,
686                            url_label: None,
687                        }),
688                    })
689                    .ok(),
690                ),
691            };
692
693            session
694                .update(cx, |session, cx| {
695                    session.respond_to_client(
696                        seq,
697                        success,
698                        RunInTerminal::COMMAND.to_string(),
699                        body,
700                        cx,
701                    )
702                })?
703                .await
704        })
705    }
706
707    pub fn evaluate(
708        &self,
709        session_id: &SessionId,
710        stack_frame_id: u64,
711        expression: String,
712        context: EvaluateArgumentsContext,
713        source: Option<Source>,
714        cx: &mut Context<Self>,
715    ) -> Task<Result<EvaluateResponse>> {
716        let Some(client) = self
717            .session_by_id(session_id)
718            .and_then(|client| client.read(cx).adapter_client())
719        else {
720            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
721        };
722
723        cx.background_executor().spawn(async move {
724            client
725                .request::<Evaluate>(EvaluateArguments {
726                    expression: expression.clone(),
727                    frame_id: Some(stack_frame_id),
728                    context: Some(context),
729                    format: None,
730                    line: None,
731                    column: None,
732                    source,
733                })
734                .await
735        })
736    }
737
738    pub fn completions(
739        &self,
740        session_id: &SessionId,
741        stack_frame_id: u64,
742        text: String,
743        completion_column: u64,
744        cx: &mut Context<Self>,
745    ) -> Task<Result<Vec<CompletionItem>>> {
746        let Some(client) = self
747            .session_by_id(session_id)
748            .and_then(|client| client.read(cx).adapter_client())
749        else {
750            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
751        };
752
753        cx.background_executor().spawn(async move {
754            Ok(client
755                .request::<Completions>(CompletionsArguments {
756                    frame_id: Some(stack_frame_id),
757                    line: None,
758                    text,
759                    column: completion_column,
760                })
761                .await?
762                .targets)
763        })
764    }
765
766    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
767        let mut tasks = vec![];
768        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
769            tasks.push(self.shutdown_session(session_id, cx));
770        }
771
772        cx.background_executor().spawn(async move {
773            futures::future::join_all(tasks).await;
774        })
775    }
776
777    pub fn shutdown_session(
778        &mut self,
779        session_id: SessionId,
780        cx: &mut Context<Self>,
781    ) -> Task<Result<()>> {
782        let Some(session) = self.sessions.remove(&session_id) else {
783            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
784        };
785
786        let shutdown_children = session
787            .read(cx)
788            .child_session_ids()
789            .iter()
790            .map(|session_id| self.shutdown_session(*session_id, cx))
791            .collect::<Vec<_>>();
792
793        let shutdown_parent_task = if let Some(parent_session) = session
794            .read(cx)
795            .parent_id(cx)
796            .and_then(|session_id| self.session_by_id(session_id))
797        {
798            let shutdown_id = parent_session.update(cx, |parent_session, _| {
799                parent_session.remove_child_session_id(session_id);
800
801                if parent_session.child_session_ids().len() == 0 {
802                    Some(parent_session.session_id())
803                } else {
804                    None
805                }
806            });
807
808            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
809        } else {
810            None
811        };
812
813        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
814
815        cx.background_spawn(async move {
816            if shutdown_children.len() > 0 {
817                let _ = join_all(shutdown_children).await;
818            }
819
820            shutdown_task.await;
821
822            if let Some(parent_task) = shutdown_parent_task {
823                parent_task.await?;
824            }
825
826            Ok(())
827        })
828    }
829
830    pub fn shared(
831        &mut self,
832        project_id: u64,
833        downstream_client: AnyProtoClient,
834        _: &mut Context<Self>,
835    ) {
836        self.downstream_client = Some((downstream_client.clone(), project_id));
837    }
838
839    pub fn unshared(&mut self, cx: &mut Context<Self>) {
840        self.downstream_client.take();
841
842        cx.notify();
843    }
844
845    async fn handle_run_debug_locator(
846        this: Entity<Self>,
847        envelope: TypedEnvelope<proto::RunDebugLocator>,
848        mut cx: AsyncApp,
849    ) -> Result<proto::DebugTaskDefinition> {
850        let template = DebugTaskTemplate {
851            locator: Some(envelope.payload.locator),
852            definition: DebugTaskDefinition::from_proto(
853                envelope
854                    .payload
855                    .task
856                    .ok_or_else(|| anyhow!("missing definition"))?,
857            )?,
858        };
859        let definition = this
860            .update(&mut cx, |this, cx| this.run_debug_locator(template, cx))?
861            .await?;
862        Ok(definition.to_proto())
863    }
864
865    async fn handle_get_debug_adapter_binary(
866        this: Entity<Self>,
867        envelope: TypedEnvelope<proto::GetDebugAdapterBinary>,
868        mut cx: AsyncApp,
869    ) -> Result<proto::DebugAdapterBinary> {
870        let definition = DebugTaskDefinition::from_proto(
871            envelope
872                .payload
873                .task
874                .ok_or_else(|| anyhow!("missing definition"))?,
875        )?;
876        let binary = this
877            .update(&mut cx, |this, cx| {
878                this.get_debug_adapter_binary(definition, cx)
879            })?
880            .await?;
881        Ok(binary.to_proto())
882    }
883}
884
885#[derive(Clone)]
886pub struct DapAdapterDelegate {
887    fs: Arc<dyn Fs>,
888    worktree_id: WorktreeId,
889    node_runtime: NodeRuntime,
890    http_client: Arc<dyn HttpClient>,
891    language_registry: Arc<LanguageRegistry>,
892    toolchain_store: Arc<dyn LanguageToolchainStore>,
893    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
894    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
895}
896
897impl DapAdapterDelegate {
898    pub fn new(
899        fs: Arc<dyn Fs>,
900        worktree_id: WorktreeId,
901        node_runtime: NodeRuntime,
902        http_client: Arc<dyn HttpClient>,
903        language_registry: Arc<LanguageRegistry>,
904        toolchain_store: Arc<dyn LanguageToolchainStore>,
905        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
906    ) -> Self {
907        Self {
908            fs,
909            worktree_id,
910            http_client,
911            node_runtime,
912            toolchain_store,
913            language_registry,
914            load_shell_env_task,
915            updated_adapters: Default::default(),
916        }
917    }
918}
919
920#[async_trait(?Send)]
921impl dap::adapters::DapDelegate for DapAdapterDelegate {
922    fn worktree_id(&self) -> WorktreeId {
923        self.worktree_id
924    }
925
926    fn http_client(&self) -> Arc<dyn HttpClient> {
927        self.http_client.clone()
928    }
929
930    fn node_runtime(&self) -> NodeRuntime {
931        self.node_runtime.clone()
932    }
933
934    fn fs(&self) -> Arc<dyn Fs> {
935        self.fs.clone()
936    }
937
938    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
939        self.updated_adapters.clone()
940    }
941
942    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
943        let name = SharedString::from(dap_name.to_string());
944        let status = match status {
945            DapStatus::None => BinaryStatus::None,
946            DapStatus::Downloading => BinaryStatus::Downloading,
947            DapStatus::Failed { error } => BinaryStatus::Failed { error },
948            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
949        };
950
951        self.language_registry
952            .update_dap_status(LanguageServerName(name), status);
953    }
954
955    fn which(&self, command: &OsStr) -> Option<PathBuf> {
956        which::which(command).ok()
957    }
958
959    async fn shell_env(&self) -> HashMap<String, String> {
960        let task = self.load_shell_env_task.clone();
961        task.await.unwrap_or_default()
962    }
963
964    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
965        self.toolchain_store.clone()
966    }
967}