transport.rs

  1use anyhow::{Context as _, Result, bail};
  2use dap_types::{
  3    ErrorResponse,
  4    messages::{Message, Response},
  5};
  6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  7use gpui::AsyncApp;
  8use settings::Settings as _;
  9use smallvec::SmallVec;
 10use smol::{
 11    channel::{Receiver, Sender, unbounded},
 12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 13    lock::Mutex,
 14    net::{TcpListener, TcpStream},
 15    process::Child,
 16};
 17use std::{
 18    collections::HashMap,
 19    net::{Ipv4Addr, SocketAddrV4},
 20    process::Stdio,
 21    sync::Arc,
 22    time::Duration,
 23};
 24use task::TcpArgumentsTemplate;
 25use util::{ResultExt as _, TryFutureExt};
 26
 27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 28
 29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 30
 31#[derive(PartialEq, Eq, Clone, Copy)]
 32pub enum LogKind {
 33    Adapter,
 34    Rpc,
 35}
 36
 37pub enum IoKind {
 38    StdIn,
 39    StdOut,
 40    StdErr,
 41}
 42
 43pub struct TransportPipe {
 44    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 45    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 46    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 47    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 48}
 49
 50impl TransportPipe {
 51    pub fn new(
 52        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 53        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 54        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 55        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 56    ) -> Self {
 57        TransportPipe {
 58            input,
 59            output,
 60            stdout,
 61            stderr,
 62        }
 63    }
 64}
 65
 66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 68
 69pub enum Transport {
 70    Stdio(StdioTransport),
 71    Tcp(TcpTransport),
 72    #[cfg(any(test, feature = "test-support"))]
 73    Fake(FakeTransport),
 74}
 75
 76impl Transport {
 77    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 78        #[cfg(any(test, feature = "test-support"))]
 79        if cfg!(any(test, feature = "test-support")) {
 80            return FakeTransport::start(cx)
 81                .await
 82                .map(|(transports, fake)| (transports, Self::Fake(fake)));
 83        }
 84
 85        if binary.connection.is_some() {
 86            TcpTransport::start(binary, cx)
 87                .await
 88                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
 89        } else {
 90            StdioTransport::start(binary, cx)
 91                .await
 92                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
 93        }
 94    }
 95
 96    fn has_adapter_logs(&self) -> bool {
 97        match self {
 98            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
 99            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100            #[cfg(any(test, feature = "test-support"))]
101            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102        }
103    }
104
105    async fn kill(&self) -> Result<()> {
106        match self {
107            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109            #[cfg(any(test, feature = "test-support"))]
110            Transport::Fake(fake_transport) => fake_transport.kill().await,
111        }
112    }
113
114    #[cfg(any(test, feature = "test-support"))]
115    pub(crate) fn as_fake(&self) -> &FakeTransport {
116        match self {
117            Transport::Fake(fake_transport) => fake_transport,
118            _ => panic!("Not a fake transport layer"),
119        }
120    }
121}
122
123pub(crate) struct TransportDelegate {
124    log_handlers: LogHandlers,
125    current_requests: Requests,
126    pending_requests: Requests,
127    transport: Transport,
128    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129    _tasks: Vec<gpui::Task<Option<()>>>,
130}
131
132impl TransportDelegate {
133    pub(crate) async fn start(
134        binary: &DebugAdapterBinary,
135        cx: AsyncApp,
136    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138        let mut this = Self {
139            transport,
140            server_tx: Default::default(),
141            log_handlers: Default::default(),
142            current_requests: Default::default(),
143            pending_requests: Default::default(),
144            _tasks: Default::default(),
145        };
146        let messages = this.start_handlers(transport_pipes, cx).await?;
147        Ok((messages, this))
148    }
149
150    async fn start_handlers(
151        &mut self,
152        mut params: TransportPipe,
153        cx: AsyncApp,
154    ) -> Result<(Receiver<Message>, Sender<Message>)> {
155        let (client_tx, server_rx) = unbounded::<Message>();
156        let (server_tx, client_rx) = unbounded::<Message>();
157
158        let log_dap_communications =
159            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161                .unwrap_or(false);
162
163        let log_handler = if log_dap_communications {
164            Some(self.log_handlers.clone())
165        } else {
166            None
167        };
168
169        cx.update(|cx| {
170            if let Some(stdout) = params.stdout.take() {
171                self._tasks.push(
172                    cx.background_executor()
173                        .spawn(Self::handle_adapter_log(stdout, log_handler.clone()).log_err()),
174                );
175            }
176
177            self._tasks.push(
178                cx.background_executor().spawn(
179                    Self::handle_output(
180                        params.output,
181                        client_tx,
182                        self.pending_requests.clone(),
183                        log_handler.clone(),
184                    )
185                    .log_err(),
186                ),
187            );
188
189            if let Some(stderr) = params.stderr.take() {
190                self._tasks.push(
191                    cx.background_executor()
192                        .spawn(Self::handle_error(stderr, self.log_handlers.clone()).log_err()),
193                );
194            }
195
196            self._tasks.push(
197                cx.background_executor().spawn(
198                    Self::handle_input(
199                        params.input,
200                        client_rx,
201                        self.current_requests.clone(),
202                        self.pending_requests.clone(),
203                        log_handler.clone(),
204                    )
205                    .log_err(),
206                ),
207            );
208        })?;
209
210        {
211            let mut lock = self.server_tx.lock().await;
212            *lock = Some(server_tx.clone());
213        }
214
215        Ok((server_rx, server_tx))
216    }
217
218    pub(crate) async fn add_pending_request(
219        &self,
220        sequence_id: u64,
221        request: oneshot::Sender<Result<Response>>,
222    ) {
223        let mut pending_requests = self.pending_requests.lock().await;
224        pending_requests.insert(sequence_id, request);
225    }
226
227    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
228        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
229            server_tx.send(message).await.context("sending message")
230        } else {
231            anyhow::bail!("Server tx already dropped")
232        }
233    }
234
235    async fn handle_adapter_log<Stdout>(
236        stdout: Stdout,
237        log_handlers: Option<LogHandlers>,
238    ) -> Result<()>
239    where
240        Stdout: AsyncRead + Unpin + Send + 'static,
241    {
242        let mut reader = BufReader::new(stdout);
243        let mut line = String::new();
244
245        let result = loop {
246            line.truncate(0);
247
248            let bytes_read = match reader.read_line(&mut line).await {
249                Ok(bytes_read) => bytes_read,
250                Err(e) => break Err(e.into()),
251            };
252
253            if bytes_read == 0 {
254                anyhow::bail!("Debugger log stream closed");
255            }
256
257            if let Some(log_handlers) = log_handlers.as_ref() {
258                for (kind, handler) in log_handlers.lock().iter_mut() {
259                    if matches!(kind, LogKind::Adapter) {
260                        handler(IoKind::StdOut, line.as_str());
261                    }
262                }
263            }
264        };
265
266        log::debug!("Handle adapter log dropped");
267
268        result
269    }
270
271    fn build_rpc_message(message: String) -> String {
272        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
273    }
274
275    async fn handle_input<Stdin>(
276        mut server_stdin: Stdin,
277        client_rx: Receiver<Message>,
278        current_requests: Requests,
279        pending_requests: Requests,
280        log_handlers: Option<LogHandlers>,
281    ) -> Result<()>
282    where
283        Stdin: AsyncWrite + Unpin + Send + 'static,
284    {
285        let result = loop {
286            match client_rx.recv().await {
287                Ok(message) => {
288                    if let Message::Request(request) = &message {
289                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
290                            pending_requests.lock().await.insert(request.seq, sender);
291                        }
292                    }
293
294                    let message = match serde_json::to_string(&message) {
295                        Ok(message) => message,
296                        Err(e) => break Err(e.into()),
297                    };
298
299                    if let Some(log_handlers) = log_handlers.as_ref() {
300                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
301                            if matches!(kind, LogKind::Rpc) {
302                                log_handler(IoKind::StdIn, &message);
303                            }
304                        }
305                    }
306
307                    if let Err(e) = server_stdin
308                        .write_all(Self::build_rpc_message(message).as_bytes())
309                        .await
310                    {
311                        break Err(e.into());
312                    }
313
314                    if let Err(e) = server_stdin.flush().await {
315                        break Err(e.into());
316                    }
317                }
318                Err(error) => break Err(error.into()),
319            }
320        };
321
322        log::debug!("Handle adapter input dropped");
323
324        result
325    }
326
327    async fn handle_output<Stdout>(
328        server_stdout: Stdout,
329        client_tx: Sender<Message>,
330        pending_requests: Requests,
331        log_handlers: Option<LogHandlers>,
332    ) -> Result<()>
333    where
334        Stdout: AsyncRead + Unpin + Send + 'static,
335    {
336        let mut recv_buffer = String::new();
337        let mut reader = BufReader::new(server_stdout);
338
339        let result = loop {
340            let message =
341                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
342                    .await;
343
344            match message {
345                Ok(Message::Response(res)) => {
346                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
347                        if let Err(e) = tx.send(Self::process_response(res)) {
348                            log::trace!("Did not send response `{:?}` for a cancelled", e);
349                        }
350                    } else {
351                        client_tx.send(Message::Response(res)).await?;
352                    };
353                }
354                Ok(message) => {
355                    client_tx.send(message).await?;
356                }
357                Err(e) => break Err(e),
358            }
359        };
360
361        drop(client_tx);
362
363        log::debug!("Handle adapter output dropped");
364
365        result
366    }
367
368    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
369    where
370        Stderr: AsyncRead + Unpin + Send + 'static,
371    {
372        log::debug!("Handle error started");
373        let mut buffer = String::new();
374
375        let mut reader = BufReader::new(stderr);
376
377        let result = loop {
378            match reader.read_line(&mut buffer).await {
379                Ok(0) => anyhow::bail!("debugger error stream closed"),
380                Ok(_) => {
381                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
382                        if matches!(kind, LogKind::Adapter) {
383                            log_handler(IoKind::StdErr, buffer.as_str());
384                        }
385                    }
386
387                    buffer.truncate(0);
388                }
389                Err(error) => break Err(error.into()),
390            }
391        };
392
393        log::debug!("Handle adapter error dropped");
394
395        result
396    }
397
398    fn process_response(response: Response) -> Result<Response> {
399        if response.success {
400            Ok(response)
401        } else {
402            if let Some(error_message) = response
403                .body
404                .clone()
405                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
406                .and_then(|response| response.error.map(|msg| msg.format))
407                .or_else(|| response.message.clone())
408            {
409                anyhow::bail!(error_message);
410            };
411
412            anyhow::bail!(
413                "Received error response from adapter. Response: {:?}",
414                response
415            );
416        }
417    }
418
419    async fn receive_server_message<Stdout>(
420        reader: &mut BufReader<Stdout>,
421        buffer: &mut String,
422        log_handlers: Option<&LogHandlers>,
423    ) -> Result<Message>
424    where
425        Stdout: AsyncRead + Unpin + Send + 'static,
426    {
427        let mut content_length = None;
428        loop {
429            buffer.truncate(0);
430
431            if reader
432                .read_line(buffer)
433                .await
434                .with_context(|| "reading a message from server")?
435                == 0
436            {
437                anyhow::bail!("debugger reader stream closed, last string output: '{buffer}'");
438            };
439
440            if buffer == "\r\n" {
441                break;
442            }
443
444            let parts = buffer.trim().split_once(": ");
445
446            match parts {
447                Some(("Content-Length", value)) => {
448                    content_length = Some(value.parse().context("invalid content length")?);
449                }
450                _ => {}
451            }
452        }
453
454        let content_length = content_length.context("missing content length")?;
455
456        let mut content = vec![0; content_length];
457        reader
458            .read_exact(&mut content)
459            .await
460            .with_context(|| "reading after a loop")?;
461
462        let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
463
464        if let Some(log_handlers) = log_handlers {
465            for (kind, log_handler) in log_handlers.lock().iter_mut() {
466                if matches!(kind, LogKind::Rpc) {
467                    log_handler(IoKind::StdOut, &message);
468                }
469            }
470        }
471
472        Ok(serde_json::from_str::<Message>(message)?)
473    }
474
475    pub async fn shutdown(&self) -> Result<()> {
476        log::debug!("Start shutdown client");
477
478        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
479            server_tx.close();
480        }
481
482        let mut current_requests = self.current_requests.lock().await;
483        let mut pending_requests = self.pending_requests.lock().await;
484
485        current_requests.clear();
486        pending_requests.clear();
487
488        let _ = self.transport.kill().await.log_err();
489
490        drop(current_requests);
491        drop(pending_requests);
492
493        log::debug!("Shutdown client completed");
494
495        anyhow::Ok(())
496    }
497
498    pub fn has_adapter_logs(&self) -> bool {
499        self.transport.has_adapter_logs()
500    }
501
502    pub fn transport(&self) -> &Transport {
503        &self.transport
504    }
505
506    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
507    where
508        F: 'static + Send + FnMut(IoKind, &str),
509    {
510        let mut log_handlers = self.log_handlers.lock();
511        log_handlers.push((kind, Box::new(f)));
512    }
513}
514
515pub struct TcpTransport {
516    pub port: u16,
517    pub host: Ipv4Addr,
518    pub timeout: u64,
519    process: Mutex<Child>,
520}
521
522impl TcpTransport {
523    /// Get an open port to use with the tcp client when not supplied by debug config
524    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
525        if let Some(port) = host.port {
526            Ok(port)
527        } else {
528            Self::unused_port(host.host()).await
529        }
530    }
531
532    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
533        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
534            .await?
535            .local_addr()?
536            .port())
537    }
538
539    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
540        let connection_args = binary
541            .connection
542            .as_ref()
543            .context("No connection arguments provided")?;
544
545        let host = connection_args.host;
546        let port = connection_args.port;
547
548        let mut command = util::command::new_std_command(&binary.command);
549        util::set_pre_exec_to_start_new_session(&mut command);
550        let mut command = smol::process::Command::from(command);
551
552        if let Some(cwd) = &binary.cwd {
553            command.current_dir(cwd);
554        }
555
556        command.args(&binary.arguments);
557        command.envs(&binary.envs);
558
559        command
560            .stdin(Stdio::null())
561            .stdout(Stdio::piped())
562            .stderr(Stdio::piped())
563            .kill_on_drop(true);
564
565        let mut process = command
566            .spawn()
567            .with_context(|| "failed to start debug adapter.")?;
568
569        let address = SocketAddrV4::new(host, port);
570
571        let timeout = connection_args.timeout.unwrap_or_else(|| {
572            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
573                .unwrap_or(2000u64)
574        });
575
576        let (mut process, (rx, tx)) = select! {
577            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
578                anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
579            },
580            result = cx.spawn(async move |cx| {
581                loop {
582                    match TcpStream::connect(address).await {
583                        Ok(stream) => return Ok((process, stream.split())),
584                        Err(_) => {
585                            if let Ok(Some(_)) = process.try_status() {
586                                let output = process.output().await?;
587                                let output = if output.stderr.is_empty() {
588                                    String::from_utf8_lossy(&output.stdout).to_string()
589                                } else {
590                                    String::from_utf8_lossy(&output.stderr).to_string()
591                                };
592                                anyhow::bail!("{output}\nerror: process exited before debugger attached.");
593                            }
594                            cx.background_executor().timer(Duration::from_millis(100)).await;
595                        }
596                    }
597                }
598            }).fuse() => result?
599        };
600
601        log::info!(
602            "Debug adapter has connected to TCP server {}:{}",
603            host,
604            port
605        );
606        let stdout = process.stdout.take();
607        let stderr = process.stderr.take();
608
609        let this = Self {
610            port,
611            host,
612            process: Mutex::new(process),
613            timeout,
614        };
615
616        let pipe = TransportPipe::new(
617            Box::new(tx),
618            Box::new(BufReader::new(rx)),
619            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
620            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
621        );
622
623        Ok((pipe, this))
624    }
625
626    fn has_adapter_logs(&self) -> bool {
627        true
628    }
629
630    async fn kill(&self) -> Result<()> {
631        self.process.lock().await.kill()?;
632
633        Ok(())
634    }
635}
636
637pub struct StdioTransport {
638    process: Mutex<Child>,
639}
640
641impl StdioTransport {
642    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
643    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
644        let mut command = util::command::new_std_command(&binary.command);
645        util::set_pre_exec_to_start_new_session(&mut command);
646        let mut command = smol::process::Command::from(command);
647
648        if let Some(cwd) = &binary.cwd {
649            command.current_dir(cwd);
650        }
651
652        command.args(&binary.arguments);
653        command.envs(&binary.envs);
654
655        command
656            .stdin(Stdio::piped())
657            .stdout(Stdio::piped())
658            .stderr(Stdio::piped())
659            .kill_on_drop(true);
660
661        let mut process = command.spawn().with_context(|| {
662            format!(
663                "failed to spawn command `{} {}`.",
664                binary.command,
665                binary.arguments.join(" ")
666            )
667        })?;
668
669        let stdin = process.stdin.take().context("Failed to open stdin")?;
670        let stdout = process.stdout.take().context("Failed to open stdout")?;
671        let stderr = process
672            .stderr
673            .take()
674            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
675
676        if stderr.is_none() {
677            bail!(
678                "Failed to connect to stderr for debug adapter command {}",
679                &binary.command
680            );
681        }
682
683        log::info!("Debug adapter has connected to stdio adapter");
684
685        let process = Mutex::new(process);
686
687        Ok((
688            TransportPipe::new(
689                Box::new(stdin),
690                Box::new(BufReader::new(stdout)),
691                None,
692                stderr,
693            ),
694            Self { process },
695        ))
696    }
697
698    fn has_adapter_logs(&self) -> bool {
699        false
700    }
701
702    async fn kill(&self) -> Result<()> {
703        self.process.lock().await.kill()?;
704        Ok(())
705    }
706}
707
708#[cfg(any(test, feature = "test-support"))]
709type RequestHandler =
710    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
711
712#[cfg(any(test, feature = "test-support"))]
713type ResponseHandler = Box<dyn Send + Fn(Response)>;
714
715#[cfg(any(test, feature = "test-support"))]
716pub struct FakeTransport {
717    // for sending fake response back from adapter side
718    request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
719    // for reverse request responses
720    response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
721}
722
723#[cfg(any(test, feature = "test-support"))]
724impl FakeTransport {
725    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
726    where
727        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
728    {
729        self.request_handlers.lock().insert(
730            R::COMMAND,
731            Box::new(move |seq, args| {
732                let result = handler(seq, serde_json::from_value(args).unwrap());
733                let response = match result {
734                    Ok(response) => Response {
735                        seq: seq + 1,
736                        request_seq: seq,
737                        success: true,
738                        command: R::COMMAND.into(),
739                        body: Some(serde_json::to_value(response).unwrap()),
740                        message: None,
741                    },
742                    Err(response) => Response {
743                        seq: seq + 1,
744                        request_seq: seq,
745                        success: false,
746                        command: R::COMMAND.into(),
747                        body: Some(serde_json::to_value(response).unwrap()),
748                        message: None,
749                    },
750                };
751                response
752            }),
753        );
754    }
755
756    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
757    where
758        F: 'static + Send + Fn(Response),
759    {
760        self.response_handlers
761            .lock()
762            .insert(R::COMMAND, Box::new(handler));
763    }
764
765    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
766        let this = Self {
767            request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
768            response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
769        };
770        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
771        use serde_json::json;
772
773        let (stdin_writer, stdin_reader) = async_pipe::pipe();
774        let (stdout_writer, stdout_reader) = async_pipe::pipe();
775
776        let request_handlers = this.request_handlers.clone();
777        let response_handlers = this.response_handlers.clone();
778        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
779
780        cx.background_executor()
781            .spawn(async move {
782                let mut reader = BufReader::new(stdin_reader);
783                let mut buffer = String::new();
784
785                loop {
786                    let message =
787                        TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
788                            .await;
789
790                    match message {
791                        Err(error) => {
792                            break anyhow::anyhow!(error);
793                        }
794                        Ok(message) => {
795                            match message {
796                                Message::Request(request) => {
797                                    // redirect reverse requests to stdout writer/reader
798                                    if request.command == RunInTerminal::COMMAND
799                                        || request.command == StartDebugging::COMMAND
800                                    {
801                                        let message =
802                                            serde_json::to_string(&Message::Request(request))
803                                                .unwrap();
804
805                                        let mut writer = stdout_writer.lock().await;
806                                        writer
807                                            .write_all(
808                                                TransportDelegate::build_rpc_message(message)
809                                                    .as_bytes(),
810                                            )
811                                            .await
812                                            .unwrap();
813                                        writer.flush().await.unwrap();
814                                    } else {
815                                        let response = if let Some(handle) = request_handlers
816                                            .lock()
817                                            .get_mut(request.command.as_str())
818                                        {
819                                            handle(
820                                                request.seq,
821                                                request.arguments.unwrap_or(json!({})),
822                                            )
823                                        } else {
824                                            panic!("No request handler for {}", request.command);
825                                        };
826                                        let message =
827                                            serde_json::to_string(&Message::Response(response))
828                                                .unwrap();
829
830                                        let mut writer = stdout_writer.lock().await;
831
832                                        writer
833                                            .write_all(
834                                                TransportDelegate::build_rpc_message(message)
835                                                    .as_bytes(),
836                                            )
837                                            .await
838                                            .unwrap();
839                                        writer.flush().await.unwrap();
840                                    }
841                                }
842                                Message::Event(event) => {
843                                    let message =
844                                        serde_json::to_string(&Message::Event(event)).unwrap();
845
846                                    let mut writer = stdout_writer.lock().await;
847                                    writer
848                                        .write_all(
849                                            TransportDelegate::build_rpc_message(message)
850                                                .as_bytes(),
851                                        )
852                                        .await
853                                        .unwrap();
854                                    writer.flush().await.unwrap();
855                                }
856                                Message::Response(response) => {
857                                    if let Some(handle) =
858                                        response_handlers.lock().get(response.command.as_str())
859                                    {
860                                        handle(response);
861                                    } else {
862                                        log::error!("No response handler for {}", response.command);
863                                    }
864                                }
865                            }
866                        }
867                    }
868                }
869            })
870            .detach();
871
872        Ok((
873            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
874            this,
875        ))
876    }
877
878    fn has_adapter_logs(&self) -> bool {
879        false
880    }
881
882    async fn kill(&self) -> Result<()> {
883        Ok(())
884    }
885}