1use anyhow::{Context, Result, anyhow, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::AsyncApp;
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TcpArgumentsTemplate;
25use util::{ResultExt as _, TryFutureExt};
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 if binary.connection.is_some() {
86 TcpTransport::start(binary, cx)
87 .await
88 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
89 } else {
90 StdioTransport::start(binary, cx)
91 .await
92 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
93 }
94 }
95
96 fn has_adapter_logs(&self) -> bool {
97 match self {
98 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
99 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100 #[cfg(any(test, feature = "test-support"))]
101 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102 }
103 }
104
105 async fn kill(&self) -> Result<()> {
106 match self {
107 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109 #[cfg(any(test, feature = "test-support"))]
110 Transport::Fake(fake_transport) => fake_transport.kill().await,
111 }
112 }
113
114 #[cfg(any(test, feature = "test-support"))]
115 pub(crate) fn as_fake(&self) -> &FakeTransport {
116 match self {
117 Transport::Fake(fake_transport) => fake_transport,
118 _ => panic!("Not a fake transport layer"),
119 }
120 }
121}
122
123pub(crate) struct TransportDelegate {
124 log_handlers: LogHandlers,
125 current_requests: Requests,
126 pending_requests: Requests,
127 transport: Transport,
128 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129 _tasks: Vec<gpui::Task<Option<()>>>,
130}
131
132impl TransportDelegate {
133 pub(crate) async fn start(
134 binary: &DebugAdapterBinary,
135 cx: AsyncApp,
136 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138 let mut this = Self {
139 transport,
140 server_tx: Default::default(),
141 log_handlers: Default::default(),
142 current_requests: Default::default(),
143 pending_requests: Default::default(),
144 _tasks: Default::default(),
145 };
146 let messages = this.start_handlers(transport_pipes, cx).await?;
147 Ok((messages, this))
148 }
149
150 async fn start_handlers(
151 &mut self,
152 mut params: TransportPipe,
153 cx: AsyncApp,
154 ) -> Result<(Receiver<Message>, Sender<Message>)> {
155 let (client_tx, server_rx) = unbounded::<Message>();
156 let (server_tx, client_rx) = unbounded::<Message>();
157
158 let log_dap_communications =
159 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161 .unwrap_or(false);
162
163 let log_handler = if log_dap_communications {
164 Some(self.log_handlers.clone())
165 } else {
166 None
167 };
168
169 cx.update(|cx| {
170 if let Some(stdout) = params.stdout.take() {
171 self._tasks.push(
172 cx.background_executor()
173 .spawn(Self::handle_adapter_log(stdout, log_handler.clone()).log_err()),
174 );
175 }
176
177 self._tasks.push(
178 cx.background_executor().spawn(
179 Self::handle_output(
180 params.output,
181 client_tx,
182 self.pending_requests.clone(),
183 log_handler.clone(),
184 )
185 .log_err(),
186 ),
187 );
188
189 if let Some(stderr) = params.stderr.take() {
190 self._tasks.push(
191 cx.background_executor()
192 .spawn(Self::handle_error(stderr, self.log_handlers.clone()).log_err()),
193 );
194 }
195
196 self._tasks.push(
197 cx.background_executor().spawn(
198 Self::handle_input(
199 params.input,
200 client_rx,
201 self.current_requests.clone(),
202 self.pending_requests.clone(),
203 log_handler.clone(),
204 )
205 .log_err(),
206 ),
207 );
208 })?;
209
210 {
211 let mut lock = self.server_tx.lock().await;
212 *lock = Some(server_tx.clone());
213 }
214
215 Ok((server_rx, server_tx))
216 }
217
218 pub(crate) async fn add_pending_request(
219 &self,
220 sequence_id: u64,
221 request: oneshot::Sender<Result<Response>>,
222 ) {
223 let mut pending_requests = self.pending_requests.lock().await;
224 pending_requests.insert(sequence_id, request);
225 }
226
227 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
228 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
229 server_tx
230 .send(message)
231 .await
232 .map_err(|e| anyhow!("Failed to send message: {}", e))
233 } else {
234 Err(anyhow!("Server tx already dropped"))
235 }
236 }
237
238 async fn handle_adapter_log<Stdout>(
239 stdout: Stdout,
240 log_handlers: Option<LogHandlers>,
241 ) -> Result<()>
242 where
243 Stdout: AsyncRead + Unpin + Send + 'static,
244 {
245 let mut reader = BufReader::new(stdout);
246 let mut line = String::new();
247
248 let result = loop {
249 line.truncate(0);
250
251 let bytes_read = match reader.read_line(&mut line).await {
252 Ok(bytes_read) => bytes_read,
253 Err(e) => break Err(e.into()),
254 };
255
256 if bytes_read == 0 {
257 break Err(anyhow!("Debugger log stream closed"));
258 }
259
260 if let Some(log_handlers) = log_handlers.as_ref() {
261 for (kind, handler) in log_handlers.lock().iter_mut() {
262 if matches!(kind, LogKind::Adapter) {
263 handler(IoKind::StdOut, line.as_str());
264 }
265 }
266 }
267 };
268
269 log::debug!("Handle adapter log dropped");
270
271 result
272 }
273
274 fn build_rpc_message(message: String) -> String {
275 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
276 }
277
278 async fn handle_input<Stdin>(
279 mut server_stdin: Stdin,
280 client_rx: Receiver<Message>,
281 current_requests: Requests,
282 pending_requests: Requests,
283 log_handlers: Option<LogHandlers>,
284 ) -> Result<()>
285 where
286 Stdin: AsyncWrite + Unpin + Send + 'static,
287 {
288 let result = loop {
289 match client_rx.recv().await {
290 Ok(message) => {
291 if let Message::Request(request) = &message {
292 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
293 pending_requests.lock().await.insert(request.seq, sender);
294 }
295 }
296
297 let message = match serde_json::to_string(&message) {
298 Ok(message) => message,
299 Err(e) => break Err(e.into()),
300 };
301
302 if let Some(log_handlers) = log_handlers.as_ref() {
303 for (kind, log_handler) in log_handlers.lock().iter_mut() {
304 if matches!(kind, LogKind::Rpc) {
305 log_handler(IoKind::StdIn, &message);
306 }
307 }
308 }
309
310 if let Err(e) = server_stdin
311 .write_all(Self::build_rpc_message(message).as_bytes())
312 .await
313 {
314 break Err(e.into());
315 }
316
317 if let Err(e) = server_stdin.flush().await {
318 break Err(e.into());
319 }
320 }
321 Err(error) => break Err(error.into()),
322 }
323 };
324
325 log::debug!("Handle adapter input dropped");
326
327 result
328 }
329
330 async fn handle_output<Stdout>(
331 server_stdout: Stdout,
332 client_tx: Sender<Message>,
333 pending_requests: Requests,
334 log_handlers: Option<LogHandlers>,
335 ) -> Result<()>
336 where
337 Stdout: AsyncRead + Unpin + Send + 'static,
338 {
339 let mut recv_buffer = String::new();
340 let mut reader = BufReader::new(server_stdout);
341
342 let result = loop {
343 let message =
344 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
345 .await;
346
347 match message {
348 Ok(Message::Response(res)) => {
349 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
350 if let Err(e) = tx.send(Self::process_response(res)) {
351 log::trace!("Did not send response `{:?}` for a cancelled", e);
352 }
353 } else {
354 client_tx.send(Message::Response(res)).await?;
355 };
356 }
357 Ok(message) => {
358 client_tx.send(message).await?;
359 }
360 Err(e) => break Err(e),
361 }
362 };
363
364 drop(client_tx);
365
366 log::debug!("Handle adapter output dropped");
367
368 result
369 }
370
371 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
372 where
373 Stderr: AsyncRead + Unpin + Send + 'static,
374 {
375 log::debug!("Handle error started");
376 let mut buffer = String::new();
377
378 let mut reader = BufReader::new(stderr);
379
380 let result = loop {
381 match reader.read_line(&mut buffer).await {
382 Ok(0) => break Err(anyhow!("debugger error stream closed")),
383 Ok(_) => {
384 for (kind, log_handler) in log_handlers.lock().iter_mut() {
385 if matches!(kind, LogKind::Adapter) {
386 log_handler(IoKind::StdErr, buffer.as_str());
387 }
388 }
389
390 buffer.truncate(0);
391 }
392 Err(error) => break Err(error.into()),
393 }
394 };
395
396 log::debug!("Handle adapter error dropped");
397
398 result
399 }
400
401 fn process_response(response: Response) -> Result<Response> {
402 if response.success {
403 Ok(response)
404 } else {
405 if let Some(error_message) = response
406 .body
407 .clone()
408 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
409 .and_then(|response| response.error.map(|msg| msg.format))
410 .or_else(|| response.message.clone())
411 {
412 return Err(anyhow!(error_message));
413 };
414
415 Err(anyhow!(
416 "Received error response from adapter. Response: {:?}",
417 response.clone()
418 ))
419 }
420 }
421
422 async fn receive_server_message<Stdout>(
423 reader: &mut BufReader<Stdout>,
424 buffer: &mut String,
425 log_handlers: Option<&LogHandlers>,
426 ) -> Result<Message>
427 where
428 Stdout: AsyncRead + Unpin + Send + 'static,
429 {
430 let mut content_length = None;
431 loop {
432 buffer.truncate(0);
433
434 if reader
435 .read_line(buffer)
436 .await
437 .with_context(|| "reading a message from server")?
438 == 0
439 {
440 return Err(anyhow!("debugger reader stream closed"));
441 };
442
443 if buffer == "\r\n" {
444 break;
445 }
446
447 let parts = buffer.trim().split_once(": ");
448
449 match parts {
450 Some(("Content-Length", value)) => {
451 content_length = Some(value.parse().context("invalid content length")?);
452 }
453 _ => {}
454 }
455 }
456
457 let content_length = content_length.context("missing content length")?;
458
459 let mut content = vec![0; content_length];
460 reader
461 .read_exact(&mut content)
462 .await
463 .with_context(|| "reading after a loop")?;
464
465 let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
466
467 if let Some(log_handlers) = log_handlers {
468 for (kind, log_handler) in log_handlers.lock().iter_mut() {
469 if matches!(kind, LogKind::Rpc) {
470 log_handler(IoKind::StdOut, &message);
471 }
472 }
473 }
474
475 Ok(serde_json::from_str::<Message>(message)?)
476 }
477
478 pub async fn shutdown(&self) -> Result<()> {
479 log::debug!("Start shutdown client");
480
481 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
482 server_tx.close();
483 }
484
485 let mut current_requests = self.current_requests.lock().await;
486 let mut pending_requests = self.pending_requests.lock().await;
487
488 current_requests.clear();
489 pending_requests.clear();
490
491 let _ = self.transport.kill().await.log_err();
492
493 drop(current_requests);
494 drop(pending_requests);
495
496 log::debug!("Shutdown client completed");
497
498 anyhow::Ok(())
499 }
500
501 pub fn has_adapter_logs(&self) -> bool {
502 self.transport.has_adapter_logs()
503 }
504
505 pub fn transport(&self) -> &Transport {
506 &self.transport
507 }
508
509 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
510 where
511 F: 'static + Send + FnMut(IoKind, &str),
512 {
513 let mut log_handlers = self.log_handlers.lock();
514 log_handlers.push((kind, Box::new(f)));
515 }
516}
517
518pub struct TcpTransport {
519 pub port: u16,
520 pub host: Ipv4Addr,
521 pub timeout: u64,
522 process: Mutex<Child>,
523}
524
525impl TcpTransport {
526 /// Get an open port to use with the tcp client when not supplied by debug config
527 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
528 if let Some(port) = host.port {
529 Ok(port)
530 } else {
531 Self::unused_port(host.host()).await
532 }
533 }
534
535 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
536 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
537 .await?
538 .local_addr()?
539 .port())
540 }
541
542 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
543 let Some(connection_args) = binary.connection.as_ref() else {
544 return Err(anyhow!("No connection arguments provided"));
545 };
546
547 let host = connection_args.host;
548 let port = connection_args.port;
549
550 let mut command = util::command::new_std_command(&binary.command);
551 util::set_pre_exec_to_start_new_session(&mut command);
552 let mut command = smol::process::Command::from(command);
553
554 if let Some(cwd) = &binary.cwd {
555 command.current_dir(cwd);
556 }
557
558 command.args(&binary.arguments);
559 command.envs(&binary.envs);
560
561 command
562 .stdin(Stdio::null())
563 .stdout(Stdio::piped())
564 .stderr(Stdio::piped())
565 .kill_on_drop(true);
566
567 let mut process = command
568 .spawn()
569 .with_context(|| "failed to start debug adapter.")?;
570
571 let address = SocketAddrV4::new(host, port);
572
573 let timeout = connection_args.timeout.unwrap_or_else(|| {
574 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
575 .unwrap_or(2000u64)
576 });
577
578 let (mut process, (rx, tx)) = select! {
579 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
580 return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
581 },
582 result = cx.spawn(async move |cx| {
583 loop {
584 match TcpStream::connect(address).await {
585 Ok(stream) => return Ok((process, stream.split())),
586 Err(_) => {
587 if let Ok(Some(_)) = process.try_status() {
588 let output = process.output().await?;
589 let output = if output.stderr.is_empty() {
590 String::from_utf8_lossy(&output.stdout).to_string()
591 } else {
592 String::from_utf8_lossy(&output.stderr).to_string()
593 };
594 return Err(anyhow!("{}\nerror: process exited before debugger attached.", output));
595 }
596 cx.background_executor().timer(Duration::from_millis(100)).await;
597 }
598 }
599 }
600 }).fuse() => result?
601 };
602
603 log::info!(
604 "Debug adapter has connected to TCP server {}:{}",
605 host,
606 port
607 );
608 let stdout = process.stdout.take();
609 let stderr = process.stderr.take();
610
611 let this = Self {
612 port,
613 host,
614 process: Mutex::new(process),
615 timeout,
616 };
617
618 let pipe = TransportPipe::new(
619 Box::new(tx),
620 Box::new(BufReader::new(rx)),
621 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
622 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
623 );
624
625 Ok((pipe, this))
626 }
627
628 fn has_adapter_logs(&self) -> bool {
629 true
630 }
631
632 async fn kill(&self) -> Result<()> {
633 self.process.lock().await.kill()?;
634
635 Ok(())
636 }
637}
638
639pub struct StdioTransport {
640 process: Mutex<Child>,
641}
642
643impl StdioTransport {
644 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
645 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
646 let mut command = util::command::new_std_command(&binary.command);
647 util::set_pre_exec_to_start_new_session(&mut command);
648 let mut command = smol::process::Command::from(command);
649
650 if let Some(cwd) = &binary.cwd {
651 command.current_dir(cwd);
652 }
653
654 command.args(&binary.arguments);
655 command.envs(&binary.envs);
656
657 command
658 .stdin(Stdio::piped())
659 .stdout(Stdio::piped())
660 .stderr(Stdio::piped())
661 .kill_on_drop(true);
662
663 let mut process = command
664 .spawn()
665 .with_context(|| "failed to spawn command.")?;
666
667 let stdin = process
668 .stdin
669 .take()
670 .ok_or_else(|| anyhow!("Failed to open stdin"))?;
671 let stdout = process
672 .stdout
673 .take()
674 .ok_or_else(|| anyhow!("Failed to open stdout"))?;
675 let stderr = process
676 .stderr
677 .take()
678 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
679
680 if stderr.is_none() {
681 bail!(
682 "Failed to connect to stderr for debug adapter command {}",
683 &binary.command
684 );
685 }
686
687 log::info!("Debug adapter has connected to stdio adapter");
688
689 let process = Mutex::new(process);
690
691 Ok((
692 TransportPipe::new(
693 Box::new(stdin),
694 Box::new(BufReader::new(stdout)),
695 None,
696 stderr,
697 ),
698 Self { process },
699 ))
700 }
701
702 fn has_adapter_logs(&self) -> bool {
703 false
704 }
705
706 async fn kill(&self) -> Result<()> {
707 self.process.lock().await.kill()?;
708 Ok(())
709 }
710}
711
712#[cfg(any(test, feature = "test-support"))]
713type RequestHandler =
714 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
715
716#[cfg(any(test, feature = "test-support"))]
717type ResponseHandler = Box<dyn Send + Fn(Response)>;
718
719#[cfg(any(test, feature = "test-support"))]
720pub struct FakeTransport {
721 // for sending fake response back from adapter side
722 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
723 // for reverse request responses
724 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
725}
726
727#[cfg(any(test, feature = "test-support"))]
728impl FakeTransport {
729 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
730 where
731 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
732 {
733 self.request_handlers.lock().insert(
734 R::COMMAND,
735 Box::new(move |seq, args| {
736 let result = handler(seq, serde_json::from_value(args).unwrap());
737 let response = match result {
738 Ok(response) => Response {
739 seq: seq + 1,
740 request_seq: seq,
741 success: true,
742 command: R::COMMAND.into(),
743 body: Some(serde_json::to_value(response).unwrap()),
744 message: None,
745 },
746 Err(response) => Response {
747 seq: seq + 1,
748 request_seq: seq,
749 success: false,
750 command: R::COMMAND.into(),
751 body: Some(serde_json::to_value(response).unwrap()),
752 message: None,
753 },
754 };
755 response
756 }),
757 );
758 }
759
760 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
761 where
762 F: 'static + Send + Fn(Response),
763 {
764 self.response_handlers
765 .lock()
766 .insert(R::COMMAND, Box::new(handler));
767 }
768
769 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
770 let this = Self {
771 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
772 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
773 };
774 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
775 use serde_json::json;
776
777 let (stdin_writer, stdin_reader) = async_pipe::pipe();
778 let (stdout_writer, stdout_reader) = async_pipe::pipe();
779
780 let request_handlers = this.request_handlers.clone();
781 let response_handlers = this.response_handlers.clone();
782 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
783
784 cx.background_executor()
785 .spawn(async move {
786 let mut reader = BufReader::new(stdin_reader);
787 let mut buffer = String::new();
788
789 loop {
790 let message =
791 TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
792 .await;
793
794 match message {
795 Err(error) => {
796 break anyhow!(error);
797 }
798 Ok(message) => {
799 match message {
800 Message::Request(request) => {
801 // redirect reverse requests to stdout writer/reader
802 if request.command == RunInTerminal::COMMAND
803 || request.command == StartDebugging::COMMAND
804 {
805 let message =
806 serde_json::to_string(&Message::Request(request))
807 .unwrap();
808
809 let mut writer = stdout_writer.lock().await;
810 writer
811 .write_all(
812 TransportDelegate::build_rpc_message(message)
813 .as_bytes(),
814 )
815 .await
816 .unwrap();
817 writer.flush().await.unwrap();
818 } else {
819 let response = if let Some(handle) = request_handlers
820 .lock()
821 .get_mut(request.command.as_str())
822 {
823 handle(
824 request.seq,
825 request.arguments.unwrap_or(json!({})),
826 )
827 } else {
828 panic!("No request handler for {}", request.command);
829 };
830 let message =
831 serde_json::to_string(&Message::Response(response))
832 .unwrap();
833
834 let mut writer = stdout_writer.lock().await;
835
836 writer
837 .write_all(
838 TransportDelegate::build_rpc_message(message)
839 .as_bytes(),
840 )
841 .await
842 .unwrap();
843 writer.flush().await.unwrap();
844 }
845 }
846 Message::Event(event) => {
847 let message =
848 serde_json::to_string(&Message::Event(event)).unwrap();
849
850 let mut writer = stdout_writer.lock().await;
851 writer
852 .write_all(
853 TransportDelegate::build_rpc_message(message)
854 .as_bytes(),
855 )
856 .await
857 .unwrap();
858 writer.flush().await.unwrap();
859 }
860 Message::Response(response) => {
861 if let Some(handle) =
862 response_handlers.lock().get(response.command.as_str())
863 {
864 handle(response);
865 } else {
866 log::error!("No response handler for {}", response.command);
867 }
868 }
869 }
870 }
871 }
872 }
873 })
874 .detach();
875
876 Ok((
877 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
878 this,
879 ))
880 }
881
882 fn has_adapter_logs(&self) -> bool {
883 false
884 }
885
886 async fn kill(&self) -> Result<()> {
887 Ok(())
888 }
889}