dap_store.rs

  1use super::{
  2    breakpoint_store::BreakpointStore,
  3    // Will need to uncomment this once we implement rpc message handler again
  4    // dap_command::{
  5    //     ContinueCommand, DapCommand, DisconnectCommand, NextCommand, PauseCommand, RestartCommand,
  6    //     RestartStackFrameCommand, StepBackCommand, StepCommand, StepInCommand, StepOutCommand,
  7    //     TerminateCommand, TerminateThreadsCommand, VariablesCommand,
  8    // },
  9    session::{self, Session},
 10};
 11use crate::{debugger, worktree_store::WorktreeStore, ProjectEnvironment};
 12use anyhow::{anyhow, Result};
 13use async_trait::async_trait;
 14use collections::HashMap;
 15use dap::{
 16    adapters::{DapStatus, DebugAdapterName},
 17    client::SessionId,
 18    messages::Message,
 19    requests::{
 20        Completions, Evaluate, Request as _, RunInTerminal, SetExpression, SetVariable,
 21        StartDebugging,
 22    },
 23    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
 24    EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
 25    SetExpressionArguments, SetVariableArguments, Source, StartDebuggingRequestArguments,
 26};
 27use fs::Fs;
 28use futures::{
 29    channel::{mpsc, oneshot},
 30    future::{join_all, Shared},
 31};
 32use gpui::{App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task};
 33use http_client::HttpClient;
 34use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
 35use lsp::LanguageServerName;
 36use node_runtime::NodeRuntime;
 37
 38use rpc::{
 39    proto::{self},
 40    AnyProtoClient, TypedEnvelope,
 41};
 42use serde_json::Value;
 43use settings::WorktreeId;
 44use smol::{lock::Mutex, stream::StreamExt};
 45use std::{
 46    borrow::Borrow,
 47    collections::{BTreeMap, HashSet},
 48    ffi::OsStr,
 49    path::PathBuf,
 50    sync::{atomic::Ordering::SeqCst, Arc},
 51};
 52use std::{collections::VecDeque, sync::atomic::AtomicU32};
 53use task::{DebugAdapterConfig, DebugRequestDisposition};
 54use util::ResultExt as _;
 55use worktree::Worktree;
 56
 57pub enum DapStoreEvent {
 58    DebugClientStarted(SessionId),
 59    DebugClientShutdown(SessionId),
 60    DebugClientEvent {
 61        session_id: SessionId,
 62        message: Message,
 63    },
 64    RunInTerminal {
 65        session_id: SessionId,
 66        title: Option<String>,
 67        cwd: PathBuf,
 68        command: Option<String>,
 69        args: Vec<String>,
 70        envs: HashMap<String, String>,
 71        sender: mpsc::Sender<Result<u32>>,
 72    },
 73    Notification(String),
 74    RemoteHasInitialized,
 75}
 76
 77#[allow(clippy::large_enum_variant)]
 78pub enum DapStoreMode {
 79    Local(LocalDapStore),   // ssh host and collab host
 80    Remote(RemoteDapStore), // collab guest
 81}
 82
 83pub struct LocalDapStore {
 84    fs: Arc<dyn Fs>,
 85    node_runtime: NodeRuntime,
 86    next_session_id: AtomicU32,
 87    http_client: Arc<dyn HttpClient>,
 88    worktree_store: Entity<WorktreeStore>,
 89    environment: Entity<ProjectEnvironment>,
 90    language_registry: Arc<LanguageRegistry>,
 91    debug_adapters: Arc<DapRegistry>,
 92    toolchain_store: Arc<dyn LanguageToolchainStore>,
 93    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 94    _start_debugging_task: Task<()>,
 95}
 96
 97impl LocalDapStore {
 98    fn next_session_id(&self) -> SessionId {
 99        SessionId(self.next_session_id.fetch_add(1, SeqCst))
100    }
101}
102
103pub struct RemoteDapStore {
104    upstream_client: AnyProtoClient,
105    upstream_project_id: u64,
106    event_queue: Option<VecDeque<DapStoreEvent>>,
107}
108
109pub struct DapStore {
110    mode: DapStoreMode,
111    downstream_client: Option<(AnyProtoClient, u64)>,
112    breakpoint_store: Entity<BreakpointStore>,
113    sessions: BTreeMap<SessionId, Entity<Session>>,
114}
115
116impl EventEmitter<DapStoreEvent> for DapStore {}
117
118impl DapStore {
119    pub fn init(_client: &AnyProtoClient) {
120        // todo(debugger): Reenable these after we finish handle_dap_command refactor
121        // client.add_entity_request_handler(Self::handle_dap_command::<NextCommand>);
122        // client.add_entity_request_handler(Self::handle_dap_command::<StepInCommand>);
123        // client.add_entity_request_handler(Self::handle_dap_command::<StepOutCommand>);
124        // client.add_entity_request_handler(Self::handle_dap_command::<StepBackCommand>);
125        // client.add_entity_request_handler(Self::handle_dap_command::<ContinueCommand>);
126        // client.add_entity_request_handler(Self::handle_dap_command::<PauseCommand>);
127        // client.add_entity_request_handler(Self::handle_dap_command::<DisconnectCommand>);
128        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateThreadsCommand>);
129        // client.add_entity_request_handler(Self::handle_dap_command::<TerminateCommand>);
130        // client.add_entity_request_handler(Self::handle_dap_command::<RestartCommand>);
131        // client.add_entity_request_handler(Self::handle_dap_command::<VariablesCommand>);
132        // client.add_entity_request_handler(Self::handle_dap_command::<RestartStackFrameCommand>);
133    }
134
135    #[expect(clippy::too_many_arguments)]
136    pub fn new_local(
137        http_client: Arc<dyn HttpClient>,
138        node_runtime: NodeRuntime,
139        fs: Arc<dyn Fs>,
140        language_registry: Arc<LanguageRegistry>,
141        debug_adapters: Arc<DapRegistry>,
142        environment: Entity<ProjectEnvironment>,
143        toolchain_store: Arc<dyn LanguageToolchainStore>,
144        breakpoint_store: Entity<BreakpointStore>,
145        worktree_store: Entity<WorktreeStore>,
146        cx: &mut Context<Self>,
147    ) -> Self {
148        cx.on_app_quit(Self::shutdown_sessions).detach();
149
150        let (start_debugging_tx, mut message_rx) =
151            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
152
153        let _start_debugging_task = cx.spawn(async move |this, cx| {
154            while let Some((session_id, message)) = message_rx.next().await {
155                match message {
156                    Message::Request(request) => {
157                        let _ = this
158                            .update(cx, |this, cx| {
159                                if request.command == StartDebugging::COMMAND {
160                                    this.handle_start_debugging_request(session_id, request, cx)
161                                        .detach_and_log_err(cx);
162                                } else if request.command == RunInTerminal::COMMAND {
163                                    this.handle_run_in_terminal_request(session_id, request, cx)
164                                        .detach_and_log_err(cx);
165                                }
166                            })
167                            .log_err();
168                    }
169                    _ => {}
170                }
171            }
172        });
173        Self {
174            mode: DapStoreMode::Local(LocalDapStore {
175                fs,
176                environment,
177                http_client,
178                node_runtime,
179                worktree_store,
180                toolchain_store,
181                language_registry,
182                debug_adapters,
183                start_debugging_tx,
184                _start_debugging_task,
185                next_session_id: Default::default(),
186            }),
187            downstream_client: None,
188            breakpoint_store,
189            sessions: Default::default(),
190        }
191    }
192
193    pub fn new_remote(
194        project_id: u64,
195        upstream_client: AnyProtoClient,
196        breakpoint_store: Entity<BreakpointStore>,
197    ) -> Self {
198        Self {
199            mode: DapStoreMode::Remote(RemoteDapStore {
200                upstream_client,
201                upstream_project_id: project_id,
202                event_queue: Some(VecDeque::default()),
203            }),
204            downstream_client: None,
205            breakpoint_store,
206            sessions: Default::default(),
207        }
208    }
209
210    pub fn as_remote(&self) -> Option<&RemoteDapStore> {
211        match &self.mode {
212            DapStoreMode::Remote(remote_dap_store) => Some(remote_dap_store),
213            _ => None,
214        }
215    }
216
217    pub fn remote_event_queue(&mut self) -> Option<VecDeque<DapStoreEvent>> {
218        if let DapStoreMode::Remote(remote) = &mut self.mode {
219            remote.event_queue.take()
220        } else {
221            None
222        }
223    }
224
225    pub fn as_local(&self) -> Option<&LocalDapStore> {
226        match &self.mode {
227            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
228            _ => None,
229        }
230    }
231
232    pub fn as_local_mut(&mut self) -> Option<&mut LocalDapStore> {
233        match &mut self.mode {
234            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
235            _ => None,
236        }
237    }
238
239    pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> {
240        match &self.mode {
241            DapStoreMode::Remote(RemoteDapStore {
242                upstream_client,
243                upstream_project_id,
244                ..
245            }) => Some((upstream_client.clone(), *upstream_project_id)),
246
247            DapStoreMode::Local(_) => None,
248        }
249    }
250
251    pub fn downstream_client(&self) -> Option<&(AnyProtoClient, u64)> {
252        self.downstream_client.as_ref()
253    }
254
255    pub fn add_remote_client(
256        &mut self,
257        session_id: SessionId,
258        ignore: Option<bool>,
259        cx: &mut Context<Self>,
260    ) {
261        if let DapStoreMode::Remote(remote) = &self.mode {
262            self.sessions.insert(
263                session_id,
264                cx.new(|_| {
265                    debugger::session::Session::remote(
266                        session_id,
267                        remote.upstream_client.clone(),
268                        remote.upstream_project_id,
269                        ignore.unwrap_or(false),
270                    )
271                }),
272            );
273        } else {
274            debug_assert!(false);
275        }
276    }
277
278    pub fn session_by_id(
279        &self,
280        session_id: impl Borrow<SessionId>,
281    ) -> Option<Entity<session::Session>> {
282        let session_id = session_id.borrow();
283        let client = self.sessions.get(session_id).cloned();
284
285        client
286    }
287    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
288        self.sessions.values()
289    }
290
291    pub fn capabilities_by_id(
292        &self,
293        session_id: impl Borrow<SessionId>,
294        cx: &App,
295    ) -> Option<Capabilities> {
296        let session_id = session_id.borrow();
297        self.sessions
298            .get(session_id)
299            .map(|client| client.read(cx).capabilities.clone())
300    }
301
302    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
303        &self.breakpoint_store
304    }
305
306    #[allow(dead_code)]
307    async fn handle_ignore_breakpoint_state(
308        this: Entity<Self>,
309        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
310        mut cx: AsyncApp,
311    ) -> Result<()> {
312        let session_id = SessionId::from_proto(envelope.payload.session_id);
313
314        this.update(&mut cx, |this, cx| {
315            if let Some(session) = this.session_by_id(&session_id) {
316                session.update(cx, |session, cx| {
317                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
318                })
319            } else {
320                Task::ready(())
321            }
322        })?
323        .await;
324
325        Ok(())
326    }
327
328    pub fn new_session(
329        &mut self,
330        config: DebugAdapterConfig,
331        worktree: &Entity<Worktree>,
332        parent_session: Option<Entity<Session>>,
333        cx: &mut Context<Self>,
334    ) -> (SessionId, Task<Result<Entity<Session>>>) {
335        let Some(local_store) = self.as_local() else {
336            unimplemented!("Starting session on remote side");
337        };
338
339        let delegate = DapAdapterDelegate::new(
340            local_store.fs.clone(),
341            worktree.read(cx).id(),
342            local_store.node_runtime.clone(),
343            local_store.http_client.clone(),
344            local_store.language_registry.clone(),
345            local_store.toolchain_store.clone(),
346            local_store.environment.update(cx, |env, cx| {
347                let worktree = worktree.read(cx);
348                env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
349            }),
350        );
351        let session_id = local_store.next_session_id();
352
353        if let Some(session) = &parent_session {
354            session.update(cx, |session, _| {
355                session.add_child_session_id(session_id);
356            });
357        }
358
359        let (initialized_tx, initialized_rx) = oneshot::channel();
360
361        let start_client_task = Session::local(
362            self.breakpoint_store.clone(),
363            session_id,
364            parent_session,
365            delegate,
366            config,
367            local_store.start_debugging_tx.clone(),
368            initialized_tx,
369            local_store.debug_adapters.clone(),
370            cx,
371        );
372
373        let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
374        (session_id, task)
375    }
376    #[cfg(any(test, feature = "test-support"))]
377    pub fn new_fake_session(
378        &mut self,
379        config: DebugAdapterConfig,
380        worktree: &Entity<Worktree>,
381        parent_session: Option<Entity<Session>>,
382        caps: Capabilities,
383        fails: bool,
384        cx: &mut Context<Self>,
385    ) -> (SessionId, Task<Result<Entity<Session>>>) {
386        let Some(local_store) = self.as_local() else {
387            unimplemented!("Starting session on remote side");
388        };
389
390        let delegate = DapAdapterDelegate::new(
391            local_store.fs.clone(),
392            worktree.read(cx).id(),
393            local_store.node_runtime.clone(),
394            local_store.http_client.clone(),
395            local_store.language_registry.clone(),
396            local_store.toolchain_store.clone(),
397            local_store.environment.update(cx, |env, cx| {
398                let worktree = worktree.read(cx);
399                env.get_environment(Some(worktree.id()), Some(worktree.abs_path()), cx)
400            }),
401        );
402        let session_id = local_store.next_session_id();
403
404        if let Some(session) = &parent_session {
405            session.update(cx, |session, _| {
406                session.add_child_session_id(session_id);
407            });
408        }
409
410        let (initialized_tx, initialized_rx) = oneshot::channel();
411
412        let start_client_task = Session::fake(
413            self.breakpoint_store.clone(),
414            session_id,
415            parent_session,
416            delegate,
417            config,
418            local_store.start_debugging_tx.clone(),
419            initialized_tx,
420            caps,
421            fails,
422            cx,
423        );
424
425        let task = create_new_session(session_id, initialized_rx, start_client_task, cx);
426        (session_id, task)
427    }
428
429    fn handle_start_debugging_request(
430        &mut self,
431        session_id: SessionId,
432        request: dap::messages::Request,
433        cx: &mut Context<Self>,
434    ) -> Task<Result<()>> {
435        let Some(local_store) = self.as_local() else {
436            unreachable!("Cannot response for non-local session");
437        };
438
439        let Some(parent_session) = self.session_by_id(session_id) else {
440            return Task::ready(Err(anyhow!("Session not found")));
441        };
442
443        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
444            request.arguments.unwrap_or_default(),
445        )
446        .expect("To parse StartDebuggingRequestArguments");
447        let worktree = local_store
448            .worktree_store
449            .update(cx, |this, _| this.worktrees().next())
450            .expect("worktree-less project");
451
452        let Some(config) = parent_session.read(cx).configuration() else {
453            unreachable!("there must be a config for local sessions");
454        };
455
456        let debug_config = DebugAdapterConfig {
457            label: config.label,
458            adapter: config.adapter,
459            request: DebugRequestDisposition::ReverseRequest(args),
460            initialize_args: config.initialize_args.clone(),
461            tcp_connection: config.tcp_connection.clone(),
462        };
463        #[cfg(any(test, feature = "test-support"))]
464        let new_session_task = {
465            let caps = parent_session.read(cx).capabilities.clone();
466            self.new_fake_session(
467                debug_config,
468                &worktree,
469                Some(parent_session.clone()),
470                caps,
471                false,
472                cx,
473            )
474            .1
475        };
476        #[cfg(not(any(test, feature = "test-support")))]
477        let new_session_task = self
478            .new_session(debug_config, &worktree, Some(parent_session.clone()), cx)
479            .1;
480
481        let request_seq = request.seq;
482        cx.spawn(async move |_, cx| {
483            let (success, body) = match new_session_task.await {
484                Ok(_) => (true, None),
485                Err(error) => (
486                    false,
487                    Some(serde_json::to_value(ErrorResponse {
488                        error: Some(dap::Message {
489                            id: request_seq,
490                            format: error.to_string(),
491                            variables: None,
492                            send_telemetry: None,
493                            show_user: None,
494                            url: None,
495                            url_label: None,
496                        }),
497                    })?),
498                ),
499            };
500
501            parent_session
502                .update(cx, |session, cx| {
503                    session.respond_to_client(
504                        request_seq,
505                        success,
506                        StartDebugging::COMMAND.to_string(),
507                        body,
508                        cx,
509                    )
510                })?
511                .await
512        })
513    }
514
515    fn handle_run_in_terminal_request(
516        &mut self,
517        session_id: SessionId,
518        request: dap::messages::Request,
519        cx: &mut Context<Self>,
520    ) -> Task<Result<()>> {
521        let Some(session) = self.session_by_id(session_id) else {
522            return Task::ready(Err(anyhow!("Session not found")));
523        };
524
525        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
526            request.arguments.unwrap_or_default(),
527        )
528        .expect("To parse StartDebuggingRequestArguments");
529
530        let seq = request.seq;
531
532        let cwd = PathBuf::from(request_args.cwd);
533        match cwd.try_exists() {
534            Ok(true) => (),
535            Ok(false) | Err(_) => {
536                return session.update(cx, |session, cx| {
537                    session.respond_to_client(
538                        seq,
539                        false,
540                        RunInTerminal::COMMAND.to_string(),
541                        serde_json::to_value(dap::ErrorResponse {
542                            error: Some(dap::Message {
543                                id: seq,
544                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
545                                variables: None,
546                                send_telemetry: None,
547                                show_user: None,
548                                url: None,
549                                url_label: None,
550                            }),
551                        })
552                        .ok(),
553                        cx,
554                    )
555                })
556            }
557        }
558
559        let mut args = request_args.args.clone();
560
561        // Handle special case for NodeJS debug adapter
562        // If only the Node binary path is provided, we set the command to None
563        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
564        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
565        // This allows the NodeJS debug client to attach correctly
566        let command = if args.len() > 1 {
567            Some(args.remove(0))
568        } else {
569            None
570        };
571
572        let mut envs: HashMap<String, String> = Default::default();
573        if let Some(Value::Object(env)) = request_args.env {
574            for (key, value) in env {
575                let value_str = match (key.as_str(), value) {
576                    (_, Value::String(value)) => value,
577                    _ => continue,
578                };
579
580                envs.insert(key, value_str);
581            }
582        }
583
584        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
585
586        cx.emit(DapStoreEvent::RunInTerminal {
587            session_id,
588            title: request_args.title,
589            cwd,
590            command,
591            args,
592            envs,
593            sender: tx,
594        });
595        cx.notify();
596
597        let session = session.downgrade();
598        cx.spawn(async move |_, cx| {
599            let (success, body) = match rx.next().await {
600                Some(Ok(pid)) => (
601                    true,
602                    serde_json::to_value(dap::RunInTerminalResponse {
603                        process_id: None,
604                        shell_process_id: Some(pid as u64),
605                    })
606                    .ok(),
607                ),
608                Some(Err(error)) => (
609                    false,
610                    serde_json::to_value(dap::ErrorResponse {
611                        error: Some(dap::Message {
612                            id: seq,
613                            format: error.to_string(),
614                            variables: None,
615                            send_telemetry: None,
616                            show_user: None,
617                            url: None,
618                            url_label: None,
619                        }),
620                    })
621                    .ok(),
622                ),
623                None => (
624                    false,
625                    serde_json::to_value(dap::ErrorResponse {
626                        error: Some(dap::Message {
627                            id: seq,
628                            format: "failed to receive response from spawn terminal".to_string(),
629                            variables: None,
630                            send_telemetry: None,
631                            show_user: None,
632                            url: None,
633                            url_label: None,
634                        }),
635                    })
636                    .ok(),
637                ),
638            };
639
640            session
641                .update(cx, |session, cx| {
642                    session.respond_to_client(
643                        seq,
644                        success,
645                        RunInTerminal::COMMAND.to_string(),
646                        body,
647                        cx,
648                    )
649                })?
650                .await
651        })
652    }
653
654    pub fn evaluate(
655        &self,
656        session_id: &SessionId,
657        stack_frame_id: u64,
658        expression: String,
659        context: EvaluateArgumentsContext,
660        source: Option<Source>,
661        cx: &mut Context<Self>,
662    ) -> Task<Result<EvaluateResponse>> {
663        let Some(client) = self
664            .session_by_id(session_id)
665            .and_then(|client| client.read(cx).adapter_client())
666        else {
667            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
668        };
669
670        cx.background_executor().spawn(async move {
671            client
672                .request::<Evaluate>(EvaluateArguments {
673                    expression: expression.clone(),
674                    frame_id: Some(stack_frame_id),
675                    context: Some(context),
676                    format: None,
677                    line: None,
678                    column: None,
679                    source,
680                })
681                .await
682        })
683    }
684
685    pub fn completions(
686        &self,
687        session_id: &SessionId,
688        stack_frame_id: u64,
689        text: String,
690        completion_column: u64,
691        cx: &mut Context<Self>,
692    ) -> Task<Result<Vec<CompletionItem>>> {
693        let Some(client) = self
694            .session_by_id(session_id)
695            .and_then(|client| client.read(cx).adapter_client())
696        else {
697            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
698        };
699
700        cx.background_executor().spawn(async move {
701            Ok(client
702                .request::<Completions>(CompletionsArguments {
703                    frame_id: Some(stack_frame_id),
704                    line: None,
705                    text,
706                    column: completion_column,
707                })
708                .await?
709                .targets)
710        })
711    }
712
713    #[allow(clippy::too_many_arguments)]
714    pub fn set_variable_value(
715        &self,
716        session_id: &SessionId,
717        stack_frame_id: u64,
718        variables_reference: u64,
719        name: String,
720        value: String,
721        evaluate_name: Option<String>,
722        cx: &mut Context<Self>,
723    ) -> Task<Result<()>> {
724        let Some(client) = self
725            .session_by_id(session_id)
726            .and_then(|client| client.read(cx).adapter_client())
727        else {
728            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
729        };
730
731        let supports_set_expression = self
732            .capabilities_by_id(session_id, cx)
733            .and_then(|caps| caps.supports_set_expression)
734            .unwrap_or_default();
735
736        cx.background_executor().spawn(async move {
737            if let Some(evaluate_name) = supports_set_expression.then(|| evaluate_name).flatten() {
738                client
739                    .request::<SetExpression>(SetExpressionArguments {
740                        expression: evaluate_name,
741                        value,
742                        frame_id: Some(stack_frame_id),
743                        format: None,
744                    })
745                    .await?;
746            } else {
747                client
748                    .request::<SetVariable>(SetVariableArguments {
749                        variables_reference,
750                        name,
751                        value,
752                        format: None,
753                    })
754                    .await?;
755            }
756
757            Ok(())
758        })
759    }
760
761    // .. get the client and what not
762    // let _ = client.modules(); // This can fire a request to a dap adapter or be a cheap getter.
763    // client.wait_for_request(request::Modules); // This ensures that the request that we've fired off runs to completions
764    // let returned_value = client.modules(); // this is a cheap getter.
765
766    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
767        let mut tasks = vec![];
768        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
769            tasks.push(self.shutdown_session(session_id, cx));
770        }
771
772        cx.background_executor().spawn(async move {
773            futures::future::join_all(tasks).await;
774        })
775    }
776
777    pub fn shutdown_session(
778        &mut self,
779        session_id: SessionId,
780        cx: &mut Context<Self>,
781    ) -> Task<Result<()>> {
782        let Some(_) = self.as_local_mut() else {
783            return Task::ready(Err(anyhow!("Cannot shutdown session on remote side")));
784        };
785
786        let Some(session) = self.sessions.remove(&session_id) else {
787            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
788        };
789
790        let shutdown_children = session
791            .read(cx)
792            .child_session_ids()
793            .iter()
794            .map(|session_id| self.shutdown_session(*session_id, cx))
795            .collect::<Vec<_>>();
796
797        let shutdown_parent_task = if let Some(parent_session) = session
798            .read(cx)
799            .parent_id()
800            .and_then(|session_id| self.session_by_id(session_id))
801        {
802            let shutdown_id = parent_session.update(cx, |parent_session, _| {
803                parent_session.remove_child_session_id(session_id);
804
805                if parent_session.child_session_ids().len() == 0 {
806                    Some(parent_session.session_id())
807                } else {
808                    None
809                }
810            });
811
812            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
813        } else {
814            None
815        };
816
817        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
818
819        cx.background_spawn(async move {
820            if shutdown_children.len() > 0 {
821                let _ = join_all(shutdown_children).await;
822            }
823
824            shutdown_task.await;
825
826            if let Some(parent_task) = shutdown_parent_task {
827                parent_task.await?;
828            }
829
830            Ok(())
831        })
832    }
833
834    pub fn shared(
835        &mut self,
836        project_id: u64,
837        downstream_client: AnyProtoClient,
838        _: &mut Context<Self>,
839    ) {
840        self.downstream_client = Some((downstream_client.clone(), project_id));
841    }
842
843    pub fn unshared(&mut self, cx: &mut Context<Self>) {
844        self.downstream_client.take();
845
846        cx.notify();
847    }
848}
849
850fn create_new_session(
851    session_id: SessionId,
852    initialized_rx: oneshot::Receiver<()>,
853    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
854    cx: &mut Context<DapStore>,
855) -> Task<Result<Entity<Session>>> {
856    let task = cx.spawn(async move |this, cx| {
857        let session = match start_client_task.await {
858            Ok(session) => session,
859            Err(error) => {
860                this.update(cx, |_, cx| {
861                    cx.emit(DapStoreEvent::Notification(error.to_string()));
862                })
863                .log_err();
864
865                return Err(error);
866            }
867        };
868
869        // we have to insert the session early, so we can handle reverse requests
870        // that need the session to be available
871        this.update(cx, |store, cx| {
872            store.sessions.insert(session_id, session.clone());
873            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
874            cx.notify();
875        })?;
876
877        match session
878            .update(cx, |session, cx| {
879                session.initialize_sequence(initialized_rx, cx)
880            })?
881            .await
882        {
883            Ok(_) => {}
884            Err(error) => {
885                this.update(cx, |this, cx| {
886                    cx.emit(DapStoreEvent::Notification(error.to_string()));
887
888                    this.shutdown_session(session_id, cx)
889                })?
890                .await
891                .log_err();
892
893                return Err(error);
894            }
895        }
896
897        Ok(session)
898    });
899    task
900}
901
902#[derive(Clone)]
903pub struct DapAdapterDelegate {
904    fs: Arc<dyn Fs>,
905    worktree_id: WorktreeId,
906    node_runtime: NodeRuntime,
907    http_client: Arc<dyn HttpClient>,
908    language_registry: Arc<LanguageRegistry>,
909    toolchain_store: Arc<dyn LanguageToolchainStore>,
910    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
911    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
912}
913
914impl DapAdapterDelegate {
915    pub fn new(
916        fs: Arc<dyn Fs>,
917        worktree_id: WorktreeId,
918        node_runtime: NodeRuntime,
919        http_client: Arc<dyn HttpClient>,
920        language_registry: Arc<LanguageRegistry>,
921        toolchain_store: Arc<dyn LanguageToolchainStore>,
922        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
923    ) -> Self {
924        Self {
925            fs,
926            worktree_id,
927            http_client,
928            node_runtime,
929            toolchain_store,
930            language_registry,
931            load_shell_env_task,
932            updated_adapters: Default::default(),
933        }
934    }
935}
936
937#[async_trait(?Send)]
938impl dap::adapters::DapDelegate for DapAdapterDelegate {
939    fn worktree_id(&self) -> WorktreeId {
940        self.worktree_id
941    }
942
943    fn http_client(&self) -> Arc<dyn HttpClient> {
944        self.http_client.clone()
945    }
946
947    fn node_runtime(&self) -> NodeRuntime {
948        self.node_runtime.clone()
949    }
950
951    fn fs(&self) -> Arc<dyn Fs> {
952        self.fs.clone()
953    }
954
955    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
956        self.updated_adapters.clone()
957    }
958
959    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
960        let name = SharedString::from(dap_name.to_string());
961        let status = match status {
962            DapStatus::None => BinaryStatus::None,
963            DapStatus::Downloading => BinaryStatus::Downloading,
964            DapStatus::Failed { error } => BinaryStatus::Failed { error },
965            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
966        };
967
968        self.language_registry
969            .update_dap_status(LanguageServerName(name), status);
970    }
971
972    fn which(&self, command: &OsStr) -> Option<PathBuf> {
973        which::which(command).ok()
974    }
975
976    async fn shell_env(&self) -> HashMap<String, String> {
977        let task = self.load_shell_env_task.clone();
978        task.await.unwrap_or_default()
979    }
980
981    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
982        self.toolchain_store.clone()
983    }
984}