transport.rs

  1use anyhow::{Context as _, Result, bail};
  2use dap_types::{
  3    ErrorResponse,
  4    messages::{Message, Response},
  5};
  6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  7use gpui::{AppContext as _, AsyncApp, Task};
  8use settings::Settings as _;
  9use smallvec::SmallVec;
 10use smol::{
 11    channel::{Receiver, Sender, unbounded},
 12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 13    lock::Mutex,
 14    net::{TcpListener, TcpStream},
 15    process::Child,
 16};
 17use std::{
 18    collections::HashMap,
 19    net::{Ipv4Addr, SocketAddrV4},
 20    process::Stdio,
 21    sync::Arc,
 22    time::Duration,
 23};
 24use task::TcpArgumentsTemplate;
 25use util::{ConnectionResult, ResultExt as _};
 26
 27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 28
 29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 30
 31#[derive(PartialEq, Eq, Clone, Copy)]
 32pub enum LogKind {
 33    Adapter,
 34    Rpc,
 35}
 36
 37pub enum IoKind {
 38    StdIn,
 39    StdOut,
 40    StdErr,
 41}
 42
 43pub struct TransportPipe {
 44    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 45    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 46    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 47    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 48}
 49
 50impl TransportPipe {
 51    pub fn new(
 52        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 53        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 54        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 55        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 56    ) -> Self {
 57        TransportPipe {
 58            input,
 59            output,
 60            stdout,
 61            stderr,
 62        }
 63    }
 64}
 65
 66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 68
 69pub enum Transport {
 70    Stdio(StdioTransport),
 71    Tcp(TcpTransport),
 72    #[cfg(any(test, feature = "test-support"))]
 73    Fake(FakeTransport),
 74}
 75
 76impl Transport {
 77    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 78        #[cfg(any(test, feature = "test-support"))]
 79        if cfg!(any(test, feature = "test-support")) {
 80            return FakeTransport::start(cx)
 81                .await
 82                .map(|(transports, fake)| (transports, Self::Fake(fake)));
 83        }
 84
 85        if binary.connection.is_some() {
 86            TcpTransport::start(binary, cx)
 87                .await
 88                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
 89        } else {
 90            StdioTransport::start(binary, cx)
 91                .await
 92                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
 93        }
 94    }
 95
 96    fn has_adapter_logs(&self) -> bool {
 97        match self {
 98            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
 99            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100            #[cfg(any(test, feature = "test-support"))]
101            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102        }
103    }
104
105    async fn kill(&self) -> Result<()> {
106        match self {
107            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109            #[cfg(any(test, feature = "test-support"))]
110            Transport::Fake(fake_transport) => fake_transport.kill().await,
111        }
112    }
113
114    #[cfg(any(test, feature = "test-support"))]
115    pub(crate) fn as_fake(&self) -> &FakeTransport {
116        match self {
117            Transport::Fake(fake_transport) => fake_transport,
118            _ => panic!("Not a fake transport layer"),
119        }
120    }
121}
122
123pub(crate) struct TransportDelegate {
124    log_handlers: LogHandlers,
125    current_requests: Requests,
126    pending_requests: Requests,
127    transport: Transport,
128    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129    _tasks: Vec<Task<()>>,
130}
131
132impl TransportDelegate {
133    pub(crate) async fn start(
134        binary: &DebugAdapterBinary,
135        cx: AsyncApp,
136    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138        let mut this = Self {
139            transport,
140            server_tx: Default::default(),
141            log_handlers: Default::default(),
142            current_requests: Default::default(),
143            pending_requests: Default::default(),
144            _tasks: Vec::new(),
145        };
146        let messages = this.start_handlers(transport_pipes, cx).await?;
147        Ok((messages, this))
148    }
149
150    async fn start_handlers(
151        &mut self,
152        mut params: TransportPipe,
153        cx: AsyncApp,
154    ) -> Result<(Receiver<Message>, Sender<Message>)> {
155        let (client_tx, server_rx) = unbounded::<Message>();
156        let (server_tx, client_rx) = unbounded::<Message>();
157
158        let log_dap_communications =
159            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161                .unwrap_or(false);
162
163        let log_handler = if log_dap_communications {
164            Some(self.log_handlers.clone())
165        } else {
166            None
167        };
168
169        let adapter_log_handler = log_handler.clone();
170        cx.update(|cx| {
171            if let Some(stdout) = params.stdout.take() {
172                self._tasks.push(cx.background_spawn(async move {
173                    match Self::handle_adapter_log(stdout, adapter_log_handler).await {
174                        ConnectionResult::Timeout => {
175                            log::error!("Timed out when handling debugger log");
176                        }
177                        ConnectionResult::ConnectionReset => {
178                            log::info!("Debugger logs connection closed");
179                        }
180                        ConnectionResult::Result(Ok(())) => {}
181                        ConnectionResult::Result(Err(e)) => {
182                            log::error!("Error handling debugger log: {e}");
183                        }
184                    }
185                }));
186            }
187
188            let pending_requests = self.pending_requests.clone();
189            let output_log_handler = log_handler.clone();
190            self._tasks.push(cx.background_spawn(async move {
191                match Self::handle_output(
192                    params.output,
193                    client_tx,
194                    pending_requests,
195                    output_log_handler,
196                )
197                .await
198                {
199                    Ok(()) => {}
200                    Err(e) => log::error!("Error handling debugger output: {e}"),
201                }
202            }));
203
204            if let Some(stderr) = params.stderr.take() {
205                let log_handlers = self.log_handlers.clone();
206                self._tasks.push(cx.background_spawn(async move {
207                    match Self::handle_error(stderr, log_handlers).await {
208                        ConnectionResult::Timeout => {
209                            log::error!("Timed out reading debugger error stream")
210                        }
211                        ConnectionResult::ConnectionReset => {
212                            log::info!("Debugger closed its error stream")
213                        }
214                        ConnectionResult::Result(Ok(())) => {}
215                        ConnectionResult::Result(Err(e)) => {
216                            log::error!("Error handling debugger error: {e}")
217                        }
218                    }
219                }));
220            }
221
222            let current_requests = self.current_requests.clone();
223            let pending_requests = self.pending_requests.clone();
224            let log_handler = log_handler.clone();
225            self._tasks.push(cx.background_spawn(async move {
226                match Self::handle_input(
227                    params.input,
228                    client_rx,
229                    current_requests,
230                    pending_requests,
231                    log_handler,
232                )
233                .await
234                {
235                    Ok(()) => {}
236                    Err(e) => log::error!("Error handling debugger input: {e}"),
237                }
238            }));
239        })?;
240
241        {
242            let mut lock = self.server_tx.lock().await;
243            *lock = Some(server_tx.clone());
244        }
245
246        Ok((server_rx, server_tx))
247    }
248
249    pub(crate) async fn add_pending_request(
250        &self,
251        sequence_id: u64,
252        request: oneshot::Sender<Result<Response>>,
253    ) {
254        let mut pending_requests = self.pending_requests.lock().await;
255        pending_requests.insert(sequence_id, request);
256    }
257
258    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
259        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
260            server_tx.send(message).await.context("sending message")
261        } else {
262            anyhow::bail!("Server tx already dropped")
263        }
264    }
265
266    async fn handle_adapter_log<Stdout>(
267        stdout: Stdout,
268        log_handlers: Option<LogHandlers>,
269    ) -> ConnectionResult<()>
270    where
271        Stdout: AsyncRead + Unpin + Send + 'static,
272    {
273        let mut reader = BufReader::new(stdout);
274        let mut line = String::new();
275
276        let result = loop {
277            line.truncate(0);
278
279            match reader
280                .read_line(&mut line)
281                .await
282                .context("reading adapter log line")
283            {
284                Ok(0) => break ConnectionResult::ConnectionReset,
285                Ok(_) => {}
286                Err(e) => break ConnectionResult::Result(Err(e)),
287            }
288
289            if let Some(log_handlers) = log_handlers.as_ref() {
290                for (kind, handler) in log_handlers.lock().iter_mut() {
291                    if matches!(kind, LogKind::Adapter) {
292                        handler(IoKind::StdOut, line.as_str());
293                    }
294                }
295            }
296        };
297
298        log::debug!("Handle adapter log dropped");
299
300        result
301    }
302
303    fn build_rpc_message(message: String) -> String {
304        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
305    }
306
307    async fn handle_input<Stdin>(
308        mut server_stdin: Stdin,
309        client_rx: Receiver<Message>,
310        current_requests: Requests,
311        pending_requests: Requests,
312        log_handlers: Option<LogHandlers>,
313    ) -> Result<()>
314    where
315        Stdin: AsyncWrite + Unpin + Send + 'static,
316    {
317        let result = loop {
318            match client_rx.recv().await {
319                Ok(message) => {
320                    if let Message::Request(request) = &message {
321                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
322                            pending_requests.lock().await.insert(request.seq, sender);
323                        }
324                    }
325
326                    let message = match serde_json::to_string(&message) {
327                        Ok(message) => message,
328                        Err(e) => break Err(e.into()),
329                    };
330
331                    if let Some(log_handlers) = log_handlers.as_ref() {
332                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
333                            if matches!(kind, LogKind::Rpc) {
334                                log_handler(IoKind::StdIn, &message);
335                            }
336                        }
337                    }
338
339                    if let Err(e) = server_stdin
340                        .write_all(Self::build_rpc_message(message).as_bytes())
341                        .await
342                    {
343                        break Err(e.into());
344                    }
345
346                    if let Err(e) = server_stdin.flush().await {
347                        break Err(e.into());
348                    }
349                }
350                Err(error) => break Err(error.into()),
351            }
352        };
353
354        log::debug!("Handle adapter input dropped");
355
356        result
357    }
358
359    async fn handle_output<Stdout>(
360        server_stdout: Stdout,
361        client_tx: Sender<Message>,
362        pending_requests: Requests,
363        log_handlers: Option<LogHandlers>,
364    ) -> Result<()>
365    where
366        Stdout: AsyncRead + Unpin + Send + 'static,
367    {
368        let mut recv_buffer = String::new();
369        let mut reader = BufReader::new(server_stdout);
370
371        let result = loop {
372            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
373                .await
374            {
375                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
376                ConnectionResult::ConnectionReset => {
377                    log::info!("Debugger closed the connection");
378                    return Ok(());
379                }
380                ConnectionResult::Result(Ok(Message::Response(res))) => {
381                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
382                        if let Err(e) = tx.send(Self::process_response(res)) {
383                            log::trace!("Did not send response `{:?}` for a cancelled", e);
384                        }
385                    } else {
386                        client_tx.send(Message::Response(res)).await?;
387                    }
388                }
389                ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
390                ConnectionResult::Result(Err(e)) => break Err(e),
391            }
392        };
393
394        drop(client_tx);
395        log::debug!("Handle adapter output dropped");
396
397        result
398    }
399
400    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
401    where
402        Stderr: AsyncRead + Unpin + Send + 'static,
403    {
404        log::debug!("Handle error started");
405        let mut buffer = String::new();
406
407        let mut reader = BufReader::new(stderr);
408
409        let result = loop {
410            match reader
411                .read_line(&mut buffer)
412                .await
413                .context("reading error log line")
414            {
415                Ok(0) => break ConnectionResult::ConnectionReset,
416                Ok(_) => {
417                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
418                        if matches!(kind, LogKind::Adapter) {
419                            log_handler(IoKind::StdErr, buffer.as_str());
420                        }
421                    }
422
423                    buffer.truncate(0);
424                }
425                Err(error) => break ConnectionResult::Result(Err(error)),
426            }
427        };
428
429        log::debug!("Handle adapter error dropped");
430
431        result
432    }
433
434    fn process_response(response: Response) -> Result<Response> {
435        if response.success {
436            Ok(response)
437        } else {
438            if let Some(error_message) = response
439                .body
440                .clone()
441                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
442                .and_then(|response| response.error.map(|msg| msg.format))
443                .or_else(|| response.message.clone())
444            {
445                anyhow::bail!(error_message);
446            };
447
448            anyhow::bail!(
449                "Received error response from adapter. Response: {:?}",
450                response
451            );
452        }
453    }
454
455    async fn receive_server_message<Stdout>(
456        reader: &mut BufReader<Stdout>,
457        buffer: &mut String,
458        log_handlers: Option<&LogHandlers>,
459    ) -> ConnectionResult<Message>
460    where
461        Stdout: AsyncRead + Unpin + Send + 'static,
462    {
463        let mut content_length = None;
464        loop {
465            buffer.truncate(0);
466
467            match reader
468                .read_line(buffer)
469                .await
470                .with_context(|| "reading a message from server")
471            {
472                Ok(0) => return ConnectionResult::ConnectionReset,
473                Ok(_) => {}
474                Err(e) => return ConnectionResult::Result(Err(e)),
475            };
476
477            if buffer == "\r\n" {
478                break;
479            }
480
481            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
482                match value.parse().context("invalid content length") {
483                    Ok(length) => content_length = Some(length),
484                    Err(e) => return ConnectionResult::Result(Err(e)),
485                }
486            }
487        }
488
489        let content_length = match content_length.context("missing content length") {
490            Ok(length) => length,
491            Err(e) => return ConnectionResult::Result(Err(e)),
492        };
493
494        let mut content = vec![0; content_length];
495        if let Err(e) = reader
496            .read_exact(&mut content)
497            .await
498            .with_context(|| "reading after a loop")
499        {
500            return ConnectionResult::Result(Err(e));
501        }
502
503        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
504            Ok(str) => str,
505            Err(e) => return ConnectionResult::Result(Err(e)),
506        };
507
508        if let Some(log_handlers) = log_handlers {
509            for (kind, log_handler) in log_handlers.lock().iter_mut() {
510                if matches!(kind, LogKind::Rpc) {
511                    log_handler(IoKind::StdOut, message_str);
512                }
513            }
514        }
515
516        ConnectionResult::Result(
517            serde_json::from_str::<Message>(message_str).context("deserializing server message"),
518        )
519    }
520
521    pub async fn shutdown(&self) -> Result<()> {
522        log::debug!("Start shutdown client");
523
524        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
525            server_tx.close();
526        }
527
528        let mut current_requests = self.current_requests.lock().await;
529        let mut pending_requests = self.pending_requests.lock().await;
530
531        current_requests.clear();
532        pending_requests.clear();
533
534        let _ = self.transport.kill().await.log_err();
535
536        drop(current_requests);
537        drop(pending_requests);
538
539        log::debug!("Shutdown client completed");
540
541        anyhow::Ok(())
542    }
543
544    pub fn has_adapter_logs(&self) -> bool {
545        self.transport.has_adapter_logs()
546    }
547
548    pub fn transport(&self) -> &Transport {
549        &self.transport
550    }
551
552    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
553    where
554        F: 'static + Send + FnMut(IoKind, &str),
555    {
556        let mut log_handlers = self.log_handlers.lock();
557        log_handlers.push((kind, Box::new(f)));
558    }
559}
560
561pub struct TcpTransport {
562    pub port: u16,
563    pub host: Ipv4Addr,
564    pub timeout: u64,
565    process: Mutex<Child>,
566}
567
568impl TcpTransport {
569    /// Get an open port to use with the tcp client when not supplied by debug config
570    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
571        if let Some(port) = host.port {
572            Ok(port)
573        } else {
574            Self::unused_port(host.host()).await
575        }
576    }
577
578    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
579        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
580            .await?
581            .local_addr()?
582            .port())
583    }
584
585    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
586        let connection_args = binary
587            .connection
588            .as_ref()
589            .context("No connection arguments provided")?;
590
591        let host = connection_args.host;
592        let port = connection_args.port;
593
594        let mut command = util::command::new_std_command(&binary.command);
595        util::set_pre_exec_to_start_new_session(&mut command);
596        let mut command = smol::process::Command::from(command);
597
598        if let Some(cwd) = &binary.cwd {
599            command.current_dir(cwd);
600        }
601
602        command.args(&binary.arguments);
603        command.envs(&binary.envs);
604
605        command
606            .stdin(Stdio::null())
607            .stdout(Stdio::piped())
608            .stderr(Stdio::piped())
609            .kill_on_drop(true);
610
611        let mut process = command
612            .spawn()
613            .with_context(|| "failed to start debug adapter.")?;
614
615        let address = SocketAddrV4::new(host, port);
616
617        let timeout = connection_args.timeout.unwrap_or_else(|| {
618            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
619                .unwrap_or(2000u64)
620        });
621
622        let (mut process, (rx, tx)) = select! {
623            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
624                anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
625            },
626            result = cx.spawn(async move |cx| {
627                loop {
628                    match TcpStream::connect(address).await {
629                        Ok(stream) => return Ok((process, stream.split())),
630                        Err(_) => {
631                            if let Ok(Some(_)) = process.try_status() {
632                                let output = process.output().await?;
633                                let output = if output.stderr.is_empty() {
634                                    String::from_utf8_lossy(&output.stdout).to_string()
635                                } else {
636                                    String::from_utf8_lossy(&output.stderr).to_string()
637                                };
638                                anyhow::bail!("{output}\nerror: process exited before debugger attached.");
639                            }
640                            cx.background_executor().timer(Duration::from_millis(100)).await;
641                        }
642                    }
643                }
644            }).fuse() => result?
645        };
646
647        log::info!(
648            "Debug adapter has connected to TCP server {}:{}",
649            host,
650            port
651        );
652        let stdout = process.stdout.take();
653        let stderr = process.stderr.take();
654
655        let this = Self {
656            port,
657            host,
658            process: Mutex::new(process),
659            timeout,
660        };
661
662        let pipe = TransportPipe::new(
663            Box::new(tx),
664            Box::new(BufReader::new(rx)),
665            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
666            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
667        );
668
669        Ok((pipe, this))
670    }
671
672    fn has_adapter_logs(&self) -> bool {
673        true
674    }
675
676    async fn kill(&self) -> Result<()> {
677        self.process.lock().await.kill()?;
678
679        Ok(())
680    }
681}
682
683pub struct StdioTransport {
684    process: Mutex<Child>,
685}
686
687impl StdioTransport {
688    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
689    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
690        let mut command = util::command::new_std_command(&binary.command);
691        util::set_pre_exec_to_start_new_session(&mut command);
692        let mut command = smol::process::Command::from(command);
693
694        if let Some(cwd) = &binary.cwd {
695            command.current_dir(cwd);
696        }
697
698        command.args(&binary.arguments);
699        command.envs(&binary.envs);
700
701        command
702            .stdin(Stdio::piped())
703            .stdout(Stdio::piped())
704            .stderr(Stdio::piped())
705            .kill_on_drop(true);
706
707        let mut process = command.spawn().with_context(|| {
708            format!(
709                "failed to spawn command `{} {}`.",
710                binary.command,
711                binary.arguments.join(" ")
712            )
713        })?;
714
715        let stdin = process.stdin.take().context("Failed to open stdin")?;
716        let stdout = process.stdout.take().context("Failed to open stdout")?;
717        let stderr = process
718            .stderr
719            .take()
720            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
721
722        if stderr.is_none() {
723            bail!(
724                "Failed to connect to stderr for debug adapter command {}",
725                &binary.command
726            );
727        }
728
729        log::info!("Debug adapter has connected to stdio adapter");
730
731        let process = Mutex::new(process);
732
733        Ok((
734            TransportPipe::new(
735                Box::new(stdin),
736                Box::new(BufReader::new(stdout)),
737                None,
738                stderr,
739            ),
740            Self { process },
741        ))
742    }
743
744    fn has_adapter_logs(&self) -> bool {
745        false
746    }
747
748    async fn kill(&self) -> Result<()> {
749        self.process.lock().await.kill()?;
750        Ok(())
751    }
752}
753
754#[cfg(any(test, feature = "test-support"))]
755type RequestHandler =
756    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
757
758#[cfg(any(test, feature = "test-support"))]
759type ResponseHandler = Box<dyn Send + Fn(Response)>;
760
761#[cfg(any(test, feature = "test-support"))]
762pub struct FakeTransport {
763    // for sending fake response back from adapter side
764    request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
765    // for reverse request responses
766    response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
767}
768
769#[cfg(any(test, feature = "test-support"))]
770impl FakeTransport {
771    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
772    where
773        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
774    {
775        self.request_handlers.lock().insert(
776            R::COMMAND,
777            Box::new(move |seq, args| {
778                let result = handler(seq, serde_json::from_value(args).unwrap());
779                let response = match result {
780                    Ok(response) => Response {
781                        seq: seq + 1,
782                        request_seq: seq,
783                        success: true,
784                        command: R::COMMAND.into(),
785                        body: Some(serde_json::to_value(response).unwrap()),
786                        message: None,
787                    },
788                    Err(response) => Response {
789                        seq: seq + 1,
790                        request_seq: seq,
791                        success: false,
792                        command: R::COMMAND.into(),
793                        body: Some(serde_json::to_value(response).unwrap()),
794                        message: None,
795                    },
796                };
797                response
798            }),
799        );
800    }
801
802    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
803    where
804        F: 'static + Send + Fn(Response),
805    {
806        self.response_handlers
807            .lock()
808            .insert(R::COMMAND, Box::new(handler));
809    }
810
811    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
812        let this = Self {
813            request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
814            response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
815        };
816        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
817        use serde_json::json;
818
819        let (stdin_writer, stdin_reader) = async_pipe::pipe();
820        let (stdout_writer, stdout_reader) = async_pipe::pipe();
821
822        let request_handlers = this.request_handlers.clone();
823        let response_handlers = this.response_handlers.clone();
824        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
825
826        cx.background_spawn(async move {
827            let mut reader = BufReader::new(stdin_reader);
828            let mut buffer = String::new();
829
830            loop {
831                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
832                    .await
833                {
834                    ConnectionResult::Timeout => {
835                        anyhow::bail!("Timed out when connecting to debugger");
836                    }
837                    ConnectionResult::ConnectionReset => {
838                        log::info!("Debugger closed the connection");
839                        break Ok(());
840                    }
841                    ConnectionResult::Result(Err(e)) => break Err(e),
842                    ConnectionResult::Result(Ok(message)) => {
843                        match message {
844                            Message::Request(request) => {
845                                // redirect reverse requests to stdout writer/reader
846                                if request.command == RunInTerminal::COMMAND
847                                    || request.command == StartDebugging::COMMAND
848                                {
849                                    let message =
850                                        serde_json::to_string(&Message::Request(request)).unwrap();
851
852                                    let mut writer = stdout_writer.lock().await;
853                                    writer
854                                        .write_all(
855                                            TransportDelegate::build_rpc_message(message)
856                                                .as_bytes(),
857                                        )
858                                        .await
859                                        .unwrap();
860                                    writer.flush().await.unwrap();
861                                } else {
862                                    let response = if let Some(handle) =
863                                        request_handlers.lock().get_mut(request.command.as_str())
864                                    {
865                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
866                                    } else {
867                                        panic!("No request handler for {}", request.command);
868                                    };
869                                    let message =
870                                        serde_json::to_string(&Message::Response(response))
871                                            .unwrap();
872
873                                    let mut writer = stdout_writer.lock().await;
874
875                                    writer
876                                        .write_all(
877                                            TransportDelegate::build_rpc_message(message)
878                                                .as_bytes(),
879                                        )
880                                        .await
881                                        .unwrap();
882                                    writer.flush().await.unwrap();
883                                }
884                            }
885                            Message::Event(event) => {
886                                let message =
887                                    serde_json::to_string(&Message::Event(event)).unwrap();
888
889                                let mut writer = stdout_writer.lock().await;
890                                writer
891                                    .write_all(
892                                        TransportDelegate::build_rpc_message(message).as_bytes(),
893                                    )
894                                    .await
895                                    .unwrap();
896                                writer.flush().await.unwrap();
897                            }
898                            Message::Response(response) => {
899                                if let Some(handle) =
900                                    response_handlers.lock().get(response.command.as_str())
901                                {
902                                    handle(response);
903                                } else {
904                                    log::error!("No response handler for {}", response.command);
905                                }
906                            }
907                        }
908                    }
909                }
910            }
911        })
912        .detach();
913
914        Ok((
915            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
916            this,
917        ))
918    }
919
920    fn has_adapter_logs(&self) -> bool {
921        false
922    }
923
924    async fn kill(&self) -> Result<()> {
925        Ok(())
926    }
927}