transport.rs

  1use anyhow::{Context, Result, anyhow, bail};
  2use dap_types::{
  3    ErrorResponse,
  4    messages::{Message, Response},
  5};
  6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  7use gpui::AsyncApp;
  8use settings::Settings as _;
  9use smallvec::SmallVec;
 10use smol::{
 11    channel::{Receiver, Sender, unbounded},
 12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 13    lock::Mutex,
 14    net::{TcpListener, TcpStream},
 15    process::Child,
 16};
 17use std::{
 18    collections::HashMap,
 19    net::{Ipv4Addr, SocketAddrV4},
 20    process::Stdio,
 21    sync::Arc,
 22    time::Duration,
 23};
 24use task::TcpArgumentsTemplate;
 25use util::ResultExt as _;
 26
 27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 28
 29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 30
 31#[derive(PartialEq, Eq, Clone, Copy)]
 32pub enum LogKind {
 33    Adapter,
 34    Rpc,
 35}
 36
 37pub enum IoKind {
 38    StdIn,
 39    StdOut,
 40    StdErr,
 41}
 42
 43pub struct TransportPipe {
 44    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 45    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 46    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 47    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 48}
 49
 50impl TransportPipe {
 51    pub fn new(
 52        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 53        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 54        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 55        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 56    ) -> Self {
 57        TransportPipe {
 58            input,
 59            output,
 60            stdout,
 61            stderr,
 62        }
 63    }
 64}
 65
 66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 68
 69pub enum Transport {
 70    Stdio(StdioTransport),
 71    Tcp(TcpTransport),
 72    #[cfg(any(test, feature = "test-support"))]
 73    Fake(FakeTransport),
 74}
 75
 76impl Transport {
 77    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 78        #[cfg(any(test, feature = "test-support"))]
 79        if cfg!(any(test, feature = "test-support")) {
 80            return FakeTransport::start(cx)
 81                .await
 82                .map(|(transports, fake)| (transports, Self::Fake(fake)));
 83        }
 84
 85        if binary.connection.is_some() {
 86            TcpTransport::start(binary, cx)
 87                .await
 88                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
 89        } else {
 90            StdioTransport::start(binary, cx)
 91                .await
 92                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
 93        }
 94    }
 95
 96    fn has_adapter_logs(&self) -> bool {
 97        match self {
 98            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
 99            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100            #[cfg(any(test, feature = "test-support"))]
101            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102        }
103    }
104
105    async fn kill(&self) -> Result<()> {
106        match self {
107            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109            #[cfg(any(test, feature = "test-support"))]
110            Transport::Fake(fake_transport) => fake_transport.kill().await,
111        }
112    }
113
114    #[cfg(any(test, feature = "test-support"))]
115    pub(crate) fn as_fake(&self) -> &FakeTransport {
116        match self {
117            Transport::Fake(fake_transport) => fake_transport,
118            _ => panic!("Not a fake transport layer"),
119        }
120    }
121}
122
123pub(crate) struct TransportDelegate {
124    log_handlers: LogHandlers,
125    current_requests: Requests,
126    pending_requests: Requests,
127    transport: Transport,
128    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129}
130
131impl TransportDelegate {
132    pub(crate) async fn start(
133        binary: &DebugAdapterBinary,
134        cx: AsyncApp,
135    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
136        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
137        let mut this = Self {
138            transport,
139            server_tx: Default::default(),
140            log_handlers: Default::default(),
141            current_requests: Default::default(),
142            pending_requests: Default::default(),
143        };
144        let messages = this.start_handlers(transport_pipes, cx).await?;
145        Ok((messages, this))
146    }
147
148    async fn start_handlers(
149        &mut self,
150        mut params: TransportPipe,
151        cx: AsyncApp,
152    ) -> Result<(Receiver<Message>, Sender<Message>)> {
153        let (client_tx, server_rx) = unbounded::<Message>();
154        let (server_tx, client_rx) = unbounded::<Message>();
155
156        let log_dap_communications =
157            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
158                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
159                .unwrap_or(false);
160
161        let log_handler = if log_dap_communications {
162            Some(self.log_handlers.clone())
163        } else {
164            None
165        };
166
167        cx.update(|cx| {
168            if let Some(stdout) = params.stdout.take() {
169                cx.background_executor()
170                    .spawn(Self::handle_adapter_log(stdout, log_handler.clone()))
171                    .detach_and_log_err(cx);
172            }
173
174            cx.background_executor()
175                .spawn(Self::handle_output(
176                    params.output,
177                    client_tx,
178                    self.pending_requests.clone(),
179                    log_handler.clone(),
180                ))
181                .detach_and_log_err(cx);
182
183            if let Some(stderr) = params.stderr.take() {
184                cx.background_executor()
185                    .spawn(Self::handle_error(stderr, self.log_handlers.clone()))
186                    .detach_and_log_err(cx);
187            }
188
189            cx.background_executor()
190                .spawn(Self::handle_input(
191                    params.input,
192                    client_rx,
193                    self.current_requests.clone(),
194                    self.pending_requests.clone(),
195                    log_handler.clone(),
196                ))
197                .detach_and_log_err(cx);
198        })?;
199
200        {
201            let mut lock = self.server_tx.lock().await;
202            *lock = Some(server_tx.clone());
203        }
204
205        Ok((server_rx, server_tx))
206    }
207
208    pub(crate) async fn add_pending_request(
209        &self,
210        sequence_id: u64,
211        request: oneshot::Sender<Result<Response>>,
212    ) {
213        let mut pending_requests = self.pending_requests.lock().await;
214        pending_requests.insert(sequence_id, request);
215    }
216
217    pub(crate) async fn cancel_pending_request(&self, sequence_id: &u64) {
218        let mut pending_requests = self.pending_requests.lock().await;
219        pending_requests.remove(sequence_id);
220    }
221
222    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
223        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
224            server_tx
225                .send(message)
226                .await
227                .map_err(|e| anyhow!("Failed to send message: {}", e))
228        } else {
229            Err(anyhow!("Server tx already dropped"))
230        }
231    }
232
233    async fn handle_adapter_log<Stdout>(
234        stdout: Stdout,
235        log_handlers: Option<LogHandlers>,
236    ) -> Result<()>
237    where
238        Stdout: AsyncRead + Unpin + Send + 'static,
239    {
240        let mut reader = BufReader::new(stdout);
241        let mut line = String::new();
242
243        let result = loop {
244            line.truncate(0);
245
246            let bytes_read = match reader.read_line(&mut line).await {
247                Ok(bytes_read) => bytes_read,
248                Err(e) => break Err(e.into()),
249            };
250
251            if bytes_read == 0 {
252                break Err(anyhow!("Debugger log stream closed"));
253            }
254
255            if let Some(log_handlers) = log_handlers.as_ref() {
256                for (kind, handler) in log_handlers.lock().iter_mut() {
257                    if matches!(kind, LogKind::Adapter) {
258                        handler(IoKind::StdOut, line.as_str());
259                    }
260                }
261            }
262        };
263
264        log::debug!("Handle adapter log dropped");
265
266        result
267    }
268
269    fn build_rpc_message(message: String) -> String {
270        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
271    }
272
273    async fn handle_input<Stdin>(
274        mut server_stdin: Stdin,
275        client_rx: Receiver<Message>,
276        current_requests: Requests,
277        pending_requests: Requests,
278        log_handlers: Option<LogHandlers>,
279    ) -> Result<()>
280    where
281        Stdin: AsyncWrite + Unpin + Send + 'static,
282    {
283        let result = loop {
284            match client_rx.recv().await {
285                Ok(message) => {
286                    if let Message::Request(request) = &message {
287                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
288                            pending_requests.lock().await.insert(request.seq, sender);
289                        }
290                    }
291
292                    let message = match serde_json::to_string(&message) {
293                        Ok(message) => message,
294                        Err(e) => break Err(e.into()),
295                    };
296
297                    if let Some(log_handlers) = log_handlers.as_ref() {
298                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
299                            if matches!(kind, LogKind::Rpc) {
300                                log_handler(IoKind::StdIn, &message);
301                            }
302                        }
303                    }
304
305                    if let Err(e) = server_stdin
306                        .write_all(Self::build_rpc_message(message).as_bytes())
307                        .await
308                    {
309                        break Err(e.into());
310                    }
311
312                    if let Err(e) = server_stdin.flush().await {
313                        break Err(e.into());
314                    }
315                }
316                Err(error) => break Err(error.into()),
317            }
318        };
319
320        log::debug!("Handle adapter input dropped");
321
322        result
323    }
324
325    async fn handle_output<Stdout>(
326        server_stdout: Stdout,
327        client_tx: Sender<Message>,
328        pending_requests: Requests,
329        log_handlers: Option<LogHandlers>,
330    ) -> Result<()>
331    where
332        Stdout: AsyncRead + Unpin + Send + 'static,
333    {
334        let mut recv_buffer = String::new();
335        let mut reader = BufReader::new(server_stdout);
336
337        let result = loop {
338            let message =
339                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
340                    .await;
341
342            match message {
343                Ok(Message::Response(res)) => {
344                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
345                        if let Err(e) = tx.send(Self::process_response(res)) {
346                            log::trace!("Did not send response `{:?}` for a cancelled", e);
347                        }
348                    } else {
349                        client_tx.send(Message::Response(res)).await?;
350                    };
351                }
352                Ok(message) => {
353                    client_tx.send(message).await?;
354                }
355                Err(e) => break Err(e),
356            }
357        };
358
359        drop(client_tx);
360
361        log::debug!("Handle adapter output dropped");
362
363        result
364    }
365
366    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
367    where
368        Stderr: AsyncRead + Unpin + Send + 'static,
369    {
370        let mut buffer = String::new();
371
372        let mut reader = BufReader::new(stderr);
373
374        let result = loop {
375            match reader.read_line(&mut buffer).await {
376                Ok(0) => break Err(anyhow!("debugger error stream closed")),
377                Ok(_) => {
378                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
379                        if matches!(kind, LogKind::Adapter) {
380                            log_handler(IoKind::StdErr, buffer.as_str());
381                        }
382                    }
383
384                    buffer.truncate(0);
385                }
386                Err(error) => break Err(error.into()),
387            }
388        };
389
390        log::debug!("Handle adapter error dropped");
391
392        result
393    }
394
395    fn process_response(response: Response) -> Result<Response> {
396        if response.success {
397            Ok(response)
398        } else {
399            if let Some(error_message) = response
400                .body
401                .clone()
402                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
403                .and_then(|response| response.error.map(|msg| msg.format))
404                .or_else(|| response.message.clone())
405            {
406                return Err(anyhow!(error_message));
407            };
408
409            Err(anyhow!(
410                "Received error response from adapter. Response: {:?}",
411                response.clone()
412            ))
413        }
414    }
415
416    async fn receive_server_message<Stdout>(
417        reader: &mut BufReader<Stdout>,
418        buffer: &mut String,
419        log_handlers: Option<&LogHandlers>,
420    ) -> Result<Message>
421    where
422        Stdout: AsyncRead + Unpin + Send + 'static,
423    {
424        let mut content_length = None;
425        loop {
426            buffer.truncate(0);
427
428            if reader
429                .read_line(buffer)
430                .await
431                .with_context(|| "reading a message from server")?
432                == 0
433            {
434                return Err(anyhow!("debugger reader stream closed"));
435            };
436
437            if buffer == "\r\n" {
438                break;
439            }
440
441            let parts = buffer.trim().split_once(": ");
442
443            match parts {
444                Some(("Content-Length", value)) => {
445                    content_length = Some(value.parse().context("invalid content length")?);
446                }
447                _ => {}
448            }
449        }
450
451        let content_length = content_length.context("missing content length")?;
452
453        let mut content = vec![0; content_length];
454        reader
455            .read_exact(&mut content)
456            .await
457            .with_context(|| "reading after a loop")?;
458
459        let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
460
461        if let Some(log_handlers) = log_handlers {
462            for (kind, log_handler) in log_handlers.lock().iter_mut() {
463                if matches!(kind, LogKind::Rpc) {
464                    log_handler(IoKind::StdOut, &message);
465                }
466            }
467        }
468
469        Ok(serde_json::from_str::<Message>(message)?)
470    }
471
472    pub async fn shutdown(&self) -> Result<()> {
473        log::debug!("Start shutdown client");
474
475        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
476            server_tx.close();
477        }
478
479        let mut current_requests = self.current_requests.lock().await;
480        let mut pending_requests = self.pending_requests.lock().await;
481
482        current_requests.clear();
483        pending_requests.clear();
484
485        let _ = self.transport.kill().await.log_err();
486
487        drop(current_requests);
488        drop(pending_requests);
489
490        log::debug!("Shutdown client completed");
491
492        anyhow::Ok(())
493    }
494
495    pub fn has_adapter_logs(&self) -> bool {
496        self.transport.has_adapter_logs()
497    }
498
499    pub fn transport(&self) -> &Transport {
500        &self.transport
501    }
502
503    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
504    where
505        F: 'static + Send + FnMut(IoKind, &str),
506    {
507        let mut log_handlers = self.log_handlers.lock();
508        log_handlers.push((kind, Box::new(f)));
509    }
510}
511
512pub struct TcpTransport {
513    pub port: u16,
514    pub host: Ipv4Addr,
515    pub timeout: u64,
516    process: Mutex<Child>,
517}
518
519impl TcpTransport {
520    /// Get an open port to use with the tcp client when not supplied by debug config
521    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
522        if let Some(port) = host.port {
523            Ok(port)
524        } else {
525            Self::unused_port(host.host()).await
526        }
527    }
528
529    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
530        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
531            .await?
532            .local_addr()?
533            .port())
534    }
535
536    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
537        let Some(connection_args) = binary.connection.as_ref() else {
538            return Err(anyhow!("No connection arguments provided"));
539        };
540
541        let host = connection_args.host;
542        let port = connection_args.port;
543
544        let mut command = util::command::new_smol_command(&binary.command);
545
546        if let Some(cwd) = &binary.cwd {
547            command.current_dir(cwd);
548        }
549
550        command.args(&binary.arguments);
551        command.envs(&binary.envs);
552
553        command
554            .stdin(Stdio::null())
555            .stdout(Stdio::piped())
556            .stderr(Stdio::piped())
557            .kill_on_drop(true);
558
559        let mut process = command
560            .spawn()
561            .with_context(|| "failed to start debug adapter.")?;
562
563        let address = SocketAddrV4::new(host, port);
564
565        let timeout = connection_args.timeout.unwrap_or_else(|| {
566            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
567                .unwrap_or(2000u64)
568        });
569
570        let (rx, tx) = select! {
571            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
572                return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
573            },
574            result = cx.spawn(async move |cx| {
575                loop {
576                    match TcpStream::connect(address).await {
577                        Ok(stream) => return stream.split(),
578                        Err(_) => {
579                            cx.background_executor().timer(Duration::from_millis(100)).await;
580                        }
581                    }
582                }
583            }).fuse() => result
584        };
585        log::info!(
586            "Debug adapter has connected to TCP server {}:{}",
587            host,
588            port
589        );
590        let stdout = process.stdout.take();
591        let stderr = process.stderr.take();
592
593        let this = Self {
594            port,
595            host,
596            process: Mutex::new(process),
597            timeout,
598        };
599
600        let pipe = TransportPipe::new(
601            Box::new(tx),
602            Box::new(BufReader::new(rx)),
603            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
604            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
605        );
606
607        Ok((pipe, this))
608    }
609
610    fn has_adapter_logs(&self) -> bool {
611        true
612    }
613
614    async fn kill(&self) -> Result<()> {
615        self.process.lock().await.kill()?;
616
617        Ok(())
618    }
619}
620
621pub struct StdioTransport {
622    process: Mutex<Child>,
623}
624
625impl StdioTransport {
626    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
627    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
628        let mut command = util::command::new_smol_command(&binary.command);
629
630        if let Some(cwd) = &binary.cwd {
631            command.current_dir(cwd);
632        }
633
634        command.args(&binary.arguments);
635        command.envs(&binary.envs);
636
637        command
638            .stdin(Stdio::piped())
639            .stdout(Stdio::piped())
640            .stderr(Stdio::piped())
641            .kill_on_drop(true);
642
643        let mut process = command
644            .spawn()
645            .with_context(|| "failed to spawn command.")?;
646
647        let stdin = process
648            .stdin
649            .take()
650            .ok_or_else(|| anyhow!("Failed to open stdin"))?;
651        let stdout = process
652            .stdout
653            .take()
654            .ok_or_else(|| anyhow!("Failed to open stdout"))?;
655        let stderr = process
656            .stderr
657            .take()
658            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
659
660        if stderr.is_none() {
661            bail!(
662                "Failed to connect to stderr for debug adapter command {}",
663                &binary.command
664            );
665        }
666
667        log::info!("Debug adapter has connected to stdio adapter");
668
669        let process = Mutex::new(process);
670
671        Ok((
672            TransportPipe::new(
673                Box::new(stdin),
674                Box::new(BufReader::new(stdout)),
675                None,
676                stderr,
677            ),
678            Self { process },
679        ))
680    }
681
682    fn has_adapter_logs(&self) -> bool {
683        false
684    }
685
686    async fn kill(&self) -> Result<()> {
687        self.process.lock().await.kill()?;
688        Ok(())
689    }
690}
691
692#[cfg(any(test, feature = "test-support"))]
693type RequestHandler =
694    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
695
696#[cfg(any(test, feature = "test-support"))]
697type ResponseHandler = Box<dyn Send + Fn(Response)>;
698
699#[cfg(any(test, feature = "test-support"))]
700pub struct FakeTransport {
701    // for sending fake response back from adapter side
702    request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
703    // for reverse request responses
704    response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
705}
706
707#[cfg(any(test, feature = "test-support"))]
708impl FakeTransport {
709    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
710    where
711        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
712    {
713        self.request_handlers.lock().insert(
714            R::COMMAND,
715            Box::new(move |seq, args| {
716                let result = handler(seq, serde_json::from_value(args).unwrap());
717                let response = match result {
718                    Ok(response) => Response {
719                        seq: seq + 1,
720                        request_seq: seq,
721                        success: true,
722                        command: R::COMMAND.into(),
723                        body: Some(serde_json::to_value(response).unwrap()),
724                        message: None,
725                    },
726                    Err(response) => Response {
727                        seq: seq + 1,
728                        request_seq: seq,
729                        success: false,
730                        command: R::COMMAND.into(),
731                        body: Some(serde_json::to_value(response).unwrap()),
732                        message: None,
733                    },
734                };
735                response
736            }),
737        );
738    }
739
740    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
741    where
742        F: 'static + Send + Fn(Response),
743    {
744        self.response_handlers
745            .lock()
746            .insert(R::COMMAND, Box::new(handler));
747    }
748
749    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
750        let this = Self {
751            request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
752            response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
753        };
754        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
755        use serde_json::json;
756
757        let (stdin_writer, stdin_reader) = async_pipe::pipe();
758        let (stdout_writer, stdout_reader) = async_pipe::pipe();
759
760        let request_handlers = this.request_handlers.clone();
761        let response_handlers = this.response_handlers.clone();
762        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
763
764        cx.background_executor()
765            .spawn(async move {
766                let mut reader = BufReader::new(stdin_reader);
767                let mut buffer = String::new();
768
769                loop {
770                    let message =
771                        TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
772                            .await;
773
774                    match message {
775                        Err(error) => {
776                            break anyhow!(error);
777                        }
778                        Ok(message) => {
779                            match message {
780                                Message::Request(request) => {
781                                    // redirect reverse requests to stdout writer/reader
782                                    if request.command == RunInTerminal::COMMAND
783                                        || request.command == StartDebugging::COMMAND
784                                    {
785                                        let message =
786                                            serde_json::to_string(&Message::Request(request))
787                                                .unwrap();
788
789                                        let mut writer = stdout_writer.lock().await;
790                                        writer
791                                            .write_all(
792                                                TransportDelegate::build_rpc_message(message)
793                                                    .as_bytes(),
794                                            )
795                                            .await
796                                            .unwrap();
797                                        writer.flush().await.unwrap();
798                                    } else {
799                                        let response = if let Some(handle) = request_handlers
800                                            .lock()
801                                            .get_mut(request.command.as_str())
802                                        {
803                                            handle(
804                                                request.seq,
805                                                request.arguments.unwrap_or(json!({})),
806                                            )
807                                        } else {
808                                            panic!("No request handler for {}", request.command);
809                                        };
810                                        let message =
811                                            serde_json::to_string(&Message::Response(response))
812                                                .unwrap();
813
814                                        let mut writer = stdout_writer.lock().await;
815
816                                        writer
817                                            .write_all(
818                                                TransportDelegate::build_rpc_message(message)
819                                                    .as_bytes(),
820                                            )
821                                            .await
822                                            .unwrap();
823                                        writer.flush().await.unwrap();
824                                    }
825                                }
826                                Message::Event(event) => {
827                                    let message =
828                                        serde_json::to_string(&Message::Event(event)).unwrap();
829
830                                    let mut writer = stdout_writer.lock().await;
831                                    writer
832                                        .write_all(
833                                            TransportDelegate::build_rpc_message(message)
834                                                .as_bytes(),
835                                        )
836                                        .await
837                                        .unwrap();
838                                    writer.flush().await.unwrap();
839                                }
840                                Message::Response(response) => {
841                                    if let Some(handle) =
842                                        response_handlers.lock().get(response.command.as_str())
843                                    {
844                                        handle(response);
845                                    } else {
846                                        log::error!("No response handler for {}", response.command);
847                                    }
848                                }
849                            }
850                        }
851                    }
852                }
853            })
854            .detach();
855
856        Ok((
857            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
858            this,
859        ))
860    }
861
862    fn has_adapter_logs(&self) -> bool {
863        false
864    }
865
866    async fn kill(&self) -> Result<()> {
867        Ok(())
868    }
869}