transport.rs

  1use anyhow::{anyhow, bail, Context, Result};
  2use dap_types::{
  3    messages::{Message, Response},
  4    ErrorResponse,
  5};
  6use futures::{channel::oneshot, select, AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _};
  7use gpui::AsyncApp;
  8use settings::Settings as _;
  9use smallvec::SmallVec;
 10use smol::{
 11    channel::{unbounded, Receiver, Sender},
 12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 13    lock::Mutex,
 14    net::{TcpListener, TcpStream},
 15    process::Child,
 16};
 17use std::{
 18    collections::HashMap,
 19    net::{Ipv4Addr, SocketAddrV4},
 20    process::Stdio,
 21    sync::Arc,
 22    time::Duration,
 23};
 24use task::TCPHost;
 25use util::ResultExt as _;
 26
 27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 28
 29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 30
 31#[derive(PartialEq, Eq, Clone, Copy)]
 32pub enum LogKind {
 33    Adapter,
 34    Rpc,
 35}
 36
 37pub enum IoKind {
 38    StdIn,
 39    StdOut,
 40    StdErr,
 41}
 42
 43pub struct TransportPipe {
 44    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 45    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 46    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 47    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 48}
 49
 50impl TransportPipe {
 51    pub fn new(
 52        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 53        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 54        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 55        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 56    ) -> Self {
 57        TransportPipe {
 58            input,
 59            output,
 60            stdout,
 61            stderr,
 62        }
 63    }
 64}
 65
 66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 68
 69pub enum Transport {
 70    Stdio(StdioTransport),
 71    Tcp(TcpTransport),
 72    #[cfg(any(test, feature = "test-support"))]
 73    Fake(FakeTransport),
 74}
 75
 76impl Transport {
 77    #[cfg(any(test, feature = "test-support"))]
 78    async fn start(_: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 79        #[cfg(any(test, feature = "test-support"))]
 80        return FakeTransport::start(cx)
 81            .await
 82            .map(|(transports, fake)| (transports, Self::Fake(fake)));
 83    }
 84
 85    #[cfg(not(any(test, feature = "test-support")))]
 86    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 87        if binary.connection.is_some() {
 88            TcpTransport::start(binary, cx)
 89                .await
 90                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
 91        } else {
 92            StdioTransport::start(binary, cx)
 93                .await
 94                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
 95        }
 96    }
 97
 98    fn has_adapter_logs(&self) -> bool {
 99        match self {
100            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
101            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
102            #[cfg(any(test, feature = "test-support"))]
103            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
104        }
105    }
106
107    async fn kill(&self) -> Result<()> {
108        match self {
109            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
110            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
111            #[cfg(any(test, feature = "test-support"))]
112            Transport::Fake(fake_transport) => fake_transport.kill().await,
113        }
114    }
115
116    #[cfg(any(test, feature = "test-support"))]
117    pub(crate) fn as_fake(&self) -> &FakeTransport {
118        match self {
119            Transport::Fake(fake_transport) => fake_transport,
120            _ => panic!("Not a fake transport layer"),
121        }
122    }
123}
124
125pub(crate) struct TransportDelegate {
126    log_handlers: LogHandlers,
127    current_requests: Requests,
128    pending_requests: Requests,
129    transport: Transport,
130    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
131}
132
133impl TransportDelegate {
134    pub(crate) async fn start(
135        binary: &DebugAdapterBinary,
136        cx: AsyncApp,
137    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
138        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
139        let mut this = Self {
140            transport,
141            server_tx: Default::default(),
142            log_handlers: Default::default(),
143            current_requests: Default::default(),
144            pending_requests: Default::default(),
145        };
146        let messages = this.start_handlers(transport_pipes, cx).await?;
147        Ok((messages, this))
148    }
149
150    async fn start_handlers(
151        &mut self,
152        mut params: TransportPipe,
153        cx: AsyncApp,
154    ) -> Result<(Receiver<Message>, Sender<Message>)> {
155        let (client_tx, server_rx) = unbounded::<Message>();
156        let (server_tx, client_rx) = unbounded::<Message>();
157
158        let log_dap_communications =
159            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161                .unwrap_or(false);
162
163        let log_handler = if log_dap_communications {
164            Some(self.log_handlers.clone())
165        } else {
166            None
167        };
168
169        cx.update(|cx| {
170            if let Some(stdout) = params.stdout.take() {
171                cx.background_executor()
172                    .spawn(Self::handle_adapter_log(stdout, log_handler.clone()))
173                    .detach_and_log_err(cx);
174            }
175
176            cx.background_executor()
177                .spawn(Self::handle_output(
178                    params.output,
179                    client_tx,
180                    self.pending_requests.clone(),
181                    log_handler.clone(),
182                ))
183                .detach_and_log_err(cx);
184
185            if let Some(stderr) = params.stderr.take() {
186                cx.background_executor()
187                    .spawn(Self::handle_error(stderr, self.log_handlers.clone()))
188                    .detach_and_log_err(cx);
189            }
190
191            cx.background_executor()
192                .spawn(Self::handle_input(
193                    params.input,
194                    client_rx,
195                    self.current_requests.clone(),
196                    self.pending_requests.clone(),
197                    log_handler.clone(),
198                ))
199                .detach_and_log_err(cx);
200        })?;
201
202        {
203            let mut lock = self.server_tx.lock().await;
204            *lock = Some(server_tx.clone());
205        }
206
207        Ok((server_rx, server_tx))
208    }
209
210    pub(crate) async fn add_pending_request(
211        &self,
212        sequence_id: u64,
213        request: oneshot::Sender<Result<Response>>,
214    ) {
215        let mut pending_requests = self.pending_requests.lock().await;
216        pending_requests.insert(sequence_id, request);
217    }
218
219    pub(crate) async fn cancel_pending_request(&self, sequence_id: &u64) {
220        let mut pending_requests = self.pending_requests.lock().await;
221        pending_requests.remove(sequence_id);
222    }
223
224    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
225        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
226            server_tx
227                .send(message)
228                .await
229                .map_err(|e| anyhow!("Failed to send message: {}", e))
230        } else {
231            Err(anyhow!("Server tx already dropped"))
232        }
233    }
234
235    async fn handle_adapter_log<Stdout>(
236        stdout: Stdout,
237        log_handlers: Option<LogHandlers>,
238    ) -> Result<()>
239    where
240        Stdout: AsyncRead + Unpin + Send + 'static,
241    {
242        let mut reader = BufReader::new(stdout);
243        let mut line = String::new();
244
245        let result = loop {
246            line.truncate(0);
247
248            let bytes_read = match reader.read_line(&mut line).await {
249                Ok(bytes_read) => bytes_read,
250                Err(e) => break Err(e.into()),
251            };
252
253            if bytes_read == 0 {
254                break Err(anyhow!("Debugger log stream closed"));
255            }
256
257            if let Some(log_handlers) = log_handlers.as_ref() {
258                for (kind, handler) in log_handlers.lock().iter_mut() {
259                    if matches!(kind, LogKind::Adapter) {
260                        handler(IoKind::StdOut, line.as_str());
261                    }
262                }
263            }
264
265            smol::future::yield_now().await;
266        };
267
268        log::debug!("Handle adapter log dropped");
269
270        result
271    }
272
273    fn build_rpc_message(message: String) -> String {
274        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
275    }
276
277    async fn handle_input<Stdin>(
278        mut server_stdin: Stdin,
279        client_rx: Receiver<Message>,
280        current_requests: Requests,
281        pending_requests: Requests,
282        log_handlers: Option<LogHandlers>,
283    ) -> Result<()>
284    where
285        Stdin: AsyncWrite + Unpin + Send + 'static,
286    {
287        let result = loop {
288            match client_rx.recv().await {
289                Ok(message) => {
290                    if let Message::Request(request) = &message {
291                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
292                            pending_requests.lock().await.insert(request.seq, sender);
293                        }
294                    }
295
296                    let message = match serde_json::to_string(&message) {
297                        Ok(message) => message,
298                        Err(e) => break Err(e.into()),
299                    };
300
301                    if let Some(log_handlers) = log_handlers.as_ref() {
302                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
303                            if matches!(kind, LogKind::Rpc) {
304                                log_handler(IoKind::StdIn, &message);
305                            }
306                        }
307                    }
308
309                    if let Err(e) = server_stdin
310                        .write_all(Self::build_rpc_message(message).as_bytes())
311                        .await
312                    {
313                        break Err(e.into());
314                    }
315
316                    if let Err(e) = server_stdin.flush().await {
317                        break Err(e.into());
318                    }
319                }
320                Err(error) => break Err(error.into()),
321            }
322
323            smol::future::yield_now().await;
324        };
325
326        log::debug!("Handle adapter input dropped");
327
328        result
329    }
330
331    async fn handle_output<Stdout>(
332        server_stdout: Stdout,
333        client_tx: Sender<Message>,
334        pending_requests: Requests,
335        log_handlers: Option<LogHandlers>,
336    ) -> Result<()>
337    where
338        Stdout: AsyncRead + Unpin + Send + 'static,
339    {
340        let mut recv_buffer = String::new();
341        let mut reader = BufReader::new(server_stdout);
342
343        let result = loop {
344            let message =
345                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
346                    .await;
347
348            match message {
349                Ok(Message::Response(res)) => {
350                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
351                        if let Err(e) = tx.send(Self::process_response(res)) {
352                            log::trace!("Did not send response `{:?}` for a cancelled", e);
353                        }
354                    } else {
355                        client_tx.send(Message::Response(res)).await?;
356                    };
357                }
358                Ok(message) => {
359                    client_tx.send(message).await?;
360                }
361                Err(e) => break Err(e),
362            }
363
364            smol::future::yield_now().await;
365        };
366
367        drop(client_tx);
368
369        log::debug!("Handle adapter output dropped");
370
371        result
372    }
373
374    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
375    where
376        Stderr: AsyncRead + Unpin + Send + 'static,
377    {
378        let mut buffer = String::new();
379
380        let mut reader = BufReader::new(stderr);
381
382        let result = loop {
383            match reader.read_line(&mut buffer).await {
384                Ok(0) => break Err(anyhow!("debugger error stream closed")),
385                Ok(_) => {
386                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
387                        if matches!(kind, LogKind::Adapter) {
388                            log_handler(IoKind::StdErr, buffer.as_str());
389                        }
390                    }
391
392                    buffer.truncate(0);
393                }
394                Err(error) => break Err(error.into()),
395            }
396
397            smol::future::yield_now().await;
398        };
399
400        log::debug!("Handle adapter error dropped");
401
402        result
403    }
404
405    fn process_response(response: Response) -> Result<Response> {
406        if response.success {
407            Ok(response)
408        } else {
409            if let Some(error_message) = response
410                .body
411                .clone()
412                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
413                .and_then(|response| response.error.map(|msg| msg.format))
414                .or_else(|| response.message.clone())
415            {
416                return Err(anyhow!(error_message));
417            };
418
419            Err(anyhow!(
420                "Received error response from adapter. Response: {:?}",
421                response.clone()
422            ))
423        }
424    }
425
426    async fn receive_server_message<Stdout>(
427        reader: &mut BufReader<Stdout>,
428        buffer: &mut String,
429        log_handlers: Option<&LogHandlers>,
430    ) -> Result<Message>
431    where
432        Stdout: AsyncRead + Unpin + Send + 'static,
433    {
434        let mut content_length = None;
435        loop {
436            buffer.truncate(0);
437
438            if reader
439                .read_line(buffer)
440                .await
441                .with_context(|| "reading a message from server")?
442                == 0
443            {
444                return Err(anyhow!("debugger reader stream closed"));
445            };
446
447            if buffer == "\r\n" {
448                break;
449            }
450
451            let parts = buffer.trim().split_once(": ");
452
453            match parts {
454                Some(("Content-Length", value)) => {
455                    content_length = Some(value.parse().context("invalid content length")?);
456                }
457                _ => {}
458            }
459        }
460
461        let content_length = content_length.context("missing content length")?;
462
463        let mut content = vec![0; content_length];
464        reader
465            .read_exact(&mut content)
466            .await
467            .with_context(|| "reading after a loop")?;
468
469        let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
470
471        if let Some(log_handlers) = log_handlers {
472            for (kind, log_handler) in log_handlers.lock().iter_mut() {
473                if matches!(kind, LogKind::Rpc) {
474                    log_handler(IoKind::StdOut, &message);
475                }
476            }
477        }
478
479        Ok(serde_json::from_str::<Message>(message)?)
480    }
481
482    pub async fn shutdown(&self) -> Result<()> {
483        log::debug!("Start shutdown client");
484
485        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
486            server_tx.close();
487        }
488
489        let mut current_requests = self.current_requests.lock().await;
490        let mut pending_requests = self.pending_requests.lock().await;
491
492        current_requests.clear();
493        pending_requests.clear();
494
495        let _ = self.transport.kill().await.log_err();
496
497        drop(current_requests);
498        drop(pending_requests);
499
500        log::debug!("Shutdown client completed");
501
502        anyhow::Ok(())
503    }
504
505    pub fn has_adapter_logs(&self) -> bool {
506        self.transport.has_adapter_logs()
507    }
508
509    pub fn transport(&self) -> &Transport {
510        &self.transport
511    }
512
513    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
514    where
515        F: 'static + Send + FnMut(IoKind, &str),
516    {
517        let mut log_handlers = self.log_handlers.lock();
518        log_handlers.push((kind, Box::new(f)));
519    }
520}
521
522pub struct TcpTransport {
523    pub port: u16,
524    pub host: Ipv4Addr,
525    pub timeout: u64,
526    process: Mutex<Child>,
527}
528
529impl TcpTransport {
530    /// Get an open port to use with the tcp client when not supplied by debug config
531    pub async fn port(host: &TCPHost) -> Result<u16> {
532        if let Some(port) = host.port {
533            Ok(port)
534        } else {
535            Ok(TcpListener::bind(SocketAddrV4::new(host.host(), 0))
536                .await?
537                .local_addr()?
538                .port())
539        }
540    }
541
542    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
543    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
544        let Some(connection_args) = binary.connection.as_ref() else {
545            return Err(anyhow!("No connection arguments provided"));
546        };
547
548        let host = connection_args.host;
549        let port = connection_args.port;
550
551        let mut command = util::command::new_smol_command(&binary.command);
552
553        if let Some(cwd) = &binary.cwd {
554            command.current_dir(cwd);
555        }
556
557        if let Some(args) = &binary.arguments {
558            command.args(args);
559        }
560
561        if let Some(envs) = &binary.envs {
562            command.envs(envs);
563        }
564
565        command
566            .stdin(Stdio::null())
567            .stdout(Stdio::piped())
568            .stderr(Stdio::piped())
569            .kill_on_drop(true);
570
571        let mut process = command
572            .spawn()
573            .with_context(|| "failed to start debug adapter.")?;
574
575        let address = SocketAddrV4::new(host, port);
576
577        let timeout = connection_args.timeout.unwrap_or_else(|| {
578            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
579                .unwrap_or(2000u64)
580        });
581
582        let (rx, tx) = select! {
583            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
584                return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
585            },
586            result = cx.spawn(async move |cx| {
587                loop {
588                    match TcpStream::connect(address).await {
589                        Ok(stream) => return stream.split(),
590                        Err(_) => {
591                            cx.background_executor().timer(Duration::from_millis(100)).await;
592                        }
593                    }
594                }
595            }).fuse() => result
596        };
597        log::info!(
598            "Debug adapter has connected to TCP server {}:{}",
599            host,
600            port
601        );
602        let stdout = process.stdout.take();
603        let stderr = process.stderr.take();
604
605        let this = Self {
606            port,
607            host,
608            process: Mutex::new(process),
609            timeout,
610        };
611
612        let pipe = TransportPipe::new(
613            Box::new(tx),
614            Box::new(BufReader::new(rx)),
615            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
616            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
617        );
618
619        Ok((pipe, this))
620    }
621
622    fn has_adapter_logs(&self) -> bool {
623        true
624    }
625
626    async fn kill(&self) -> Result<()> {
627        self.process.lock().await.kill()?;
628
629        Ok(())
630    }
631}
632
633pub struct StdioTransport {
634    process: Mutex<Child>,
635}
636
637impl StdioTransport {
638    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
639    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
640        let mut command = util::command::new_smol_command(&binary.command);
641
642        if let Some(cwd) = &binary.cwd {
643            command.current_dir(cwd);
644        }
645
646        if let Some(args) = &binary.arguments {
647            command.args(args);
648        }
649
650        if let Some(envs) = &binary.envs {
651            command.envs(envs);
652        }
653
654        command
655            .stdin(Stdio::piped())
656            .stdout(Stdio::piped())
657            .stderr(Stdio::piped())
658            .kill_on_drop(true);
659
660        let mut process = command
661            .spawn()
662            .with_context(|| "failed to spawn command.")?;
663
664        let stdin = process
665            .stdin
666            .take()
667            .ok_or_else(|| anyhow!("Failed to open stdin"))?;
668        let stdout = process
669            .stdout
670            .take()
671            .ok_or_else(|| anyhow!("Failed to open stdout"))?;
672        let stderr = process
673            .stderr
674            .take()
675            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
676
677        if stderr.is_none() {
678            bail!(
679                "Failed to connect to stderr for debug adapter command {}",
680                &binary.command
681            );
682        }
683
684        log::info!("Debug adapter has connected to stdio adapter");
685
686        let process = Mutex::new(process);
687
688        Ok((
689            TransportPipe::new(
690                Box::new(stdin),
691                Box::new(BufReader::new(stdout)),
692                None,
693                stderr,
694            ),
695            Self { process },
696        ))
697    }
698
699    fn has_adapter_logs(&self) -> bool {
700        false
701    }
702
703    async fn kill(&self) -> Result<()> {
704        self.process.lock().await.kill()?;
705        Ok(())
706    }
707}
708
709#[cfg(any(test, feature = "test-support"))]
710type RequestHandler = Box<
711    dyn Send
712        + FnMut(
713            u64,
714            serde_json::Value,
715            Arc<Mutex<async_pipe::PipeWriter>>,
716        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
717>;
718
719#[cfg(any(test, feature = "test-support"))]
720type ResponseHandler = Box<dyn Send + Fn(Response)>;
721
722#[cfg(any(test, feature = "test-support"))]
723pub struct FakeTransport {
724    // for sending fake response back from adapter side
725    request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
726    // for reverse request responses
727    response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
728}
729
730#[cfg(any(test, feature = "test-support"))]
731impl FakeTransport {
732    pub async fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
733    where
734        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
735    {
736        self.request_handlers.lock().await.insert(
737            R::COMMAND,
738            Box::new(
739                move |seq, args, writer: Arc<Mutex<async_pipe::PipeWriter>>| {
740                    let response = handler(seq, serde_json::from_value(args).unwrap());
741
742                    let message = serde_json::to_string(&Message::Response(Response {
743                        seq: seq + 1,
744                        request_seq: seq,
745                        success: response.as_ref().is_ok(),
746                        command: R::COMMAND.into(),
747                        body: util::maybe!({ serde_json::to_value(response.ok()?).ok() }),
748                        message: None,
749                    }))
750                    .unwrap();
751
752                    let writer = writer.clone();
753
754                    Box::pin(async move {
755                        let mut writer = writer.lock().await;
756                        writer
757                            .write_all(TransportDelegate::build_rpc_message(message).as_bytes())
758                            .await
759                            .unwrap();
760                        writer.flush().await.unwrap();
761                    })
762                },
763            ),
764        );
765    }
766
767    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
768    where
769        F: 'static + Send + Fn(Response),
770    {
771        self.response_handlers
772            .lock()
773            .await
774            .insert(R::COMMAND, Box::new(handler));
775    }
776
777    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
778        let this = Self {
779            request_handlers: Arc::new(Mutex::new(HashMap::default())),
780            response_handlers: Arc::new(Mutex::new(HashMap::default())),
781        };
782        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
783        use serde_json::json;
784
785        let (stdin_writer, stdin_reader) = async_pipe::pipe();
786        let (stdout_writer, stdout_reader) = async_pipe::pipe();
787
788        let request_handlers = this.request_handlers.clone();
789        let response_handlers = this.response_handlers.clone();
790        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
791
792        cx.background_executor()
793            .spawn(async move {
794                let mut reader = BufReader::new(stdin_reader);
795                let mut buffer = String::new();
796
797                loop {
798                    let message =
799                        TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
800                            .await;
801
802                    match message {
803                        Err(error) => {
804                            break anyhow!(error);
805                        }
806                        Ok(message) => {
807                            match message {
808                                Message::Request(request) => {
809                                    // redirect reverse requests to stdout writer/reader
810                                    if request.command == RunInTerminal::COMMAND
811                                        || request.command == StartDebugging::COMMAND
812                                    {
813                                        let message =
814                                            serde_json::to_string(&Message::Request(request))
815                                                .unwrap();
816
817                                        let mut writer = stdout_writer.lock().await;
818                                        writer
819                                            .write_all(
820                                                TransportDelegate::build_rpc_message(message)
821                                                    .as_bytes(),
822                                            )
823                                            .await
824                                            .unwrap();
825                                        writer.flush().await.unwrap();
826                                    } else {
827                                        if let Some(handle) = request_handlers
828                                            .lock()
829                                            .await
830                                            .get_mut(request.command.as_str())
831                                        {
832                                            handle(
833                                                request.seq,
834                                                request.arguments.unwrap_or(json!({})),
835                                                stdout_writer.clone(),
836                                            )
837                                            .await;
838                                        } else {
839                                            log::error!(
840                                                "No request handler for {}",
841                                                request.command
842                                            );
843                                        }
844                                    }
845                                }
846                                Message::Event(event) => {
847                                    let message =
848                                        serde_json::to_string(&Message::Event(event)).unwrap();
849
850                                    let mut writer = stdout_writer.lock().await;
851                                    writer
852                                        .write_all(
853                                            TransportDelegate::build_rpc_message(message)
854                                                .as_bytes(),
855                                        )
856                                        .await
857                                        .unwrap();
858                                    writer.flush().await.unwrap();
859                                }
860                                Message::Response(response) => {
861                                    if let Some(handle) = response_handlers
862                                        .lock()
863                                        .await
864                                        .get(response.command.as_str())
865                                    {
866                                        handle(response);
867                                    } else {
868                                        log::error!("No response handler for {}", response.command);
869                                    }
870                                }
871                            }
872                        }
873                    }
874                }
875            })
876            .detach();
877
878        Ok((
879            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
880            this,
881        ))
882    }
883
884    fn has_adapter_logs(&self) -> bool {
885        false
886    }
887
888    async fn kill(&self) -> Result<()> {
889        Ok(())
890    }
891}