dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    dap_command::EvaluateCommand,
  4    locators,
  5    session::{self, Session, SessionStateEvent},
  6};
  7use crate::{
  8    InlayHint, InlayHintLabel, ProjectEnvironment, ResolveState,
  9    project_settings::ProjectSettings,
 10    terminals::{SshCommand, wrap_for_ssh},
 11    worktree_store::WorktreeStore,
 12};
 13use anyhow::{Context as _, Result, anyhow};
 14use async_trait::async_trait;
 15use collections::HashMap;
 16use dap::{
 17    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, DebugRequest,
 18    EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, Source, StackFrameId,
 19    adapters::{
 20        DapDelegate, DebugAdapterBinary, DebugAdapterName, DebugTaskDefinition, TcpArguments,
 21    },
 22    client::SessionId,
 23    inline_value::VariableLookupKind,
 24    messages::Message,
 25    requests::{Completions, Evaluate},
 26};
 27use fs::Fs;
 28use futures::{
 29    StreamExt,
 30    channel::mpsc::{self, UnboundedSender},
 31    future::{Shared, join_all},
 32};
 33use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 34use http_client::HttpClient;
 35use language::{Buffer, LanguageToolchainStore, language_settings::InlayHintKind};
 36use node_runtime::NodeRuntime;
 37
 38use remote::SshRemoteClient;
 39use rpc::{
 40    AnyProtoClient, TypedEnvelope,
 41    proto::{self},
 42};
 43use settings::{Settings, WorktreeId};
 44use std::{
 45    borrow::Borrow,
 46    collections::BTreeMap,
 47    ffi::OsStr,
 48    net::Ipv4Addr,
 49    path::{Path, PathBuf},
 50    sync::{Arc, Once},
 51};
 52use task::{DebugScenario, SpawnInTerminal, TaskContext, TaskTemplate};
 53use util::ResultExt as _;
 54use worktree::Worktree;
 55
 56#[derive(Debug)]
 57pub enum DapStoreEvent {
 58    DebugClientStarted(SessionId),
 59    DebugSessionInitialized(SessionId),
 60    DebugClientShutdown(SessionId),
 61    DebugClientEvent {
 62        session_id: SessionId,
 63        message: Message,
 64    },
 65    Notification(String),
 66    RemoteHasInitialized,
 67}
 68
 69enum DapStoreMode {
 70    Local(LocalDapStore),
 71    Ssh(SshDapStore),
 72    Collab,
 73}
 74
 75pub struct LocalDapStore {
 76    fs: Arc<dyn Fs>,
 77    node_runtime: NodeRuntime,
 78    http_client: Arc<dyn HttpClient>,
 79    environment: Entity<ProjectEnvironment>,
 80    toolchain_store: Arc<dyn LanguageToolchainStore>,
 81}
 82
 83pub struct SshDapStore {
 84    ssh_client: Entity<SshRemoteClient>,
 85    upstream_client: AnyProtoClient,
 86    upstream_project_id: u64,
 87}
 88
 89pub struct DapStore {
 90    mode: DapStoreMode,
 91    downstream_client: Option<(AnyProtoClient, u64)>,
 92    breakpoint_store: Entity<BreakpointStore>,
 93    worktree_store: Entity<WorktreeStore>,
 94    sessions: BTreeMap<SessionId, Entity<Session>>,
 95    next_session_id: u32,
 96}
 97
 98impl EventEmitter<DapStoreEvent> for DapStore {}
 99
