transport.rs

  1use anyhow::{Context, Result, anyhow, bail};
  2use dap_types::{
  3    ErrorResponse,
  4    messages::{Message, Response},
  5};
  6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  7use gpui::AsyncApp;
  8use settings::Settings as _;
  9use smallvec::SmallVec;
 10use smol::{
 11    channel::{Receiver, Sender, unbounded},
 12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 13    lock::Mutex,
 14    net::{TcpListener, TcpStream},
 15    process::Child,
 16};
 17use std::{
 18    collections::HashMap,
 19    net::{Ipv4Addr, SocketAddrV4},
 20    process::Stdio,
 21    sync::Arc,
 22    time::Duration,
 23};
 24use task::TcpArgumentsTemplate;
 25use util::{ResultExt as _, TryFutureExt};
 26
 27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 28
 29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 30
 31#[derive(PartialEq, Eq, Clone, Copy)]
 32pub enum LogKind {
 33    Adapter,
 34    Rpc,
 35}
 36
 37pub enum IoKind {
 38    StdIn,
 39    StdOut,
 40    StdErr,
 41}
 42
 43pub struct TransportPipe {
 44    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 45    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 46    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 47    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 48}
 49
 50impl TransportPipe {
 51    pub fn new(
 52        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 53        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 54        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 55        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 56    ) -> Self {
 57        TransportPipe {
 58            input,
 59            output,
 60            stdout,
 61            stderr,
 62        }
 63    }
 64}
 65
 66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 68
 69pub enum Transport {
 70    Stdio(StdioTransport),
 71    Tcp(TcpTransport),
 72    #[cfg(any(test, feature = "test-support"))]
 73    Fake(FakeTransport),
 74}
 75
 76impl Transport {
 77    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 78        #[cfg(any(test, feature = "test-support"))]
 79        if cfg!(any(test, feature = "test-support")) {
 80            return FakeTransport::start(cx)
 81                .await
 82                .map(|(transports, fake)| (transports, Self::Fake(fake)));
 83        }
 84
 85        if binary.connection.is_some() {
 86            TcpTransport::start(binary, cx)
 87                .await
 88                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
 89        } else {
 90            StdioTransport::start(binary, cx)
 91                .await
 92                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
 93        }
 94    }
 95
 96    fn has_adapter_logs(&self) -> bool {
 97        match self {
 98            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
 99            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100            #[cfg(any(test, feature = "test-support"))]
101            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102        }
103    }
104
105    async fn kill(&self) -> Result<()> {
106        match self {
107            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109            #[cfg(any(test, feature = "test-support"))]
110            Transport::Fake(fake_transport) => fake_transport.kill().await,
111        }
112    }
113
114    #[cfg(any(test, feature = "test-support"))]
115    pub(crate) fn as_fake(&self) -> &FakeTransport {
116        match self {
117            Transport::Fake(fake_transport) => fake_transport,
118            _ => panic!("Not a fake transport layer"),
119        }
120    }
121}
122
123pub(crate) struct TransportDelegate {
124    log_handlers: LogHandlers,
125    current_requests: Requests,
126    pending_requests: Requests,
127    transport: Transport,
128    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129    _tasks: Vec<gpui::Task<Option<()>>>,
130}
131
132impl TransportDelegate {
133    pub(crate) async fn start(
134        binary: &DebugAdapterBinary,
135        cx: AsyncApp,
136    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138        let mut this = Self {
139            transport,
140            server_tx: Default::default(),
141            log_handlers: Default::default(),
142            current_requests: Default::default(),
143            pending_requests: Default::default(),
144            _tasks: Default::default(),
145        };
146        let messages = this.start_handlers(transport_pipes, cx).await?;
147        Ok((messages, this))
148    }
149
150    async fn start_handlers(
151        &mut self,
152        mut params: TransportPipe,
153        cx: AsyncApp,
154    ) -> Result<(Receiver<Message>, Sender<Message>)> {
155        let (client_tx, server_rx) = unbounded::<Message>();
156        let (server_tx, client_rx) = unbounded::<Message>();
157
158        let log_dap_communications =
159            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161                .unwrap_or(false);
162
163        let log_handler = if log_dap_communications {
164            Some(self.log_handlers.clone())
165        } else {
166            None
167        };
168
169        cx.update(|cx| {
170            if let Some(stdout) = params.stdout.take() {
171                self._tasks.push(
172                    cx.background_executor()
173                        .spawn(Self::handle_adapter_log(stdout, log_handler.clone()).log_err()),
174                );
175            }
176
177            self._tasks.push(
178                cx.background_executor().spawn(
179                    Self::handle_output(
180                        params.output,
181                        client_tx,
182                        self.pending_requests.clone(),
183                        log_handler.clone(),
184                    )
185                    .log_err(),
186                ),
187            );
188
189            if let Some(stderr) = params.stderr.take() {
190                self._tasks.push(
191                    cx.background_executor()
192                        .spawn(Self::handle_error(stderr, self.log_handlers.clone()).log_err()),
193                );
194            }
195
196            self._tasks.push(
197                cx.background_executor().spawn(
198                    Self::handle_input(
199                        params.input,
200                        client_rx,
201                        self.current_requests.clone(),
202                        self.pending_requests.clone(),
203                        log_handler.clone(),
204                    )
205                    .log_err(),
206                ),
207            );
208        })?;
209
210        {
211            let mut lock = self.server_tx.lock().await;
212            *lock = Some(server_tx.clone());
213        }
214
215        Ok((server_rx, server_tx))
216    }
217
218    pub(crate) async fn add_pending_request(
219        &self,
220        sequence_id: u64,
221        request: oneshot::Sender<Result<Response>>,
222    ) {
223        let mut pending_requests = self.pending_requests.lock().await;
224        pending_requests.insert(sequence_id, request);
225    }
226
227    pub(crate) async fn cancel_pending_request(&self, sequence_id: &u64) {
228        let mut pending_requests = self.pending_requests.lock().await;
229        pending_requests.remove(sequence_id);
230    }
231
232    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
233        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
234            server_tx
235                .send(message)
236                .await
237                .map_err(|e| anyhow!("Failed to send message: {}", e))
238        } else {
239            Err(anyhow!("Server tx already dropped"))
240        }
241    }
242
243    async fn handle_adapter_log<Stdout>(
244        stdout: Stdout,
245        log_handlers: Option<LogHandlers>,
246    ) -> Result<()>
247    where
248        Stdout: AsyncRead + Unpin + Send + 'static,
249    {
250        let mut reader = BufReader::new(stdout);
251        let mut line = String::new();
252
253        let result = loop {
254            line.truncate(0);
255
256            let bytes_read = match reader.read_line(&mut line).await {
257                Ok(bytes_read) => bytes_read,
258                Err(e) => break Err(e.into()),
259            };
260
261            if bytes_read == 0 {
262                break Err(anyhow!("Debugger log stream closed"));
263            }
264
265            if let Some(log_handlers) = log_handlers.as_ref() {
266                for (kind, handler) in log_handlers.lock().iter_mut() {
267                    if matches!(kind, LogKind::Adapter) {
268                        handler(IoKind::StdOut, line.as_str());
269                    }
270                }
271            }
272        };
273
274        log::debug!("Handle adapter log dropped");
275
276        result
277    }
278
279    fn build_rpc_message(message: String) -> String {
280        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
281    }
282
283    async fn handle_input<Stdin>(
284        mut server_stdin: Stdin,
285        client_rx: Receiver<Message>,
286        current_requests: Requests,
287        pending_requests: Requests,
288        log_handlers: Option<LogHandlers>,
289    ) -> Result<()>
290    where
291        Stdin: AsyncWrite + Unpin + Send + 'static,
292    {
293        let result = loop {
294            match client_rx.recv().await {
295                Ok(message) => {
296                    if let Message::Request(request) = &message {
297                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
298                            pending_requests.lock().await.insert(request.seq, sender);
299                        }
300                    }
301
302                    let message = match serde_json::to_string(&message) {
303                        Ok(message) => message,
304                        Err(e) => break Err(e.into()),
305                    };
306
307                    if let Some(log_handlers) = log_handlers.as_ref() {
308                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
309                            if matches!(kind, LogKind::Rpc) {
310                                log_handler(IoKind::StdIn, &message);
311                            }
312                        }
313                    }
314
315                    if let Err(e) = server_stdin
316                        .write_all(Self::build_rpc_message(message).as_bytes())
317                        .await
318                    {
319                        break Err(e.into());
320                    }
321
322                    if let Err(e) = server_stdin.flush().await {
323                        break Err(e.into());
324                    }
325                }
326                Err(error) => break Err(error.into()),
327            }
328        };
329
330        log::debug!("Handle adapter input dropped");
331
332        result
333    }
334
335    async fn handle_output<Stdout>(
336        server_stdout: Stdout,
337        client_tx: Sender<Message>,
338        pending_requests: Requests,
339        log_handlers: Option<LogHandlers>,
340    ) -> Result<()>
341    where
342        Stdout: AsyncRead + Unpin + Send + 'static,
343    {
344        let mut recv_buffer = String::new();
345        let mut reader = BufReader::new(server_stdout);
346
347        let result = loop {
348            let message =
349                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
350                    .await;
351
352            match message {
353                Ok(Message::Response(res)) => {
354                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
355                        if let Err(e) = tx.send(Self::process_response(res)) {
356                            log::trace!("Did not send response `{:?}` for a cancelled", e);
357                        }
358                    } else {
359                        client_tx.send(Message::Response(res)).await?;
360                    };
361                }
362                Ok(message) => {
363                    client_tx.send(message).await?;
364                }
365                Err(e) => break Err(e),
366            }
367        };
368
369        drop(client_tx);
370
371        log::debug!("Handle adapter output dropped");
372
373        result
374    }
375
376    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
377    where
378        Stderr: AsyncRead + Unpin + Send + 'static,
379    {
380        log::debug!("Handle error started");
381        let mut buffer = String::new();
382
383        let mut reader = BufReader::new(stderr);
384
385        let result = loop {
386            match reader.read_line(&mut buffer).await {
387                Ok(0) => break Err(anyhow!("debugger error stream closed")),
388                Ok(_) => {
389                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
390                        if matches!(kind, LogKind::Adapter) {
391                            log_handler(IoKind::StdErr, buffer.as_str());
392                        }
393                    }
394
395                    buffer.truncate(0);
396                }
397                Err(error) => break Err(error.into()),
398            }
399        };
400
401        log::debug!("Handle adapter error dropped");
402
403        result
404    }
405
406    fn process_response(response: Response) -> Result<Response> {
407        if response.success {
408            Ok(response)
409        } else {
410            if let Some(error_message) = response
411                .body
412                .clone()
413                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
414                .and_then(|response| response.error.map(|msg| msg.format))
415                .or_else(|| response.message.clone())
416            {
417                return Err(anyhow!(error_message));
418            };
419
420            Err(anyhow!(
421                "Received error response from adapter. Response: {:?}",
422                response.clone()
423            ))
424        }
425    }
426
427    async fn receive_server_message<Stdout>(
428        reader: &mut BufReader<Stdout>,
429        buffer: &mut String,
430        log_handlers: Option<&LogHandlers>,
431    ) -> Result<Message>
432    where
433        Stdout: AsyncRead + Unpin + Send + 'static,
434    {
435        let mut content_length = None;
436        loop {
437            buffer.truncate(0);
438
439            if reader
440                .read_line(buffer)
441                .await
442                .with_context(|| "reading a message from server")?
443                == 0
444            {
445                return Err(anyhow!("debugger reader stream closed"));
446            };
447
448            if buffer == "\r\n" {
449                break;
450            }
451
452            let parts = buffer.trim().split_once(": ");
453
454            match parts {
455                Some(("Content-Length", value)) => {
456                    content_length = Some(value.parse().context("invalid content length")?);
457                }
458                _ => {}
459            }
460        }
461
462        let content_length = content_length.context("missing content length")?;
463
464        let mut content = vec![0; content_length];
465        reader
466            .read_exact(&mut content)
467            .await
468            .with_context(|| "reading after a loop")?;
469
470        let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
471
472        if let Some(log_handlers) = log_handlers {
473            for (kind, log_handler) in log_handlers.lock().iter_mut() {
474                if matches!(kind, LogKind::Rpc) {
475                    log_handler(IoKind::StdOut, &message);
476                }
477            }
478        }
479
480        Ok(serde_json::from_str::<Message>(message)?)
481    }
482
483    pub async fn shutdown(&self) -> Result<()> {
484        log::debug!("Start shutdown client");
485
486        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
487            server_tx.close();
488        }
489
490        let mut current_requests = self.current_requests.lock().await;
491        let mut pending_requests = self.pending_requests.lock().await;
492
493        current_requests.clear();
494        pending_requests.clear();
495
496        let _ = self.transport.kill().await.log_err();
497
498        drop(current_requests);
499        drop(pending_requests);
500
501        log::debug!("Shutdown client completed");
502
503        anyhow::Ok(())
504    }
505
506    pub fn has_adapter_logs(&self) -> bool {
507        self.transport.has_adapter_logs()
508    }
509
510    pub fn transport(&self) -> &Transport {
511        &self.transport
512    }
513
514    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
515    where
516        F: 'static + Send + FnMut(IoKind, &str),
517    {
518        let mut log_handlers = self.log_handlers.lock();
519        log_handlers.push((kind, Box::new(f)));
520    }
521}
522
523pub struct TcpTransport {
524    pub port: u16,
525    pub host: Ipv4Addr,
526    pub timeout: u64,
527    process: Mutex<Child>,
528}
529
530impl TcpTransport {
531    /// Get an open port to use with the tcp client when not supplied by debug config
532    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
533        if let Some(port) = host.port {
534            Ok(port)
535        } else {
536            Self::unused_port(host.host()).await
537        }
538    }
539
540    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
541        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
542            .await?
543            .local_addr()?
544            .port())
545    }
546
547    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
548        let Some(connection_args) = binary.connection.as_ref() else {
549            return Err(anyhow!("No connection arguments provided"));
550        };
551
552        let host = connection_args.host;
553        let port = connection_args.port;
554
555        let mut command = util::command::new_std_command(&binary.command);
556        util::set_pre_exec_to_start_new_session(&mut command);
557        let mut command = smol::process::Command::from(command);
558
559        if let Some(cwd) = &binary.cwd {
560            command.current_dir(cwd);
561        }
562
563        command.args(&binary.arguments);
564        command.envs(&binary.envs);
565
566        command
567            .stdin(Stdio::null())
568            .stdout(Stdio::piped())
569            .stderr(Stdio::piped())
570            .kill_on_drop(true);
571
572        let mut process = command
573            .spawn()
574            .with_context(|| "failed to start debug adapter.")?;
575
576        let address = SocketAddrV4::new(host, port);
577
578        let timeout = connection_args.timeout.unwrap_or_else(|| {
579            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
580                .unwrap_or(2000u64)
581        });
582
583        let (rx, tx) = select! {
584            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
585                return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
586            },
587            result = cx.spawn(async move |cx| {
588                loop {
589                    match TcpStream::connect(address).await {
590                        Ok(stream) => return stream.split(),
591                        Err(_) => {
592                            cx.background_executor().timer(Duration::from_millis(100)).await;
593                        }
594                    }
595                }
596            }).fuse() => result
597        };
598        log::info!(
599            "Debug adapter has connected to TCP server {}:{}",
600            host,
601            port
602        );
603        let stdout = process.stdout.take();
604        let stderr = process.stderr.take();
605
606        let this = Self {
607            port,
608            host,
609            process: Mutex::new(process),
610            timeout,
611        };
612
613        let pipe = TransportPipe::new(
614            Box::new(tx),
615            Box::new(BufReader::new(rx)),
616            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
617            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
618        );
619
620        Ok((pipe, this))
621    }
622
623    fn has_adapter_logs(&self) -> bool {
624        true
625    }
626
627    async fn kill(&self) -> Result<()> {
628        self.process.lock().await.kill()?;
629
630        Ok(())
631    }
632}
633
634pub struct StdioTransport {
635    process: Mutex<Child>,
636}
637
638impl StdioTransport {
639    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
640    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
641        let mut command = util::command::new_std_command(&binary.command);
642        util::set_pre_exec_to_start_new_session(&mut command);
643        let mut command = smol::process::Command::from(command);
644
645        if let Some(cwd) = &binary.cwd {
646            command.current_dir(cwd);
647        }
648
649        command.args(&binary.arguments);
650        command.envs(&binary.envs);
651
652        command
653            .stdin(Stdio::piped())
654            .stdout(Stdio::piped())
655            .stderr(Stdio::piped())
656            .kill_on_drop(true);
657
658        let mut process = command
659            .spawn()
660            .with_context(|| "failed to spawn command.")?;
661
662        let stdin = process
663            .stdin
664            .take()
665            .ok_or_else(|| anyhow!("Failed to open stdin"))?;
666        let stdout = process
667            .stdout
668            .take()
669            .ok_or_else(|| anyhow!("Failed to open stdout"))?;
670        let stderr = process
671            .stderr
672            .take()
673            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
674
675        if stderr.is_none() {
676            bail!(
677                "Failed to connect to stderr for debug adapter command {}",
678                &binary.command
679            );
680        }
681
682        log::info!("Debug adapter has connected to stdio adapter");
683
684        let process = Mutex::new(process);
685
686        Ok((
687            TransportPipe::new(
688                Box::new(stdin),
689                Box::new(BufReader::new(stdout)),
690                None,
691                stderr,
692            ),
693            Self { process },
694        ))
695    }
696
697    fn has_adapter_logs(&self) -> bool {
698        false
699    }
700
701    async fn kill(&self) -> Result<()> {
702        self.process.lock().await.kill()?;
703        Ok(())
704    }
705}
706
707#[cfg(any(test, feature = "test-support"))]
708type RequestHandler =
709    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
710
711#[cfg(any(test, feature = "test-support"))]
712type ResponseHandler = Box<dyn Send + Fn(Response)>;
713
714#[cfg(any(test, feature = "test-support"))]
715pub struct FakeTransport {
716    // for sending fake response back from adapter side
717    request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
718    // for reverse request responses
719    response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
720}
721
722#[cfg(any(test, feature = "test-support"))]
723impl FakeTransport {
724    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
725    where
726        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
727    {
728        self.request_handlers.lock().insert(
729            R::COMMAND,
730            Box::new(move |seq, args| {
731                let result = handler(seq, serde_json::from_value(args).unwrap());
732                let response = match result {
733                    Ok(response) => Response {
734                        seq: seq + 1,
735                        request_seq: seq,
736                        success: true,
737                        command: R::COMMAND.into(),
738                        body: Some(serde_json::to_value(response).unwrap()),
739                        message: None,
740                    },
741                    Err(response) => Response {
742                        seq: seq + 1,
743                        request_seq: seq,
744                        success: false,
745                        command: R::COMMAND.into(),
746                        body: Some(serde_json::to_value(response).unwrap()),
747                        message: None,
748                    },
749                };
750                response
751            }),
752        );
753    }
754
755    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
756    where
757        F: 'static + Send + Fn(Response),
758    {
759        self.response_handlers
760            .lock()
761            .insert(R::COMMAND, Box::new(handler));
762    }
763
764    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
765        let this = Self {
766            request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
767            response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
768        };
769        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
770        use serde_json::json;
771
772        let (stdin_writer, stdin_reader) = async_pipe::pipe();
773        let (stdout_writer, stdout_reader) = async_pipe::pipe();
774
775        let request_handlers = this.request_handlers.clone();
776        let response_handlers = this.response_handlers.clone();
777        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
778
779        cx.background_executor()
780            .spawn(async move {
781                let mut reader = BufReader::new(stdin_reader);
782                let mut buffer = String::new();
783
784                loop {
785                    let message =
786                        TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
787                            .await;
788
789                    match message {
790                        Err(error) => {
791                            break anyhow!(error);
792                        }
793                        Ok(message) => {
794                            match message {
795                                Message::Request(request) => {
796                                    // redirect reverse requests to stdout writer/reader
797                                    if request.command == RunInTerminal::COMMAND
798                                        || request.command == StartDebugging::COMMAND
799                                    {
800                                        let message =
801                                            serde_json::to_string(&Message::Request(request))
802                                                .unwrap();
803
804                                        let mut writer = stdout_writer.lock().await;
805                                        writer
806                                            .write_all(
807                                                TransportDelegate::build_rpc_message(message)
808                                                    .as_bytes(),
809                                            )
810                                            .await
811                                            .unwrap();
812                                        writer.flush().await.unwrap();
813                                    } else {
814                                        let response = if let Some(handle) = request_handlers
815                                            .lock()
816                                            .get_mut(request.command.as_str())
817                                        {
818                                            handle(
819                                                request.seq,
820                                                request.arguments.unwrap_or(json!({})),
821                                            )
822                                        } else {
823                                            panic!("No request handler for {}", request.command);
824                                        };
825                                        let message =
826                                            serde_json::to_string(&Message::Response(response))
827                                                .unwrap();
828
829                                        let mut writer = stdout_writer.lock().await;
830
831                                        writer
832                                            .write_all(
833                                                TransportDelegate::build_rpc_message(message)
834                                                    .as_bytes(),
835                                            )
836                                            .await
837                                            .unwrap();
838                                        writer.flush().await.unwrap();
839                                    }
840                                }
841                                Message::Event(event) => {
842                                    let message =
843                                        serde_json::to_string(&Message::Event(event)).unwrap();
844
845                                    let mut writer = stdout_writer.lock().await;
846                                    writer
847                                        .write_all(
848                                            TransportDelegate::build_rpc_message(message)
849                                                .as_bytes(),
850                                        )
851                                        .await
852                                        .unwrap();
853                                    writer.flush().await.unwrap();
854                                }
855                                Message::Response(response) => {
856                                    if let Some(handle) =
857                                        response_handlers.lock().get(response.command.as_str())
858                                    {
859                                        handle(response);
860                                    } else {
861                                        log::error!("No response handler for {}", response.command);
862                                    }
863                                }
864                            }
865                        }
866                    }
867                }
868            })
869            .detach();
870
871        Ok((
872            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
873            this,
874        ))
875    }
876
877    fn has_adapter_logs(&self) -> bool {
878        false
879    }
880
881    async fn kill(&self) -> Result<()> {
882        Ok(())
883    }
884}