1use anyhow::{Context, Result, anyhow, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::AsyncApp;
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TCPHost;
25use util::ResultExt as _;
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 #[cfg(any(test, feature = "test-support"))]
78 async fn start(_: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
79 #[cfg(any(test, feature = "test-support"))]
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 #[cfg(not(any(test, feature = "test-support")))]
86 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
87 if binary.connection.is_some() {
88 TcpTransport::start(binary, cx)
89 .await
90 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
91 } else {
92 StdioTransport::start(binary, cx)
93 .await
94 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
95 }
96 }
97
98 fn has_adapter_logs(&self) -> bool {
99 match self {
100 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
101 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
102 #[cfg(any(test, feature = "test-support"))]
103 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
104 }
105 }
106
107 async fn kill(&self) -> Result<()> {
108 match self {
109 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
110 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
111 #[cfg(any(test, feature = "test-support"))]
112 Transport::Fake(fake_transport) => fake_transport.kill().await,
113 }
114 }
115
116 #[cfg(any(test, feature = "test-support"))]
117 pub(crate) fn as_fake(&self) -> &FakeTransport {
118 match self {
119 Transport::Fake(fake_transport) => fake_transport,
120 _ => panic!("Not a fake transport layer"),
121 }
122 }
123}
124
125pub(crate) struct TransportDelegate {
126 log_handlers: LogHandlers,
127 current_requests: Requests,
128 pending_requests: Requests,
129 transport: Transport,
130 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
131}
132
133impl TransportDelegate {
134 pub(crate) async fn start(
135 binary: &DebugAdapterBinary,
136 cx: AsyncApp,
137 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
138 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
139 let mut this = Self {
140 transport,
141 server_tx: Default::default(),
142 log_handlers: Default::default(),
143 current_requests: Default::default(),
144 pending_requests: Default::default(),
145 };
146 let messages = this.start_handlers(transport_pipes, cx).await?;
147 Ok((messages, this))
148 }
149
150 async fn start_handlers(
151 &mut self,
152 mut params: TransportPipe,
153 cx: AsyncApp,
154 ) -> Result<(Receiver<Message>, Sender<Message>)> {
155 let (client_tx, server_rx) = unbounded::<Message>();
156 let (server_tx, client_rx) = unbounded::<Message>();
157
158 let log_dap_communications =
159 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161 .unwrap_or(false);
162
163 let log_handler = if log_dap_communications {
164 Some(self.log_handlers.clone())
165 } else {
166 None
167 };
168
169 cx.update(|cx| {
170 if let Some(stdout) = params.stdout.take() {
171 cx.background_executor()
172 .spawn(Self::handle_adapter_log(stdout, log_handler.clone()))
173 .detach_and_log_err(cx);
174 }
175
176 cx.background_executor()
177 .spawn(Self::handle_output(
178 params.output,
179 client_tx,
180 self.pending_requests.clone(),
181 log_handler.clone(),
182 ))
183 .detach_and_log_err(cx);
184
185 if let Some(stderr) = params.stderr.take() {
186 cx.background_executor()
187 .spawn(Self::handle_error(stderr, self.log_handlers.clone()))
188 .detach_and_log_err(cx);
189 }
190
191 cx.background_executor()
192 .spawn(Self::handle_input(
193 params.input,
194 client_rx,
195 self.current_requests.clone(),
196 self.pending_requests.clone(),
197 log_handler.clone(),
198 ))
199 .detach_and_log_err(cx);
200 })?;
201
202 {
203 let mut lock = self.server_tx.lock().await;
204 *lock = Some(server_tx.clone());
205 }
206
207 Ok((server_rx, server_tx))
208 }
209
210 pub(crate) async fn add_pending_request(
211 &self,
212 sequence_id: u64,
213 request: oneshot::Sender<Result<Response>>,
214 ) {
215 let mut pending_requests = self.pending_requests.lock().await;
216 pending_requests.insert(sequence_id, request);
217 }
218
219 pub(crate) async fn cancel_pending_request(&self, sequence_id: &u64) {
220 let mut pending_requests = self.pending_requests.lock().await;
221 pending_requests.remove(sequence_id);
222 }
223
224 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
225 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
226 server_tx
227 .send(message)
228 .await
229 .map_err(|e| anyhow!("Failed to send message: {}", e))
230 } else {
231 Err(anyhow!("Server tx already dropped"))
232 }
233 }
234
235 async fn handle_adapter_log<Stdout>(
236 stdout: Stdout,
237 log_handlers: Option<LogHandlers>,
238 ) -> Result<()>
239 where
240 Stdout: AsyncRead + Unpin + Send + 'static,
241 {
242 let mut reader = BufReader::new(stdout);
243 let mut line = String::new();
244
245 let result = loop {
246 line.truncate(0);
247
248 let bytes_read = match reader.read_line(&mut line).await {
249 Ok(bytes_read) => bytes_read,
250 Err(e) => break Err(e.into()),
251 };
252
253 if bytes_read == 0 {
254 break Err(anyhow!("Debugger log stream closed"));
255 }
256
257 if let Some(log_handlers) = log_handlers.as_ref() {
258 for (kind, handler) in log_handlers.lock().iter_mut() {
259 if matches!(kind, LogKind::Adapter) {
260 handler(IoKind::StdOut, line.as_str());
261 }
262 }
263 }
264 };
265
266 log::debug!("Handle adapter log dropped");
267
268 result
269 }
270
271 fn build_rpc_message(message: String) -> String {
272 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
273 }
274
275 async fn handle_input<Stdin>(
276 mut server_stdin: Stdin,
277 client_rx: Receiver<Message>,
278 current_requests: Requests,
279 pending_requests: Requests,
280 log_handlers: Option<LogHandlers>,
281 ) -> Result<()>
282 where
283 Stdin: AsyncWrite + Unpin + Send + 'static,
284 {
285 let result = loop {
286 match client_rx.recv().await {
287 Ok(message) => {
288 if let Message::Request(request) = &message {
289 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
290 pending_requests.lock().await.insert(request.seq, sender);
291 }
292 }
293
294 let message = match serde_json::to_string(&message) {
295 Ok(message) => message,
296 Err(e) => break Err(e.into()),
297 };
298
299 if let Some(log_handlers) = log_handlers.as_ref() {
300 for (kind, log_handler) in log_handlers.lock().iter_mut() {
301 if matches!(kind, LogKind::Rpc) {
302 log_handler(IoKind::StdIn, &message);
303 }
304 }
305 }
306
307 if let Err(e) = server_stdin
308 .write_all(Self::build_rpc_message(message).as_bytes())
309 .await
310 {
311 break Err(e.into());
312 }
313
314 if let Err(e) = server_stdin.flush().await {
315 break Err(e.into());
316 }
317 }
318 Err(error) => break Err(error.into()),
319 }
320 };
321
322 log::debug!("Handle adapter input dropped");
323
324 result
325 }
326
327 async fn handle_output<Stdout>(
328 server_stdout: Stdout,
329 client_tx: Sender<Message>,
330 pending_requests: Requests,
331 log_handlers: Option<LogHandlers>,
332 ) -> Result<()>
333 where
334 Stdout: AsyncRead + Unpin + Send + 'static,
335 {
336 let mut recv_buffer = String::new();
337 let mut reader = BufReader::new(server_stdout);
338
339 let result = loop {
340 let message =
341 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
342 .await;
343
344 match message {
345 Ok(Message::Response(res)) => {
346 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
347 if let Err(e) = tx.send(Self::process_response(res)) {
348 log::trace!("Did not send response `{:?}` for a cancelled", e);
349 }
350 } else {
351 client_tx.send(Message::Response(res)).await?;
352 };
353 }
354 Ok(message) => {
355 client_tx.send(message).await?;
356 }
357 Err(e) => break Err(e),
358 }
359 };
360
361 drop(client_tx);
362
363 log::debug!("Handle adapter output dropped");
364
365 result
366 }
367
368 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
369 where
370 Stderr: AsyncRead + Unpin + Send + 'static,
371 {
372 let mut buffer = String::new();
373
374 let mut reader = BufReader::new(stderr);
375
376 let result = loop {
377 match reader.read_line(&mut buffer).await {
378 Ok(0) => break Err(anyhow!("debugger error stream closed")),
379 Ok(_) => {
380 for (kind, log_handler) in log_handlers.lock().iter_mut() {
381 if matches!(kind, LogKind::Adapter) {
382 log_handler(IoKind::StdErr, buffer.as_str());
383 }
384 }
385
386 buffer.truncate(0);
387 }
388 Err(error) => break Err(error.into()),
389 }
390 };
391
392 log::debug!("Handle adapter error dropped");
393
394 result
395 }
396
397 fn process_response(response: Response) -> Result<Response> {
398 if response.success {
399 Ok(response)
400 } else {
401 if let Some(error_message) = response
402 .body
403 .clone()
404 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
405 .and_then(|response| response.error.map(|msg| msg.format))
406 .or_else(|| response.message.clone())
407 {
408 return Err(anyhow!(error_message));
409 };
410
411 Err(anyhow!(
412 "Received error response from adapter. Response: {:?}",
413 response.clone()
414 ))
415 }
416 }
417
418 async fn receive_server_message<Stdout>(
419 reader: &mut BufReader<Stdout>,
420 buffer: &mut String,
421 log_handlers: Option<&LogHandlers>,
422 ) -> Result<Message>
423 where
424 Stdout: AsyncRead + Unpin + Send + 'static,
425 {
426 let mut content_length = None;
427 loop {
428 buffer.truncate(0);
429
430 if reader
431 .read_line(buffer)
432 .await
433 .with_context(|| "reading a message from server")?
434 == 0
435 {
436 return Err(anyhow!("debugger reader stream closed"));
437 };
438
439 if buffer == "\r\n" {
440 break;
441 }
442
443 let parts = buffer.trim().split_once(": ");
444
445 match parts {
446 Some(("Content-Length", value)) => {
447 content_length = Some(value.parse().context("invalid content length")?);
448 }
449 _ => {}
450 }
451 }
452
453 let content_length = content_length.context("missing content length")?;
454
455 let mut content = vec![0; content_length];
456 reader
457 .read_exact(&mut content)
458 .await
459 .with_context(|| "reading after a loop")?;
460
461 let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
462
463 if let Some(log_handlers) = log_handlers {
464 for (kind, log_handler) in log_handlers.lock().iter_mut() {
465 if matches!(kind, LogKind::Rpc) {
466 log_handler(IoKind::StdOut, &message);
467 }
468 }
469 }
470
471 Ok(serde_json::from_str::<Message>(message)?)
472 }
473
474 pub async fn shutdown(&self) -> Result<()> {
475 log::debug!("Start shutdown client");
476
477 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
478 server_tx.close();
479 }
480
481 let mut current_requests = self.current_requests.lock().await;
482 let mut pending_requests = self.pending_requests.lock().await;
483
484 current_requests.clear();
485 pending_requests.clear();
486
487 let _ = self.transport.kill().await.log_err();
488
489 drop(current_requests);
490 drop(pending_requests);
491
492 log::debug!("Shutdown client completed");
493
494 anyhow::Ok(())
495 }
496
497 pub fn has_adapter_logs(&self) -> bool {
498 self.transport.has_adapter_logs()
499 }
500
501 pub fn transport(&self) -> &Transport {
502 &self.transport
503 }
504
505 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
506 where
507 F: 'static + Send + FnMut(IoKind, &str),
508 {
509 let mut log_handlers = self.log_handlers.lock();
510 log_handlers.push((kind, Box::new(f)));
511 }
512}
513
514pub struct TcpTransport {
515 pub port: u16,
516 pub host: Ipv4Addr,
517 pub timeout: u64,
518 process: Mutex<Child>,
519}
520
521impl TcpTransport {
522 /// Get an open port to use with the tcp client when not supplied by debug config
523 pub async fn port(host: &TCPHost) -> Result<u16> {
524 if let Some(port) = host.port {
525 Ok(port)
526 } else {
527 Ok(TcpListener::bind(SocketAddrV4::new(host.host(), 0))
528 .await?
529 .local_addr()?
530 .port())
531 }
532 }
533
534 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
535 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
536 let Some(connection_args) = binary.connection.as_ref() else {
537 return Err(anyhow!("No connection arguments provided"));
538 };
539
540 let host = connection_args.host;
541 let port = connection_args.port;
542
543 let mut command = util::command::new_smol_command(&binary.command);
544
545 if let Some(cwd) = &binary.cwd {
546 command.current_dir(cwd);
547 }
548
549 if let Some(args) = &binary.arguments {
550 command.args(args);
551 }
552
553 if let Some(envs) = &binary.envs {
554 command.envs(envs);
555 }
556
557 command
558 .stdin(Stdio::null())
559 .stdout(Stdio::piped())
560 .stderr(Stdio::piped())
561 .kill_on_drop(true);
562
563 let mut process = command
564 .spawn()
565 .with_context(|| "failed to start debug adapter.")?;
566
567 let address = SocketAddrV4::new(host, port);
568
569 let timeout = connection_args.timeout.unwrap_or_else(|| {
570 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
571 .unwrap_or(2000u64)
572 });
573
574 let (rx, tx) = select! {
575 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
576 return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
577 },
578 result = cx.spawn(async move |cx| {
579 loop {
580 match TcpStream::connect(address).await {
581 Ok(stream) => return stream.split(),
582 Err(_) => {
583 cx.background_executor().timer(Duration::from_millis(100)).await;
584 }
585 }
586 }
587 }).fuse() => result
588 };
589 log::info!(
590 "Debug adapter has connected to TCP server {}:{}",
591 host,
592 port
593 );
594 let stdout = process.stdout.take();
595 let stderr = process.stderr.take();
596
597 let this = Self {
598 port,
599 host,
600 process: Mutex::new(process),
601 timeout,
602 };
603
604 let pipe = TransportPipe::new(
605 Box::new(tx),
606 Box::new(BufReader::new(rx)),
607 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
608 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
609 );
610
611 Ok((pipe, this))
612 }
613
614 fn has_adapter_logs(&self) -> bool {
615 true
616 }
617
618 async fn kill(&self) -> Result<()> {
619 self.process.lock().await.kill()?;
620
621 Ok(())
622 }
623}
624
625pub struct StdioTransport {
626 process: Mutex<Child>,
627}
628
629impl StdioTransport {
630 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
631 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
632 let mut command = util::command::new_smol_command(&binary.command);
633
634 if let Some(cwd) = &binary.cwd {
635 command.current_dir(cwd);
636 }
637
638 if let Some(args) = &binary.arguments {
639 command.args(args);
640 }
641
642 if let Some(envs) = &binary.envs {
643 command.envs(envs);
644 }
645
646 command
647 .stdin(Stdio::piped())
648 .stdout(Stdio::piped())
649 .stderr(Stdio::piped())
650 .kill_on_drop(true);
651
652 let mut process = command
653 .spawn()
654 .with_context(|| "failed to spawn command.")?;
655
656 let stdin = process
657 .stdin
658 .take()
659 .ok_or_else(|| anyhow!("Failed to open stdin"))?;
660 let stdout = process
661 .stdout
662 .take()
663 .ok_or_else(|| anyhow!("Failed to open stdout"))?;
664 let stderr = process
665 .stderr
666 .take()
667 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
668
669 if stderr.is_none() {
670 bail!(
671 "Failed to connect to stderr for debug adapter command {}",
672 &binary.command
673 );
674 }
675
676 log::info!("Debug adapter has connected to stdio adapter");
677
678 let process = Mutex::new(process);
679
680 Ok((
681 TransportPipe::new(
682 Box::new(stdin),
683 Box::new(BufReader::new(stdout)),
684 None,
685 stderr,
686 ),
687 Self { process },
688 ))
689 }
690
691 fn has_adapter_logs(&self) -> bool {
692 false
693 }
694
695 async fn kill(&self) -> Result<()> {
696 self.process.lock().await.kill()?;
697 Ok(())
698 }
699}
700
701#[cfg(any(test, feature = "test-support"))]
702type RequestHandler =
703 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
704
705#[cfg(any(test, feature = "test-support"))]
706type ResponseHandler = Box<dyn Send + Fn(Response)>;
707
708#[cfg(any(test, feature = "test-support"))]
709pub struct FakeTransport {
710 // for sending fake response back from adapter side
711 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
712 // for reverse request responses
713 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
714}
715
716#[cfg(any(test, feature = "test-support"))]
717impl FakeTransport {
718 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
719 where
720 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
721 {
722 self.request_handlers.lock().insert(
723 R::COMMAND,
724 Box::new(move |seq, args| {
725 let result = handler(seq, serde_json::from_value(args).unwrap());
726 let response = match result {
727 Ok(response) => Response {
728 seq: seq + 1,
729 request_seq: seq,
730 success: true,
731 command: R::COMMAND.into(),
732 body: Some(serde_json::to_value(response).unwrap()),
733 message: None,
734 },
735 Err(response) => Response {
736 seq: seq + 1,
737 request_seq: seq,
738 success: false,
739 command: R::COMMAND.into(),
740 body: Some(serde_json::to_value(response).unwrap()),
741 message: None,
742 },
743 };
744 response
745 }),
746 );
747 }
748
749 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
750 where
751 F: 'static + Send + Fn(Response),
752 {
753 self.response_handlers
754 .lock()
755 .insert(R::COMMAND, Box::new(handler));
756 }
757
758 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
759 let this = Self {
760 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
761 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
762 };
763 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
764 use serde_json::json;
765
766 let (stdin_writer, stdin_reader) = async_pipe::pipe();
767 let (stdout_writer, stdout_reader) = async_pipe::pipe();
768
769 let request_handlers = this.request_handlers.clone();
770 let response_handlers = this.response_handlers.clone();
771 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
772
773 cx.background_executor()
774 .spawn(async move {
775 let mut reader = BufReader::new(stdin_reader);
776 let mut buffer = String::new();
777
778 loop {
779 let message =
780 TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
781 .await;
782
783 match message {
784 Err(error) => {
785 break anyhow!(error);
786 }
787 Ok(message) => {
788 match message {
789 Message::Request(request) => {
790 // redirect reverse requests to stdout writer/reader
791 if request.command == RunInTerminal::COMMAND
792 || request.command == StartDebugging::COMMAND
793 {
794 let message =
795 serde_json::to_string(&Message::Request(request))
796 .unwrap();
797
798 let mut writer = stdout_writer.lock().await;
799 writer
800 .write_all(
801 TransportDelegate::build_rpc_message(message)
802 .as_bytes(),
803 )
804 .await
805 .unwrap();
806 writer.flush().await.unwrap();
807 } else {
808 let response = if let Some(handle) = request_handlers
809 .lock()
810 .get_mut(request.command.as_str())
811 {
812 handle(
813 request.seq,
814 request.arguments.unwrap_or(json!({})),
815 )
816 } else {
817 panic!("No request handler for {}", request.command);
818 };
819 let message =
820 serde_json::to_string(&Message::Response(response))
821 .unwrap();
822
823 let mut writer = stdout_writer.lock().await;
824
825 writer
826 .write_all(
827 TransportDelegate::build_rpc_message(message)
828 .as_bytes(),
829 )
830 .await
831 .unwrap();
832 writer.flush().await.unwrap();
833 }
834 }
835 Message::Event(event) => {
836 let message =
837 serde_json::to_string(&Message::Event(event)).unwrap();
838
839 let mut writer = stdout_writer.lock().await;
840 writer
841 .write_all(
842 TransportDelegate::build_rpc_message(message)
843 .as_bytes(),
844 )
845 .await
846 .unwrap();
847 writer.flush().await.unwrap();
848 }
849 Message::Response(response) => {
850 if let Some(handle) =
851 response_handlers.lock().get(response.command.as_str())
852 {
853 handle(response);
854 } else {
855 log::error!("No response handler for {}", response.command);
856 }
857 }
858 }
859 }
860 }
861 }
862 })
863 .detach();
864
865 Ok((
866 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
867 this,
868 ))
869 }
870
871 fn has_adapter_logs(&self) -> bool {
872 false
873 }
874
875 async fn kill(&self) -> Result<()> {
876 Ok(())
877 }
878}