100impl DapStore {
101    pub fn init(client: &AnyProtoClient, cx: &mut App) {
102        static ADD_LOCATORS: Once = Once::new();
103        ADD_LOCATORS.call_once(|| {
104            let registry = DapRegistry::global(cx);
105            registry.add_locator(Arc::new(locators::cargo::CargoLocator {}));
106            registry.add_locator(Arc::new(locators::go::GoLocator {}));
107            registry.add_locator(Arc::new(locators::node::NodeLocator));
108            registry.add_locator(Arc::new(locators::python::PythonLocator));
109        });
110        client.add_entity_request_handler(Self::handle_run_debug_locator);
111        client.add_entity_request_handler(Self::handle_get_debug_adapter_binary);
112        client.add_entity_message_handler(Self::handle_log_to_debug_console);
113    }
114
115    #[expect(clippy::too_many_arguments)]
116    pub fn new_local(
117        http_client: Arc<dyn HttpClient>,
118        node_runtime: NodeRuntime,
119        fs: Arc<dyn Fs>,
120        environment: Entity<ProjectEnvironment>,
121        toolchain_store: Arc<dyn LanguageToolchainStore>,
122        worktree_store: Entity<WorktreeStore>,
123        breakpoint_store: Entity<BreakpointStore>,
124        cx: &mut Context<Self>,
125    ) -> Self {
126        let mode = DapStoreMode::Local(LocalDapStore {
127            fs,
128            environment,
129            http_client,
130            node_runtime,
131            toolchain_store,
132        });
133
134        Self::new(mode, breakpoint_store, worktree_store, cx)
135    }
136
137    pub fn new_ssh(
138        project_id: u64,
139        ssh_client: Entity<SshRemoteClient>,
140        breakpoint_store: Entity<BreakpointStore>,
141        worktree_store: Entity<WorktreeStore>,
142        cx: &mut Context<Self>,
143    ) -> Self {
144        let mode = DapStoreMode::Ssh(SshDapStore {
145            upstream_client: ssh_client.read(cx).proto_client(),
146            ssh_client,
147            upstream_project_id: project_id,
148        });
149
150        Self::new(mode, breakpoint_store, worktree_store, cx)
151    }
152
153    pub fn new_collab(
154        _project_id: u64,
155        _upstream_client: AnyProtoClient,
156        breakpoint_store: Entity<BreakpointStore>,
157        worktree_store: Entity<WorktreeStore>,
158        cx: &mut Context<Self>,
159    ) -> Self {
160        Self::new(DapStoreMode::Collab, breakpoint_store, worktree_store, cx)
161    }
162
163    fn new(
164        mode: DapStoreMode,
165        breakpoint_store: Entity<BreakpointStore>,
166        worktree_store: Entity<WorktreeStore>,
167        _cx: &mut Context<Self>,
168    ) -> Self {
169        Self {
170            mode,
171            next_session_id: 0,
172            downstream_client: None,
173            breakpoint_store,
174            worktree_store,
175            sessions: Default::default(),
176        }
177    }
178
179    pub fn get_debug_adapter_binary(
180        &mut self,
181        definition: DebugTaskDefinition,
182        session_id: SessionId,
183        worktree: &Entity<Worktree>,
184        console: UnboundedSender<String>,
185        cx: &mut Context<Self>,
186    ) -> Task<Result<DebugAdapterBinary>> {
187        match &self.mode {
188            DapStoreMode::Local(_) => {
189                let Some(adapter) = DapRegistry::global(cx).adapter(&definition.adapter) else {
190                    return Task::ready(Err(anyhow!("Failed to find a debug adapter")));
191                };
192
193                let user_installed_path = ProjectSettings::get_global(cx)
194                    .dap
195                    .get(&adapter.name())
196                    .and_then(|s| s.binary.as_ref().map(PathBuf::from));
197
198                let delegate = self.delegate(&worktree, console, cx);
199                let cwd: Arc<Path> = worktree.read(cx).abs_path().as_ref().into();
200
201                cx.spawn(async move |this, cx| {
202                    let mut binary = adapter
203                        .get_binary(&delegate, &definition, user_installed_path, cx)
204                        .await?;
205
206                    let env = this
207                        .update(cx, |this, cx| {
208                            this.as_local()
209                                .unwrap()
210                                .environment
211                                .update(cx, |environment, cx| {
212                                    environment.get_directory_environment(cwd, cx)
213                                })
214                        })?
215                        .await;
216
217                    if let Some(mut env) = env {
218                        env.extend(std::mem::take(&mut binary.envs));
219                        binary.envs = env;
220                    }
221
222                    Ok(binary)
223                })
224            }
225            DapStoreMode::Ssh(ssh) => {
226                let request = ssh.upstream_client.request(proto::GetDebugAdapterBinary {
227                    session_id: session_id.to_proto(),
228                    project_id: ssh.upstream_project_id,
229                    worktree_id: worktree.read(cx).id().to_proto(),
230                    definition: Some(definition.to_proto()),
231                });
232                let ssh_client = ssh.ssh_client.clone();
233
234                cx.spawn(async move |_, cx| {
235                    let response = request.await?;
236                    let binary = DebugAdapterBinary::from_proto(response)?;
237                    let mut ssh_command = ssh_client.read_with(cx, |ssh, _| {
238                        anyhow::Ok(SshCommand {
239                            arguments: ssh.ssh_args().context("SSH arguments not found")?,
240                        })
241                    })??;
242
243                    let mut connection = None;
244                    if let Some(c) = binary.connection {
245                        let local_bind_addr = Ipv4Addr::LOCALHOST;
246                        let port =
247                            dap::transport::TcpTransport::unused_port(local_bind_addr).await?;
248
249                        ssh_command.add_port_forwarding(port, c.host.to_string(), c.port);
250                        connection = Some(TcpArguments {
251                            port,
252                            host: local_bind_addr,
253                            timeout: c.timeout,
254                        })
255                    }
256
257                    let (program, args) = wrap_for_ssh(
258                        &ssh_command,
259                        binary
260                            .command
261                            .as_ref()
262                            .map(|command| (command, &binary.arguments)),
263                        binary.cwd.as_deref(),
264                        binary.envs,
265                        None,
266                    );
267
268                    Ok(DebugAdapterBinary {
269                        command: Some(program),
270                        arguments: args,
271                        envs: HashMap::default(),
272                        cwd: None,
273                        connection,
274                        request_args: binary.request_args,
275                    })
276                })
277            }
278            DapStoreMode::Collab => {
279                Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
280            }
281        }
282    }
283
284    pub fn debug_scenario_for_build_task(
285        &self,
286        build: TaskTemplate,
287        adapter: DebugAdapterName,
288        label: SharedString,
289        cx: &mut App,
290    ) -> Task<Option<DebugScenario>> {
291        let locators = DapRegistry::global(cx).locators();
292
293        cx.background_spawn(async move {
294            for locator in locators.values() {
295                if let Some(scenario) = locator.create_scenario(&build, &label, &adapter).await {
296                    return Some(scenario);
297                }
298            }
299            None
300        })
301    }
302
303    pub fn run_debug_locator(
304        &mut self,
305        locator_name: &str,
306        build_command: SpawnInTerminal,
307        cx: &mut Context<Self>,
308    ) -> Task<Result<DebugRequest>> {
309        match &self.mode {
310            DapStoreMode::Local(_) => {
311                // Pre-resolve args with existing environment.
312                let locators = DapRegistry::global(cx).locators();
313                let locator = locators.get(locator_name);
314
315                if let Some(locator) = locator.cloned() {
316                    cx.background_spawn(async move {
317                        let result = locator
318                            .run(build_command.clone())
319                            .await
320                            .log_with_level(log::Level::Error);
321                        if let Some(result) = result {
322                            return Ok(result);
323                        }
324
325                        anyhow::bail!(
326                            "None of the locators for task `{}` completed successfully",
327                            build_command.label
328                        )
329                    })
330                } else {
331                    Task::ready(Err(anyhow!(
332                        "Couldn't find any locator for task `{}`. Specify the `attach` or `launch` arguments in your debug scenario definition",
333                        build_command.label
334                    )))
335                }
336            }
337            DapStoreMode::Ssh(ssh) => {
338                let request = ssh.upstream_client.request(proto::RunDebugLocators {
339                    project_id: ssh.upstream_project_id,
340                    build_command: Some(build_command.to_proto()),
341                    locator: locator_name.to_owned(),
342                });
343                cx.background_spawn(async move {
344                    let response = request.await?;
345                    DebugRequest::from_proto(response)
346                })
347            }
348            DapStoreMode::Collab => {
349                Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
350            }
351        }
352    }
353
354    fn as_local(&self) -> Option<&LocalDapStore> {
355        match &self.mode {
356            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
357            _ => None,
358        }
359    }
360
361    pub fn new_session(
362        &mut self,
363        label: SharedString,
364        adapter: DebugAdapterName,
365        task_context: TaskContext,
366        parent_session: Option<Entity<Session>>,
367        cx: &mut Context<Self>,
368    ) -> Entity<Session> {
369        let session_id = SessionId(util::post_inc(&mut self.next_session_id));
370
371        if let Some(session) = &parent_session {
372            session.update(cx, |session, _| {
373                session.add_child_session_id(session_id);
374            });
375        }
376
377        let session = Session::new(
378            self.breakpoint_store.clone(),
379            session_id,
380            parent_session,
381            label,
382            adapter,
383            task_context,
384            cx,
385        );
386
387        self.sessions.insert(session_id, session.clone());
388        cx.notify();
389
390        cx.subscribe(&session, {
391            move |this: &mut DapStore, _, event: &SessionStateEvent, cx| match event {
392                SessionStateEvent::Shutdown => {
393                    this.shutdown_session(session_id, cx).detach_and_log_err(cx);
394                }
395                SessionStateEvent::Restart | SessionStateEvent::SpawnChildSession { .. } => {}
396                SessionStateEvent::Running => {
397                    cx.emit(DapStoreEvent::DebugClientStarted(session_id));
398                }
399            }
400        })
401        .detach();
402
403        session
404    }
405
406    pub fn boot_session(
407        &self,
408        session: Entity<Session>,
409        definition: DebugTaskDefinition,
410        worktree: Entity<Worktree>,
411        cx: &mut Context<Self>,
412    ) -> Task<Result<()>> {
413        let dap_store = cx.weak_entity();
414        let console = session.update(cx, |session, cx| session.console_output(cx));
415        let session_id = session.read(cx).session_id();
416
417        cx.spawn({
418            let session = session.clone();
419            async move |this, cx| {
420                let binary = this
421                    .update(cx, |this, cx| {
422                        this.get_debug_adapter_binary(
423                            definition.clone(),
424                            session_id,
425                            &worktree,
426                            console,
427                            cx,
428                        )
429                    })?
430                    .await?;
431                session
432                    .update(cx, |session, cx| {
433                        session.boot(binary, worktree, dap_store, cx)
434                    })?
435                    .await
436            }
437        })
438    }
439
440    pub fn session_by_id(
441        &self,
442        session_id: impl Borrow<SessionId>,
443    ) -> Option<Entity<session::Session>> {
444        let session_id = session_id.borrow();
445        let client = self.sessions.get(session_id).cloned();
446
447        client
448    }
449    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
450        self.sessions.values()
451    }
452
453    pub fn capabilities_by_id(
454        &self,
455        session_id: impl Borrow<SessionId>,
456        cx: &App,
457    ) -> Option<Capabilities> {
458        let session_id = session_id.borrow();
459        self.sessions
460            .get(session_id)
461            .map(|client| client.read(cx).capabilities.clone())
462    }
463
464    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
465        &self.breakpoint_store
466    }
467
468    pub fn worktree_store(&self) -> &Entity<WorktreeStore> {
469        &self.worktree_store
470    }
471
472    #[allow(dead_code)]
473    async fn handle_ignore_breakpoint_state(
474        this: Entity<Self>,
475        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
476        mut cx: AsyncApp,
477    ) -> Result<()> {
478        let session_id = SessionId::from_proto(envelope.payload.session_id);
479
480        this.update(&mut cx, |this, cx| {
481            if let Some(session) = this.session_by_id(&session_id) {
482                session.update(cx, |session, cx| {
483                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
484                })
485            } else {
486                Task::ready(HashMap::default())
487            }
488        })?
489        .await;
490
491        Ok(())
492    }
493
494    fn delegate(
495        &self,
496        worktree: &Entity<Worktree>,
497        console: UnboundedSender<String>,
498        cx: &mut App,
499    ) -> Arc<dyn DapDelegate> {
500        let Some(local_store) = self.as_local() else {
501            unimplemented!("Starting session on remote side");
502        };
503
504        Arc::new(DapAdapterDelegate::new(
505            local_store.fs.clone(),
506            worktree.read(cx).snapshot(),
507            console,
508            local_store.node_runtime.clone(),
509            local_store.http_client.clone(),
510            local_store.toolchain_store.clone(),
511            local_store.environment.update(cx, |env, cx| {
512                env.get_worktree_environment(worktree.clone(), cx)
513            }),
514        ))
515    }
516
517    pub fn evaluate(
518        &self,
519        session_id: &SessionId,
520        stack_frame_id: u64,
521        expression: String,
522        context: EvaluateArgumentsContext,
523        source: Option<Source>,
524        cx: &mut Context<Self>,
525    ) -> Task<Result<EvaluateResponse>> {
526        let Some(client) = self
527            .session_by_id(session_id)
528            .and_then(|client| client.read(cx).adapter_client())
529        else {
530            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
531        };
532
533        cx.background_executor().spawn(async move {
534            client
535                .request::<Evaluate>(EvaluateArguments {
536                    expression: expression.clone(),
537                    frame_id: Some(stack_frame_id),
538                    context: Some(context),
539                    format: None,
540                    line: None,
541                    column: None,
542                    source,
543                })
544                .await
545        })
546    }
547
548    pub fn completions(
549        &self,
550        session_id: &SessionId,
551        stack_frame_id: u64,
552        text: String,
553        completion_column: u64,
554        cx: &mut Context<Self>,
555    ) -> Task<Result<Vec<CompletionItem>>> {
556        let Some(client) = self
557            .session_by_id(session_id)
558            .and_then(|client| client.read(cx).adapter_client())
559        else {
560            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
561        };
562
563        cx.background_executor().spawn(async move {
564            Ok(client
565                .request::<Completions>(CompletionsArguments {
566                    frame_id: Some(stack_frame_id),
567                    line: None,
568                    text,
569                    column: completion_column,
570                })
571                .await?
572                .targets)
573        })
574    }
575
576    pub fn resolve_inline_value_locations(
577        &self,
578        session: Entity<Session>,
579        stack_frame_id: StackFrameId,
580        buffer_handle: Entity<Buffer>,
581        inline_value_locations: Vec<dap::inline_value::InlineValueLocation>,
582        cx: &mut Context<Self>,
583    ) -> Task<Result<Vec<InlayHint>>> {
584        let snapshot = buffer_handle.read(cx).snapshot();
585        let all_variables = session.read(cx).variables_by_stack_frame_id(stack_frame_id);
586
587        fn format_value(mut value: String) -> String {
588            const LIMIT: usize = 100;
589
590            if value.len() > LIMIT {
591                let mut index = LIMIT;
592                // If index isn't a char boundary truncate will cause a panic
593                while !value.is_char_boundary(index) {
594                    index -= 1;
595                }
596                value.truncate(index);
597                value.push_str("...");
598            }
599
600            format!(": {}", value)
601        }
602
603        cx.spawn(async move |_, cx| {
604            let mut inlay_hints = Vec::with_capacity(inline_value_locations.len());
605            for inline_value_location in inline_value_locations.iter() {
606                let point = snapshot.point_to_point_utf16(language::Point::new(
607                    inline_value_location.row as u32,
608                    inline_value_location.column as u32,
609                ));
610                let position = snapshot.anchor_after(point);
611
612                match inline_value_location.lookup {
613                    VariableLookupKind::Variable => {
614                        let Some(variable) = all_variables
615                            .iter()
616                            .find(|variable| variable.name == inline_value_location.variable_name)
617                        else {
618                            continue;
619                        };
620
621                        inlay_hints.push(InlayHint {
622                            position,
623                            label: InlayHintLabel::String(format_value(variable.value.clone())),
624                            kind: Some(InlayHintKind::Type),
625                            padding_left: false,
626                            padding_right: false,
627                            tooltip: None,
628                            resolve_state: ResolveState::Resolved,
629                        });
630                    }
631                    VariableLookupKind::Expression => {
632                        let Ok(eval_task) = session.read_with(cx, |session, _| {
633                            session.mode.request_dap(EvaluateCommand {
634                                expression: inline_value_location.variable_name.clone(),
635                                frame_id: Some(stack_frame_id),
636                                source: None,
637                                context: Some(EvaluateArgumentsContext::Variables),
638                            })
639                        }) else {
640                            continue;
641                        };
642
643                        if let Some(response) = eval_task.await.log_err() {
644                            inlay_hints.push(InlayHint {
645                                position,
646                                label: InlayHintLabel::String(format_value(response.result)),
647                                kind: Some(InlayHintKind::Type),
648                                padding_left: false,
649                                padding_right: false,
650                                tooltip: None,
651                                resolve_state: ResolveState::Resolved,
652                            });
653                        };
654                    }
655                };
656            }
657
658            Ok(inlay_hints)
659        })
660    }
661
662    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
663        let mut tasks = vec![];
664        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
665            tasks.push(self.shutdown_session(session_id, cx));
666        }
667
668        cx.background_executor().spawn(async move {
669            futures::future::join_all(tasks).await;
670        })
671    }
672
673    pub fn shutdown_session(
674        &mut self,
675        session_id: SessionId,
676        cx: &mut Context<Self>,
677    ) -> Task<Result<()>> {
678        let Some(session) = self.sessions.remove(&session_id) else {
679            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
680        };
681
682        let shutdown_children = session
683            .read(cx)
684            .child_session_ids()
685            .iter()
686            .map(|session_id| self.shutdown_session(*session_id, cx))
687            .collect::<Vec<_>>();
688
689        let shutdown_parent_task = if let Some(parent_session) = session
690            .read(cx)
691            .parent_id(cx)
692            .and_then(|session_id| self.session_by_id(session_id))
693        {
694            let shutdown_id = parent_session.update(cx, |parent_session, _| {
695                parent_session.remove_child_session_id(session_id);
696
697                if parent_session.child_session_ids().len() == 0 {
698                    Some(parent_session.session_id())
699                } else {
700                    None
701                }
702            });
703
704            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
705        } else {
706            None
707        };
708
709        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
710
711        cx.emit(DapStoreEvent::DebugClientShutdown(session_id));
712
713        cx.background_spawn(async move {
714            if shutdown_children.len() > 0 {
715                let _ = join_all(shutdown_children).await;
716            }
717
718            shutdown_task.await;
719
720            if let Some(parent_task) = shutdown_parent_task {
721                parent_task.await?;
722            }
723
724            Ok(())
725        })
726    }
727
728    pub fn shared(
729        &mut self,
730        project_id: u64,
731        downstream_client: AnyProtoClient,
732        _: &mut Context<Self>,
733    ) {
734        self.downstream_client = Some((downstream_client.clone(), project_id));
735    }
736
737    pub fn unshared(&mut self, cx: &mut Context<Self>) {
738        self.downstream_client.take();
739
740        cx.notify();
741    }
742
743    async fn handle_run_debug_locator(
744        this: Entity<Self>,
745        envelope: TypedEnvelope<proto::RunDebugLocators>,
746        mut cx: AsyncApp,
747    ) -> Result<proto::DebugRequest> {
748        let task = envelope
749            .payload
750            .build_command
751            .context("missing definition")?;
752        let build_task = SpawnInTerminal::from_proto(task);
753        let locator = envelope.payload.locator;
754        let request = this
755            .update(&mut cx, |this, cx| {
756                this.run_debug_locator(&locator, build_task, cx)
757            })?
758            .await?;
759
760        Ok(request.to_proto())
761    }
762
763    async fn handle_get_debug_adapter_binary(
764        this: Entity<Self>,
765        envelope: TypedEnvelope<proto::GetDebugAdapterBinary>,
766        mut cx: AsyncApp,
767    ) -> Result<proto::DebugAdapterBinary> {
768        let definition = DebugTaskDefinition::from_proto(
769            envelope.payload.definition.context("missing definition")?,
770        )?;
771        let (tx, mut rx) = mpsc::unbounded();
772        let session_id = envelope.payload.session_id;
773        cx.spawn({
774            let this = this.clone();
775            async move |cx| {
776                while let Some(message) = rx.next().await {
777                    this.read_with(cx, |this, _| {
778                        if let Some((downstream, project_id)) = this.downstream_client.clone() {
779                            downstream
780                                .send(proto::LogToDebugConsole {
781                                    project_id,
782                                    session_id,
783                                    message,
784                                })
785                                .ok();
786                        }
787                    })
788                    .ok();
789                }
790            }
791        })
792        .detach();
793
794        let worktree = this
795            .update(&mut cx, |this, cx| {
796                this.worktree_store
797                    .read(cx)
798                    .worktree_for_id(WorktreeId::from_proto(envelope.payload.worktree_id), cx)
799            })?
800            .context("Failed to find worktree with a given ID")?;
801        let binary = this
802            .update(&mut cx, |this, cx| {
803                this.get_debug_adapter_binary(
804                    definition,
805                    SessionId::from_proto(session_id),
806                    &worktree,
807                    tx,
808                    cx,
809                )
810            })?
811            .await?;
812        Ok(binary.to_proto())
813    }
814
815    async fn handle_log_to_debug_console(
816        this: Entity<Self>,
817        envelope: TypedEnvelope<proto::LogToDebugConsole>,
818        mut cx: AsyncApp,
819    ) -> Result<()> {
820        let session_id = SessionId::from_proto(envelope.payload.session_id);
821        this.update(&mut cx, |this, cx| {
822            let Some(session) = this.sessions.get(&session_id) else {
823                return;
824            };
825            session.update(cx, |session, cx| {
826                session
827                    .console_output(cx)
828                    .unbounded_send(envelope.payload.message)
829                    .ok();
830            })
831        })
832    }
833}
834
835#[derive(Clone)]
836pub struct DapAdapterDelegate {
837    fs: Arc<dyn Fs>,
838    console: mpsc::UnboundedSender<String>,
839    worktree: worktree::Snapshot,
840    node_runtime: NodeRuntime,
841    http_client: Arc<dyn HttpClient>,
842    toolchain_store: Arc<dyn LanguageToolchainStore>,
843    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
844}
845
846impl DapAdapterDelegate {
847    pub fn new(
848        fs: Arc<dyn Fs>,
849        worktree: worktree::Snapshot,
850        status: mpsc::UnboundedSender<String>,
851        node_runtime: NodeRuntime,
852        http_client: Arc<dyn HttpClient>,
853        toolchain_store: Arc<dyn LanguageToolchainStore>,
854        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
855    ) -> Self {
856        Self {
857            fs,
858            console: status,
859            worktree,
860            http_client,
861            node_runtime,
862            toolchain_store,
863            load_shell_env_task,
864        }
865    }
866}
867
868#[async_trait]
869impl dap::adapters::DapDelegate for DapAdapterDelegate {
870    fn worktree_id(&self) -> WorktreeId {
871        self.worktree.id()
872    }
873
874    fn worktree_root_path(&self) -> &Path {
875        &self.worktree.abs_path()
876    }
877    fn http_client(&self) -> Arc<dyn HttpClient> {
878        self.http_client.clone()
879    }
880
881    fn node_runtime(&self) -> NodeRuntime {
882        self.node_runtime.clone()
883    }
884
885    fn fs(&self) -> Arc<dyn Fs> {
886        self.fs.clone()
887    }
888
889    fn output_to_console(&self, msg: String) {
890        self.console.unbounded_send(msg).ok();
891    }
892
893    async fn which(&self, command: &OsStr) -> Option<PathBuf> {
894        let worktree_abs_path = self.worktree.abs_path();
895        let shell_path = self.shell_env().await.get("PATH").cloned();
896        which::which_in(command, shell_path.as_ref(), worktree_abs_path).ok()
897    }
898
899    async fn shell_env(&self) -> HashMap<String, String> {
900        let task = self.load_shell_env_task.clone();
901        task.await.unwrap_or_default()
902    }
903
904    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
905        self.toolchain_store.clone()
906    }
907    async fn read_text_file(&self, path: PathBuf) -> Result<String> {
908        let entry = self
909            .worktree
910            .entry_for_path(&path)
911            .with_context(|| format!("no worktree entry for path {path:?}"))?;
912        let abs_path = self
913            .worktree
914            .absolutize(&entry.path)
915            .with_context(|| format!("cannot absolutize path {path:?}"))?;
916
917        self.fs.load(&abs_path).await
918    }
919}