transport.rs

   1use anyhow::{Context as _, Result, anyhow, bail};
   2#[cfg(any(test, feature = "test-support"))]
   3use async_pipe::{PipeReader, PipeWriter};
   4use dap_types::{
   5    ErrorResponse,
   6    messages::{Message, Response},
   7};
   8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
   9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
  10use parking_lot::Mutex;
  11use proto::ErrorExt;
  12use settings::Settings as _;
  13use smallvec::SmallVec;
  14use smol::{
  15    channel::{Receiver, Sender, unbounded},
  16    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
  17    net::{TcpListener, TcpStream},
  18};
  19use std::{
  20    collections::HashMap,
  21    net::{Ipv4Addr, SocketAddrV4},
  22    process::Stdio,
  23    sync::Arc,
  24    time::Duration,
  25};
  26use task::TcpArgumentsTemplate;
  27use util::ConnectionResult;
  28
  29use crate::{
  30    adapters::{DebugAdapterBinary, TcpArguments},
  31    client::DapMessageHandler,
  32    debugger_settings::DebuggerSettings,
  33};
  34
  35pub(crate) type IoMessage = str;
  36pub(crate) type Command = str;
  37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
  38
  39#[derive(PartialEq, Eq, Clone, Copy)]
  40pub enum LogKind {
  41    Adapter,
  42    Rpc,
  43}
  44
  45#[derive(Clone, Copy)]
  46pub enum IoKind {
  47    StdIn,
  48    StdOut,
  49    StdErr,
  50}
  51
  52#[cfg(any(test, feature = "test-support"))]
  53pub enum RequestHandling<T> {
  54    Respond(T),
  55    Exit,
  56}
  57
  58type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
  59
  60pub trait Transport: Send + Sync {
  61    fn has_adapter_logs(&self) -> bool;
  62    fn tcp_arguments(&self) -> Option<TcpArguments>;
  63    fn connect(
  64        &mut self,
  65    ) -> Task<
  66        Result<(
  67            Box<dyn AsyncWrite + Unpin + Send + 'static>,
  68            Box<dyn AsyncRead + Unpin + Send + 'static>,
  69        )>,
  70    >;
  71    fn kill(&mut self);
  72    #[cfg(any(test, feature = "test-support"))]
  73    fn as_fake(&self) -> &FakeTransport {
  74        unreachable!()
  75    }
  76}
  77
  78async fn start(
  79    binary: &DebugAdapterBinary,
  80    log_handlers: LogHandlers,
  81    cx: &mut AsyncApp,
  82) -> Result<Box<dyn Transport>> {
  83    #[cfg(any(test, feature = "test-support"))]
  84    if cfg!(any(test, feature = "test-support")) {
  85        if let Some(connection) = binary.connection.clone() {
  86            return Ok(Box::new(FakeTransport::start_tcp(connection, cx).await?));
  87        } else {
  88            return Ok(Box::new(FakeTransport::start_stdio(cx).await?));
  89        }
  90    }
  91
  92    if binary.connection.is_some() {
  93        Ok(Box::new(
  94            TcpTransport::start(binary, log_handlers, cx).await?,
  95        ))
  96    } else {
  97        Ok(Box::new(
  98            StdioTransport::start(binary, log_handlers, cx).await?,
  99        ))
 100    }
 101}
 102
 103pub(crate) struct PendingRequests {
 104    inner: Option<HashMap<u64, oneshot::Sender<Result<Response>>>>,
 105}
 106
 107impl PendingRequests {
 108    fn new() -> Self {
 109        Self {
 110            inner: Some(HashMap::default()),
 111        }
 112    }
 113
 114    fn flush(&mut self, e: anyhow::Error) {
 115        let Some(inner) = self.inner.as_mut() else {
 116            return;
 117        };
 118        for (_, sender) in inner.drain() {
 119            sender.send(Err(e.cloned())).ok();
 120        }
 121    }
 122
 123    pub(crate) fn insert(
 124        &mut self,
 125        sequence_id: u64,
 126        callback_tx: oneshot::Sender<Result<Response>>,
 127    ) -> anyhow::Result<()> {
 128        let Some(inner) = self.inner.as_mut() else {
 129            bail!("client is closed")
 130        };
 131        inner.insert(sequence_id, callback_tx);
 132        Ok(())
 133    }
 134
 135    pub(crate) fn remove(
 136        &mut self,
 137        sequence_id: u64,
 138    ) -> anyhow::Result<Option<oneshot::Sender<Result<Response>>>> {
 139        let Some(inner) = self.inner.as_mut() else {
 140            bail!("client is closed");
 141        };
 142        Ok(inner.remove(&sequence_id))
 143    }
 144
 145    pub(crate) fn shutdown(&mut self) {
 146        self.flush(anyhow!("transport shutdown"));
 147        self.inner = None;
 148    }
 149}
 150
 151pub(crate) struct TransportDelegate {
 152    log_handlers: LogHandlers,
 153    pub(crate) pending_requests: Arc<Mutex<PendingRequests>>,
 154    pub(crate) transport: Mutex<Box<dyn Transport>>,
 155    pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
 156    tasks: Mutex<Vec<Task<()>>>,
 157}
 158
 159impl TransportDelegate {
 160    pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
 161        let log_handlers: LogHandlers = Default::default();
 162        let transport = start(binary, log_handlers.clone(), cx).await?;
 163        Ok(Self {
 164            transport: Mutex::new(transport),
 165            log_handlers,
 166            server_tx: Default::default(),
 167            pending_requests: Arc::new(Mutex::new(PendingRequests::new())),
 168            tasks: Default::default(),
 169        })
 170    }
 171
 172    pub async fn connect(
 173        &self,
 174        message_handler: DapMessageHandler,
 175        cx: &mut AsyncApp,
 176    ) -> Result<()> {
 177        let (server_tx, client_rx) = unbounded::<Message>();
 178        self.tasks.lock().clear();
 179
 180        let log_dap_communications =
 181            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
 182                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
 183                .unwrap_or(false);
 184
 185        let connect = self.transport.lock().connect();
 186        let (input, output) = connect.await?;
 187
 188        let log_handler = if log_dap_communications {
 189            Some(self.log_handlers.clone())
 190        } else {
 191            None
 192        };
 193
 194        let pending_requests = self.pending_requests.clone();
 195        let output_log_handler = log_handler.clone();
 196        {
 197            let mut tasks = self.tasks.lock();
 198            tasks.push(cx.background_spawn(async move {
 199                match Self::recv_from_server(
 200                    output,
 201                    message_handler,
 202                    pending_requests.clone(),
 203                    output_log_handler,
 204                )
 205                .await
 206                {
 207                    Ok(()) => {
 208                        pending_requests
 209                            .lock()
 210                            .flush(anyhow!("debugger shutdown unexpectedly"));
 211                    }
 212                    Err(e) => {
 213                        pending_requests.lock().flush(e);
 214                    }
 215                }
 216            }));
 217
 218            tasks.push(cx.background_spawn(async move {
 219                match Self::send_to_server(input, client_rx, log_handler).await {
 220                    Ok(()) => {}
 221                    Err(e) => log::error!("Error handling debugger input: {e}"),
 222                }
 223            }));
 224        }
 225
 226        {
 227            let mut lock = self.server_tx.lock().await;
 228            *lock = Some(server_tx.clone());
 229        }
 230
 231        Ok(())
 232    }
 233
 234    pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
 235        self.transport.lock().tcp_arguments()
 236    }
 237
 238    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
 239        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
 240            server_tx.send(message).await.context("sending message")
 241        } else {
 242            anyhow::bail!("Server tx already dropped")
 243        }
 244    }
 245
 246    async fn handle_adapter_log(
 247        stdout: impl AsyncRead + Unpin + Send + 'static,
 248        iokind: IoKind,
 249        log_handlers: LogHandlers,
 250    ) {
 251        let mut reader = BufReader::new(stdout);
 252        let mut line = String::new();
 253
 254        loop {
 255            line.truncate(0);
 256
 257            match reader.read_line(&mut line).await {
 258                Ok(0) => break,
 259                Ok(_) => {}
 260                Err(e) => {
 261                    log::debug!("handle_adapter_log: {}", e);
 262                    break;
 263                }
 264            }
 265
 266            for (kind, handler) in log_handlers.lock().iter_mut() {
 267                if matches!(kind, LogKind::Adapter) {
 268                    handler(iokind, None, line.as_str());
 269                }
 270            }
 271        }
 272    }
 273
 274    fn build_rpc_message(message: String) -> String {
 275        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
 276    }
 277
 278    async fn send_to_server<Stdin>(
 279        mut server_stdin: Stdin,
 280        client_rx: Receiver<Message>,
 281        log_handlers: Option<LogHandlers>,
 282    ) -> Result<()>
 283    where
 284        Stdin: AsyncWrite + Unpin + Send + 'static,
 285    {
 286        let result = loop {
 287            match client_rx.recv().await {
 288                Ok(message) => {
 289                    let command = match &message {
 290                        Message::Request(request) => Some(request.command.as_str()),
 291                        Message::Response(response) => Some(response.command.as_str()),
 292                        _ => None,
 293                    };
 294
 295                    let message = match serde_json::to_string(&message) {
 296                        Ok(message) => message,
 297                        Err(e) => break Err(e.into()),
 298                    };
 299
 300                    if let Some(log_handlers) = log_handlers.as_ref() {
 301                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
 302                            if matches!(kind, LogKind::Rpc) {
 303                                log_handler(IoKind::StdIn, command, &message);
 304                            }
 305                        }
 306                    }
 307
 308                    if let Err(e) = server_stdin
 309                        .write_all(Self::build_rpc_message(message).as_bytes())
 310                        .await
 311                    {
 312                        break Err(e.into());
 313                    }
 314
 315                    if let Err(e) = server_stdin.flush().await {
 316                        break Err(e.into());
 317                    }
 318                }
 319                Err(error) => break Err(error.into()),
 320            }
 321        };
 322
 323        log::debug!("Handle adapter input dropped");
 324
 325        result
 326    }
 327
 328    async fn recv_from_server<Stdout>(
 329        server_stdout: Stdout,
 330        mut message_handler: DapMessageHandler,
 331        pending_requests: Arc<Mutex<PendingRequests>>,
 332        log_handlers: Option<LogHandlers>,
 333    ) -> Result<()>
 334    where
 335        Stdout: AsyncRead + Unpin + Send + 'static,
 336    {
 337        let mut recv_buffer = String::new();
 338        let mut reader = BufReader::new(server_stdout);
 339
 340        let result = loop {
 341            let result =
 342                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
 343                    .await;
 344            match result {
 345                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
 346                ConnectionResult::ConnectionReset => {
 347                    log::info!("Debugger closed the connection");
 348                    return Ok(());
 349                }
 350                ConnectionResult::Result(Ok(Message::Response(res))) => {
 351                    let tx = pending_requests.lock().remove(res.request_seq)?;
 352                    if let Some(tx) = tx {
 353                        if let Err(e) = tx.send(Self::process_response(res)) {
 354                            log::trace!("Did not send response `{:?}` for a cancelled", e);
 355                        }
 356                    } else {
 357                        message_handler(Message::Response(res))
 358                    }
 359                }
 360                ConnectionResult::Result(Ok(message)) => message_handler(message),
 361                ConnectionResult::Result(Err(e)) => break Err(e),
 362            }
 363        };
 364
 365        log::debug!("Handle adapter output dropped");
 366
 367        result
 368    }
 369
 370    fn process_response(response: Response) -> Result<Response> {
 371        if response.success {
 372            Ok(response)
 373        } else {
 374            if let Some(error_message) = response
 375                .body
 376                .clone()
 377                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
 378                .and_then(|response| response.error.map(|msg| msg.format))
 379                .or_else(|| response.message.clone())
 380            {
 381                anyhow::bail!(error_message);
 382            };
 383
 384            anyhow::bail!(
 385                "Received error response from adapter. Response: {:?}",
 386                response
 387            );
 388        }
 389    }
 390
 391    async fn receive_server_message<Stdout>(
 392        reader: &mut BufReader<Stdout>,
 393        buffer: &mut String,
 394        log_handlers: Option<&LogHandlers>,
 395    ) -> ConnectionResult<Message>
 396    where
 397        Stdout: AsyncRead + Unpin + Send + 'static,
 398    {
 399        let mut content_length = None;
 400        loop {
 401            buffer.truncate(0);
 402            match reader.read_line(buffer).await {
 403                Ok(0) => return ConnectionResult::ConnectionReset,
 404                Ok(_) => {}
 405                Err(e) => return ConnectionResult::Result(Err(e.into())),
 406            };
 407
 408            if buffer == "\r\n" {
 409                break;
 410            }
 411
 412            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
 413                match value.parse().context("invalid content length") {
 414                    Ok(length) => content_length = Some(length),
 415                    Err(e) => return ConnectionResult::Result(Err(e)),
 416                }
 417            }
 418        }
 419
 420        let content_length = match content_length.context("missing content length") {
 421            Ok(length) => length,
 422            Err(e) => return ConnectionResult::Result(Err(e)),
 423        };
 424
 425        let mut content = vec![0; content_length];
 426        if let Err(e) = reader
 427            .read_exact(&mut content)
 428            .await
 429            .with_context(|| "reading after a loop")
 430        {
 431            return ConnectionResult::Result(Err(e));
 432        }
 433
 434        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
 435            Ok(str) => str,
 436            Err(e) => return ConnectionResult::Result(Err(e)),
 437        };
 438
 439        let message =
 440            serde_json::from_str::<Message>(message_str).context("deserializing server message");
 441
 442        if let Some(log_handlers) = log_handlers {
 443            let command = match &message {
 444                Ok(Message::Request(request)) => Some(request.command.as_str()),
 445                Ok(Message::Response(response)) => Some(response.command.as_str()),
 446                _ => None,
 447            };
 448
 449            for (kind, log_handler) in log_handlers.lock().iter_mut() {
 450                if matches!(kind, LogKind::Rpc) {
 451                    log_handler(IoKind::StdOut, command, message_str);
 452                }
 453            }
 454        }
 455
 456        ConnectionResult::Result(message)
 457    }
 458
 459    pub fn has_adapter_logs(&self) -> bool {
 460        self.transport.lock().has_adapter_logs()
 461    }
 462
 463    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
 464    where
 465        F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
 466    {
 467        let mut log_handlers = self.log_handlers.lock();
 468        log_handlers.push((kind, Box::new(f)));
 469    }
 470}
 471
 472pub struct TcpTransport {
 473    executor: BackgroundExecutor,
 474    pub port: u16,
 475    pub host: Ipv4Addr,
 476    pub timeout: u64,
 477    process: Arc<Mutex<Option<Child>>>,
 478    _stderr_task: Option<Task<()>>,
 479    _stdout_task: Option<Task<()>>,
 480}
 481
 482impl TcpTransport {
 483    /// Get an open port to use with the tcp client when not supplied by debug config
 484    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
 485        if let Some(port) = host.port {
 486            Ok(port)
 487        } else {
 488            Self::unused_port(host.host()).await
 489        }
 490    }
 491
 492    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
 493        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
 494            .await?
 495            .local_addr()?
 496            .port())
 497    }
 498
 499    async fn start(
 500        binary: &DebugAdapterBinary,
 501        log_handlers: LogHandlers,
 502        cx: &mut AsyncApp,
 503    ) -> Result<Self> {
 504        let connection_args = binary
 505            .connection
 506            .as_ref()
 507            .context("No connection arguments provided")?;
 508
 509        let host = connection_args.host;
 510        let port = connection_args.port;
 511
 512        let mut process = None;
 513        let mut stdout_task = None;
 514        let mut stderr_task = None;
 515
 516        if let Some(command) = &binary.command {
 517            let mut command = util::command::new_std_command(&command);
 518
 519            if let Some(cwd) = &binary.cwd {
 520                command.current_dir(cwd);
 521            }
 522
 523            command.args(&binary.arguments);
 524            command.envs(&binary.envs);
 525
 526            let mut p = Child::spawn(command, Stdio::null())
 527                .with_context(|| "failed to start debug adapter.")?;
 528
 529            stdout_task = p.stdout.take().map(|stdout| {
 530                cx.background_executor()
 531                    .spawn(TransportDelegate::handle_adapter_log(
 532                        stdout,
 533                        IoKind::StdOut,
 534                        log_handlers.clone(),
 535                    ))
 536            });
 537            stderr_task = p.stderr.take().map(|stderr| {
 538                cx.background_executor()
 539                    .spawn(TransportDelegate::handle_adapter_log(
 540                        stderr,
 541                        IoKind::StdErr,
 542                        log_handlers,
 543                    ))
 544            });
 545            process = Some(p);
 546        };
 547
 548        let timeout = connection_args.timeout.unwrap_or_else(|| {
 549            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
 550                .unwrap_or(20000u64)
 551        });
 552
 553        log::info!(
 554            "Debug adapter has connected to TCP server {}:{}",
 555            host,
 556            port
 557        );
 558
 559        let this = Self {
 560            executor: cx.background_executor().clone(),
 561            port,
 562            host,
 563            process: Arc::new(Mutex::new(process)),
 564            timeout,
 565            _stdout_task: stdout_task,
 566            _stderr_task: stderr_task,
 567        };
 568
 569        Ok(this)
 570    }
 571}
 572
 573impl Transport for TcpTransport {
 574    fn has_adapter_logs(&self) -> bool {
 575        true
 576    }
 577
 578    fn kill(&mut self) {
 579        if let Some(process) = &mut *self.process.lock() {
 580            process.kill();
 581        }
 582    }
 583
 584    fn tcp_arguments(&self) -> Option<TcpArguments> {
 585        Some(TcpArguments {
 586            host: self.host,
 587            port: self.port,
 588            timeout: Some(self.timeout),
 589        })
 590    }
 591
 592    fn connect(
 593        &mut self,
 594    ) -> Task<
 595        Result<(
 596            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 597            Box<dyn AsyncRead + Unpin + Send + 'static>,
 598        )>,
 599    > {
 600        let executor = self.executor.clone();
 601        let timeout = self.timeout;
 602        let address = SocketAddrV4::new(self.host, self.port);
 603        let process = self.process.clone();
 604        executor.clone().spawn(async move {
 605            select! {
 606                _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
 607                    anyhow::bail!("Connection to TCP DAP timeout {address}");
 608                },
 609                result = executor.clone().spawn(async move {
 610                    loop {
 611                        match TcpStream::connect(address).await {
 612                            Ok(stream) => {
 613                                let (read, write) = stream.split();
 614                                return Ok((Box::new(write) as _, Box::new(read) as _))
 615                            },
 616                            Err(_) => {
 617                                let has_process = process.lock().is_some();
 618                                if has_process {
 619                                    let status = process.lock().as_mut().unwrap().try_status();
 620                                    if let Ok(Some(_)) = status {
 621                                        let process = process.lock().take().unwrap().into_inner();
 622                                        let output = process.output().await?;
 623                                        let output = if output.stderr.is_empty() {
 624                                            String::from_utf8_lossy(&output.stdout).to_string()
 625                                        } else {
 626                                            String::from_utf8_lossy(&output.stderr).to_string()
 627                                        };
 628                                        anyhow::bail!("{output}\nerror: process exited before debugger attached.");
 629                                    }
 630                                }
 631
 632                                executor.timer(Duration::from_millis(100)).await;
 633                            }
 634                        }
 635                    }
 636                }).fuse() => result
 637            }
 638        })
 639    }
 640}
 641
 642impl Drop for TcpTransport {
 643    fn drop(&mut self) {
 644        if let Some(mut p) = self.process.lock().take() {
 645            p.kill()
 646        }
 647    }
 648}
 649
 650pub struct StdioTransport {
 651    process: Mutex<Option<Child>>,
 652    _stderr_task: Option<Task<()>>,
 653}
 654
 655impl StdioTransport {
 656    // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
 657    async fn start(
 658        binary: &DebugAdapterBinary,
 659        log_handlers: LogHandlers,
 660        cx: &mut AsyncApp,
 661    ) -> Result<Self> {
 662        let Some(binary_command) = &binary.command else {
 663            bail!(
 664                "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
 665            );
 666        };
 667        let mut command = util::command::new_std_command(&binary_command);
 668
 669        if let Some(cwd) = &binary.cwd {
 670            command.current_dir(cwd);
 671        }
 672
 673        command.args(&binary.arguments);
 674        command.envs(&binary.envs);
 675
 676        let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
 677            format!(
 678                "failed to spawn command `{} {}`.",
 679                binary_command,
 680                binary.arguments.join(" ")
 681            )
 682        })?;
 683
 684        let err_task = process.stderr.take().map(|stderr| {
 685            cx.background_spawn(TransportDelegate::handle_adapter_log(
 686                stderr,
 687                IoKind::StdErr,
 688                log_handlers,
 689            ))
 690        });
 691
 692        let process = Mutex::new(Some(process));
 693
 694        Ok(Self {
 695            process,
 696            _stderr_task: err_task,
 697        })
 698    }
 699}
 700
 701impl Transport for StdioTransport {
 702    fn has_adapter_logs(&self) -> bool {
 703        false
 704    }
 705
 706    fn kill(&mut self) {
 707        if let Some(process) = &mut *self.process.lock() {
 708            process.kill();
 709        }
 710    }
 711
 712    fn connect(
 713        &mut self,
 714    ) -> Task<
 715        Result<(
 716            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 717            Box<dyn AsyncRead + Unpin + Send + 'static>,
 718        )>,
 719    > {
 720        let result = util::maybe!({
 721            let mut guard = self.process.lock();
 722            let process = guard.as_mut().context("oops")?;
 723            Ok((
 724                Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
 725                Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
 726            ))
 727        });
 728        Task::ready(result)
 729    }
 730
 731    fn tcp_arguments(&self) -> Option<TcpArguments> {
 732        None
 733    }
 734}
 735
 736impl Drop for StdioTransport {
 737    fn drop(&mut self) {
 738        if let Some(process) = &mut *self.process.lock() {
 739            process.kill();
 740        }
 741    }
 742}
 743
 744#[cfg(any(test, feature = "test-support"))]
 745type RequestHandler = Box<dyn Send + FnMut(u64, serde_json::Value) -> RequestHandling<Response>>;
 746
 747#[cfg(any(test, feature = "test-support"))]
 748type ResponseHandler = Box<dyn Send + Fn(Response)>;
 749
 750#[cfg(any(test, feature = "test-support"))]
 751pub struct FakeTransport {
 752    // for sending fake response back from adapter side
 753    request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
 754    // for reverse request responses
 755    response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
 756    message_handler: Option<Task<Result<()>>>,
 757    kind: FakeTransportKind,
 758}
 759
 760#[cfg(any(test, feature = "test-support"))]
 761pub enum FakeTransportKind {
 762    Stdio {
 763        stdin_writer: Option<PipeWriter>,
 764        stdout_reader: Option<PipeReader>,
 765    },
 766    Tcp {
 767        connection: TcpArguments,
 768        executor: BackgroundExecutor,
 769    },
 770}
 771
 772#[cfg(any(test, feature = "test-support"))]
 773impl FakeTransport {
 774    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
 775    where
 776        F: 'static
 777            + Send
 778            + FnMut(u64, R::Arguments) -> RequestHandling<Result<R::Response, ErrorResponse>>,
 779    {
 780        self.request_handlers.lock().insert(
 781            R::COMMAND,
 782            Box::new(move |seq, args| {
 783                let result = handler(seq, serde_json::from_value(args).unwrap());
 784                let RequestHandling::Respond(response) = result else {
 785                    return RequestHandling::Exit;
 786                };
 787                let response = match response {
 788                    Ok(response) => Response {
 789                        seq: seq + 1,
 790                        request_seq: seq,
 791                        success: true,
 792                        command: R::COMMAND.into(),
 793                        body: Some(serde_json::to_value(response).unwrap()),
 794                        message: None,
 795                    },
 796                    Err(response) => Response {
 797                        seq: seq + 1,
 798                        request_seq: seq,
 799                        success: false,
 800                        command: R::COMMAND.into(),
 801                        body: Some(serde_json::to_value(response).unwrap()),
 802                        message: None,
 803                    },
 804                };
 805                RequestHandling::Respond(response)
 806            }),
 807        );
 808    }
 809
 810    pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
 811    where
 812        F: 'static + Send + Fn(Response),
 813    {
 814        self.response_handlers
 815            .lock()
 816            .insert(R::COMMAND, Box::new(handler));
 817    }
 818
 819    async fn start_tcp(connection: TcpArguments, cx: &mut AsyncApp) -> Result<Self> {
 820        Ok(Self {
 821            request_handlers: Arc::new(Mutex::new(HashMap::default())),
 822            response_handlers: Arc::new(Mutex::new(HashMap::default())),
 823            message_handler: None,
 824            kind: FakeTransportKind::Tcp {
 825                connection,
 826                executor: cx.background_executor().clone(),
 827            },
 828        })
 829    }
 830
 831    async fn handle_messages(
 832        request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
 833        response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
 834        stdin_reader: PipeReader,
 835        stdout_writer: PipeWriter,
 836    ) -> Result<()> {
 837        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
 838        use serde_json::json;
 839
 840        let mut reader = BufReader::new(stdin_reader);
 841        let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
 842        let mut buffer = String::new();
 843
 844        loop {
 845            match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None).await {
 846                ConnectionResult::Timeout => {
 847                    anyhow::bail!("Timed out when connecting to debugger");
 848                }
 849                ConnectionResult::ConnectionReset => {
 850                    log::info!("Debugger closed the connection");
 851                    break Ok(());
 852                }
 853                ConnectionResult::Result(Err(e)) => break Err(e),
 854                ConnectionResult::Result(Ok(message)) => {
 855                    match message {
 856                        Message::Request(request) => {
 857                            // redirect reverse requests to stdout writer/reader
 858                            if request.command == RunInTerminal::COMMAND
 859                                || request.command == StartDebugging::COMMAND
 860                            {
 861                                let message =
 862                                    serde_json::to_string(&Message::Request(request)).unwrap();
 863
 864                                let mut writer = stdout_writer.lock().await;
 865                                writer
 866                                    .write_all(
 867                                        TransportDelegate::build_rpc_message(message).as_bytes(),
 868                                    )
 869                                    .await
 870                                    .unwrap();
 871                                writer.flush().await.unwrap();
 872                            } else {
 873                                let response = if let Some(handle) =
 874                                    request_handlers.lock().get_mut(request.command.as_str())
 875                                {
 876                                    handle(request.seq, request.arguments.unwrap_or(json!({})))
 877                                } else {
 878                                    panic!("No request handler for {}", request.command);
 879                                };
 880                                let response = match response {
 881                                    RequestHandling::Respond(response) => response,
 882                                    RequestHandling::Exit => {
 883                                        break Err(anyhow!("exit in response to request"));
 884                                    }
 885                                };
 886                                let success = response.success;
 887                                let message =
 888                                    serde_json::to_string(&Message::Response(response)).unwrap();
 889
 890                                let mut writer = stdout_writer.lock().await;
 891                                writer
 892                                    .write_all(
 893                                        TransportDelegate::build_rpc_message(message).as_bytes(),
 894                                    )
 895                                    .await
 896                                    .unwrap();
 897
 898                                if request.command == dap_types::requests::Initialize::COMMAND
 899                                    && success
 900                                {
 901                                    let message = serde_json::to_string(&Message::Event(Box::new(
 902                                        dap_types::messages::Events::Initialized(Some(
 903                                            Default::default(),
 904                                        )),
 905                                    )))
 906                                    .unwrap();
 907                                    writer
 908                                        .write_all(
 909                                            TransportDelegate::build_rpc_message(message)
 910                                                .as_bytes(),
 911                                        )
 912                                        .await
 913                                        .unwrap();
 914                                }
 915
 916                                writer.flush().await.unwrap();
 917                            }
 918                        }
 919                        Message::Event(event) => {
 920                            let message = serde_json::to_string(&Message::Event(event)).unwrap();
 921
 922                            let mut writer = stdout_writer.lock().await;
 923                            writer
 924                                .write_all(TransportDelegate::build_rpc_message(message).as_bytes())
 925                                .await
 926                                .unwrap();
 927                            writer.flush().await.unwrap();
 928                        }
 929                        Message::Response(response) => {
 930                            if let Some(handle) =
 931                                response_handlers.lock().get(response.command.as_str())
 932                            {
 933                                handle(response);
 934                            } else {
 935                                log::error!("No response handler for {}", response.command);
 936                            }
 937                        }
 938                    }
 939                }
 940            }
 941        }
 942    }
 943
 944    async fn start_stdio(cx: &mut AsyncApp) -> Result<Self> {
 945        let (stdin_writer, stdin_reader) = async_pipe::pipe();
 946        let (stdout_writer, stdout_reader) = async_pipe::pipe();
 947        let kind = FakeTransportKind::Stdio {
 948            stdin_writer: Some(stdin_writer),
 949            stdout_reader: Some(stdout_reader),
 950        };
 951
 952        let mut this = Self {
 953            request_handlers: Arc::new(Mutex::new(HashMap::default())),
 954            response_handlers: Arc::new(Mutex::new(HashMap::default())),
 955            message_handler: None,
 956            kind,
 957        };
 958
 959        let request_handlers = this.request_handlers.clone();
 960        let response_handlers = this.response_handlers.clone();
 961
 962        this.message_handler = Some(cx.background_spawn(Self::handle_messages(
 963            request_handlers,
 964            response_handlers,
 965            stdin_reader,
 966            stdout_writer,
 967        )));
 968
 969        Ok(this)
 970    }
 971}
 972
 973#[cfg(any(test, feature = "test-support"))]
 974impl Transport for FakeTransport {
 975    fn tcp_arguments(&self) -> Option<TcpArguments> {
 976        match &self.kind {
 977            FakeTransportKind::Stdio { .. } => None,
 978            FakeTransportKind::Tcp { connection, .. } => Some(connection.clone()),
 979        }
 980    }
 981
 982    fn connect(
 983        &mut self,
 984    ) -> Task<
 985        Result<(
 986            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 987            Box<dyn AsyncRead + Unpin + Send + 'static>,
 988        )>,
 989    > {
 990        let result = match &mut self.kind {
 991            FakeTransportKind::Stdio {
 992                stdin_writer,
 993                stdout_reader,
 994            } => util::maybe!({
 995                Ok((
 996                    Box::new(stdin_writer.take().context("Cannot reconnect")?) as _,
 997                    Box::new(stdout_reader.take().context("Cannot reconnect")?) as _,
 998                ))
 999            }),
1000            FakeTransportKind::Tcp { executor, .. } => {
1001                let (stdin_writer, stdin_reader) = async_pipe::pipe();
1002                let (stdout_writer, stdout_reader) = async_pipe::pipe();
1003
1004                let request_handlers = self.request_handlers.clone();
1005                let response_handlers = self.response_handlers.clone();
1006
1007                self.message_handler = Some(executor.spawn(Self::handle_messages(
1008                    request_handlers,
1009                    response_handlers,
1010                    stdin_reader,
1011                    stdout_writer,
1012                )));
1013
1014                Ok((Box::new(stdin_writer) as _, Box::new(stdout_reader) as _))
1015            }
1016        };
1017        Task::ready(result)
1018    }
1019
1020    fn has_adapter_logs(&self) -> bool {
1021        false
1022    }
1023
1024    fn kill(&mut self) {
1025        self.message_handler.take();
1026    }
1027
1028    #[cfg(any(test, feature = "test-support"))]
1029    fn as_fake(&self) -> &FakeTransport {
1030        self
1031    }
1032}
1033
1034struct Child {
1035    process: smol::process::Child,
1036}
1037
1038impl std::ops::Deref for Child {
1039    type Target = smol::process::Child;
1040
1041    fn deref(&self) -> &Self::Target {
1042        &self.process
1043    }
1044}
1045
1046impl std::ops::DerefMut for Child {
1047    fn deref_mut(&mut self) -> &mut Self::Target {
1048        &mut self.process
1049    }
1050}
1051
1052impl Child {
1053    fn into_inner(self) -> smol::process::Child {
1054        self.process
1055    }
1056
1057    #[cfg(not(windows))]
1058    fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
1059        util::set_pre_exec_to_start_new_session(&mut command);
1060        let process = smol::process::Command::from(command)
1061            .stdin(stdin)
1062            .stdout(Stdio::piped())
1063            .stderr(Stdio::piped())
1064            .spawn()?;
1065        Ok(Self { process })
1066    }
1067
1068    #[cfg(windows)]
1069    fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
1070        // TODO(windows): create a job object and add the child process handle to it,
1071        // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
1072        let process = smol::process::Command::from(command)
1073            .stdin(stdin)
1074            .stdout(Stdio::piped())
1075            .stderr(Stdio::piped())
1076            .spawn()?;
1077        Ok(Self { process })
1078    }
1079
1080    #[cfg(not(windows))]
1081    fn kill(&mut self) {
1082        let pid = self.process.id();
1083        unsafe {
1084            libc::killpg(pid as i32, libc::SIGKILL);
1085        }
1086    }
1087
1088    #[cfg(windows)]
1089    fn kill(&mut self) {
1090        // TODO(windows): terminate the job object in kill
1091        let _ = self.process.kill();
1092    }
1093}