transport.rs

   1use anyhow::{Context as _, Result, anyhow, bail};
   2#[cfg(any(test, feature = "test-support"))]
   3use async_pipe::{PipeReader, PipeWriter};
   4use dap_types::{
   5    ErrorResponse,
   6    messages::{Message, Response},
   7};
   8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
   9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
  10use parking_lot::Mutex;
  11use proto::ErrorExt;
  12use settings::Settings as _;
  13use smallvec::SmallVec;
  14use smol::{
  15    channel::{Receiver, Sender, unbounded},
  16    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
  17    net::{TcpListener, TcpStream},
  18};
  19use std::{
  20    collections::HashMap,
  21    net::{Ipv4Addr, SocketAddrV4},
  22    process::Stdio,
  23    sync::Arc,
  24    time::Duration,
  25};
  26use task::TcpArgumentsTemplate;
  27use util::{ConnectionResult, ResultExt, process::Child};
  28
  29use crate::{
  30    adapters::{DebugAdapterBinary, TcpArguments},
  31    client::DapMessageHandler,
  32    debugger_settings::DebuggerSettings,
  33};
  34
  35pub(crate) type IoMessage = str;
  36pub(crate) type Command = str;
  37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
  38
  39#[derive(PartialEq, Eq, Clone, Copy)]
  40pub enum LogKind {
  41    Adapter,
  42    Rpc,
  43}
  44
  45#[derive(Clone, Copy)]
  46pub enum IoKind {
  47    StdIn,
  48    StdOut,
  49    StdErr,
  50}
  51
  52#[cfg(any(test, feature = "test-support"))]
  53pub enum RequestHandling<T> {
  54    Respond(T),
  55    Exit,
  56}
  57
  58type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
  59
  60pub trait Transport: Send + Sync {
  61    fn has_adapter_logs(&self) -> bool;
  62    fn tcp_arguments(&self) -> Option<TcpArguments>;
  63    fn connect(
  64        &mut self,
  65    ) -> Task<
  66        Result<(
  67            Box<dyn AsyncWrite + Unpin + Send + 'static>,
  68            Box<dyn AsyncRead + Unpin + Send + 'static>,
  69        )>,
  70    >;
  71    fn kill(&mut self);
  72    #[cfg(any(test, feature = "test-support"))]
  73    fn as_fake(&self) -> &FakeTransport {
  74        unreachable!()
  75    }
  76}
  77
  78async fn start(
  79    binary: &DebugAdapterBinary,
  80    log_handlers: LogHandlers,
  81    cx: &mut AsyncApp,
  82) -> Result<Box<dyn Transport>> {
  83    #[cfg(any(test, feature = "test-support"))]
  84    if cfg!(any(test, feature = "test-support")) {
  85        if let Some(connection) = binary.connection.clone() {
  86            return Ok(Box::new(FakeTransport::start_tcp(connection, cx).await?));
  87        } else {
  88            return Ok(Box::new(FakeTransport::start_stdio(cx).await?));
  89        }
  90    }
  91
  92    if binary.connection.is_some() {
  93        Ok(Box::new(
  94            TcpTransport::start(binary, log_handlers, cx).await?,
  95        ))
  96    } else {
  97        Ok(Box::new(
  98            StdioTransport::start(binary, log_handlers, cx).await?,
  99        ))
 100    }
 101}
 102
 103pub(crate) struct PendingRequests {
 104    inner: Option<HashMap<u64, oneshot::Sender<Result<Response>>>>,
 105}
 106
 107impl PendingRequests {
 108    fn new() -> Self {
 109        Self {
 110            inner: Some(HashMap::default()),
 111        }
 112    }
 113
 114    fn flush(&mut self, e: anyhow::Error) {
 115        let Some(inner) = self.inner.as_mut() else {
 116            return;
 117        };
 118        for (_, sender) in inner.drain() {
 119            sender.send(Err(e.cloned())).ok();
 120        }
 121    }
 122
 123    pub(crate) fn insert(
 124        &mut self,
 125        sequence_id: u64,
 126        callback_tx: oneshot::Sender<Result<Response>>,
 127    ) -> anyhow::Result<()> {
 128        let Some(inner) = self.inner.as_mut() else {
 129            bail!("client is closed")
 130        };
 131        inner.insert(sequence_id, callback_tx);
 132        Ok(())
 133    }
 134
 135    pub(crate) fn remove(
 136        &mut self,
 137        sequence_id: u64,
 138    ) -> anyhow::Result<Option<oneshot::Sender<Result<Response>>>> {
 139        let Some(inner) = self.inner.as_mut() else {
 140            bail!("client is closed");
 141        };
 142        Ok(inner.remove(&sequence_id))
 143    }
 144
 145    pub(crate) fn shutdown(&mut self) {
 146        self.flush(anyhow!("transport shutdown"));
 147        self.inner = None;
 148    }
 149}
 150
 151pub(crate) struct TransportDelegate {
 152    log_handlers: LogHandlers,
 153    pub(crate) pending_requests: Arc<Mutex<PendingRequests>>,
 154    pub(crate) transport: Mutex<Box<dyn Transport>>,
 155    pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
 156    tasks: Mutex<Vec<Task<()>>>,
 157}
 158
 159impl TransportDelegate {
 160    pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
 161        let log_handlers: LogHandlers = Default::default();
 162        let transport = start(binary, log_handlers.clone(), cx).await?;
 163        Ok(Self {
 164            transport: Mutex::new(transport),
 165            log_handlers,
 166            server_tx: Default::default(),
 167            pending_requests: Arc::new(Mutex::new(PendingRequests::new())),
 168            tasks: Default::default(),
 169        })
 170    }
 171
 172    pub async fn connect(
 173        &self,
 174        message_handler: DapMessageHandler,
 175        cx: &mut AsyncApp,
 176    ) -> Result<()> {
 177        let (server_tx, client_rx) = unbounded::<Message>();
 178        self.tasks.lock().clear();
 179
 180        let log_dap_communications =
 181            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications);
 182
 183        let connect = self.transport.lock().connect();
 184        let (input, output) = connect.await?;
 185
 186        let log_handler = if log_dap_communications {
 187            Some(self.log_handlers.clone())
 188        } else {
 189            None
 190        };
 191
 192        let pending_requests = self.pending_requests.clone();
 193        let output_log_handler = log_handler.clone();
 194        {
 195            let mut tasks = self.tasks.lock();
 196            tasks.push(cx.background_spawn(async move {
 197                match Self::recv_from_server(
 198                    output,
 199                    message_handler,
 200                    pending_requests.clone(),
 201                    output_log_handler,
 202                )
 203                .await
 204                {
 205                    Ok(()) => {
 206                        pending_requests
 207                            .lock()
 208                            .flush(anyhow!("debugger shutdown unexpectedly"));
 209                    }
 210                    Err(e) => {
 211                        pending_requests.lock().flush(e);
 212                    }
 213                }
 214            }));
 215
 216            tasks.push(cx.background_spawn(async move {
 217                match Self::send_to_server(input, client_rx, log_handler).await {
 218                    Ok(()) => {}
 219                    Err(e) => log::error!("Error handling debugger input: {e}"),
 220                }
 221            }));
 222        }
 223
 224        *self.server_tx.lock().await = Some(server_tx.clone());
 225
 226        Ok(())
 227    }
 228
 229    pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
 230        self.transport.lock().tcp_arguments()
 231    }
 232
 233    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
 234        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
 235            server_tx.send(message).await.context("sending message")
 236        } else {
 237            anyhow::bail!("Server tx already dropped")
 238        }
 239    }
 240
 241    async fn handle_adapter_log(
 242        stdout: impl AsyncRead + Unpin + Send + 'static,
 243        iokind: IoKind,
 244        log_handlers: LogHandlers,
 245    ) {
 246        let mut reader = BufReader::new(stdout);
 247        let mut line = String::new();
 248
 249        loop {
 250            line.truncate(0);
 251
 252            match reader.read_line(&mut line).await {
 253                Ok(0) => break,
 254                Ok(_) => {}
 255                Err(e) => {
 256                    log::debug!("handle_adapter_log: {}", e);
 257                    break;
 258                }
 259            }
 260
 261            // Clean up logs by trimming unnecessary whitespace/newlines before inserting into log.
 262            let line = line.trim();
 263
 264            log::debug!("stderr: {line}");
 265
 266            for (kind, handler) in log_handlers.lock().iter_mut() {
 267                if matches!(kind, LogKind::Adapter) {
 268                    handler(iokind, None, line);
 269                }
 270            }
 271        }
 272    }
 273
 274    fn build_rpc_message(message: String) -> String {
 275        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
 276    }
 277
 278    async fn send_to_server<Stdin>(
 279        mut server_stdin: Stdin,
 280        client_rx: Receiver<Message>,
 281        log_handlers: Option<LogHandlers>,
 282    ) -> Result<()>
 283    where
 284        Stdin: AsyncWrite + Unpin + Send + 'static,
 285    {
 286        let result = loop {
 287            match client_rx.recv().await {
 288                Ok(message) => {
 289                    let command = match &message {
 290                        Message::Request(request) => Some(request.command.as_str()),
 291                        Message::Response(response) => Some(response.command.as_str()),
 292                        _ => None,
 293                    };
 294
 295                    let message = match serde_json::to_string(&message) {
 296                        Ok(message) => message,
 297                        Err(e) => break Err(e.into()),
 298                    };
 299
 300                    if let Some(log_handlers) = log_handlers.as_ref() {
 301                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
 302                            if matches!(kind, LogKind::Rpc) {
 303                                log_handler(IoKind::StdIn, command, &message);
 304                            }
 305                        }
 306                    }
 307
 308                    if let Err(e) = server_stdin
 309                        .write_all(Self::build_rpc_message(message).as_bytes())
 310                        .await
 311                    {
 312                        break Err(e.into());
 313                    }
 314
 315                    if let Err(e) = server_stdin.flush().await {
 316                        break Err(e.into());
 317                    }
 318                }
 319                Err(error) => break Err(error.into()),
 320            }
 321        };
 322
 323        log::debug!("Handle adapter input dropped");
 324
 325        result
 326    }
 327
 328    async fn recv_from_server<Stdout>(
 329        server_stdout: Stdout,
 330        mut message_handler: DapMessageHandler,
 331        pending_requests: Arc<Mutex<PendingRequests>>,
 332        log_handlers: Option<LogHandlers>,
 333    ) -> Result<()>
 334    where
 335        Stdout: AsyncRead + Unpin + Send + 'static,
 336    {
 337        let mut recv_buffer = String::new();
 338        let mut reader = BufReader::new(server_stdout);
 339
 340        let result = loop {
 341            let result =
 342                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
 343                    .await;
 344            match result {
 345                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
 346                ConnectionResult::ConnectionReset => {
 347                    log::info!("Debugger closed the connection");
 348                    return Ok(());
 349                }
 350                ConnectionResult::Result(Ok(Message::Response(res))) => {
 351                    let tx = pending_requests.lock().remove(res.request_seq)?;
 352                    if let Some(tx) = tx {
 353                        if let Err(e) = tx.send(Self::process_response(res)) {
 354                            log::trace!("Did not send response `{:?}` for a cancelled", e);
 355                        }
 356                    } else {
 357                        message_handler(Message::Response(res))
 358                    }
 359                }
 360                ConnectionResult::Result(Ok(message)) => message_handler(message),
 361                ConnectionResult::Result(Err(e)) => break Err(e),
 362            }
 363        };
 364
 365        log::debug!("Handle adapter output dropped");
 366
 367        result
 368    }
 369
 370    fn process_response(response: Response) -> Result<Response> {
 371        if response.success {
 372            Ok(response)
 373        } else {
 374            if let Some(error_message) = response
 375                .body
 376                .clone()
 377                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
 378                .and_then(|response| response.error.map(|msg| msg.format))
 379                .or_else(|| response.message.clone())
 380            {
 381                anyhow::bail!(error_message);
 382            };
 383
 384            anyhow::bail!(
 385                "Received error response from adapter. Response: {:?}",
 386                response
 387            );
 388        }
 389    }
 390
 391    async fn receive_server_message<Stdout>(
 392        reader: &mut BufReader<Stdout>,
 393        buffer: &mut String,
 394        log_handlers: Option<&LogHandlers>,
 395    ) -> ConnectionResult<Message>
 396    where
 397        Stdout: AsyncRead + Unpin + Send + 'static,
 398    {
 399        let mut content_length = None;
 400        loop {
 401            buffer.truncate(0);
 402            match reader.read_line(buffer).await {
 403                Ok(0) => return ConnectionResult::ConnectionReset,
 404                Ok(_) => {}
 405                Err(e) => return ConnectionResult::Result(Err(e.into())),
 406            };
 407
 408            if buffer == "\r\n" {
 409                break;
 410            }
 411
 412            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
 413                match value.parse().context("invalid content length") {
 414                    Ok(length) => content_length = Some(length),
 415                    Err(e) => return ConnectionResult::Result(Err(e)),
 416                }
 417            }
 418        }
 419
 420        let content_length = match content_length.context("missing content length") {
 421            Ok(length) => length,
 422            Err(e) => return ConnectionResult::Result(Err(e)),
 423        };
 424
 425        let mut content = vec![0; content_length];
 426        if let Err(e) = reader
 427            .read_exact(&mut content)
 428            .await
 429            .with_context(|| "reading after a loop")
 430        {
 431            return ConnectionResult::Result(Err(e));
 432        }
 433
 434        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
 435            Ok(str) => str,
 436            Err(e) => return ConnectionResult::Result(Err(e)),
 437        };
 438
 439        let message =
 440            serde_json::from_str::<Message>(message_str).context("deserializing server message");
 441
 442        if let Some(log_handlers) = log_handlers {
 443            let command = match &message {
 444                Ok(Message::Request(request)) => Some(request.command.as_str()),
 445                Ok(Message::Response(response)) => Some(response.command.as_str()),
 446                _ => None,
 447            };
 448
 449            for (kind, log_handler) in log_handlers.lock().iter_mut() {
 450                if matches!(kind, LogKind::Rpc) {
 451                    log_handler(IoKind::StdOut, command, message_str);
 452                }
 453            }
 454        }
 455
 456        ConnectionResult::Result(message)
 457    }
 458
 459    pub fn has_adapter_logs(&self) -> bool {
 460        self.transport.lock().has_adapter_logs()
 461    }
 462
 463    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
 464    where
 465        F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
 466    {
 467        let mut log_handlers = self.log_handlers.lock();
 468        log_handlers.push((kind, Box::new(f)));
 469    }
 470}
 471
 472pub struct TcpTransport {
 473    executor: BackgroundExecutor,
 474    pub port: u16,
 475    pub host: Ipv4Addr,
 476    pub timeout: u64,
 477    process: Arc<Mutex<Option<Child>>>,
 478    _stderr_task: Option<Task<()>>,
 479    _stdout_task: Option<Task<()>>,
 480}
 481
 482impl TcpTransport {
 483    /// Get an open port to use with the tcp client when not supplied by debug config
 484    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
 485        if let Some(port) = host.port {
 486            Ok(port)
 487        } else {
 488            Self::unused_port(host.host()).await
 489        }
 490    }
 491
 492    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
 493        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
 494            .await?
 495            .local_addr()?
 496            .port())
 497    }
 498
 499    async fn start(
 500        binary: &DebugAdapterBinary,
 501        log_handlers: LogHandlers,
 502        cx: &mut AsyncApp,
 503    ) -> Result<Self> {
 504        let connection_args = binary
 505            .connection
 506            .as_ref()
 507            .context("No connection arguments provided")?;
 508
 509        let host = connection_args.host;
 510        let port = connection_args.port;
 511
 512        let mut process = None;
 513        let mut stdout_task = None;
 514        let mut stderr_task = None;
 515
 516        if let Some(command) = &binary.command {
 517            let mut command = util::command::new_std_command(&command);
 518
 519            if let Some(cwd) = &binary.cwd {
 520                command.current_dir(cwd);
 521            }
 522
 523            command.args(&binary.arguments);
 524            command.envs(&binary.envs);
 525
 526            let mut p = Child::spawn(command, Stdio::null(), Stdio::piped(), Stdio::piped())
 527                .with_context(|| "failed to start debug adapter.")?;
 528
 529            stdout_task = p.stdout.take().map(|stdout| {
 530                cx.background_executor()
 531                    .spawn(TransportDelegate::handle_adapter_log(
 532                        stdout,
 533                        IoKind::StdOut,
 534                        log_handlers.clone(),
 535                    ))
 536            });
 537            stderr_task = p.stderr.take().map(|stderr| {
 538                cx.background_executor()
 539                    .spawn(TransportDelegate::handle_adapter_log(
 540                        stderr,
 541                        IoKind::StdErr,
 542                        log_handlers,
 543                    ))
 544            });
 545            process = Some(p);
 546        };
 547
 548        let timeout = connection_args
 549            .timeout
 550            .unwrap_or_else(|| cx.update(|cx| DebuggerSettings::get_global(cx).timeout));
 551
 552        log::info!(
 553            "Debug adapter has connected to TCP server {}:{}",
 554            host,
 555            port
 556        );
 557
 558        let this = Self {
 559            executor: cx.background_executor().clone(),
 560            port,
 561            host,
 562            process: Arc::new(Mutex::new(process)),
 563            timeout,
 564            _stdout_task: stdout_task,
 565            _stderr_task: stderr_task,
 566        };
 567
 568        Ok(this)
 569    }
 570}
 571
 572impl Transport for TcpTransport {
 573    fn has_adapter_logs(&self) -> bool {
 574        true
 575    }
 576
 577    fn kill(&mut self) {
 578        if let Some(process) = &mut *self.process.lock() {
 579            process.kill().log_err();
 580        }
 581    }
 582
 583    fn tcp_arguments(&self) -> Option<TcpArguments> {
 584        Some(TcpArguments {
 585            host: self.host,
 586            port: self.port,
 587            timeout: Some(self.timeout),
 588        })
 589    }
 590
 591    fn connect(
 592        &mut self,
 593    ) -> Task<
 594        Result<(
 595            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 596            Box<dyn AsyncRead + Unpin + Send + 'static>,
 597        )>,
 598    > {
 599        let executor = self.executor.clone();
 600        let timeout = self.timeout;
 601        let address = SocketAddrV4::new(self.host, self.port);
 602        let process = self.process.clone();
 603        executor.clone().spawn(async move {
 604            select! {
 605                _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
 606                    anyhow::bail!("Connection to TCP DAP timeout {address}");
 607                },
 608                result = executor.clone().spawn(async move {
 609                    loop {
 610                        match TcpStream::connect(address).await {
 611                            Ok(stream) => {
 612                                let (read, write) = stream.split();
 613                                return Ok((Box::new(write) as _, Box::new(read) as _))
 614                            },
 615                            Err(_) => {
 616                                let has_process = process.lock().is_some();
 617                                if has_process {
 618                                    let status = process.lock().as_mut().unwrap().try_status();
 619                                    if let Ok(Some(_)) = status {
 620                                        let process = process.lock().take().unwrap().into_inner();
 621                                        let output = process.output().await?;
 622                                        let output = if output.stderr.is_empty() {
 623                                            String::from_utf8_lossy(&output.stdout).to_string()
 624                                        } else {
 625                                            String::from_utf8_lossy(&output.stderr).to_string()
 626                                        };
 627                                        anyhow::bail!("{output}\nerror: process exited before debugger attached.");
 628                                    }
 629                                }
 630
 631                                executor.timer(Duration::from_millis(100)).await;
 632                            }
 633                        }
 634                    }
 635                }).fuse() => result
 636            }
 637        })
 638    }
 639}
 640
 641impl Drop for TcpTransport {
 642    fn drop(&mut self) {
 643        if let Some(mut p) = self.process.lock().take() {
 644            p.kill().log_err();
 645        }
 646    }
 647}
 648
 649pub struct StdioTransport {
 650    process: Mutex<Child>,
 651    _stderr_task: Option<Task<()>>,
 652}
 653
 654impl StdioTransport {
 655    // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
 656    async fn start(
 657        binary: &DebugAdapterBinary,
 658        log_handlers: LogHandlers,
 659        cx: &mut AsyncApp,
 660    ) -> Result<Self> {
 661        let Some(binary_command) = &binary.command else {
 662            bail!(
 663                "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
 664            );
 665        };
 666        let mut command = util::command::new_std_command(&binary_command);
 667
 668        if let Some(cwd) = &binary.cwd {
 669            command.current_dir(cwd);
 670        }
 671
 672        command.args(&binary.arguments);
 673        command.envs(&binary.envs);
 674
 675        let mut process = Child::spawn(command, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
 676
 677        let _stderr_task = process.stderr.take().map(|stderr| {
 678            cx.background_spawn(TransportDelegate::handle_adapter_log(
 679                stderr,
 680                IoKind::StdErr,
 681                log_handlers,
 682            ))
 683        });
 684
 685        let process = Mutex::new(process);
 686
 687        Ok(Self {
 688            process,
 689            _stderr_task,
 690        })
 691    }
 692}
 693
 694impl Transport for StdioTransport {
 695    fn has_adapter_logs(&self) -> bool {
 696        true
 697    }
 698
 699    fn kill(&mut self) {
 700        self.process.lock().kill().log_err();
 701    }
 702
 703    fn connect(
 704        &mut self,
 705    ) -> Task<
 706        Result<(
 707            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 708            Box<dyn AsyncRead + Unpin + Send + 'static>,
 709        )>,
 710    > {
 711        let result = util::maybe!({
 712            let mut process = self.process.lock();
 713            Ok((
 714                Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
 715                Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
 716            ))
 717        });
 718        Task::ready(result)
 719    }
 720
 721    fn tcp_arguments(&self) -> Option<TcpArguments> {
 722        None
 723    }
 724}
 725
 726impl Drop for StdioTransport {
 727    fn drop(&mut self) {
 728        self.process.lock().kill().log_err();
 729    }
 730}
 731
 732#[cfg(any(test, feature = "test-support"))]
 733type RequestHandler = Box<dyn Send + FnMut(u64, serde_json::Value) -> RequestHandling<Response>>;
 734
 735#[cfg(any(test, feature = "test-support"))]
 736type ResponseHandler = Box<dyn Send + Fn(Response)>;
 737
 738#[cfg(any(test, feature = "test-support"))]
 739pub struct FakeTransport {
 740    // for sending fake response back from adapter side
 741    request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
 742    // for reverse request responses
 743    response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
 744    message_handler: Option<Task<Result<()>>>,
 745    kind: FakeTransportKind,
 746}
 747
 748#[cfg(any(test, feature = "test-support"))]
 749pub enum FakeTransportKind {
 750    Stdio {
 751        stdin_writer: Option<PipeWriter>,
 752        stdout_reader: Option<PipeReader>,
 753    },
 754    Tcp {
 755        connection: TcpArguments,
 756        executor: BackgroundExecutor,
 757    },
 758}
 759
 760#[cfg(any(test, feature = "test-support"))]
 761impl FakeTransport {
 762    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
 763    where
 764        F: 'static
 765            + Send
 766            + FnMut(u64, R::Arguments) -> RequestHandling<Result<R::Response, ErrorResponse>>,
 767    {
 768        self.request_handlers.lock().insert(
 769            R::COMMAND,
 770            Box::new(move |seq, args| {
 771                let result = handler(seq, serde_json::from_value(args).unwrap());
 772                let RequestHandling::Respond(response) = result else {
 773                    return RequestHandling::Exit;
 774                };
 775                let response = match response {
 776                    Ok(response) => Response {
 777                        seq: seq + 1,
 778                        request_seq: seq,
 779                        success: true,
 780                        command: R::COMMAND.into(),
 781                        body: Some(serde_json::to_value(response).unwrap()),
 782                        message: None,
 783                    },
 784                    Err(response) => Response {
 785                        seq: seq + 1,
 786                        request_seq: seq,
 787                        success: false,
 788                        command: R::COMMAND.into(),
 789                        body: Some(serde_json::to_value(response).unwrap()),
 790                        message: None,
 791                    },
 792                };
 793                RequestHandling::Respond(response)
 794            }),
 795        );
 796    }
 797
 798    pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
 799    where
 800        F: 'static + Send + Fn(Response),
 801    {
 802        self.response_handlers
 803            .lock()
 804            .insert(R::COMMAND, Box::new(handler));
 805    }
 806
 807    async fn start_tcp(connection: TcpArguments, cx: &mut AsyncApp) -> Result<Self> {
 808        Ok(Self {
 809            request_handlers: Arc::new(Mutex::new(HashMap::default())),
 810            response_handlers: Arc::new(Mutex::new(HashMap::default())),
 811            message_handler: None,
 812            kind: FakeTransportKind::Tcp {
 813                connection,
 814                executor: cx.background_executor().clone(),
 815            },
 816        })
 817    }
 818
 819    async fn handle_messages(
 820        request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
 821        response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
 822        stdin_reader: PipeReader,
 823        stdout_writer: PipeWriter,
 824    ) -> Result<()> {
 825        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
 826        use serde_json::json;
 827
 828        let mut reader = BufReader::new(stdin_reader);
 829        let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
 830        let mut buffer = String::new();
 831
 832        loop {
 833            match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None).await {
 834                ConnectionResult::Timeout => {
 835                    anyhow::bail!("Timed out when connecting to debugger");
 836                }
 837                ConnectionResult::ConnectionReset => {
 838                    log::info!("Debugger closed the connection");
 839                    break Ok(());
 840                }
 841                ConnectionResult::Result(Err(e)) => break Err(e),
 842                ConnectionResult::Result(Ok(message)) => {
 843                    match message {
 844                        Message::Request(request) => {
 845                            // redirect reverse requests to stdout writer/reader
 846                            if request.command == RunInTerminal::COMMAND
 847                                || request.command == StartDebugging::COMMAND
 848                            {
 849                                let message =
 850                                    serde_json::to_string(&Message::Request(request)).unwrap();
 851
 852                                let mut writer = stdout_writer.lock().await;
 853                                writer
 854                                    .write_all(
 855                                        TransportDelegate::build_rpc_message(message).as_bytes(),
 856                                    )
 857                                    .await
 858                                    .unwrap();
 859                                writer.flush().await.unwrap();
 860                            } else {
 861                                let response = if let Some(handle) =
 862                                    request_handlers.lock().get_mut(request.command.as_str())
 863                                {
 864                                    handle(request.seq, request.arguments.unwrap_or(json!({})))
 865                                } else {
 866                                    panic!("No request handler for {}", request.command);
 867                                };
 868                                let response = match response {
 869                                    RequestHandling::Respond(response) => response,
 870                                    RequestHandling::Exit => {
 871                                        break Err(anyhow!("exit in response to request"));
 872                                    }
 873                                };
 874                                let success = response.success;
 875                                let message =
 876                                    serde_json::to_string(&Message::Response(response)).unwrap();
 877
 878                                let mut writer = stdout_writer.lock().await;
 879                                writer
 880                                    .write_all(
 881                                        TransportDelegate::build_rpc_message(message).as_bytes(),
 882                                    )
 883                                    .await
 884                                    .unwrap();
 885
 886                                if request.command == dap_types::requests::Initialize::COMMAND
 887                                    && success
 888                                {
 889                                    let message = serde_json::to_string(&Message::Event(Box::new(
 890                                        dap_types::messages::Events::Initialized(Some(
 891                                            Default::default(),
 892                                        )),
 893                                    )))
 894                                    .unwrap();
 895                                    writer
 896                                        .write_all(
 897                                            TransportDelegate::build_rpc_message(message)
 898                                                .as_bytes(),
 899                                        )
 900                                        .await
 901                                        .unwrap();
 902                                }
 903
 904                                writer.flush().await.unwrap();
 905                            }
 906                        }
 907                        Message::Event(event) => {
 908                            let message = serde_json::to_string(&Message::Event(event)).unwrap();
 909
 910                            let mut writer = stdout_writer.lock().await;
 911                            writer
 912                                .write_all(TransportDelegate::build_rpc_message(message).as_bytes())
 913                                .await
 914                                .unwrap();
 915                            writer.flush().await.unwrap();
 916                        }
 917                        Message::Response(response) => {
 918                            if let Some(handle) =
 919                                response_handlers.lock().get(response.command.as_str())
 920                            {
 921                                handle(response);
 922                            } else {
 923                                log::error!("No response handler for {}", response.command);
 924                            }
 925                        }
 926                    }
 927                }
 928            }
 929        }
 930    }
 931
 932    async fn start_stdio(cx: &mut AsyncApp) -> Result<Self> {
 933        let (stdin_writer, stdin_reader) = async_pipe::pipe();
 934        let (stdout_writer, stdout_reader) = async_pipe::pipe();
 935        let kind = FakeTransportKind::Stdio {
 936            stdin_writer: Some(stdin_writer),
 937            stdout_reader: Some(stdout_reader),
 938        };
 939
 940        let mut this = Self {
 941            request_handlers: Arc::new(Mutex::new(HashMap::default())),
 942            response_handlers: Arc::new(Mutex::new(HashMap::default())),
 943            message_handler: None,
 944            kind,
 945        };
 946
 947        let request_handlers = this.request_handlers.clone();
 948        let response_handlers = this.response_handlers.clone();
 949
 950        this.message_handler = Some(cx.background_spawn(Self::handle_messages(
 951            request_handlers,
 952            response_handlers,
 953            stdin_reader,
 954            stdout_writer,
 955        )));
 956
 957        Ok(this)
 958    }
 959}
 960
 961#[cfg(any(test, feature = "test-support"))]
 962impl Transport for FakeTransport {
 963    fn tcp_arguments(&self) -> Option<TcpArguments> {
 964        match &self.kind {
 965            FakeTransportKind::Stdio { .. } => None,
 966            FakeTransportKind::Tcp { connection, .. } => Some(connection.clone()),
 967        }
 968    }
 969
 970    fn connect(
 971        &mut self,
 972    ) -> Task<
 973        Result<(
 974            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 975            Box<dyn AsyncRead + Unpin + Send + 'static>,
 976        )>,
 977    > {
 978        let result = match &mut self.kind {
 979            FakeTransportKind::Stdio {
 980                stdin_writer,
 981                stdout_reader,
 982            } => util::maybe!({
 983                Ok((
 984                    Box::new(stdin_writer.take().context("Cannot reconnect")?) as _,
 985                    Box::new(stdout_reader.take().context("Cannot reconnect")?) as _,
 986                ))
 987            }),
 988            FakeTransportKind::Tcp { executor, .. } => {
 989                let (stdin_writer, stdin_reader) = async_pipe::pipe();
 990                let (stdout_writer, stdout_reader) = async_pipe::pipe();
 991
 992                let request_handlers = self.request_handlers.clone();
 993                let response_handlers = self.response_handlers.clone();
 994
 995                self.message_handler = Some(executor.spawn(Self::handle_messages(
 996                    request_handlers,
 997                    response_handlers,
 998                    stdin_reader,
 999                    stdout_writer,
1000                )));
1001
1002                Ok((Box::new(stdin_writer) as _, Box::new(stdout_reader) as _))
1003            }
1004        };
1005        Task::ready(result)
1006    }
1007
1008    fn has_adapter_logs(&self) -> bool {
1009        false
1010    }
1011
1012    fn kill(&mut self) {
1013        self.message_handler.take();
1014    }
1015
1016    #[cfg(any(test, feature = "test-support"))]
1017    fn as_fake(&self) -> &FakeTransport {
1018        self
1019    }
1020}