1use anyhow::{Context, Result, anyhow, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::AsyncApp;
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TcpArgumentsTemplate;
25use util::{ResultExt as _, TryFutureExt};
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 if binary.connection.is_some() {
86 TcpTransport::start(binary, cx)
87 .await
88 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
89 } else {
90 StdioTransport::start(binary, cx)
91 .await
92 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
93 }
94 }
95
96 fn has_adapter_logs(&self) -> bool {
97 match self {
98 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
99 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100 #[cfg(any(test, feature = "test-support"))]
101 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102 }
103 }
104
105 async fn kill(&self) -> Result<()> {
106 match self {
107 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109 #[cfg(any(test, feature = "test-support"))]
110 Transport::Fake(fake_transport) => fake_transport.kill().await,
111 }
112 }
113
114 #[cfg(any(test, feature = "test-support"))]
115 pub(crate) fn as_fake(&self) -> &FakeTransport {
116 match self {
117 Transport::Fake(fake_transport) => fake_transport,
118 _ => panic!("Not a fake transport layer"),
119 }
120 }
121}
122
123pub(crate) struct TransportDelegate {
124 log_handlers: LogHandlers,
125 current_requests: Requests,
126 pending_requests: Requests,
127 transport: Transport,
128 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129 _tasks: Vec<gpui::Task<Option<()>>>,
130}
131
132impl TransportDelegate {
133 pub(crate) async fn start(
134 binary: &DebugAdapterBinary,
135 cx: AsyncApp,
136 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138 let mut this = Self {
139 transport,
140 server_tx: Default::default(),
141 log_handlers: Default::default(),
142 current_requests: Default::default(),
143 pending_requests: Default::default(),
144 _tasks: Default::default(),
145 };
146 let messages = this.start_handlers(transport_pipes, cx).await?;
147 Ok((messages, this))
148 }
149
150 async fn start_handlers(
151 &mut self,
152 mut params: TransportPipe,
153 cx: AsyncApp,
154 ) -> Result<(Receiver<Message>, Sender<Message>)> {
155 let (client_tx, server_rx) = unbounded::<Message>();
156 let (server_tx, client_rx) = unbounded::<Message>();
157
158 let log_dap_communications =
159 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161 .unwrap_or(false);
162
163 let log_handler = if log_dap_communications {
164 Some(self.log_handlers.clone())
165 } else {
166 None
167 };
168
169 cx.update(|cx| {
170 if let Some(stdout) = params.stdout.take() {
171 self._tasks.push(
172 cx.background_executor()
173 .spawn(Self::handle_adapter_log(stdout, log_handler.clone()).log_err()),
174 );
175 }
176
177 self._tasks.push(
178 cx.background_executor().spawn(
179 Self::handle_output(
180 params.output,
181 client_tx,
182 self.pending_requests.clone(),
183 log_handler.clone(),
184 )
185 .log_err(),
186 ),
187 );
188
189 if let Some(stderr) = params.stderr.take() {
190 self._tasks.push(
191 cx.background_executor()
192 .spawn(Self::handle_error(stderr, self.log_handlers.clone()).log_err()),
193 );
194 }
195
196 self._tasks.push(
197 cx.background_executor().spawn(
198 Self::handle_input(
199 params.input,
200 client_rx,
201 self.current_requests.clone(),
202 self.pending_requests.clone(),
203 log_handler.clone(),
204 )
205 .log_err(),
206 ),
207 );
208 })?;
209
210 {
211 let mut lock = self.server_tx.lock().await;
212 *lock = Some(server_tx.clone());
213 }
214
215 Ok((server_rx, server_tx))
216 }
217
218 pub(crate) async fn add_pending_request(
219 &self,
220 sequence_id: u64,
221 request: oneshot::Sender<Result<Response>>,
222 ) {
223 let mut pending_requests = self.pending_requests.lock().await;
224 pending_requests.insert(sequence_id, request);
225 }
226
227 pub(crate) async fn cancel_pending_request(&self, sequence_id: &u64) {
228 let mut pending_requests = self.pending_requests.lock().await;
229 pending_requests.remove(sequence_id);
230 }
231
232 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
233 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
234 server_tx
235 .send(message)
236 .await
237 .map_err(|e| anyhow!("Failed to send message: {}", e))
238 } else {
239 Err(anyhow!("Server tx already dropped"))
240 }
241 }
242
243 async fn handle_adapter_log<Stdout>(
244 stdout: Stdout,
245 log_handlers: Option<LogHandlers>,
246 ) -> Result<()>
247 where
248 Stdout: AsyncRead + Unpin + Send + 'static,
249 {
250 let mut reader = BufReader::new(stdout);
251 let mut line = String::new();
252
253 let result = loop {
254 line.truncate(0);
255
256 let bytes_read = match reader.read_line(&mut line).await {
257 Ok(bytes_read) => bytes_read,
258 Err(e) => break Err(e.into()),
259 };
260
261 if bytes_read == 0 {
262 break Err(anyhow!("Debugger log stream closed"));
263 }
264
265 if let Some(log_handlers) = log_handlers.as_ref() {
266 for (kind, handler) in log_handlers.lock().iter_mut() {
267 if matches!(kind, LogKind::Adapter) {
268 handler(IoKind::StdOut, line.as_str());
269 }
270 }
271 }
272 };
273
274 log::debug!("Handle adapter log dropped");
275
276 result
277 }
278
279 fn build_rpc_message(message: String) -> String {
280 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
281 }
282
283 async fn handle_input<Stdin>(
284 mut server_stdin: Stdin,
285 client_rx: Receiver<Message>,
286 current_requests: Requests,
287 pending_requests: Requests,
288 log_handlers: Option<LogHandlers>,
289 ) -> Result<()>
290 where
291 Stdin: AsyncWrite + Unpin + Send + 'static,
292 {
293 let result = loop {
294 match client_rx.recv().await {
295 Ok(message) => {
296 if let Message::Request(request) = &message {
297 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
298 pending_requests.lock().await.insert(request.seq, sender);
299 }
300 }
301
302 let message = match serde_json::to_string(&message) {
303 Ok(message) => message,
304 Err(e) => break Err(e.into()),
305 };
306
307 if let Some(log_handlers) = log_handlers.as_ref() {
308 for (kind, log_handler) in log_handlers.lock().iter_mut() {
309 if matches!(kind, LogKind::Rpc) {
310 log_handler(IoKind::StdIn, &message);
311 }
312 }
313 }
314
315 if let Err(e) = server_stdin
316 .write_all(Self::build_rpc_message(message).as_bytes())
317 .await
318 {
319 break Err(e.into());
320 }
321
322 if let Err(e) = server_stdin.flush().await {
323 break Err(e.into());
324 }
325 }
326 Err(error) => break Err(error.into()),
327 }
328 };
329
330 log::debug!("Handle adapter input dropped");
331
332 result
333 }
334
335 async fn handle_output<Stdout>(
336 server_stdout: Stdout,
337 client_tx: Sender<Message>,
338 pending_requests: Requests,
339 log_handlers: Option<LogHandlers>,
340 ) -> Result<()>
341 where
342 Stdout: AsyncRead + Unpin + Send + 'static,
343 {
344 let mut recv_buffer = String::new();
345 let mut reader = BufReader::new(server_stdout);
346
347 let result = loop {
348 let message =
349 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
350 .await;
351
352 match message {
353 Ok(Message::Response(res)) => {
354 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
355 if let Err(e) = tx.send(Self::process_response(res)) {
356 log::trace!("Did not send response `{:?}` for a cancelled", e);
357 }
358 } else {
359 client_tx.send(Message::Response(res)).await?;
360 };
361 }
362 Ok(message) => {
363 client_tx.send(message).await?;
364 }
365 Err(e) => break Err(e),
366 }
367 };
368
369 drop(client_tx);
370
371 log::debug!("Handle adapter output dropped");
372
373 result
374 }
375
376 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
377 where
378 Stderr: AsyncRead + Unpin + Send + 'static,
379 {
380 log::debug!("Handle error started");
381 let mut buffer = String::new();
382
383 let mut reader = BufReader::new(stderr);
384
385 let result = loop {
386 match reader.read_line(&mut buffer).await {
387 Ok(0) => break Err(anyhow!("debugger error stream closed")),
388 Ok(_) => {
389 for (kind, log_handler) in log_handlers.lock().iter_mut() {
390 if matches!(kind, LogKind::Adapter) {
391 log_handler(IoKind::StdErr, buffer.as_str());
392 }
393 }
394
395 buffer.truncate(0);
396 }
397 Err(error) => break Err(error.into()),
398 }
399 };
400
401 log::debug!("Handle adapter error dropped");
402
403 result
404 }
405
406 fn process_response(response: Response) -> Result<Response> {
407 if response.success {
408 Ok(response)
409 } else {
410 if let Some(error_message) = response
411 .body
412 .clone()
413 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
414 .and_then(|response| response.error.map(|msg| msg.format))
415 .or_else(|| response.message.clone())
416 {
417 return Err(anyhow!(error_message));
418 };
419
420 Err(anyhow!(
421 "Received error response from adapter. Response: {:?}",
422 response.clone()
423 ))
424 }
425 }
426
427 async fn receive_server_message<Stdout>(
428 reader: &mut BufReader<Stdout>,
429 buffer: &mut String,
430 log_handlers: Option<&LogHandlers>,
431 ) -> Result<Message>
432 where
433 Stdout: AsyncRead + Unpin + Send + 'static,
434 {
435 let mut content_length = None;
436 loop {
437 buffer.truncate(0);
438
439 if reader
440 .read_line(buffer)
441 .await
442 .with_context(|| "reading a message from server")?
443 == 0
444 {
445 return Err(anyhow!("debugger reader stream closed"));
446 };
447
448 if buffer == "\r\n" {
449 break;
450 }
451
452 let parts = buffer.trim().split_once(": ");
453
454 match parts {
455 Some(("Content-Length", value)) => {
456 content_length = Some(value.parse().context("invalid content length")?);
457 }
458 _ => {}
459 }
460 }
461
462 let content_length = content_length.context("missing content length")?;
463
464 let mut content = vec![0; content_length];
465 reader
466 .read_exact(&mut content)
467 .await
468 .with_context(|| "reading after a loop")?;
469
470 let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
471
472 if let Some(log_handlers) = log_handlers {
473 for (kind, log_handler) in log_handlers.lock().iter_mut() {
474 if matches!(kind, LogKind::Rpc) {
475 log_handler(IoKind::StdOut, &message);
476 }
477 }
478 }
479
480 Ok(serde_json::from_str::<Message>(message)?)
481 }
482
483 pub async fn shutdown(&self) -> Result<()> {
484 log::debug!("Start shutdown client");
485
486 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
487 server_tx.close();
488 }
489
490 let mut current_requests = self.current_requests.lock().await;
491 let mut pending_requests = self.pending_requests.lock().await;
492
493 current_requests.clear();
494 pending_requests.clear();
495
496 let _ = self.transport.kill().await.log_err();
497
498 drop(current_requests);
499 drop(pending_requests);
500
501 log::debug!("Shutdown client completed");
502
503 anyhow::Ok(())
504 }
505
506 pub fn has_adapter_logs(&self) -> bool {
507 self.transport.has_adapter_logs()
508 }
509
510 pub fn transport(&self) -> &Transport {
511 &self.transport
512 }
513
514 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
515 where
516 F: 'static + Send + FnMut(IoKind, &str),
517 {
518 let mut log_handlers = self.log_handlers.lock();
519 log_handlers.push((kind, Box::new(f)));
520 }
521}
522
523pub struct TcpTransport {
524 pub port: u16,
525 pub host: Ipv4Addr,
526 pub timeout: u64,
527 process: Mutex<Child>,
528}
529
530impl TcpTransport {
531 /// Get an open port to use with the tcp client when not supplied by debug config
532 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
533 if let Some(port) = host.port {
534 Ok(port)
535 } else {
536 Self::unused_port(host.host()).await
537 }
538 }
539
540 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
541 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
542 .await?
543 .local_addr()?
544 .port())
545 }
546
547 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
548 let Some(connection_args) = binary.connection.as_ref() else {
549 return Err(anyhow!("No connection arguments provided"));
550 };
551
552 let host = connection_args.host;
553 let port = connection_args.port;
554
555 let mut command = util::command::new_std_command(&binary.command);
556 util::set_pre_exec_to_start_new_session(&mut command);
557 let mut command = smol::process::Command::from(command);
558
559 if let Some(cwd) = &binary.cwd {
560 command.current_dir(cwd);
561 }
562
563 command.args(&binary.arguments);
564 command.envs(&binary.envs);
565
566 command
567 .stdin(Stdio::null())
568 .stdout(Stdio::piped())
569 .stderr(Stdio::piped())
570 .kill_on_drop(true);
571
572 let mut process = command
573 .spawn()
574 .with_context(|| "failed to start debug adapter.")?;
575
576 let address = SocketAddrV4::new(host, port);
577
578 let timeout = connection_args.timeout.unwrap_or_else(|| {
579 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
580 .unwrap_or(2000u64)
581 });
582
583 let (mut process, (rx, tx)) = select! {
584 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
585 return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
586 },
587 result = cx.spawn(async move |cx| {
588 loop {
589 match TcpStream::connect(address).await {
590 Ok(stream) => return Ok((process, stream.split())),
591 Err(_) => {
592 if let Ok(Some(_)) = process.try_status() {
593 let output = process.output().await?;
594 let output = if output.stderr.is_empty() {
595 String::from_utf8_lossy(&output.stdout).to_string()
596 } else {
597 String::from_utf8_lossy(&output.stderr).to_string()
598 };
599 return Err(anyhow!("{}\nerror: process exited before debugger attached.", output));
600 }
601 cx.background_executor().timer(Duration::from_millis(100)).await;
602 }
603 }
604 }
605 }).fuse() => result?
606 };
607
608 log::info!(
609 "Debug adapter has connected to TCP server {}:{}",
610 host,
611 port
612 );
613 let stdout = process.stdout.take();
614 let stderr = process.stderr.take();
615
616 let this = Self {
617 port,
618 host,
619 process: Mutex::new(process),
620 timeout,
621 };
622
623 let pipe = TransportPipe::new(
624 Box::new(tx),
625 Box::new(BufReader::new(rx)),
626 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
627 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
628 );
629
630 Ok((pipe, this))
631 }
632
633 fn has_adapter_logs(&self) -> bool {
634 true
635 }
636
637 async fn kill(&self) -> Result<()> {
638 self.process.lock().await.kill()?;
639
640 Ok(())
641 }
642}
643
644pub struct StdioTransport {
645 process: Mutex<Child>,
646}
647
648impl StdioTransport {
649 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
650 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
651 let mut command = util::command::new_std_command(&binary.command);
652 util::set_pre_exec_to_start_new_session(&mut command);
653 let mut command = smol::process::Command::from(command);
654
655 if let Some(cwd) = &binary.cwd {
656 command.current_dir(cwd);
657 }
658
659 command.args(&binary.arguments);
660 command.envs(&binary.envs);
661
662 command
663 .stdin(Stdio::piped())
664 .stdout(Stdio::piped())
665 .stderr(Stdio::piped())
666 .kill_on_drop(true);
667
668 let mut process = command
669 .spawn()
670 .with_context(|| "failed to spawn command.")?;
671
672 let stdin = process
673 .stdin
674 .take()
675 .ok_or_else(|| anyhow!("Failed to open stdin"))?;
676 let stdout = process
677 .stdout
678 .take()
679 .ok_or_else(|| anyhow!("Failed to open stdout"))?;
680 let stderr = process
681 .stderr
682 .take()
683 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
684
685 if stderr.is_none() {
686 bail!(
687 "Failed to connect to stderr for debug adapter command {}",
688 &binary.command
689 );
690 }
691
692 log::info!("Debug adapter has connected to stdio adapter");
693
694 let process = Mutex::new(process);
695
696 Ok((
697 TransportPipe::new(
698 Box::new(stdin),
699 Box::new(BufReader::new(stdout)),
700 None,
701 stderr,
702 ),
703 Self { process },
704 ))
705 }
706
707 fn has_adapter_logs(&self) -> bool {
708 false
709 }
710
711 async fn kill(&self) -> Result<()> {
712 self.process.lock().await.kill()?;
713 Ok(())
714 }
715}
716
717#[cfg(any(test, feature = "test-support"))]
718type RequestHandler =
719 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
720
721#[cfg(any(test, feature = "test-support"))]
722type ResponseHandler = Box<dyn Send + Fn(Response)>;
723
724#[cfg(any(test, feature = "test-support"))]
725pub struct FakeTransport {
726 // for sending fake response back from adapter side
727 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
728 // for reverse request responses
729 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
730}
731
732#[cfg(any(test, feature = "test-support"))]
733impl FakeTransport {
734 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
735 where
736 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
737 {
738 self.request_handlers.lock().insert(
739 R::COMMAND,
740 Box::new(move |seq, args| {
741 let result = handler(seq, serde_json::from_value(args).unwrap());
742 let response = match result {
743 Ok(response) => Response {
744 seq: seq + 1,
745 request_seq: seq,
746 success: true,
747 command: R::COMMAND.into(),
748 body: Some(serde_json::to_value(response).unwrap()),
749 message: None,
750 },
751 Err(response) => Response {
752 seq: seq + 1,
753 request_seq: seq,
754 success: false,
755 command: R::COMMAND.into(),
756 body: Some(serde_json::to_value(response).unwrap()),
757 message: None,
758 },
759 };
760 response
761 }),
762 );
763 }
764
765 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
766 where
767 F: 'static + Send + Fn(Response),
768 {
769 self.response_handlers
770 .lock()
771 .insert(R::COMMAND, Box::new(handler));
772 }
773
774 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
775 let this = Self {
776 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
777 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
778 };
779 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
780 use serde_json::json;
781
782 let (stdin_writer, stdin_reader) = async_pipe::pipe();
783 let (stdout_writer, stdout_reader) = async_pipe::pipe();
784
785 let request_handlers = this.request_handlers.clone();
786 let response_handlers = this.response_handlers.clone();
787 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
788
789 cx.background_executor()
790 .spawn(async move {
791 let mut reader = BufReader::new(stdin_reader);
792 let mut buffer = String::new();
793
794 loop {
795 let message =
796 TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
797 .await;
798
799 match message {
800 Err(error) => {
801 break anyhow!(error);
802 }
803 Ok(message) => {
804 match message {
805 Message::Request(request) => {
806 // redirect reverse requests to stdout writer/reader
807 if request.command == RunInTerminal::COMMAND
808 || request.command == StartDebugging::COMMAND
809 {
810 let message =
811 serde_json::to_string(&Message::Request(request))
812 .unwrap();
813
814 let mut writer = stdout_writer.lock().await;
815 writer
816 .write_all(
817 TransportDelegate::build_rpc_message(message)
818 .as_bytes(),
819 )
820 .await
821 .unwrap();
822 writer.flush().await.unwrap();
823 } else {
824 let response = if let Some(handle) = request_handlers
825 .lock()
826 .get_mut(request.command.as_str())
827 {
828 handle(
829 request.seq,
830 request.arguments.unwrap_or(json!({})),
831 )
832 } else {
833 panic!("No request handler for {}", request.command);
834 };
835 let message =
836 serde_json::to_string(&Message::Response(response))
837 .unwrap();
838
839 let mut writer = stdout_writer.lock().await;
840
841 writer
842 .write_all(
843 TransportDelegate::build_rpc_message(message)
844 .as_bytes(),
845 )
846 .await
847 .unwrap();
848 writer.flush().await.unwrap();
849 }
850 }
851 Message::Event(event) => {
852 let message =
853 serde_json::to_string(&Message::Event(event)).unwrap();
854
855 let mut writer = stdout_writer.lock().await;
856 writer
857 .write_all(
858 TransportDelegate::build_rpc_message(message)
859 .as_bytes(),
860 )
861 .await
862 .unwrap();
863 writer.flush().await.unwrap();
864 }
865 Message::Response(response) => {
866 if let Some(handle) =
867 response_handlers.lock().get(response.command.as_str())
868 {
869 handle(response);
870 } else {
871 log::error!("No response handler for {}", response.command);
872 }
873 }
874 }
875 }
876 }
877 }
878 })
879 .detach();
880
881 Ok((
882 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
883 this,
884 ))
885 }
886
887 fn has_adapter_logs(&self) -> bool {
888 false
889 }
890
891 async fn kill(&self) -> Result<()> {
892 Ok(())
893 }
894}