transport.rs

   1use anyhow::{Context as _, Result, anyhow, bail};
   2use dap_types::{
   3    ErrorResponse,
   4    messages::{Message, Response},
   5};
   6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
   7use gpui::{AppContext as _, AsyncApp, Task};
   8use settings::Settings as _;
   9use smallvec::SmallVec;
  10use smol::{
  11    channel::{Receiver, Sender, unbounded},
  12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
  13    lock::Mutex,
  14    net::{TcpListener, TcpStream},
  15};
  16use std::{
  17    collections::HashMap,
  18    net::{Ipv4Addr, SocketAddrV4},
  19    process::Stdio,
  20    sync::Arc,
  21    time::Duration,
  22};
  23use task::TcpArgumentsTemplate;
  24use util::ConnectionResult;
  25
  26use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
  27
  28pub(crate) type IoMessage = str;
  29pub(crate) type Command = str;
  30pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
  31
  32#[derive(PartialEq, Eq, Clone, Copy)]
  33pub enum LogKind {
  34    Adapter,
  35    Rpc,
  36}
  37
  38pub enum IoKind {
  39    StdIn,
  40    StdOut,
  41    StdErr,
  42}
  43
  44pub struct TransportPipe {
  45    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
  46    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
  47    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
  48    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
  49}
  50
  51impl TransportPipe {
  52    pub fn new(
  53        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
  54        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
  55        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
  56        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
  57    ) -> Self {
  58        TransportPipe {
  59            input,
  60            output,
  61            stdout,
  62            stderr,
  63        }
  64    }
  65}
  66
  67type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
  68type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
  69
  70pub enum Transport {
  71    Stdio(StdioTransport),
  72    Tcp(TcpTransport),
  73    #[cfg(any(test, feature = "test-support"))]
  74    Fake(FakeTransport),
  75}
  76
  77impl Transport {
  78    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
  79        #[cfg(any(test, feature = "test-support"))]
  80        if cfg!(any(test, feature = "test-support")) {
  81            return FakeTransport::start(cx)
  82                .await
  83                .map(|(transports, fake)| (transports, Self::Fake(fake)));
  84        }
  85
  86        if binary.connection.is_some() {
  87            TcpTransport::start(binary, cx)
  88                .await
  89                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
  90                .context("Tried to connect to a debug adapter via TCP transport layer")
  91        } else {
  92            StdioTransport::start(binary, cx)
  93                .await
  94                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
  95                .context("Tried to connect to a debug adapter via stdin/stdout transport layer")
  96        }
  97    }
  98
  99    fn has_adapter_logs(&self) -> bool {
 100        match self {
 101            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
 102            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
 103            #[cfg(any(test, feature = "test-support"))]
 104            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
 105        }
 106    }
 107
 108    async fn kill(&self) {
 109        match self {
 110            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
 111            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
 112            #[cfg(any(test, feature = "test-support"))]
 113            Transport::Fake(fake_transport) => fake_transport.kill().await,
 114        }
 115    }
 116
 117    #[cfg(any(test, feature = "test-support"))]
 118    pub(crate) fn as_fake(&self) -> &FakeTransport {
 119        match self {
 120            Transport::Fake(fake_transport) => fake_transport,
 121            _ => panic!("Not a fake transport layer"),
 122        }
 123    }
 124}
 125
 126pub(crate) struct TransportDelegate {
 127    log_handlers: LogHandlers,
 128    current_requests: Requests,
 129    pending_requests: Requests,
 130    transport: Transport,
 131    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
 132    _tasks: Vec<Task<()>>,
 133}
 134
 135impl TransportDelegate {
 136    pub(crate) async fn start(
 137        binary: &DebugAdapterBinary,
 138        cx: AsyncApp,
 139    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
 140        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
 141        let mut this = Self {
 142            transport,
 143            server_tx: Default::default(),
 144            log_handlers: Default::default(),
 145            current_requests: Default::default(),
 146            pending_requests: Default::default(),
 147            _tasks: Vec::new(),
 148        };
 149        let messages = this.start_handlers(transport_pipes, cx).await?;
 150        Ok((messages, this))
 151    }
 152
 153    async fn start_handlers(
 154        &mut self,
 155        mut params: TransportPipe,
 156        cx: AsyncApp,
 157    ) -> Result<(Receiver<Message>, Sender<Message>)> {
 158        let (client_tx, server_rx) = unbounded::<Message>();
 159        let (server_tx, client_rx) = unbounded::<Message>();
 160
 161        let log_dap_communications =
 162            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
 163                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
 164                .unwrap_or(false);
 165
 166        let log_handler = if log_dap_communications {
 167            Some(self.log_handlers.clone())
 168        } else {
 169            None
 170        };
 171
 172        let adapter_log_handler = log_handler.clone();
 173        cx.update(|cx| {
 174            if let Some(stdout) = params.stdout.take() {
 175                self._tasks.push(cx.background_spawn(async move {
 176                    match Self::handle_adapter_log(stdout, adapter_log_handler).await {
 177                        ConnectionResult::Timeout => {
 178                            log::error!("Timed out when handling debugger log");
 179                        }
 180                        ConnectionResult::ConnectionReset => {
 181                            log::info!("Debugger logs connection closed");
 182                        }
 183                        ConnectionResult::Result(Ok(())) => {}
 184                        ConnectionResult::Result(Err(e)) => {
 185                            log::error!("Error handling debugger log: {e}");
 186                        }
 187                    }
 188                }));
 189            }
 190
 191            let pending_requests = self.pending_requests.clone();
 192            let output_log_handler = log_handler.clone();
 193            self._tasks.push(cx.background_spawn(async move {
 194                match Self::handle_output(
 195                    params.output,
 196                    client_tx,
 197                    pending_requests.clone(),
 198                    output_log_handler,
 199                )
 200                .await
 201                {
 202                    Ok(()) => {}
 203                    Err(e) => log::error!("Error handling debugger output: {e}"),
 204                }
 205                let mut pending_requests = pending_requests.lock().await;
 206                pending_requests.drain().for_each(|(_, request)| {
 207                    request
 208                        .send(Err(anyhow!("debugger shutdown unexpectedly")))
 209                        .ok();
 210                });
 211            }));
 212
 213            if let Some(stderr) = params.stderr.take() {
 214                let log_handlers = self.log_handlers.clone();
 215                self._tasks.push(cx.background_spawn(async move {
 216                    match Self::handle_error(stderr, log_handlers).await {
 217                        ConnectionResult::Timeout => {
 218                            log::error!("Timed out reading debugger error stream")
 219                        }
 220                        ConnectionResult::ConnectionReset => {
 221                            log::info!("Debugger closed its error stream")
 222                        }
 223                        ConnectionResult::Result(Ok(())) => {}
 224                        ConnectionResult::Result(Err(e)) => {
 225                            log::error!("Error handling debugger error: {e}")
 226                        }
 227                    }
 228                }));
 229            }
 230
 231            let current_requests = self.current_requests.clone();
 232            let pending_requests = self.pending_requests.clone();
 233            let log_handler = log_handler.clone();
 234            self._tasks.push(cx.background_spawn(async move {
 235                match Self::handle_input(
 236                    params.input,
 237                    client_rx,
 238                    current_requests,
 239                    pending_requests,
 240                    log_handler,
 241                )
 242                .await
 243                {
 244                    Ok(()) => {}
 245                    Err(e) => log::error!("Error handling debugger input: {e}"),
 246                }
 247            }));
 248        })?;
 249
 250        {
 251            let mut lock = self.server_tx.lock().await;
 252            *lock = Some(server_tx.clone());
 253        }
 254
 255        Ok((server_rx, server_tx))
 256    }
 257
 258    pub(crate) async fn add_pending_request(
 259        &self,
 260        sequence_id: u64,
 261        request: oneshot::Sender<Result<Response>>,
 262    ) {
 263        let mut pending_requests = self.pending_requests.lock().await;
 264        pending_requests.insert(sequence_id, request);
 265    }
 266
 267    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
 268        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
 269            server_tx.send(message).await.context("sending message")
 270        } else {
 271            anyhow::bail!("Server tx already dropped")
 272        }
 273    }
 274
 275    async fn handle_adapter_log<Stdout>(
 276        stdout: Stdout,
 277        log_handlers: Option<LogHandlers>,
 278    ) -> ConnectionResult<()>
 279    where
 280        Stdout: AsyncRead + Unpin + Send + 'static,
 281    {
 282        let mut reader = BufReader::new(stdout);
 283        let mut line = String::new();
 284
 285        let result = loop {
 286            line.truncate(0);
 287
 288            match reader
 289                .read_line(&mut line)
 290                .await
 291                .context("reading adapter log line")
 292            {
 293                Ok(0) => break ConnectionResult::ConnectionReset,
 294                Ok(_) => {}
 295                Err(e) => break ConnectionResult::Result(Err(e)),
 296            }
 297
 298            if let Some(log_handlers) = log_handlers.as_ref() {
 299                for (kind, handler) in log_handlers.lock().iter_mut() {
 300                    if matches!(kind, LogKind::Adapter) {
 301                        handler(IoKind::StdOut, None, line.as_str());
 302                    }
 303                }
 304            }
 305        };
 306
 307        log::debug!("Handle adapter log dropped");
 308
 309        result
 310    }
 311
 312    fn build_rpc_message(message: String) -> String {
 313        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
 314    }
 315
 316    async fn handle_input<Stdin>(
 317        mut server_stdin: Stdin,
 318        client_rx: Receiver<Message>,
 319        current_requests: Requests,
 320        pending_requests: Requests,
 321        log_handlers: Option<LogHandlers>,
 322    ) -> Result<()>
 323    where
 324        Stdin: AsyncWrite + Unpin + Send + 'static,
 325    {
 326        let result = loop {
 327            match client_rx.recv().await {
 328                Ok(message) => {
 329                    if let Message::Request(request) = &message {
 330                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
 331                            pending_requests.lock().await.insert(request.seq, sender);
 332                        }
 333                    }
 334
 335                    let command = match &message {
 336                        Message::Request(request) => Some(request.command.as_str()),
 337                        Message::Response(response) => Some(response.command.as_str()),
 338                        _ => None,
 339                    };
 340
 341                    let message = match serde_json::to_string(&message) {
 342                        Ok(message) => message,
 343                        Err(e) => break Err(e.into()),
 344                    };
 345
 346                    if let Some(log_handlers) = log_handlers.as_ref() {
 347                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
 348                            if matches!(kind, LogKind::Rpc) {
 349                                log_handler(IoKind::StdIn, command, &message);
 350                            }
 351                        }
 352                    }
 353
 354                    if let Err(e) = server_stdin
 355                        .write_all(Self::build_rpc_message(message).as_bytes())
 356                        .await
 357                    {
 358                        break Err(e.into());
 359                    }
 360
 361                    if let Err(e) = server_stdin.flush().await {
 362                        break Err(e.into());
 363                    }
 364                }
 365                Err(error) => break Err(error.into()),
 366            }
 367        };
 368
 369        log::debug!("Handle adapter input dropped");
 370
 371        result
 372    }
 373
 374    async fn handle_output<Stdout>(
 375        server_stdout: Stdout,
 376        client_tx: Sender<Message>,
 377        pending_requests: Requests,
 378        log_handlers: Option<LogHandlers>,
 379    ) -> Result<()>
 380    where
 381        Stdout: AsyncRead + Unpin + Send + 'static,
 382    {
 383        let mut recv_buffer = String::new();
 384        let mut reader = BufReader::new(server_stdout);
 385
 386        let result = loop {
 387            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
 388                .await
 389            {
 390                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
 391                ConnectionResult::ConnectionReset => {
 392                    log::info!("Debugger closed the connection");
 393                    return Ok(());
 394                }
 395                ConnectionResult::Result(Ok(Message::Response(res))) => {
 396                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
 397                        if let Err(e) = tx.send(Self::process_response(res)) {
 398                            log::trace!("Did not send response `{:?}` for a cancelled", e);
 399                        }
 400                    } else {
 401                        client_tx.send(Message::Response(res)).await?;
 402                    }
 403                }
 404                ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
 405                ConnectionResult::Result(Err(e)) => break Err(e),
 406            }
 407        };
 408
 409        drop(client_tx);
 410        log::debug!("Handle adapter output dropped");
 411
 412        result
 413    }
 414
 415    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
 416    where
 417        Stderr: AsyncRead + Unpin + Send + 'static,
 418    {
 419        log::debug!("Handle error started");
 420        let mut buffer = String::new();
 421
 422        let mut reader = BufReader::new(stderr);
 423
 424        let result = loop {
 425            match reader
 426                .read_line(&mut buffer)
 427                .await
 428                .context("reading error log line")
 429            {
 430                Ok(0) => break ConnectionResult::ConnectionReset,
 431                Ok(_) => {
 432                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
 433                        if matches!(kind, LogKind::Adapter) {
 434                            log_handler(IoKind::StdErr, None, buffer.as_str());
 435                        }
 436                    }
 437
 438                    buffer.truncate(0);
 439                }
 440                Err(error) => break ConnectionResult::Result(Err(error)),
 441            }
 442        };
 443
 444        log::debug!("Handle adapter error dropped");
 445
 446        result
 447    }
 448
 449    fn process_response(response: Response) -> Result<Response> {
 450        if response.success {
 451            Ok(response)
 452        } else {
 453            if let Some(error_message) = response
 454                .body
 455                .clone()
 456                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
 457                .and_then(|response| response.error.map(|msg| msg.format))
 458                .or_else(|| response.message.clone())
 459            {
 460                anyhow::bail!(error_message);
 461            };
 462
 463            anyhow::bail!(
 464                "Received error response from adapter. Response: {:?}",
 465                response
 466            );
 467        }
 468    }
 469
 470    async fn receive_server_message<Stdout>(
 471        reader: &mut BufReader<Stdout>,
 472        buffer: &mut String,
 473        log_handlers: Option<&LogHandlers>,
 474    ) -> ConnectionResult<Message>
 475    where
 476        Stdout: AsyncRead + Unpin + Send + 'static,
 477    {
 478        let mut content_length = None;
 479        loop {
 480            buffer.truncate(0);
 481
 482            match reader
 483                .read_line(buffer)
 484                .await
 485                .with_context(|| "reading a message from server")
 486            {
 487                Ok(0) => return ConnectionResult::ConnectionReset,
 488                Ok(_) => {}
 489                Err(e) => return ConnectionResult::Result(Err(e)),
 490            };
 491
 492            if buffer == "\r\n" {
 493                break;
 494            }
 495
 496            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
 497                match value.parse().context("invalid content length") {
 498                    Ok(length) => content_length = Some(length),
 499                    Err(e) => return ConnectionResult::Result(Err(e)),
 500                }
 501            }
 502        }
 503
 504        let content_length = match content_length.context("missing content length") {
 505            Ok(length) => length,
 506            Err(e) => return ConnectionResult::Result(Err(e)),
 507        };
 508
 509        let mut content = vec![0; content_length];
 510        if let Err(e) = reader
 511            .read_exact(&mut content)
 512            .await
 513            .with_context(|| "reading after a loop")
 514        {
 515            return ConnectionResult::Result(Err(e));
 516        }
 517
 518        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
 519            Ok(str) => str,
 520            Err(e) => return ConnectionResult::Result(Err(e)),
 521        };
 522
 523        let message =
 524            serde_json::from_str::<Message>(message_str).context("deserializing server message");
 525
 526        if let Some(log_handlers) = log_handlers {
 527            let command = match &message {
 528                Ok(Message::Request(request)) => Some(request.command.as_str()),
 529                Ok(Message::Response(response)) => Some(response.command.as_str()),
 530                _ => None,
 531            };
 532
 533            for (kind, log_handler) in log_handlers.lock().iter_mut() {
 534                if matches!(kind, LogKind::Rpc) {
 535                    log_handler(IoKind::StdOut, command, message_str);
 536                }
 537            }
 538        }
 539
 540        ConnectionResult::Result(message)
 541    }
 542
 543    pub async fn shutdown(&self) -> Result<()> {
 544        log::debug!("Start shutdown client");
 545
 546        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
 547            server_tx.close();
 548        }
 549
 550        let mut current_requests = self.current_requests.lock().await;
 551        let mut pending_requests = self.pending_requests.lock().await;
 552
 553        current_requests.clear();
 554        pending_requests.clear();
 555
 556        self.transport.kill().await;
 557
 558        drop(current_requests);
 559        drop(pending_requests);
 560
 561        log::debug!("Shutdown client completed");
 562
 563        anyhow::Ok(())
 564    }
 565
 566    pub fn has_adapter_logs(&self) -> bool {
 567        self.transport.has_adapter_logs()
 568    }
 569
 570    pub fn transport(&self) -> &Transport {
 571        &self.transport
 572    }
 573
 574    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
 575    where
 576        F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
 577    {
 578        let mut log_handlers = self.log_handlers.lock();
 579        log_handlers.push((kind, Box::new(f)));
 580    }
 581}
 582
 583pub struct TcpTransport {
 584    pub port: u16,
 585    pub host: Ipv4Addr,
 586    pub timeout: u64,
 587    process: Option<Mutex<Child>>,
 588}
 589
 590impl TcpTransport {
 591    /// Get an open port to use with the tcp client when not supplied by debug config
 592    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
 593        if let Some(port) = host.port {
 594            Ok(port)
 595        } else {
 596            Self::unused_port(host.host()).await
 597        }
 598    }
 599
 600    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
 601        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
 602            .await?
 603            .local_addr()?
 604            .port())
 605    }
 606
 607    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 608        let connection_args = binary
 609            .connection
 610            .as_ref()
 611            .context("No connection arguments provided")?;
 612
 613        let host = connection_args.host;
 614        let port = connection_args.port;
 615
 616        let mut process = if let Some(command) = &binary.command {
 617            let mut command = util::command::new_std_command(&command);
 618
 619            if let Some(cwd) = &binary.cwd {
 620                command.current_dir(cwd);
 621            }
 622
 623            command.args(&binary.arguments);
 624            command.envs(&binary.envs);
 625
 626            Some(
 627                Child::spawn(command, Stdio::null())
 628                    .with_context(|| "failed to start debug adapter.")?,
 629            )
 630        } else {
 631            None
 632        };
 633
 634        let address = SocketAddrV4::new(host, port);
 635
 636        let timeout = connection_args.timeout.unwrap_or_else(|| {
 637            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
 638                .unwrap_or(2000u64)
 639        });
 640
 641        let (mut process, (rx, tx)) = select! {
 642            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
 643                anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
 644            },
 645            result = cx.spawn(async move |cx| {
 646                loop {
 647                    match TcpStream::connect(address).await {
 648                        Ok(stream) => return Ok((process, stream.split())),
 649                        Err(_) => {
 650                            if let Some(p) = &mut process {
 651                                if let Ok(Some(_)) = p.try_status() {
 652                                    let output = process.take().unwrap().into_inner().output().await?;
 653                                    let output = if output.stderr.is_empty() {
 654                                        String::from_utf8_lossy(&output.stdout).to_string()
 655                                    } else {
 656                                        String::from_utf8_lossy(&output.stderr).to_string()
 657                                    };
 658                                    anyhow::bail!("{output}\nerror: process exited before debugger attached.");
 659                                }
 660                            }
 661
 662                            cx.background_executor().timer(Duration::from_millis(100)).await;
 663                        }
 664                    }
 665                }
 666            }).fuse() => result?
 667        };
 668
 669        log::info!(
 670            "Debug adapter has connected to TCP server {}:{}",
 671            host,
 672            port
 673        );
 674        let stdout = process.as_mut().and_then(|p| p.stdout.take());
 675        let stderr = process.as_mut().and_then(|p| p.stderr.take());
 676
 677        let this = Self {
 678            port,
 679            host,
 680            process: process.map(Mutex::new),
 681            timeout,
 682        };
 683
 684        let pipe = TransportPipe::new(
 685            Box::new(tx),
 686            Box::new(BufReader::new(rx)),
 687            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
 688            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
 689        );
 690
 691        Ok((pipe, this))
 692    }
 693
 694    fn has_adapter_logs(&self) -> bool {
 695        true
 696    }
 697
 698    async fn kill(&self) {
 699        if let Some(process) = &self.process {
 700            let mut process = process.lock().await;
 701            Child::kill(&mut process);
 702        }
 703    }
 704}
 705
 706impl Drop for TcpTransport {
 707    fn drop(&mut self) {
 708        if let Some(mut p) = self.process.take() {
 709            p.get_mut().kill();
 710        }
 711    }
 712}
 713
 714pub struct StdioTransport {
 715    process: Mutex<Child>,
 716}
 717
 718impl StdioTransport {
 719    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
 720    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
 721        let Some(binary_command) = &binary.command else {
 722            bail!(
 723                "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
 724            );
 725        };
 726        let mut command = util::command::new_std_command(&binary_command);
 727
 728        if let Some(cwd) = &binary.cwd {
 729            command.current_dir(cwd);
 730        }
 731
 732        command.args(&binary.arguments);
 733        command.envs(&binary.envs);
 734
 735        let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
 736            format!(
 737                "failed to spawn command `{} {}`.",
 738                binary_command,
 739                binary.arguments.join(" ")
 740            )
 741        })?;
 742
 743        let stdin = process.stdin.take().context("Failed to open stdin")?;
 744        let stdout = process.stdout.take().context("Failed to open stdout")?;
 745        let stderr = process
 746            .stderr
 747            .take()
 748            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
 749
 750        if stderr.is_none() {
 751            bail!(
 752                "Failed to connect to stderr for debug adapter command {}",
 753                &binary_command
 754            );
 755        }
 756
 757        log::info!("Debug adapter has connected to stdio adapter");
 758
 759        let process = Mutex::new(process);
 760
 761        Ok((
 762            TransportPipe::new(
 763                Box::new(stdin),
 764                Box::new(BufReader::new(stdout)),
 765                None,
 766                stderr,
 767            ),
 768            Self { process },
 769        ))
 770    }
 771
 772    fn has_adapter_logs(&self) -> bool {
 773        false
 774    }
 775
 776    async fn kill(&self) {
 777        let mut process = self.process.lock().await;
 778        Child::kill(&mut process);
 779    }
 780}
 781
 782impl Drop for StdioTransport {
 783    fn drop(&mut self) {
 784        self.process.get_mut().kill();
 785    }
 786}
 787
 788#[cfg(any(test, feature = "test-support"))]
 789type RequestHandler =
 790    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
 791
 792#[cfg(any(test, feature = "test-support"))]
 793type ResponseHandler = Box<dyn Send + Fn(Response)>;
 794
 795#[cfg(any(test, feature = "test-support"))]
 796pub struct FakeTransport {
 797    // for sending fake response back from adapter side
 798    request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
 799    // for reverse request responses
 800    response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
 801}
 802
 803#[cfg(any(test, feature = "test-support"))]
 804impl FakeTransport {
 805    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
 806    where
 807        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
 808    {
 809        self.request_handlers.lock().insert(
 810            R::COMMAND,
 811            Box::new(move |seq, args| {
 812                let result = handler(seq, serde_json::from_value(args).unwrap());
 813                let response = match result {
 814                    Ok(response) => Response {
 815                        seq: seq + 1,
 816                        request_seq: seq,
 817                        success: true,
 818                        command: R::COMMAND.into(),
 819                        body: Some(serde_json::to_value(response).unwrap()),
 820                        message: None,
 821                    },
 822                    Err(response) => Response {
 823                        seq: seq + 1,
 824                        request_seq: seq,
 825                        success: false,
 826                        command: R::COMMAND.into(),
 827                        body: Some(serde_json::to_value(response).unwrap()),
 828                        message: None,
 829                    },
 830                };
 831                response
 832            }),
 833        );
 834    }
 835
 836    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
 837    where
 838        F: 'static + Send + Fn(Response),
 839    {
 840        self.response_handlers
 841            .lock()
 842            .insert(R::COMMAND, Box::new(handler));
 843    }
 844
 845    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 846        let this = Self {
 847            request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
 848            response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
 849        };
 850        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
 851        use serde_json::json;
 852
 853        let (stdin_writer, stdin_reader) = async_pipe::pipe();
 854        let (stdout_writer, stdout_reader) = async_pipe::pipe();
 855
 856        let request_handlers = this.request_handlers.clone();
 857        let response_handlers = this.response_handlers.clone();
 858        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
 859
 860        cx.background_spawn(async move {
 861            let mut reader = BufReader::new(stdin_reader);
 862            let mut buffer = String::new();
 863
 864            loop {
 865                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
 866                    .await
 867                {
 868                    ConnectionResult::Timeout => {
 869                        anyhow::bail!("Timed out when connecting to debugger");
 870                    }
 871                    ConnectionResult::ConnectionReset => {
 872                        log::info!("Debugger closed the connection");
 873                        break Ok(());
 874                    }
 875                    ConnectionResult::Result(Err(e)) => break Err(e),
 876                    ConnectionResult::Result(Ok(message)) => {
 877                        match message {
 878                            Message::Request(request) => {
 879                                // redirect reverse requests to stdout writer/reader
 880                                if request.command == RunInTerminal::COMMAND
 881                                    || request.command == StartDebugging::COMMAND
 882                                {
 883                                    let message =
 884                                        serde_json::to_string(&Message::Request(request)).unwrap();
 885
 886                                    let mut writer = stdout_writer.lock().await;
 887                                    writer
 888                                        .write_all(
 889                                            TransportDelegate::build_rpc_message(message)
 890                                                .as_bytes(),
 891                                        )
 892                                        .await
 893                                        .unwrap();
 894                                    writer.flush().await.unwrap();
 895                                } else {
 896                                    let response = if let Some(handle) =
 897                                        request_handlers.lock().get_mut(request.command.as_str())
 898                                    {
 899                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
 900                                    } else {
 901                                        panic!("No request handler for {}", request.command);
 902                                    };
 903                                    let message =
 904                                        serde_json::to_string(&Message::Response(response))
 905                                            .unwrap();
 906
 907                                    let mut writer = stdout_writer.lock().await;
 908
 909                                    writer
 910                                        .write_all(
 911                                            TransportDelegate::build_rpc_message(message)
 912                                                .as_bytes(),
 913                                        )
 914                                        .await
 915                                        .unwrap();
 916                                    writer.flush().await.unwrap();
 917                                }
 918                            }
 919                            Message::Event(event) => {
 920                                let message =
 921                                    serde_json::to_string(&Message::Event(event)).unwrap();
 922
 923                                let mut writer = stdout_writer.lock().await;
 924                                writer
 925                                    .write_all(
 926                                        TransportDelegate::build_rpc_message(message).as_bytes(),
 927                                    )
 928                                    .await
 929                                    .unwrap();
 930                                writer.flush().await.unwrap();
 931                            }
 932                            Message::Response(response) => {
 933                                if let Some(handle) =
 934                                    response_handlers.lock().get(response.command.as_str())
 935                                {
 936                                    handle(response);
 937                                } else {
 938                                    log::error!("No response handler for {}", response.command);
 939                                }
 940                            }
 941                        }
 942                    }
 943                }
 944            }
 945        })
 946        .detach();
 947
 948        Ok((
 949            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
 950            this,
 951        ))
 952    }
 953
 954    fn has_adapter_logs(&self) -> bool {
 955        false
 956    }
 957
 958    async fn kill(&self) {}
 959}
 960
 961struct Child {
 962    process: smol::process::Child,
 963}
 964
 965impl std::ops::Deref for Child {
 966    type Target = smol::process::Child;
 967
 968    fn deref(&self) -> &Self::Target {
 969        &self.process
 970    }
 971}
 972
 973impl std::ops::DerefMut for Child {
 974    fn deref_mut(&mut self) -> &mut Self::Target {
 975        &mut self.process
 976    }
 977}
 978
 979impl Child {
 980    fn into_inner(self) -> smol::process::Child {
 981        self.process
 982    }
 983
 984    #[cfg(not(windows))]
 985    fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
 986        util::set_pre_exec_to_start_new_session(&mut command);
 987        let process = smol::process::Command::from(command)
 988            .stdin(stdin)
 989            .stdout(Stdio::piped())
 990            .stderr(Stdio::piped())
 991            .spawn()?;
 992        Ok(Self { process })
 993    }
 994
 995    #[cfg(windows)]
 996    fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
 997        // TODO(windows): create a job object and add the child process handle to it,
 998        // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
 999        let process = smol::process::Command::from(command)
1000            .stdin(stdin)
1001            .stdout(Stdio::piped())
1002            .stderr(Stdio::piped())
1003            .spawn()?;
1004        Ok(Self { process })
1005    }
1006
1007    #[cfg(not(windows))]
1008    fn kill(&mut self) {
1009        let pid = self.process.id();
1010        unsafe {
1011            libc::killpg(pid as i32, libc::SIGKILL);
1012        }
1013    }
1014
1015    #[cfg(windows)]
1016    fn kill(&mut self) {
1017        // TODO(windows): terminate the job object in kill
1018        let _ = self.process.kill();
1019    }
1020}