1use anyhow::{Context as _, Result, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::AsyncApp;
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TcpArgumentsTemplate;
25use util::{ResultExt as _, TryFutureExt};
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 if binary.connection.is_some() {
86 TcpTransport::start(binary, cx)
87 .await
88 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
89 } else {
90 StdioTransport::start(binary, cx)
91 .await
92 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
93 }
94 }
95
96 fn has_adapter_logs(&self) -> bool {
97 match self {
98 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
99 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100 #[cfg(any(test, feature = "test-support"))]
101 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102 }
103 }
104
105 async fn kill(&self) -> Result<()> {
106 match self {
107 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109 #[cfg(any(test, feature = "test-support"))]
110 Transport::Fake(fake_transport) => fake_transport.kill().await,
111 }
112 }
113
114 #[cfg(any(test, feature = "test-support"))]
115 pub(crate) fn as_fake(&self) -> &FakeTransport {
116 match self {
117 Transport::Fake(fake_transport) => fake_transport,
118 _ => panic!("Not a fake transport layer"),
119 }
120 }
121}
122
123pub(crate) struct TransportDelegate {
124 log_handlers: LogHandlers,
125 current_requests: Requests,
126 pending_requests: Requests,
127 transport: Transport,
128 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129 _tasks: Vec<gpui::Task<Option<()>>>,
130}
131
132impl TransportDelegate {
133 pub(crate) async fn start(
134 binary: &DebugAdapterBinary,
135 cx: AsyncApp,
136 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138 let mut this = Self {
139 transport,
140 server_tx: Default::default(),
141 log_handlers: Default::default(),
142 current_requests: Default::default(),
143 pending_requests: Default::default(),
144 _tasks: Default::default(),
145 };
146 let messages = this.start_handlers(transport_pipes, cx).await?;
147 Ok((messages, this))
148 }
149
150 async fn start_handlers(
151 &mut self,
152 mut params: TransportPipe,
153 cx: AsyncApp,
154 ) -> Result<(Receiver<Message>, Sender<Message>)> {
155 let (client_tx, server_rx) = unbounded::<Message>();
156 let (server_tx, client_rx) = unbounded::<Message>();
157
158 let log_dap_communications =
159 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161 .unwrap_or(false);
162
163 let log_handler = if log_dap_communications {
164 Some(self.log_handlers.clone())
165 } else {
166 None
167 };
168
169 cx.update(|cx| {
170 if let Some(stdout) = params.stdout.take() {
171 self._tasks.push(
172 cx.background_executor()
173 .spawn(Self::handle_adapter_log(stdout, log_handler.clone()).log_err()),
174 );
175 }
176
177 self._tasks.push(
178 cx.background_executor().spawn(
179 Self::handle_output(
180 params.output,
181 client_tx,
182 self.pending_requests.clone(),
183 log_handler.clone(),
184 )
185 .log_err(),
186 ),
187 );
188
189 if let Some(stderr) = params.stderr.take() {
190 self._tasks.push(
191 cx.background_executor()
192 .spawn(Self::handle_error(stderr, self.log_handlers.clone()).log_err()),
193 );
194 }
195
196 self._tasks.push(
197 cx.background_executor().spawn(
198 Self::handle_input(
199 params.input,
200 client_rx,
201 self.current_requests.clone(),
202 self.pending_requests.clone(),
203 log_handler.clone(),
204 )
205 .log_err(),
206 ),
207 );
208 })?;
209
210 {
211 let mut lock = self.server_tx.lock().await;
212 *lock = Some(server_tx.clone());
213 }
214
215 Ok((server_rx, server_tx))
216 }
217
218 pub(crate) async fn add_pending_request(
219 &self,
220 sequence_id: u64,
221 request: oneshot::Sender<Result<Response>>,
222 ) {
223 let mut pending_requests = self.pending_requests.lock().await;
224 pending_requests.insert(sequence_id, request);
225 }
226
227 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
228 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
229 server_tx.send(message).await.context("sending message")
230 } else {
231 anyhow::bail!("Server tx already dropped")
232 }
233 }
234
235 async fn handle_adapter_log<Stdout>(
236 stdout: Stdout,
237 log_handlers: Option<LogHandlers>,
238 ) -> Result<()>
239 where
240 Stdout: AsyncRead + Unpin + Send + 'static,
241 {
242 let mut reader = BufReader::new(stdout);
243 let mut line = String::new();
244
245 let result = loop {
246 line.truncate(0);
247
248 let bytes_read = match reader.read_line(&mut line).await {
249 Ok(bytes_read) => bytes_read,
250 Err(e) => break Err(e.into()),
251 };
252
253 if bytes_read == 0 {
254 anyhow::bail!("Debugger log stream closed");
255 }
256
257 if let Some(log_handlers) = log_handlers.as_ref() {
258 for (kind, handler) in log_handlers.lock().iter_mut() {
259 if matches!(kind, LogKind::Adapter) {
260 handler(IoKind::StdOut, line.as_str());
261 }
262 }
263 }
264 };
265
266 log::debug!("Handle adapter log dropped");
267
268 result
269 }
270
271 fn build_rpc_message(message: String) -> String {
272 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
273 }
274
275 async fn handle_input<Stdin>(
276 mut server_stdin: Stdin,
277 client_rx: Receiver<Message>,
278 current_requests: Requests,
279 pending_requests: Requests,
280 log_handlers: Option<LogHandlers>,
281 ) -> Result<()>
282 where
283 Stdin: AsyncWrite + Unpin + Send + 'static,
284 {
285 let result = loop {
286 match client_rx.recv().await {
287 Ok(message) => {
288 if let Message::Request(request) = &message {
289 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
290 pending_requests.lock().await.insert(request.seq, sender);
291 }
292 }
293
294 let message = match serde_json::to_string(&message) {
295 Ok(message) => message,
296 Err(e) => break Err(e.into()),
297 };
298
299 if let Some(log_handlers) = log_handlers.as_ref() {
300 for (kind, log_handler) in log_handlers.lock().iter_mut() {
301 if matches!(kind, LogKind::Rpc) {
302 log_handler(IoKind::StdIn, &message);
303 }
304 }
305 }
306
307 if let Err(e) = server_stdin
308 .write_all(Self::build_rpc_message(message).as_bytes())
309 .await
310 {
311 break Err(e.into());
312 }
313
314 if let Err(e) = server_stdin.flush().await {
315 break Err(e.into());
316 }
317 }
318 Err(error) => break Err(error.into()),
319 }
320 };
321
322 log::debug!("Handle adapter input dropped");
323
324 result
325 }
326
327 async fn handle_output<Stdout>(
328 server_stdout: Stdout,
329 client_tx: Sender<Message>,
330 pending_requests: Requests,
331 log_handlers: Option<LogHandlers>,
332 ) -> Result<()>
333 where
334 Stdout: AsyncRead + Unpin + Send + 'static,
335 {
336 let mut recv_buffer = String::new();
337 let mut reader = BufReader::new(server_stdout);
338
339 let result = loop {
340 let message =
341 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
342 .await;
343
344 match message {
345 Ok(Message::Response(res)) => {
346 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
347 if let Err(e) = tx.send(Self::process_response(res)) {
348 log::trace!("Did not send response `{:?}` for a cancelled", e);
349 }
350 } else {
351 client_tx.send(Message::Response(res)).await?;
352 };
353 }
354 Ok(message) => {
355 client_tx.send(message).await?;
356 }
357 Err(e) => break Err(e),
358 }
359 };
360
361 drop(client_tx);
362
363 log::debug!("Handle adapter output dropped");
364
365 result
366 }
367
368 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
369 where
370 Stderr: AsyncRead + Unpin + Send + 'static,
371 {
372 log::debug!("Handle error started");
373 let mut buffer = String::new();
374
375 let mut reader = BufReader::new(stderr);
376
377 let result = loop {
378 match reader.read_line(&mut buffer).await {
379 Ok(0) => anyhow::bail!("debugger error stream closed"),
380 Ok(_) => {
381 for (kind, log_handler) in log_handlers.lock().iter_mut() {
382 if matches!(kind, LogKind::Adapter) {
383 log_handler(IoKind::StdErr, buffer.as_str());
384 }
385 }
386
387 buffer.truncate(0);
388 }
389 Err(error) => break Err(error.into()),
390 }
391 };
392
393 log::debug!("Handle adapter error dropped");
394
395 result
396 }
397
398 fn process_response(response: Response) -> Result<Response> {
399 if response.success {
400 Ok(response)
401 } else {
402 if let Some(error_message) = response
403 .body
404 .clone()
405 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
406 .and_then(|response| response.error.map(|msg| msg.format))
407 .or_else(|| response.message.clone())
408 {
409 anyhow::bail!(error_message);
410 };
411
412 anyhow::bail!(
413 "Received error response from adapter. Response: {:?}",
414 response
415 );
416 }
417 }
418
419 async fn receive_server_message<Stdout>(
420 reader: &mut BufReader<Stdout>,
421 buffer: &mut String,
422 log_handlers: Option<&LogHandlers>,
423 ) -> Result<Message>
424 where
425 Stdout: AsyncRead + Unpin + Send + 'static,
426 {
427 let mut content_length = None;
428 loop {
429 buffer.truncate(0);
430
431 if reader
432 .read_line(buffer)
433 .await
434 .with_context(|| "reading a message from server")?
435 == 0
436 {
437 anyhow::bail!("debugger reader stream closed, last string output: '{buffer}'");
438 };
439
440 if buffer == "\r\n" {
441 break;
442 }
443
444 let parts = buffer.trim().split_once(": ");
445
446 match parts {
447 Some(("Content-Length", value)) => {
448 content_length = Some(value.parse().context("invalid content length")?);
449 }
450 _ => {}
451 }
452 }
453
454 let content_length = content_length.context("missing content length")?;
455
456 let mut content = vec![0; content_length];
457 reader
458 .read_exact(&mut content)
459 .await
460 .with_context(|| "reading after a loop")?;
461
462 let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
463
464 if let Some(log_handlers) = log_handlers {
465 for (kind, log_handler) in log_handlers.lock().iter_mut() {
466 if matches!(kind, LogKind::Rpc) {
467 log_handler(IoKind::StdOut, &message);
468 }
469 }
470 }
471
472 Ok(serde_json::from_str::<Message>(message)?)
473 }
474
475 pub async fn shutdown(&self) -> Result<()> {
476 log::debug!("Start shutdown client");
477
478 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
479 server_tx.close();
480 }
481
482 let mut current_requests = self.current_requests.lock().await;
483 let mut pending_requests = self.pending_requests.lock().await;
484
485 current_requests.clear();
486 pending_requests.clear();
487
488 let _ = self.transport.kill().await.log_err();
489
490 drop(current_requests);
491 drop(pending_requests);
492
493 log::debug!("Shutdown client completed");
494
495 anyhow::Ok(())
496 }
497
498 pub fn has_adapter_logs(&self) -> bool {
499 self.transport.has_adapter_logs()
500 }
501
502 pub fn transport(&self) -> &Transport {
503 &self.transport
504 }
505
506 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
507 where
508 F: 'static + Send + FnMut(IoKind, &str),
509 {
510 let mut log_handlers = self.log_handlers.lock();
511 log_handlers.push((kind, Box::new(f)));
512 }
513}
514
515pub struct TcpTransport {
516 pub port: u16,
517 pub host: Ipv4Addr,
518 pub timeout: u64,
519 process: Mutex<Child>,
520}
521
522impl TcpTransport {
523 /// Get an open port to use with the tcp client when not supplied by debug config
524 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
525 if let Some(port) = host.port {
526 Ok(port)
527 } else {
528 Self::unused_port(host.host()).await
529 }
530 }
531
532 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
533 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
534 .await?
535 .local_addr()?
536 .port())
537 }
538
539 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
540 let connection_args = binary
541 .connection
542 .as_ref()
543 .context("No connection arguments provided")?;
544
545 let host = connection_args.host;
546 let port = connection_args.port;
547
548 let mut command = util::command::new_std_command(&binary.command);
549 util::set_pre_exec_to_start_new_session(&mut command);
550 let mut command = smol::process::Command::from(command);
551
552 if let Some(cwd) = &binary.cwd {
553 command.current_dir(cwd);
554 }
555
556 command.args(&binary.arguments);
557 command.envs(&binary.envs);
558
559 command
560 .stdin(Stdio::null())
561 .stdout(Stdio::piped())
562 .stderr(Stdio::piped())
563 .kill_on_drop(true);
564
565 let mut process = command
566 .spawn()
567 .with_context(|| "failed to start debug adapter.")?;
568
569 let address = SocketAddrV4::new(host, port);
570
571 let timeout = connection_args.timeout.unwrap_or_else(|| {
572 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
573 .unwrap_or(2000u64)
574 });
575
576 let (mut process, (rx, tx)) = select! {
577 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
578 anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
579 },
580 result = cx.spawn(async move |cx| {
581 loop {
582 match TcpStream::connect(address).await {
583 Ok(stream) => return Ok((process, stream.split())),
584 Err(_) => {
585 if let Ok(Some(_)) = process.try_status() {
586 let output = process.output().await?;
587 let output = if output.stderr.is_empty() {
588 String::from_utf8_lossy(&output.stdout).to_string()
589 } else {
590 String::from_utf8_lossy(&output.stderr).to_string()
591 };
592 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
593 }
594 cx.background_executor().timer(Duration::from_millis(100)).await;
595 }
596 }
597 }
598 }).fuse() => result?
599 };
600
601 log::info!(
602 "Debug adapter has connected to TCP server {}:{}",
603 host,
604 port
605 );
606 let stdout = process.stdout.take();
607 let stderr = process.stderr.take();
608
609 let this = Self {
610 port,
611 host,
612 process: Mutex::new(process),
613 timeout,
614 };
615
616 let pipe = TransportPipe::new(
617 Box::new(tx),
618 Box::new(BufReader::new(rx)),
619 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
620 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
621 );
622
623 Ok((pipe, this))
624 }
625
626 fn has_adapter_logs(&self) -> bool {
627 true
628 }
629
630 async fn kill(&self) -> Result<()> {
631 self.process.lock().await.kill()?;
632
633 Ok(())
634 }
635}
636
637pub struct StdioTransport {
638 process: Mutex<Child>,
639}
640
641impl StdioTransport {
642 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
643 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
644 let mut command = util::command::new_std_command(&binary.command);
645 util::set_pre_exec_to_start_new_session(&mut command);
646 let mut command = smol::process::Command::from(command);
647
648 if let Some(cwd) = &binary.cwd {
649 command.current_dir(cwd);
650 }
651
652 command.args(&binary.arguments);
653 command.envs(&binary.envs);
654
655 command
656 .stdin(Stdio::piped())
657 .stdout(Stdio::piped())
658 .stderr(Stdio::piped())
659 .kill_on_drop(true);
660
661 let mut process = command.spawn().with_context(|| {
662 format!(
663 "failed to spawn command `{} {}`.",
664 binary.command,
665 binary.arguments.join(" ")
666 )
667 })?;
668
669 let stdin = process.stdin.take().context("Failed to open stdin")?;
670 let stdout = process.stdout.take().context("Failed to open stdout")?;
671 let stderr = process
672 .stderr
673 .take()
674 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
675
676 if stderr.is_none() {
677 bail!(
678 "Failed to connect to stderr for debug adapter command {}",
679 &binary.command
680 );
681 }
682
683 log::info!("Debug adapter has connected to stdio adapter");
684
685 let process = Mutex::new(process);
686
687 Ok((
688 TransportPipe::new(
689 Box::new(stdin),
690 Box::new(BufReader::new(stdout)),
691 None,
692 stderr,
693 ),
694 Self { process },
695 ))
696 }
697
698 fn has_adapter_logs(&self) -> bool {
699 false
700 }
701
702 async fn kill(&self) -> Result<()> {
703 self.process.lock().await.kill()?;
704 Ok(())
705 }
706}
707
708#[cfg(any(test, feature = "test-support"))]
709type RequestHandler =
710 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
711
712#[cfg(any(test, feature = "test-support"))]
713type ResponseHandler = Box<dyn Send + Fn(Response)>;
714
715#[cfg(any(test, feature = "test-support"))]
716pub struct FakeTransport {
717 // for sending fake response back from adapter side
718 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
719 // for reverse request responses
720 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
721}
722
723#[cfg(any(test, feature = "test-support"))]
724impl FakeTransport {
725 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
726 where
727 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
728 {
729 self.request_handlers.lock().insert(
730 R::COMMAND,
731 Box::new(move |seq, args| {
732 let result = handler(seq, serde_json::from_value(args).unwrap());
733 let response = match result {
734 Ok(response) => Response {
735 seq: seq + 1,
736 request_seq: seq,
737 success: true,
738 command: R::COMMAND.into(),
739 body: Some(serde_json::to_value(response).unwrap()),
740 message: None,
741 },
742 Err(response) => Response {
743 seq: seq + 1,
744 request_seq: seq,
745 success: false,
746 command: R::COMMAND.into(),
747 body: Some(serde_json::to_value(response).unwrap()),
748 message: None,
749 },
750 };
751 response
752 }),
753 );
754 }
755
756 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
757 where
758 F: 'static + Send + Fn(Response),
759 {
760 self.response_handlers
761 .lock()
762 .insert(R::COMMAND, Box::new(handler));
763 }
764
765 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
766 let this = Self {
767 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
768 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
769 };
770 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
771 use serde_json::json;
772
773 let (stdin_writer, stdin_reader) = async_pipe::pipe();
774 let (stdout_writer, stdout_reader) = async_pipe::pipe();
775
776 let request_handlers = this.request_handlers.clone();
777 let response_handlers = this.response_handlers.clone();
778 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
779
780 cx.background_executor()
781 .spawn(async move {
782 let mut reader = BufReader::new(stdin_reader);
783 let mut buffer = String::new();
784
785 loop {
786 let message =
787 TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
788 .await;
789
790 match message {
791 Err(error) => {
792 break anyhow::anyhow!(error);
793 }
794 Ok(message) => {
795 match message {
796 Message::Request(request) => {
797 // redirect reverse requests to stdout writer/reader
798 if request.command == RunInTerminal::COMMAND
799 || request.command == StartDebugging::COMMAND
800 {
801 let message =
802 serde_json::to_string(&Message::Request(request))
803 .unwrap();
804
805 let mut writer = stdout_writer.lock().await;
806 writer
807 .write_all(
808 TransportDelegate::build_rpc_message(message)
809 .as_bytes(),
810 )
811 .await
812 .unwrap();
813 writer.flush().await.unwrap();
814 } else {
815 let response = if let Some(handle) = request_handlers
816 .lock()
817 .get_mut(request.command.as_str())
818 {
819 handle(
820 request.seq,
821 request.arguments.unwrap_or(json!({})),
822 )
823 } else {
824 panic!("No request handler for {}", request.command);
825 };
826 let message =
827 serde_json::to_string(&Message::Response(response))
828 .unwrap();
829
830 let mut writer = stdout_writer.lock().await;
831
832 writer
833 .write_all(
834 TransportDelegate::build_rpc_message(message)
835 .as_bytes(),
836 )
837 .await
838 .unwrap();
839 writer.flush().await.unwrap();
840 }
841 }
842 Message::Event(event) => {
843 let message =
844 serde_json::to_string(&Message::Event(event)).unwrap();
845
846 let mut writer = stdout_writer.lock().await;
847 writer
848 .write_all(
849 TransportDelegate::build_rpc_message(message)
850 .as_bytes(),
851 )
852 .await
853 .unwrap();
854 writer.flush().await.unwrap();
855 }
856 Message::Response(response) => {
857 if let Some(handle) =
858 response_handlers.lock().get(response.command.as_str())
859 {
860 handle(response);
861 } else {
862 log::error!("No response handler for {}", response.command);
863 }
864 }
865 }
866 }
867 }
868 }
869 })
870 .detach();
871
872 Ok((
873 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
874 this,
875 ))
876 }
877
878 fn has_adapter_logs(&self) -> bool {
879 false
880 }
881
882 async fn kill(&self) -> Result<()> {
883 Ok(())
884 }
885}