1use anyhow::{Context, Result, anyhow, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::AsyncApp;
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TcpArgumentsTemplate;
25use util::ResultExt as _;
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 if binary.connection.is_some() {
86 TcpTransport::start(binary, cx)
87 .await
88 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
89 } else {
90 StdioTransport::start(binary, cx)
91 .await
92 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
93 }
94 }
95
96 fn has_adapter_logs(&self) -> bool {
97 match self {
98 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
99 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100 #[cfg(any(test, feature = "test-support"))]
101 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102 }
103 }
104
105 async fn kill(&self) -> Result<()> {
106 match self {
107 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109 #[cfg(any(test, feature = "test-support"))]
110 Transport::Fake(fake_transport) => fake_transport.kill().await,
111 }
112 }
113
114 #[cfg(any(test, feature = "test-support"))]
115 pub(crate) fn as_fake(&self) -> &FakeTransport {
116 match self {
117 Transport::Fake(fake_transport) => fake_transport,
118 _ => panic!("Not a fake transport layer"),
119 }
120 }
121}
122
123pub(crate) struct TransportDelegate {
124 log_handlers: LogHandlers,
125 current_requests: Requests,
126 pending_requests: Requests,
127 transport: Transport,
128 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129}
130
131impl TransportDelegate {
132 pub(crate) async fn start(
133 binary: &DebugAdapterBinary,
134 cx: AsyncApp,
135 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
136 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
137 let mut this = Self {
138 transport,
139 server_tx: Default::default(),
140 log_handlers: Default::default(),
141 current_requests: Default::default(),
142 pending_requests: Default::default(),
143 };
144 let messages = this.start_handlers(transport_pipes, cx).await?;
145 Ok((messages, this))
146 }
147
148 async fn start_handlers(
149 &mut self,
150 mut params: TransportPipe,
151 cx: AsyncApp,
152 ) -> Result<(Receiver<Message>, Sender<Message>)> {
153 let (client_tx, server_rx) = unbounded::<Message>();
154 let (server_tx, client_rx) = unbounded::<Message>();
155
156 let log_dap_communications =
157 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
158 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
159 .unwrap_or(false);
160
161 let log_handler = if log_dap_communications {
162 Some(self.log_handlers.clone())
163 } else {
164 None
165 };
166
167 cx.update(|cx| {
168 if let Some(stdout) = params.stdout.take() {
169 cx.background_executor()
170 .spawn(Self::handle_adapter_log(stdout, log_handler.clone()))
171 .detach_and_log_err(cx);
172 }
173
174 cx.background_executor()
175 .spawn(Self::handle_output(
176 params.output,
177 client_tx,
178 self.pending_requests.clone(),
179 log_handler.clone(),
180 ))
181 .detach_and_log_err(cx);
182
183 if let Some(stderr) = params.stderr.take() {
184 cx.background_executor()
185 .spawn(Self::handle_error(stderr, self.log_handlers.clone()))
186 .detach_and_log_err(cx);
187 }
188
189 cx.background_executor()
190 .spawn(Self::handle_input(
191 params.input,
192 client_rx,
193 self.current_requests.clone(),
194 self.pending_requests.clone(),
195 log_handler.clone(),
196 ))
197 .detach_and_log_err(cx);
198 })?;
199
200 {
201 let mut lock = self.server_tx.lock().await;
202 *lock = Some(server_tx.clone());
203 }
204
205 Ok((server_rx, server_tx))
206 }
207
208 pub(crate) async fn add_pending_request(
209 &self,
210 sequence_id: u64,
211 request: oneshot::Sender<Result<Response>>,
212 ) {
213 let mut pending_requests = self.pending_requests.lock().await;
214 pending_requests.insert(sequence_id, request);
215 }
216
217 pub(crate) async fn cancel_pending_request(&self, sequence_id: &u64) {
218 let mut pending_requests = self.pending_requests.lock().await;
219 pending_requests.remove(sequence_id);
220 }
221
222 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
223 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
224 server_tx
225 .send(message)
226 .await
227 .map_err(|e| anyhow!("Failed to send message: {}", e))
228 } else {
229 Err(anyhow!("Server tx already dropped"))
230 }
231 }
232
233 async fn handle_adapter_log<Stdout>(
234 stdout: Stdout,
235 log_handlers: Option<LogHandlers>,
236 ) -> Result<()>
237 where
238 Stdout: AsyncRead + Unpin + Send + 'static,
239 {
240 let mut reader = BufReader::new(stdout);
241 let mut line = String::new();
242
243 let result = loop {
244 line.truncate(0);
245
246 let bytes_read = match reader.read_line(&mut line).await {
247 Ok(bytes_read) => bytes_read,
248 Err(e) => break Err(e.into()),
249 };
250
251 if bytes_read == 0 {
252 break Err(anyhow!("Debugger log stream closed"));
253 }
254
255 if let Some(log_handlers) = log_handlers.as_ref() {
256 for (kind, handler) in log_handlers.lock().iter_mut() {
257 if matches!(kind, LogKind::Adapter) {
258 handler(IoKind::StdOut, line.as_str());
259 }
260 }
261 }
262 };
263
264 log::debug!("Handle adapter log dropped");
265
266 result
267 }
268
269 fn build_rpc_message(message: String) -> String {
270 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
271 }
272
273 async fn handle_input<Stdin>(
274 mut server_stdin: Stdin,
275 client_rx: Receiver<Message>,
276 current_requests: Requests,
277 pending_requests: Requests,
278 log_handlers: Option<LogHandlers>,
279 ) -> Result<()>
280 where
281 Stdin: AsyncWrite + Unpin + Send + 'static,
282 {
283 let result = loop {
284 match client_rx.recv().await {
285 Ok(message) => {
286 if let Message::Request(request) = &message {
287 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
288 pending_requests.lock().await.insert(request.seq, sender);
289 }
290 }
291
292 let message = match serde_json::to_string(&message) {
293 Ok(message) => message,
294 Err(e) => break Err(e.into()),
295 };
296
297 if let Some(log_handlers) = log_handlers.as_ref() {
298 for (kind, log_handler) in log_handlers.lock().iter_mut() {
299 if matches!(kind, LogKind::Rpc) {
300 log_handler(IoKind::StdIn, &message);
301 }
302 }
303 }
304
305 if let Err(e) = server_stdin
306 .write_all(Self::build_rpc_message(message).as_bytes())
307 .await
308 {
309 break Err(e.into());
310 }
311
312 if let Err(e) = server_stdin.flush().await {
313 break Err(e.into());
314 }
315 }
316 Err(error) => break Err(error.into()),
317 }
318 };
319
320 log::debug!("Handle adapter input dropped");
321
322 result
323 }
324
325 async fn handle_output<Stdout>(
326 server_stdout: Stdout,
327 client_tx: Sender<Message>,
328 pending_requests: Requests,
329 log_handlers: Option<LogHandlers>,
330 ) -> Result<()>
331 where
332 Stdout: AsyncRead + Unpin + Send + 'static,
333 {
334 let mut recv_buffer = String::new();
335 let mut reader = BufReader::new(server_stdout);
336
337 let result = loop {
338 let message =
339 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
340 .await;
341
342 match message {
343 Ok(Message::Response(res)) => {
344 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
345 if let Err(e) = tx.send(Self::process_response(res)) {
346 log::trace!("Did not send response `{:?}` for a cancelled", e);
347 }
348 } else {
349 client_tx.send(Message::Response(res)).await?;
350 };
351 }
352 Ok(message) => {
353 client_tx.send(message).await?;
354 }
355 Err(e) => break Err(e),
356 }
357 };
358
359 drop(client_tx);
360
361 log::debug!("Handle adapter output dropped");
362
363 result
364 }
365
366 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
367 where
368 Stderr: AsyncRead + Unpin + Send + 'static,
369 {
370 let mut buffer = String::new();
371
372 let mut reader = BufReader::new(stderr);
373
374 let result = loop {
375 match reader.read_line(&mut buffer).await {
376 Ok(0) => break Err(anyhow!("debugger error stream closed")),
377 Ok(_) => {
378 for (kind, log_handler) in log_handlers.lock().iter_mut() {
379 if matches!(kind, LogKind::Adapter) {
380 log_handler(IoKind::StdErr, buffer.as_str());
381 }
382 }
383
384 buffer.truncate(0);
385 }
386 Err(error) => break Err(error.into()),
387 }
388 };
389
390 log::debug!("Handle adapter error dropped");
391
392 result
393 }
394
395 fn process_response(response: Response) -> Result<Response> {
396 if response.success {
397 Ok(response)
398 } else {
399 if let Some(error_message) = response
400 .body
401 .clone()
402 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
403 .and_then(|response| response.error.map(|msg| msg.format))
404 .or_else(|| response.message.clone())
405 {
406 return Err(anyhow!(error_message));
407 };
408
409 Err(anyhow!(
410 "Received error response from adapter. Response: {:?}",
411 response.clone()
412 ))
413 }
414 }
415
416 async fn receive_server_message<Stdout>(
417 reader: &mut BufReader<Stdout>,
418 buffer: &mut String,
419 log_handlers: Option<&LogHandlers>,
420 ) -> Result<Message>
421 where
422 Stdout: AsyncRead + Unpin + Send + 'static,
423 {
424 let mut content_length = None;
425 loop {
426 buffer.truncate(0);
427
428 if reader
429 .read_line(buffer)
430 .await
431 .with_context(|| "reading a message from server")?
432 == 0
433 {
434 return Err(anyhow!("debugger reader stream closed"));
435 };
436
437 if buffer == "\r\n" {
438 break;
439 }
440
441 let parts = buffer.trim().split_once(": ");
442
443 match parts {
444 Some(("Content-Length", value)) => {
445 content_length = Some(value.parse().context("invalid content length")?);
446 }
447 _ => {}
448 }
449 }
450
451 let content_length = content_length.context("missing content length")?;
452
453 let mut content = vec![0; content_length];
454 reader
455 .read_exact(&mut content)
456 .await
457 .with_context(|| "reading after a loop")?;
458
459 let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
460
461 if let Some(log_handlers) = log_handlers {
462 for (kind, log_handler) in log_handlers.lock().iter_mut() {
463 if matches!(kind, LogKind::Rpc) {
464 log_handler(IoKind::StdOut, &message);
465 }
466 }
467 }
468
469 Ok(serde_json::from_str::<Message>(message)?)
470 }
471
472 pub async fn shutdown(&self) -> Result<()> {
473 log::debug!("Start shutdown client");
474
475 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
476 server_tx.close();
477 }
478
479 let mut current_requests = self.current_requests.lock().await;
480 let mut pending_requests = self.pending_requests.lock().await;
481
482 current_requests.clear();
483 pending_requests.clear();
484
485 let _ = self.transport.kill().await.log_err();
486
487 drop(current_requests);
488 drop(pending_requests);
489
490 log::debug!("Shutdown client completed");
491
492 anyhow::Ok(())
493 }
494
495 pub fn has_adapter_logs(&self) -> bool {
496 self.transport.has_adapter_logs()
497 }
498
499 pub fn transport(&self) -> &Transport {
500 &self.transport
501 }
502
503 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
504 where
505 F: 'static + Send + FnMut(IoKind, &str),
506 {
507 let mut log_handlers = self.log_handlers.lock();
508 log_handlers.push((kind, Box::new(f)));
509 }
510}
511
512pub struct TcpTransport {
513 pub port: u16,
514 pub host: Ipv4Addr,
515 pub timeout: u64,
516 process: Mutex<Child>,
517}
518
519impl TcpTransport {
520 /// Get an open port to use with the tcp client when not supplied by debug config
521 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
522 if let Some(port) = host.port {
523 Ok(port)
524 } else {
525 Self::unused_port(host.host()).await
526 }
527 }
528
529 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
530 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
531 .await?
532 .local_addr()?
533 .port())
534 }
535
536 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
537 let Some(connection_args) = binary.connection.as_ref() else {
538 return Err(anyhow!("No connection arguments provided"));
539 };
540
541 let host = connection_args.host;
542 let port = connection_args.port;
543
544 let mut command = util::command::new_smol_command(&binary.command);
545
546 if let Some(cwd) = &binary.cwd {
547 command.current_dir(cwd);
548 }
549
550 command.args(&binary.arguments);
551 command.envs(&binary.envs);
552
553 command
554 .stdin(Stdio::null())
555 .stdout(Stdio::piped())
556 .stderr(Stdio::piped())
557 .kill_on_drop(true);
558
559 let mut process = command
560 .spawn()
561 .with_context(|| "failed to start debug adapter.")?;
562
563 let address = SocketAddrV4::new(host, port);
564
565 let timeout = connection_args.timeout.unwrap_or_else(|| {
566 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
567 .unwrap_or(2000u64)
568 });
569
570 let (rx, tx) = select! {
571 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
572 return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
573 },
574 result = cx.spawn(async move |cx| {
575 loop {
576 match TcpStream::connect(address).await {
577 Ok(stream) => return stream.split(),
578 Err(_) => {
579 cx.background_executor().timer(Duration::from_millis(100)).await;
580 }
581 }
582 }
583 }).fuse() => result
584 };
585 log::info!(
586 "Debug adapter has connected to TCP server {}:{}",
587 host,
588 port
589 );
590 let stdout = process.stdout.take();
591 let stderr = process.stderr.take();
592
593 let this = Self {
594 port,
595 host,
596 process: Mutex::new(process),
597 timeout,
598 };
599
600 let pipe = TransportPipe::new(
601 Box::new(tx),
602 Box::new(BufReader::new(rx)),
603 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
604 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
605 );
606
607 Ok((pipe, this))
608 }
609
610 fn has_adapter_logs(&self) -> bool {
611 true
612 }
613
614 async fn kill(&self) -> Result<()> {
615 self.process.lock().await.kill()?;
616
617 Ok(())
618 }
619}
620
621pub struct StdioTransport {
622 process: Mutex<Child>,
623}
624
625impl StdioTransport {
626 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
627 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
628 let mut command = util::command::new_smol_command(&binary.command);
629
630 if let Some(cwd) = &binary.cwd {
631 command.current_dir(cwd);
632 }
633
634 command.args(&binary.arguments);
635 command.envs(&binary.envs);
636
637 command
638 .stdin(Stdio::piped())
639 .stdout(Stdio::piped())
640 .stderr(Stdio::piped())
641 .kill_on_drop(true);
642
643 let mut process = command
644 .spawn()
645 .with_context(|| "failed to spawn command.")?;
646
647 let stdin = process
648 .stdin
649 .take()
650 .ok_or_else(|| anyhow!("Failed to open stdin"))?;
651 let stdout = process
652 .stdout
653 .take()
654 .ok_or_else(|| anyhow!("Failed to open stdout"))?;
655 let stderr = process
656 .stderr
657 .take()
658 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
659
660 if stderr.is_none() {
661 bail!(
662 "Failed to connect to stderr for debug adapter command {}",
663 &binary.command
664 );
665 }
666
667 log::info!("Debug adapter has connected to stdio adapter");
668
669 let process = Mutex::new(process);
670
671 Ok((
672 TransportPipe::new(
673 Box::new(stdin),
674 Box::new(BufReader::new(stdout)),
675 None,
676 stderr,
677 ),
678 Self { process },
679 ))
680 }
681
682 fn has_adapter_logs(&self) -> bool {
683 false
684 }
685
686 async fn kill(&self) -> Result<()> {
687 self.process.lock().await.kill()?;
688 Ok(())
689 }
690}
691
692#[cfg(any(test, feature = "test-support"))]
693type RequestHandler =
694 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
695
696#[cfg(any(test, feature = "test-support"))]
697type ResponseHandler = Box<dyn Send + Fn(Response)>;
698
699#[cfg(any(test, feature = "test-support"))]
700pub struct FakeTransport {
701 // for sending fake response back from adapter side
702 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
703 // for reverse request responses
704 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
705}
706
707#[cfg(any(test, feature = "test-support"))]
708impl FakeTransport {
709 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
710 where
711 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
712 {
713 self.request_handlers.lock().insert(
714 R::COMMAND,
715 Box::new(move |seq, args| {
716 let result = handler(seq, serde_json::from_value(args).unwrap());
717 let response = match result {
718 Ok(response) => Response {
719 seq: seq + 1,
720 request_seq: seq,
721 success: true,
722 command: R::COMMAND.into(),
723 body: Some(serde_json::to_value(response).unwrap()),
724 message: None,
725 },
726 Err(response) => Response {
727 seq: seq + 1,
728 request_seq: seq,
729 success: false,
730 command: R::COMMAND.into(),
731 body: Some(serde_json::to_value(response).unwrap()),
732 message: None,
733 },
734 };
735 response
736 }),
737 );
738 }
739
740 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
741 where
742 F: 'static + Send + Fn(Response),
743 {
744 self.response_handlers
745 .lock()
746 .insert(R::COMMAND, Box::new(handler));
747 }
748
749 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
750 let this = Self {
751 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
752 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
753 };
754 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
755 use serde_json::json;
756
757 let (stdin_writer, stdin_reader) = async_pipe::pipe();
758 let (stdout_writer, stdout_reader) = async_pipe::pipe();
759
760 let request_handlers = this.request_handlers.clone();
761 let response_handlers = this.response_handlers.clone();
762 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
763
764 cx.background_executor()
765 .spawn(async move {
766 let mut reader = BufReader::new(stdin_reader);
767 let mut buffer = String::new();
768
769 loop {
770 let message =
771 TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
772 .await;
773
774 match message {
775 Err(error) => {
776 break anyhow!(error);
777 }
778 Ok(message) => {
779 match message {
780 Message::Request(request) => {
781 // redirect reverse requests to stdout writer/reader
782 if request.command == RunInTerminal::COMMAND
783 || request.command == StartDebugging::COMMAND
784 {
785 let message =
786 serde_json::to_string(&Message::Request(request))
787 .unwrap();
788
789 let mut writer = stdout_writer.lock().await;
790 writer
791 .write_all(
792 TransportDelegate::build_rpc_message(message)
793 .as_bytes(),
794 )
795 .await
796 .unwrap();
797 writer.flush().await.unwrap();
798 } else {
799 let response = if let Some(handle) = request_handlers
800 .lock()
801 .get_mut(request.command.as_str())
802 {
803 handle(
804 request.seq,
805 request.arguments.unwrap_or(json!({})),
806 )
807 } else {
808 panic!("No request handler for {}", request.command);
809 };
810 let message =
811 serde_json::to_string(&Message::Response(response))
812 .unwrap();
813
814 let mut writer = stdout_writer.lock().await;
815
816 writer
817 .write_all(
818 TransportDelegate::build_rpc_message(message)
819 .as_bytes(),
820 )
821 .await
822 .unwrap();
823 writer.flush().await.unwrap();
824 }
825 }
826 Message::Event(event) => {
827 let message =
828 serde_json::to_string(&Message::Event(event)).unwrap();
829
830 let mut writer = stdout_writer.lock().await;
831 writer
832 .write_all(
833 TransportDelegate::build_rpc_message(message)
834 .as_bytes(),
835 )
836 .await
837 .unwrap();
838 writer.flush().await.unwrap();
839 }
840 Message::Response(response) => {
841 if let Some(handle) =
842 response_handlers.lock().get(response.command.as_str())
843 {
844 handle(response);
845 } else {
846 log::error!("No response handler for {}", response.command);
847 }
848 }
849 }
850 }
851 }
852 }
853 })
854 .detach();
855
856 Ok((
857 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
858 this,
859 ))
860 }
861
862 fn has_adapter_logs(&self) -> bool {
863 false
864 }
865
866 async fn kill(&self) -> Result<()> {
867 Ok(())
868 }
869}