dap_store.rs

   1use super::{
   2    breakpoint_store::BreakpointStore,
   3    locators::DapLocator,
   4    session::{self, Session, SessionStateEvent},
   5};
   6use crate::{
   7    ProjectEnvironment, debugger, project_settings::ProjectSettings, worktree_store::WorktreeStore,
   8};
   9use anyhow::{Result, anyhow};
  10use async_trait::async_trait;
  11use collections::HashMap;
  12use dap::{
  13    Capabilities, CompletionItem, CompletionsArguments, DapRegistry, ErrorResponse,
  14    EvaluateArguments, EvaluateArgumentsContext, EvaluateResponse, RunInTerminalRequestArguments,
  15    Source, StartDebuggingRequestArguments,
  16    adapters::{DapStatus, DebugAdapterBinary, DebugAdapterName},
  17    client::SessionId,
  18    messages::Message,
  19    requests::{Completions, Evaluate, Request as _, RunInTerminal, StartDebugging},
  20};
  21use fs::Fs;
  22use futures::{
  23    channel::{mpsc, oneshot},
  24    future::{Shared, join_all},
  25};
  26use gpui::{
  27    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Task, WeakEntity,
  28};
  29use http_client::HttpClient;
  30use language::{BinaryStatus, LanguageRegistry, LanguageToolchainStore};
  31use lsp::LanguageServerName;
  32use node_runtime::NodeRuntime;
  33
  34use rpc::{
  35    AnyProtoClient, TypedEnvelope,
  36    proto::{self},
  37};
  38use serde_json::Value;
  39use settings::{Settings, WorktreeId};
  40use smol::{lock::Mutex, stream::StreamExt};
  41use std::{
  42    borrow::Borrow,
  43    collections::{BTreeMap, HashSet},
  44    ffi::OsStr,
  45    path::{Path, PathBuf},
  46    sync::Arc,
  47};
  48use task::{DebugTaskDefinition, DebugTaskTemplate};
  49use util::ResultExt as _;
  50use worktree::Worktree;
  51
  52pub enum DapStoreEvent {
  53    DebugClientStarted(SessionId),
  54    DebugSessionInitialized(SessionId),
  55    DebugClientShutdown(SessionId),
  56    DebugClientEvent {
  57        session_id: SessionId,
  58        message: Message,
  59    },
  60    RunInTerminal {
  61        session_id: SessionId,
  62        title: Option<String>,
  63        cwd: Option<Arc<Path>>,
  64        command: Option<String>,
  65        args: Vec<String>,
  66        envs: HashMap<String, String>,
  67        sender: mpsc::Sender<Result<u32>>,
  68    },
  69    Notification(String),
  70    RemoteHasInitialized,
  71}
  72
  73#[allow(clippy::large_enum_variant)]
  74enum DapStoreMode {
  75    Local(LocalDapStore),
  76    Ssh(SshDapStore),
  77    Collab,
  78}
  79
  80pub struct LocalDapStore {
  81    fs: Arc<dyn Fs>,
  82    node_runtime: NodeRuntime,
  83    http_client: Arc<dyn HttpClient>,
  84    environment: Entity<ProjectEnvironment>,
  85    language_registry: Arc<LanguageRegistry>,
  86    worktree_store: Entity<WorktreeStore>,
  87    toolchain_store: Arc<dyn LanguageToolchainStore>,
  88    locators: HashMap<String, Arc<dyn DapLocator>>,
  89}
  90
  91pub struct SshDapStore {
  92    upstream_client: AnyProtoClient,
  93    upstream_project_id: u64,
  94}
  95
  96pub struct DapStore {
  97    mode: DapStoreMode,
  98    downstream_client: Option<(AnyProtoClient, u64)>,
  99    breakpoint_store: Entity<BreakpointStore>,
 100    sessions: BTreeMap<SessionId, Entity<Session>>,
 101    next_session_id: u32,
 102    start_debugging_tx: futures::channel::mpsc::UnboundedSender<(SessionId, Message)>,
 103    _start_debugging_task: Task<()>,
 104}
 105
 106impl EventEmitter<DapStoreEvent> for DapStore {}
 107
 108impl DapStore {
 109    pub fn init(client: &AnyProtoClient) {
 110        client.add_entity_request_handler(Self::handle_run_debug_locator);
 111        client.add_entity_request_handler(Self::handle_get_debug_adapter_binary);
 112    }
 113
 114    #[expect(clippy::too_many_arguments)]
 115    pub fn new_local(
 116        http_client: Arc<dyn HttpClient>,
 117        node_runtime: NodeRuntime,
 118        fs: Arc<dyn Fs>,
 119        language_registry: Arc<LanguageRegistry>,
 120        environment: Entity<ProjectEnvironment>,
 121        toolchain_store: Arc<dyn LanguageToolchainStore>,
 122        worktree_store: Entity<WorktreeStore>,
 123        breakpoint_store: Entity<BreakpointStore>,
 124        cx: &mut Context<Self>,
 125    ) -> Self {
 126        cx.on_app_quit(Self::shutdown_sessions).detach();
 127
 128        let locators = HashMap::from_iter([(
 129            "cargo".to_string(),
 130            Arc::new(super::locators::cargo::CargoLocator {}) as _,
 131        )]);
 132
 133        let mode = DapStoreMode::Local(LocalDapStore {
 134            fs,
 135            environment,
 136            http_client,
 137            node_runtime,
 138            toolchain_store,
 139            worktree_store,
 140            language_registry,
 141            locators,
 142        });
 143
 144        Self::new(mode, breakpoint_store, cx)
 145    }
 146
 147    pub fn new_ssh(
 148        project_id: u64,
 149        upstream_client: AnyProtoClient,
 150        breakpoint_store: Entity<BreakpointStore>,
 151        cx: &mut Context<Self>,
 152    ) -> Self {
 153        let mode = DapStoreMode::Ssh(SshDapStore {
 154            upstream_client,
 155            upstream_project_id: project_id,
 156        });
 157
 158        Self::new(mode, breakpoint_store, cx)
 159    }
 160
 161    pub fn new_collab(
 162        _project_id: u64,
 163        _upstream_client: AnyProtoClient,
 164        breakpoint_store: Entity<BreakpointStore>,
 165        cx: &mut Context<Self>,
 166    ) -> Self {
 167        Self::new(DapStoreMode::Collab, breakpoint_store, cx)
 168    }
 169
 170    fn new(
 171        mode: DapStoreMode,
 172        breakpoint_store: Entity<BreakpointStore>,
 173        cx: &mut Context<Self>,
 174    ) -> Self {
 175        let (start_debugging_tx, mut message_rx) =
 176            futures::channel::mpsc::unbounded::<(SessionId, Message)>();
 177        let task = cx.spawn(async move |this, cx| {
 178            while let Some((session_id, message)) = message_rx.next().await {
 179                match message {
 180                    Message::Request(request) => {
 181                        let _ = this
 182                            .update(cx, |this, cx| {
 183                                if request.command == StartDebugging::COMMAND {
 184                                    this.handle_start_debugging_request(session_id, request, cx)
 185                                        .detach_and_log_err(cx);
 186                                } else if request.command == RunInTerminal::COMMAND {
 187                                    this.handle_run_in_terminal_request(session_id, request, cx)
 188                                        .detach_and_log_err(cx);
 189                                }
 190                            })
 191                            .log_err();
 192                    }
 193                    _ => {}
 194                }
 195            }
 196        });
 197
 198        Self {
 199            mode,
 200            _start_debugging_task: task,
 201            start_debugging_tx,
 202            next_session_id: 0,
 203            downstream_client: None,
 204            breakpoint_store,
 205            sessions: Default::default(),
 206        }
 207    }
 208
 209    pub fn get_debug_adapter_binary(
 210        &mut self,
 211        definition: DebugTaskDefinition,
 212        cx: &mut Context<Self>,
 213    ) -> Task<Result<DebugAdapterBinary>> {
 214        match &self.mode {
 215            DapStoreMode::Local(local) => {
 216                let Some(worktree) = local.worktree_store.read(cx).visible_worktrees(cx).next()
 217                else {
 218                    return Task::ready(Err(anyhow!("Failed to find a worktree")));
 219                };
 220                let Some(adapter) = DapRegistry::global(cx).adapter(&definition.adapter) else {
 221                    return Task::ready(Err(anyhow!("Failed to find a debug adapter")));
 222                };
 223
 224                let user_installed_path = ProjectSettings::get_global(cx)
 225                    .dap
 226                    .get(&adapter.name())
 227                    .and_then(|s| s.binary.as_ref().map(PathBuf::from));
 228
 229                let delegate = self.delegate(&worktree, cx);
 230                let cwd: Arc<Path> = definition
 231                    .cwd()
 232                    .unwrap_or(worktree.read(cx).abs_path().as_ref())
 233                    .into();
 234
 235                cx.spawn(async move |this, cx| {
 236                    let mut binary = adapter
 237                        .get_binary(&delegate, &definition, user_installed_path, cx)
 238                        .await?;
 239
 240                    let env = this
 241                        .update(cx, |this, cx| {
 242                            this.as_local()
 243                                .unwrap()
 244                                .environment
 245                                .update(cx, |environment, cx| {
 246                                    environment.get_directory_environment(cwd, cx)
 247                                })
 248                        })?
 249                        .await;
 250
 251                    if let Some(mut env) = env {
 252                        env.extend(std::mem::take(&mut binary.envs));
 253                        binary.envs = env;
 254                    }
 255
 256                    Ok(binary)
 257                })
 258            }
 259            DapStoreMode::Ssh(ssh) => {
 260                let request = ssh.upstream_client.request(proto::GetDebugAdapterBinary {
 261                    project_id: ssh.upstream_project_id,
 262                    task: Some(definition.to_proto()),
 263                });
 264
 265                cx.background_spawn(async move {
 266                    let response = request.await?;
 267                    DebugAdapterBinary::from_proto(response)
 268                })
 269            }
 270            DapStoreMode::Collab => {
 271                Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
 272            }
 273        }
 274    }
 275
 276    pub fn run_debug_locator(
 277        &mut self,
 278        template: DebugTaskTemplate,
 279        cx: &mut Context<Self>,
 280    ) -> Task<Result<DebugTaskDefinition>> {
 281        let Some(locator_name) = template.locator else {
 282            return Task::ready(Ok(template.definition));
 283        };
 284
 285        match &self.mode {
 286            DapStoreMode::Local(local) => {
 287                if let Some(locator) = local.locators.get(&locator_name).cloned() {
 288                    cx.background_spawn(
 289                        async move { locator.run_locator(template.definition).await },
 290                    )
 291                } else {
 292                    Task::ready(Err(anyhow!("Couldn't find locator {}", locator_name)))
 293                }
 294            }
 295            DapStoreMode::Ssh(ssh) => {
 296                let request = ssh.upstream_client.request(proto::RunDebugLocator {
 297                    project_id: ssh.upstream_project_id,
 298                    locator: locator_name,
 299                    task: Some(template.definition.to_proto()),
 300                });
 301                cx.background_spawn(async move {
 302                    let response = request.await?;
 303                    DebugTaskDefinition::from_proto(response)
 304                })
 305            }
 306            DapStoreMode::Collab => {
 307                Task::ready(Err(anyhow!("Debugging is not yet supported via collab")))
 308            }
 309        }
 310    }
 311
 312    fn as_local(&self) -> Option<&LocalDapStore> {
 313        match &self.mode {
 314            DapStoreMode::Local(local_dap_store) => Some(local_dap_store),
 315            _ => None,
 316        }
 317    }
 318
 319    pub fn add_remote_client(
 320        &mut self,
 321        session_id: SessionId,
 322        ignore: Option<bool>,
 323        cx: &mut Context<Self>,
 324    ) {
 325        if let DapStoreMode::Ssh(remote) = &self.mode {
 326            self.sessions.insert(
 327                session_id,
 328                cx.new(|_| {
 329                    debugger::session::Session::remote(
 330                        session_id,
 331                        remote.upstream_client.clone(),
 332                        remote.upstream_project_id,
 333                        ignore.unwrap_or(false),
 334                    )
 335                }),
 336            );
 337        } else {
 338            debug_assert!(false);
 339        }
 340    }
 341
 342    pub fn session_by_id(
 343        &self,
 344        session_id: impl Borrow<SessionId>,
 345    ) -> Option<Entity<session::Session>> {
 346        let session_id = session_id.borrow();
 347        let client = self.sessions.get(session_id).cloned();
 348
 349        client
 350    }
 351    pub fn sessions(&self) -> impl Iterator<Item = &Entity<Session>> {
 352        self.sessions.values()
 353    }
 354
 355    pub fn capabilities_by_id(
 356        &self,
 357        session_id: impl Borrow<SessionId>,
 358        cx: &App,
 359    ) -> Option<Capabilities> {
 360        let session_id = session_id.borrow();
 361        self.sessions
 362            .get(session_id)
 363            .map(|client| client.read(cx).capabilities.clone())
 364    }
 365
 366    pub fn breakpoint_store(&self) -> &Entity<BreakpointStore> {
 367        &self.breakpoint_store
 368    }
 369
 370    #[allow(dead_code)]
 371    async fn handle_ignore_breakpoint_state(
 372        this: Entity<Self>,
 373        envelope: TypedEnvelope<proto::IgnoreBreakpointState>,
 374        mut cx: AsyncApp,
 375    ) -> Result<()> {
 376        let session_id = SessionId::from_proto(envelope.payload.session_id);
 377
 378        this.update(&mut cx, |this, cx| {
 379            if let Some(session) = this.session_by_id(&session_id) {
 380                session.update(cx, |session, cx| {
 381                    session.set_ignore_breakpoints(envelope.payload.ignore, cx)
 382                })
 383            } else {
 384                Task::ready(HashMap::default())
 385            }
 386        })?
 387        .await;
 388
 389        Ok(())
 390    }
 391
 392    fn delegate(&self, worktree: &Entity<Worktree>, cx: &mut App) -> DapAdapterDelegate {
 393        let Some(local_store) = self.as_local() else {
 394            unimplemented!("Starting session on remote side");
 395        };
 396
 397        DapAdapterDelegate::new(
 398            local_store.fs.clone(),
 399            worktree.read(cx).id(),
 400            local_store.node_runtime.clone(),
 401            local_store.http_client.clone(),
 402            local_store.language_registry.clone(),
 403            local_store.toolchain_store.clone(),
 404            local_store.environment.update(cx, |env, cx| {
 405                env.get_worktree_environment(worktree.clone(), cx)
 406            }),
 407        )
 408    }
 409
 410    pub fn new_session(
 411        &mut self,
 412        binary: DebugAdapterBinary,
 413        config: DebugTaskDefinition,
 414        worktree: WeakEntity<Worktree>,
 415        parent_session: Option<Entity<Session>>,
 416        cx: &mut Context<Self>,
 417    ) -> (SessionId, Task<Result<Entity<Session>>>) {
 418        let session_id = SessionId(util::post_inc(&mut self.next_session_id));
 419
 420        if let Some(session) = &parent_session {
 421            session.update(cx, |session, _| {
 422                session.add_child_session_id(session_id);
 423            });
 424        }
 425
 426        let (initialized_tx, initialized_rx) = oneshot::channel();
 427
 428        let start_debugging_tx = self.start_debugging_tx.clone();
 429
 430        let task = cx.spawn(async move |this, cx| {
 431            let start_client_task = this.update(cx, |this, cx| {
 432                Session::local(
 433                    this.breakpoint_store.clone(),
 434                    worktree.clone(),
 435                    session_id,
 436                    parent_session,
 437                    binary,
 438                    config,
 439                    start_debugging_tx.clone(),
 440                    initialized_tx,
 441                    cx,
 442                )
 443            })?;
 444
 445            let ret = this
 446                .update(cx, |_, cx| {
 447                    create_new_session(session_id, initialized_rx, start_client_task, worktree, cx)
 448                })?
 449                .await;
 450            ret
 451        });
 452
 453        (session_id, task)
 454    }
 455
 456    fn handle_start_debugging_request(
 457        &mut self,
 458        session_id: SessionId,
 459        request: dap::messages::Request,
 460        cx: &mut Context<Self>,
 461    ) -> Task<Result<()>> {
 462        let Some(parent_session) = self.session_by_id(session_id) else {
 463            return Task::ready(Err(anyhow!("Session not found")));
 464        };
 465
 466        let Some(worktree) = parent_session
 467            .read(cx)
 468            .as_local()
 469            .map(|local| local.worktree().clone())
 470        else {
 471            return Task::ready(Err(anyhow!(
 472                "Cannot handle start debugging request from remote end"
 473            )));
 474        };
 475
 476        let args = serde_json::from_value::<StartDebuggingRequestArguments>(
 477            request.arguments.unwrap_or_default(),
 478        )
 479        .expect("To parse StartDebuggingRequestArguments");
 480        let mut binary = parent_session.read(cx).binary().clone();
 481        let config = parent_session.read(cx).configuration().unwrap().clone();
 482        binary.request_args = args;
 483
 484        let new_session_task = self
 485            .new_session(binary, config, worktree, Some(parent_session.clone()), cx)
 486            .1;
 487
 488        let request_seq = request.seq;
 489        cx.spawn(async move |_, cx| {
 490            let (success, body) = match new_session_task.await {
 491                Ok(_) => (true, None),
 492                Err(error) => (
 493                    false,
 494                    Some(serde_json::to_value(ErrorResponse {
 495                        error: Some(dap::Message {
 496                            id: request_seq,
 497                            format: error.to_string(),
 498                            variables: None,
 499                            send_telemetry: None,
 500                            show_user: None,
 501                            url: None,
 502                            url_label: None,
 503                        }),
 504                    })?),
 505                ),
 506            };
 507
 508            parent_session
 509                .update(cx, |session, cx| {
 510                    session.respond_to_client(
 511                        request_seq,
 512                        success,
 513                        StartDebugging::COMMAND.to_string(),
 514                        body,
 515                        cx,
 516                    )
 517                })?
 518                .await
 519        })
 520    }
 521
 522    fn handle_run_in_terminal_request(
 523        &mut self,
 524        session_id: SessionId,
 525        request: dap::messages::Request,
 526        cx: &mut Context<Self>,
 527    ) -> Task<Result<()>> {
 528        let Some(session) = self.session_by_id(session_id) else {
 529            return Task::ready(Err(anyhow!("Session not found")));
 530        };
 531
 532        let request_args = serde_json::from_value::<RunInTerminalRequestArguments>(
 533            request.arguments.unwrap_or_default(),
 534        )
 535        .expect("To parse StartDebuggingRequestArguments");
 536
 537        let seq = request.seq;
 538
 539        let cwd = Path::new(&request_args.cwd);
 540
 541        match cwd.try_exists() {
 542            Ok(false) | Err(_) if !request_args.cwd.is_empty() => {
 543                return session.update(cx, |session, cx| {
 544                    session.respond_to_client(
 545                        seq,
 546                        false,
 547                        RunInTerminal::COMMAND.to_string(),
 548                        serde_json::to_value(dap::ErrorResponse {
 549                            error: Some(dap::Message {
 550                                id: seq,
 551                                format: format!("Received invalid/unknown cwd: {cwd:?}"),
 552                                variables: None,
 553                                send_telemetry: None,
 554                                show_user: None,
 555                                url: None,
 556                                url_label: None,
 557                            }),
 558                        })
 559                        .ok(),
 560                        cx,
 561                    )
 562                });
 563            }
 564            _ => (),
 565        }
 566        let mut args = request_args.args.clone();
 567
 568        // Handle special case for NodeJS debug adapter
 569        // If only the Node binary path is provided, we set the command to None
 570        // This prevents the NodeJS REPL from appearing, which is not the desired behavior
 571        // The expected usage is for users to provide their own Node command, e.g., `node test.js`
 572        // This allows the NodeJS debug client to attach correctly
 573        let command = if args.len() > 1 {
 574            Some(args.remove(0))
 575        } else {
 576            None
 577        };
 578
 579        let mut envs: HashMap<String, String> = Default::default();
 580        if let Some(Value::Object(env)) = request_args.env {
 581            for (key, value) in env {
 582                let value_str = match (key.as_str(), value) {
 583                    (_, Value::String(value)) => value,
 584                    _ => continue,
 585                };
 586
 587                envs.insert(key, value_str);
 588            }
 589        }
 590
 591        let (tx, mut rx) = mpsc::channel::<Result<u32>>(1);
 592        let cwd = Some(cwd)
 593            .filter(|cwd| cwd.as_os_str().len() > 0)
 594            .map(Arc::from)
 595            .or_else(|| {
 596                self.session_by_id(session_id)
 597                    .and_then(|session| session.read(cx).binary().cwd.as_deref().map(Arc::from))
 598            });
 599        cx.emit(DapStoreEvent::RunInTerminal {
 600            session_id,
 601            title: request_args.title,
 602            cwd,
 603            command,
 604            args,
 605            envs,
 606            sender: tx,
 607        });
 608        cx.notify();
 609
 610        let session = session.downgrade();
 611        cx.spawn(async move |_, cx| {
 612            let (success, body) = match rx.next().await {
 613                Some(Ok(pid)) => (
 614                    true,
 615                    serde_json::to_value(dap::RunInTerminalResponse {
 616                        process_id: None,
 617                        shell_process_id: Some(pid as u64),
 618                    })
 619                    .ok(),
 620                ),
 621                Some(Err(error)) => (
 622                    false,
 623                    serde_json::to_value(dap::ErrorResponse {
 624                        error: Some(dap::Message {
 625                            id: seq,
 626                            format: error.to_string(),
 627                            variables: None,
 628                            send_telemetry: None,
 629                            show_user: None,
 630                            url: None,
 631                            url_label: None,
 632                        }),
 633                    })
 634                    .ok(),
 635                ),
 636                None => (
 637                    false,
 638                    serde_json::to_value(dap::ErrorResponse {
 639                        error: Some(dap::Message {
 640                            id: seq,
 641                            format: "failed to receive response from spawn terminal".to_string(),
 642                            variables: None,
 643                            send_telemetry: None,
 644                            show_user: None,
 645                            url: None,
 646                            url_label: None,
 647                        }),
 648                    })
 649                    .ok(),
 650                ),
 651            };
 652
 653            session
 654                .update(cx, |session, cx| {
 655                    session.respond_to_client(
 656                        seq,
 657                        success,
 658                        RunInTerminal::COMMAND.to_string(),
 659                        body,
 660                        cx,
 661                    )
 662                })?
 663                .await
 664        })
 665    }
 666
 667    pub fn evaluate(
 668        &self,
 669        session_id: &SessionId,
 670        stack_frame_id: u64,
 671        expression: String,
 672        context: EvaluateArgumentsContext,
 673        source: Option<Source>,
 674        cx: &mut Context<Self>,
 675    ) -> Task<Result<EvaluateResponse>> {
 676        let Some(client) = self
 677            .session_by_id(session_id)
 678            .and_then(|client| client.read(cx).adapter_client())
 679        else {
 680            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
 681        };
 682
 683        cx.background_executor().spawn(async move {
 684            client
 685                .request::<Evaluate>(EvaluateArguments {
 686                    expression: expression.clone(),
 687                    frame_id: Some(stack_frame_id),
 688                    context: Some(context),
 689                    format: None,
 690                    line: None,
 691                    column: None,
 692                    source,
 693                })
 694                .await
 695        })
 696    }
 697
 698    pub fn completions(
 699        &self,
 700        session_id: &SessionId,
 701        stack_frame_id: u64,
 702        text: String,
 703        completion_column: u64,
 704        cx: &mut Context<Self>,
 705    ) -> Task<Result<Vec<CompletionItem>>> {
 706        let Some(client) = self
 707            .session_by_id(session_id)
 708            .and_then(|client| client.read(cx).adapter_client())
 709        else {
 710            return Task::ready(Err(anyhow!("Could not find client: {:?}", session_id)));
 711        };
 712
 713        cx.background_executor().spawn(async move {
 714            Ok(client
 715                .request::<Completions>(CompletionsArguments {
 716                    frame_id: Some(stack_frame_id),
 717                    line: None,
 718                    text,
 719                    column: completion_column,
 720                })
 721                .await?
 722                .targets)
 723        })
 724    }
 725
 726    pub fn shutdown_sessions(&mut self, cx: &mut Context<Self>) -> Task<()> {
 727        let mut tasks = vec![];
 728        for session_id in self.sessions.keys().cloned().collect::<Vec<_>>() {
 729            tasks.push(self.shutdown_session(session_id, cx));
 730        }
 731
 732        cx.background_executor().spawn(async move {
 733            futures::future::join_all(tasks).await;
 734        })
 735    }
 736
 737    pub fn shutdown_session(
 738        &mut self,
 739        session_id: SessionId,
 740        cx: &mut Context<Self>,
 741    ) -> Task<Result<()>> {
 742        let Some(session) = self.sessions.remove(&session_id) else {
 743            return Task::ready(Err(anyhow!("Could not find session: {:?}", session_id)));
 744        };
 745
 746        let shutdown_children = session
 747            .read(cx)
 748            .child_session_ids()
 749            .iter()
 750            .map(|session_id| self.shutdown_session(*session_id, cx))
 751            .collect::<Vec<_>>();
 752
 753        let shutdown_parent_task = if let Some(parent_session) = session
 754            .read(cx)
 755            .parent_id()
 756            .and_then(|session_id| self.session_by_id(session_id))
 757        {
 758            let shutdown_id = parent_session.update(cx, |parent_session, _| {
 759                parent_session.remove_child_session_id(session_id);
 760
 761                if parent_session.child_session_ids().len() == 0 {
 762                    Some(parent_session.session_id())
 763                } else {
 764                    None
 765                }
 766            });
 767
 768            shutdown_id.map(|session_id| self.shutdown_session(session_id, cx))
 769        } else {
 770            None
 771        };
 772
 773        let shutdown_task = session.update(cx, |this, cx| this.shutdown(cx));
 774
 775        cx.background_spawn(async move {
 776            if shutdown_children.len() > 0 {
 777                let _ = join_all(shutdown_children).await;
 778            }
 779
 780            shutdown_task.await;
 781
 782            if let Some(parent_task) = shutdown_parent_task {
 783                parent_task.await?;
 784            }
 785
 786            Ok(())
 787        })
 788    }
 789
 790    pub fn shared(
 791        &mut self,
 792        project_id: u64,
 793        downstream_client: AnyProtoClient,
 794        _: &mut Context<Self>,
 795    ) {
 796        self.downstream_client = Some((downstream_client.clone(), project_id));
 797    }
 798
 799    pub fn unshared(&mut self, cx: &mut Context<Self>) {
 800        self.downstream_client.take();
 801
 802        cx.notify();
 803    }
 804
 805    async fn handle_run_debug_locator(
 806        this: Entity<Self>,
 807        envelope: TypedEnvelope<proto::RunDebugLocator>,
 808        mut cx: AsyncApp,
 809    ) -> Result<proto::DebugTaskDefinition> {
 810        let template = DebugTaskTemplate {
 811            locator: Some(envelope.payload.locator),
 812            definition: DebugTaskDefinition::from_proto(
 813                envelope
 814                    .payload
 815                    .task
 816                    .ok_or_else(|| anyhow!("missing definition"))?,
 817            )?,
 818        };
 819        let definition = this
 820            .update(&mut cx, |this, cx| this.run_debug_locator(template, cx))?
 821            .await?;
 822        Ok(definition.to_proto())
 823    }
 824
 825    async fn handle_get_debug_adapter_binary(
 826        this: Entity<Self>,
 827        envelope: TypedEnvelope<proto::GetDebugAdapterBinary>,
 828        mut cx: AsyncApp,
 829    ) -> Result<proto::DebugAdapterBinary> {
 830        let definition = DebugTaskDefinition::from_proto(
 831            envelope
 832                .payload
 833                .task
 834                .ok_or_else(|| anyhow!("missing definition"))?,
 835        )?;
 836        let binary = this
 837            .update(&mut cx, |this, cx| {
 838                this.get_debug_adapter_binary(definition, cx)
 839            })?
 840            .await?;
 841        Ok(binary.to_proto())
 842    }
 843}
 844
 845fn create_new_session(
 846    session_id: SessionId,
 847    initialized_rx: oneshot::Receiver<()>,
 848    start_client_task: Task<Result<Entity<Session>, anyhow::Error>>,
 849    worktree: WeakEntity<Worktree>,
 850    cx: &mut Context<DapStore>,
 851) -> Task<Result<Entity<Session>>> {
 852    let task = cx.spawn(async move |this, cx| {
 853        let session = match start_client_task.await {
 854            Ok(session) => session,
 855            Err(error) => {
 856                this.update(cx, |_, cx| {
 857                    cx.emit(DapStoreEvent::Notification(error.to_string()));
 858                })
 859                .log_err();
 860
 861                return Err(error);
 862            }
 863        };
 864
 865        // we have to insert the session early, so we can handle reverse requests
 866        // that need the session to be available
 867        this.update(cx, |store, cx| {
 868            store.sessions.insert(session_id, session.clone());
 869            cx.emit(DapStoreEvent::DebugClientStarted(session_id));
 870            cx.notify();
 871        })?;
 872        let seq_result = async || {
 873            session
 874                .update(cx, |session, cx| session.request_initialize(cx))?
 875                .await?;
 876
 877            session
 878                .update(cx, |session, cx| {
 879                    session.initialize_sequence(initialized_rx, this.clone(), cx)
 880                })?
 881                .await
 882        };
 883        match seq_result().await {
 884            Ok(_) => {}
 885            Err(error) => {
 886                this.update(cx, |this, cx| {
 887                    cx.emit(DapStoreEvent::Notification(error.to_string()));
 888                    this.shutdown_session(session_id, cx)
 889                })?
 890                .await
 891                .log_err();
 892
 893                return Err(error);
 894            }
 895        }
 896
 897        this.update(cx, |_, cx| {
 898            cx.subscribe(
 899                &session,
 900                move |this: &mut DapStore, session, event: &SessionStateEvent, cx| match event {
 901                    SessionStateEvent::Shutdown => {
 902                        this.shutdown_session(session_id, cx).detach_and_log_err(cx);
 903                    }
 904                    SessionStateEvent::Restart => {
 905                        let mut curr_session = session;
 906                        while let Some(parent_id) = curr_session.read(cx).parent_id() {
 907                            if let Some(parent_session) = this.sessions.get(&parent_id).cloned() {
 908                                curr_session = parent_session;
 909                            } else {
 910                                log::error!("Failed to get parent session from parent session id");
 911                                break;
 912                            }
 913                        }
 914
 915                        let Some((config, binary)) = curr_session.read_with(cx, |session, _| {
 916                            session
 917                                .configuration()
 918                                .map(|config| (config, session.root_binary().clone()))
 919                        }) else {
 920                            log::error!("Failed to get debug config from session");
 921                            return;
 922                        };
 923
 924                        let session_id = curr_session.read(cx).session_id();
 925
 926                        let task = curr_session.update(cx, |session, cx| session.shutdown(cx));
 927
 928                        let worktree = worktree.clone();
 929                        cx.spawn(async move |this, cx| {
 930                            task.await;
 931
 932                            this.update(cx, |this, cx| {
 933                                this.sessions.remove(&session_id);
 934                                this.new_session(
 935                                    binary.as_ref().clone(),
 936                                    config,
 937                                    worktree,
 938                                    None,
 939                                    cx,
 940                                )
 941                            })?
 942                            .1
 943                            .await?;
 944
 945                            anyhow::Ok(())
 946                        })
 947                        .detach_and_log_err(cx);
 948                    }
 949                },
 950            )
 951            .detach();
 952            cx.emit(DapStoreEvent::DebugSessionInitialized(session_id));
 953        })?;
 954
 955        Ok(session)
 956    });
 957    task
 958}
 959
 960#[derive(Clone)]
 961pub struct DapAdapterDelegate {
 962    fs: Arc<dyn Fs>,
 963    worktree_id: WorktreeId,
 964    node_runtime: NodeRuntime,
 965    http_client: Arc<dyn HttpClient>,
 966    language_registry: Arc<LanguageRegistry>,
 967    toolchain_store: Arc<dyn LanguageToolchainStore>,
 968    updated_adapters: Arc<Mutex<HashSet<DebugAdapterName>>>,
 969    load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
 970}
 971
 972impl DapAdapterDelegate {
 973    pub fn new(
 974        fs: Arc<dyn Fs>,
 975        worktree_id: WorktreeId,
 976        node_runtime: NodeRuntime,
 977        http_client: Arc<dyn HttpClient>,
 978        language_registry: Arc<LanguageRegistry>,
 979        toolchain_store: Arc<dyn LanguageToolchainStore>,
 980        load_shell_env_task: Shared<Task<Option<HashMap<String, String>>>>,
 981    ) -> Self {
 982        Self {
 983            fs,
 984            worktree_id,
 985            http_client,
 986            node_runtime,
 987            toolchain_store,
 988            language_registry,
 989            load_shell_env_task,
 990            updated_adapters: Default::default(),
 991        }
 992    }
 993}
 994
 995#[async_trait(?Send)]
 996impl dap::adapters::DapDelegate for DapAdapterDelegate {
 997    fn worktree_id(&self) -> WorktreeId {
 998        self.worktree_id
 999    }
1000
1001    fn http_client(&self) -> Arc<dyn HttpClient> {
1002        self.http_client.clone()
1003    }
1004
1005    fn node_runtime(&self) -> NodeRuntime {
1006        self.node_runtime.clone()
1007    }
1008
1009    fn fs(&self) -> Arc<dyn Fs> {
1010        self.fs.clone()
1011    }
1012
1013    fn updated_adapters(&self) -> Arc<Mutex<HashSet<DebugAdapterName>>> {
1014        self.updated_adapters.clone()
1015    }
1016
1017    fn update_status(&self, dap_name: DebugAdapterName, status: dap::adapters::DapStatus) {
1018        let name = SharedString::from(dap_name.to_string());
1019        let status = match status {
1020            DapStatus::None => BinaryStatus::None,
1021            DapStatus::Downloading => BinaryStatus::Downloading,
1022            DapStatus::Failed { error } => BinaryStatus::Failed { error },
1023            DapStatus::CheckingForUpdate => BinaryStatus::CheckingForUpdate,
1024        };
1025
1026        self.language_registry
1027            .update_dap_status(LanguageServerName(name), status);
1028    }
1029
1030    fn which(&self, command: &OsStr) -> Option<PathBuf> {
1031        which::which(command).ok()
1032    }
1033
1034    async fn shell_env(&self) -> HashMap<String, String> {
1035        let task = self.load_shell_env_task.clone();
1036        task.await.unwrap_or_default()
1037    }
1038
1039    fn toolchain_store(&self) -> Arc<dyn LanguageToolchainStore> {
1040        self.toolchain_store.clone()
1041    }
1042}