1use anyhow::{anyhow, bail, Context, Result};
2use dap_types::{
3 messages::{Message, Response},
4 ErrorResponse,
5};
6use futures::{channel::oneshot, select, AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _};
7use gpui::AsyncApp;
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{unbounded, Receiver, Sender},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TCPHost;
25use util::ResultExt as _;
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 #[cfg(any(test, feature = "test-support"))]
78 async fn start(_: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
79 #[cfg(any(test, feature = "test-support"))]
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 #[cfg(not(any(test, feature = "test-support")))]
86 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
87 if binary.connection.is_some() {
88 TcpTransport::start(binary, cx)
89 .await
90 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
91 } else {
92 StdioTransport::start(binary, cx)
93 .await
94 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
95 }
96 }
97
98 fn has_adapter_logs(&self) -> bool {
99 match self {
100 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
101 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
102 #[cfg(any(test, feature = "test-support"))]
103 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
104 }
105 }
106
107 async fn kill(&self) -> Result<()> {
108 match self {
109 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
110 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
111 #[cfg(any(test, feature = "test-support"))]
112 Transport::Fake(fake_transport) => fake_transport.kill().await,
113 }
114 }
115
116 #[cfg(any(test, feature = "test-support"))]
117 pub(crate) fn as_fake(&self) -> &FakeTransport {
118 match self {
119 Transport::Fake(fake_transport) => fake_transport,
120 _ => panic!("Not a fake transport layer"),
121 }
122 }
123}
124
125pub(crate) struct TransportDelegate {
126 log_handlers: LogHandlers,
127 current_requests: Requests,
128 pending_requests: Requests,
129 transport: Transport,
130 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
131}
132
133impl TransportDelegate {
134 pub(crate) async fn start(
135 binary: &DebugAdapterBinary,
136 cx: AsyncApp,
137 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
138 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
139 let mut this = Self {
140 transport,
141 server_tx: Default::default(),
142 log_handlers: Default::default(),
143 current_requests: Default::default(),
144 pending_requests: Default::default(),
145 };
146 let messages = this.start_handlers(transport_pipes, cx).await?;
147 Ok((messages, this))
148 }
149
150 async fn start_handlers(
151 &mut self,
152 mut params: TransportPipe,
153 cx: AsyncApp,
154 ) -> Result<(Receiver<Message>, Sender<Message>)> {
155 let (client_tx, server_rx) = unbounded::<Message>();
156 let (server_tx, client_rx) = unbounded::<Message>();
157
158 let log_dap_communications =
159 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161 .unwrap_or(false);
162
163 let log_handler = if log_dap_communications {
164 Some(self.log_handlers.clone())
165 } else {
166 None
167 };
168
169 cx.update(|cx| {
170 if let Some(stdout) = params.stdout.take() {
171 cx.background_executor()
172 .spawn(Self::handle_adapter_log(stdout, log_handler.clone()))
173 .detach_and_log_err(cx);
174 }
175
176 cx.background_executor()
177 .spawn(Self::handle_output(
178 params.output,
179 client_tx,
180 self.pending_requests.clone(),
181 log_handler.clone(),
182 ))
183 .detach_and_log_err(cx);
184
185 if let Some(stderr) = params.stderr.take() {
186 cx.background_executor()
187 .spawn(Self::handle_error(stderr, self.log_handlers.clone()))
188 .detach_and_log_err(cx);
189 }
190
191 cx.background_executor()
192 .spawn(Self::handle_input(
193 params.input,
194 client_rx,
195 self.current_requests.clone(),
196 self.pending_requests.clone(),
197 log_handler.clone(),
198 ))
199 .detach_and_log_err(cx);
200 })?;
201
202 {
203 let mut lock = self.server_tx.lock().await;
204 *lock = Some(server_tx.clone());
205 }
206
207 Ok((server_rx, server_tx))
208 }
209
210 pub(crate) async fn add_pending_request(
211 &self,
212 sequence_id: u64,
213 request: oneshot::Sender<Result<Response>>,
214 ) {
215 let mut pending_requests = self.pending_requests.lock().await;
216 pending_requests.insert(sequence_id, request);
217 }
218
219 pub(crate) async fn cancel_pending_request(&self, sequence_id: &u64) {
220 let mut pending_requests = self.pending_requests.lock().await;
221 pending_requests.remove(sequence_id);
222 }
223
224 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
225 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
226 server_tx
227 .send(message)
228 .await
229 .map_err(|e| anyhow!("Failed to send message: {}", e))
230 } else {
231 Err(anyhow!("Server tx already dropped"))
232 }
233 }
234
235 async fn handle_adapter_log<Stdout>(
236 stdout: Stdout,
237 log_handlers: Option<LogHandlers>,
238 ) -> Result<()>
239 where
240 Stdout: AsyncRead + Unpin + Send + 'static,
241 {
242 let mut reader = BufReader::new(stdout);
243 let mut line = String::new();
244
245 let result = loop {
246 line.truncate(0);
247
248 let bytes_read = match reader.read_line(&mut line).await {
249 Ok(bytes_read) => bytes_read,
250 Err(e) => break Err(e.into()),
251 };
252
253 if bytes_read == 0 {
254 break Err(anyhow!("Debugger log stream closed"));
255 }
256
257 if let Some(log_handlers) = log_handlers.as_ref() {
258 for (kind, handler) in log_handlers.lock().iter_mut() {
259 if matches!(kind, LogKind::Adapter) {
260 handler(IoKind::StdOut, line.as_str());
261 }
262 }
263 }
264
265 smol::future::yield_now().await;
266 };
267
268 log::debug!("Handle adapter log dropped");
269
270 result
271 }
272
273 fn build_rpc_message(message: String) -> String {
274 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
275 }
276
277 async fn handle_input<Stdin>(
278 mut server_stdin: Stdin,
279 client_rx: Receiver<Message>,
280 current_requests: Requests,
281 pending_requests: Requests,
282 log_handlers: Option<LogHandlers>,
283 ) -> Result<()>
284 where
285 Stdin: AsyncWrite + Unpin + Send + 'static,
286 {
287 let result = loop {
288 match client_rx.recv().await {
289 Ok(message) => {
290 if let Message::Request(request) = &message {
291 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
292 pending_requests.lock().await.insert(request.seq, sender);
293 }
294 }
295
296 let message = match serde_json::to_string(&message) {
297 Ok(message) => message,
298 Err(e) => break Err(e.into()),
299 };
300
301 if let Some(log_handlers) = log_handlers.as_ref() {
302 for (kind, log_handler) in log_handlers.lock().iter_mut() {
303 if matches!(kind, LogKind::Rpc) {
304 log_handler(IoKind::StdIn, &message);
305 }
306 }
307 }
308
309 if let Err(e) = server_stdin
310 .write_all(Self::build_rpc_message(message).as_bytes())
311 .await
312 {
313 break Err(e.into());
314 }
315
316 if let Err(e) = server_stdin.flush().await {
317 break Err(e.into());
318 }
319 }
320 Err(error) => break Err(error.into()),
321 }
322
323 smol::future::yield_now().await;
324 };
325
326 log::debug!("Handle adapter input dropped");
327
328 result
329 }
330
331 async fn handle_output<Stdout>(
332 server_stdout: Stdout,
333 client_tx: Sender<Message>,
334 pending_requests: Requests,
335 log_handlers: Option<LogHandlers>,
336 ) -> Result<()>
337 where
338 Stdout: AsyncRead + Unpin + Send + 'static,
339 {
340 let mut recv_buffer = String::new();
341 let mut reader = BufReader::new(server_stdout);
342
343 let result = loop {
344 let message =
345 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
346 .await;
347
348 match message {
349 Ok(Message::Response(res)) => {
350 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
351 if let Err(e) = tx.send(Self::process_response(res)) {
352 log::trace!("Did not send response `{:?}` for a cancelled", e);
353 }
354 } else {
355 client_tx.send(Message::Response(res)).await?;
356 };
357 }
358 Ok(message) => {
359 client_tx.send(message).await?;
360 }
361 Err(e) => break Err(e),
362 }
363
364 smol::future::yield_now().await;
365 };
366
367 drop(client_tx);
368
369 log::debug!("Handle adapter output dropped");
370
371 result
372 }
373
374 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> Result<()>
375 where
376 Stderr: AsyncRead + Unpin + Send + 'static,
377 {
378 let mut buffer = String::new();
379
380 let mut reader = BufReader::new(stderr);
381
382 let result = loop {
383 match reader.read_line(&mut buffer).await {
384 Ok(0) => break Err(anyhow!("debugger error stream closed")),
385 Ok(_) => {
386 for (kind, log_handler) in log_handlers.lock().iter_mut() {
387 if matches!(kind, LogKind::Adapter) {
388 log_handler(IoKind::StdErr, buffer.as_str());
389 }
390 }
391
392 buffer.truncate(0);
393 }
394 Err(error) => break Err(error.into()),
395 }
396
397 smol::future::yield_now().await;
398 };
399
400 log::debug!("Handle adapter error dropped");
401
402 result
403 }
404
405 fn process_response(response: Response) -> Result<Response> {
406 if response.success {
407 Ok(response)
408 } else {
409 if let Some(error_message) = response
410 .body
411 .clone()
412 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
413 .and_then(|response| response.error.map(|msg| msg.format))
414 .or_else(|| response.message.clone())
415 {
416 return Err(anyhow!(error_message));
417 };
418
419 Err(anyhow!(
420 "Received error response from adapter. Response: {:?}",
421 response.clone()
422 ))
423 }
424 }
425
426 async fn receive_server_message<Stdout>(
427 reader: &mut BufReader<Stdout>,
428 buffer: &mut String,
429 log_handlers: Option<&LogHandlers>,
430 ) -> Result<Message>
431 where
432 Stdout: AsyncRead + Unpin + Send + 'static,
433 {
434 let mut content_length = None;
435 loop {
436 buffer.truncate(0);
437
438 if reader
439 .read_line(buffer)
440 .await
441 .with_context(|| "reading a message from server")?
442 == 0
443 {
444 return Err(anyhow!("debugger reader stream closed"));
445 };
446
447 if buffer == "\r\n" {
448 break;
449 }
450
451 let parts = buffer.trim().split_once(": ");
452
453 match parts {
454 Some(("Content-Length", value)) => {
455 content_length = Some(value.parse().context("invalid content length")?);
456 }
457 _ => {}
458 }
459 }
460
461 let content_length = content_length.context("missing content length")?;
462
463 let mut content = vec![0; content_length];
464 reader
465 .read_exact(&mut content)
466 .await
467 .with_context(|| "reading after a loop")?;
468
469 let message = std::str::from_utf8(&content).context("invalid utf8 from server")?;
470
471 if let Some(log_handlers) = log_handlers {
472 for (kind, log_handler) in log_handlers.lock().iter_mut() {
473 if matches!(kind, LogKind::Rpc) {
474 log_handler(IoKind::StdOut, &message);
475 }
476 }
477 }
478
479 Ok(serde_json::from_str::<Message>(message)?)
480 }
481
482 pub async fn shutdown(&self) -> Result<()> {
483 log::debug!("Start shutdown client");
484
485 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
486 server_tx.close();
487 }
488
489 let mut current_requests = self.current_requests.lock().await;
490 let mut pending_requests = self.pending_requests.lock().await;
491
492 current_requests.clear();
493 pending_requests.clear();
494
495 let _ = self.transport.kill().await.log_err();
496
497 drop(current_requests);
498 drop(pending_requests);
499
500 log::debug!("Shutdown client completed");
501
502 anyhow::Ok(())
503 }
504
505 pub fn has_adapter_logs(&self) -> bool {
506 self.transport.has_adapter_logs()
507 }
508
509 pub fn transport(&self) -> &Transport {
510 &self.transport
511 }
512
513 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
514 where
515 F: 'static + Send + FnMut(IoKind, &str),
516 {
517 let mut log_handlers = self.log_handlers.lock();
518 log_handlers.push((kind, Box::new(f)));
519 }
520}
521
522pub struct TcpTransport {
523 pub port: u16,
524 pub host: Ipv4Addr,
525 pub timeout: u64,
526 process: Mutex<Child>,
527}
528
529impl TcpTransport {
530 /// Get an open port to use with the tcp client when not supplied by debug config
531 pub async fn port(host: &TCPHost) -> Result<u16> {
532 if let Some(port) = host.port {
533 Ok(port)
534 } else {
535 Ok(TcpListener::bind(SocketAddrV4::new(host.host(), 0))
536 .await?
537 .local_addr()?
538 .port())
539 }
540 }
541
542 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
543 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
544 let Some(connection_args) = binary.connection.as_ref() else {
545 return Err(anyhow!("No connection arguments provided"));
546 };
547
548 let host = connection_args.host;
549 let port = connection_args.port;
550
551 let mut command = util::command::new_smol_command(&binary.command);
552
553 if let Some(cwd) = &binary.cwd {
554 command.current_dir(cwd);
555 }
556
557 if let Some(args) = &binary.arguments {
558 command.args(args);
559 }
560
561 if let Some(envs) = &binary.envs {
562 command.envs(envs);
563 }
564
565 command
566 .stdin(Stdio::null())
567 .stdout(Stdio::piped())
568 .stderr(Stdio::piped())
569 .kill_on_drop(true);
570
571 let mut process = command
572 .spawn()
573 .with_context(|| "failed to start debug adapter.")?;
574
575 let address = SocketAddrV4::new(host, port);
576
577 let timeout = connection_args.timeout.unwrap_or_else(|| {
578 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
579 .unwrap_or(2000u64)
580 });
581
582 let (rx, tx) = select! {
583 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
584 return Err(anyhow!(format!("Connection to TCP DAP timeout {}:{}", host, port)))
585 },
586 result = cx.spawn(async move |cx| {
587 loop {
588 match TcpStream::connect(address).await {
589 Ok(stream) => return stream.split(),
590 Err(_) => {
591 cx.background_executor().timer(Duration::from_millis(100)).await;
592 }
593 }
594 }
595 }).fuse() => result
596 };
597 log::info!(
598 "Debug adapter has connected to TCP server {}:{}",
599 host,
600 port
601 );
602 let stdout = process.stdout.take();
603 let stderr = process.stderr.take();
604
605 let this = Self {
606 port,
607 host,
608 process: Mutex::new(process),
609 timeout,
610 };
611
612 let pipe = TransportPipe::new(
613 Box::new(tx),
614 Box::new(BufReader::new(rx)),
615 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
616 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
617 );
618
619 Ok((pipe, this))
620 }
621
622 fn has_adapter_logs(&self) -> bool {
623 true
624 }
625
626 async fn kill(&self) -> Result<()> {
627 self.process.lock().await.kill()?;
628
629 Ok(())
630 }
631}
632
633pub struct StdioTransport {
634 process: Mutex<Child>,
635}
636
637impl StdioTransport {
638 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
639 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
640 let mut command = util::command::new_smol_command(&binary.command);
641
642 if let Some(cwd) = &binary.cwd {
643 command.current_dir(cwd);
644 }
645
646 if let Some(args) = &binary.arguments {
647 command.args(args);
648 }
649
650 if let Some(envs) = &binary.envs {
651 command.envs(envs);
652 }
653
654 command
655 .stdin(Stdio::piped())
656 .stdout(Stdio::piped())
657 .stderr(Stdio::piped())
658 .kill_on_drop(true);
659
660 let mut process = command
661 .spawn()
662 .with_context(|| "failed to spawn command.")?;
663
664 let stdin = process
665 .stdin
666 .take()
667 .ok_or_else(|| anyhow!("Failed to open stdin"))?;
668 let stdout = process
669 .stdout
670 .take()
671 .ok_or_else(|| anyhow!("Failed to open stdout"))?;
672 let stderr = process
673 .stderr
674 .take()
675 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
676
677 if stderr.is_none() {
678 bail!(
679 "Failed to connect to stderr for debug adapter command {}",
680 &binary.command
681 );
682 }
683
684 log::info!("Debug adapter has connected to stdio adapter");
685
686 let process = Mutex::new(process);
687
688 Ok((
689 TransportPipe::new(
690 Box::new(stdin),
691 Box::new(BufReader::new(stdout)),
692 None,
693 stderr,
694 ),
695 Self { process },
696 ))
697 }
698
699 fn has_adapter_logs(&self) -> bool {
700 false
701 }
702
703 async fn kill(&self) -> Result<()> {
704 self.process.lock().await.kill()?;
705 Ok(())
706 }
707}
708
709#[cfg(any(test, feature = "test-support"))]
710type RequestHandler = Box<
711 dyn Send
712 + FnMut(
713 u64,
714 serde_json::Value,
715 Arc<Mutex<async_pipe::PipeWriter>>,
716 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>,
717>;
718
719#[cfg(any(test, feature = "test-support"))]
720type ResponseHandler = Box<dyn Send + Fn(Response)>;
721
722#[cfg(any(test, feature = "test-support"))]
723pub struct FakeTransport {
724 // for sending fake response back from adapter side
725 request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
726 // for reverse request responses
727 response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
728}
729
730#[cfg(any(test, feature = "test-support"))]
731impl FakeTransport {
732 pub async fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
733 where
734 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
735 {
736 self.request_handlers.lock().await.insert(
737 R::COMMAND,
738 Box::new(
739 move |seq, args, writer: Arc<Mutex<async_pipe::PipeWriter>>| {
740 let response = handler(seq, serde_json::from_value(args).unwrap());
741
742 let message = serde_json::to_string(&Message::Response(Response {
743 seq: seq + 1,
744 request_seq: seq,
745 success: response.as_ref().is_ok(),
746 command: R::COMMAND.into(),
747 body: util::maybe!({ serde_json::to_value(response.ok()?).ok() }),
748 message: None,
749 }))
750 .unwrap();
751
752 let writer = writer.clone();
753
754 Box::pin(async move {
755 let mut writer = writer.lock().await;
756 writer
757 .write_all(TransportDelegate::build_rpc_message(message).as_bytes())
758 .await
759 .unwrap();
760 writer.flush().await.unwrap();
761 })
762 },
763 ),
764 );
765 }
766
767 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
768 where
769 F: 'static + Send + Fn(Response),
770 {
771 self.response_handlers
772 .lock()
773 .await
774 .insert(R::COMMAND, Box::new(handler));
775 }
776
777 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
778 let this = Self {
779 request_handlers: Arc::new(Mutex::new(HashMap::default())),
780 response_handlers: Arc::new(Mutex::new(HashMap::default())),
781 };
782 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
783 use serde_json::json;
784
785 let (stdin_writer, stdin_reader) = async_pipe::pipe();
786 let (stdout_writer, stdout_reader) = async_pipe::pipe();
787
788 let request_handlers = this.request_handlers.clone();
789 let response_handlers = this.response_handlers.clone();
790 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
791
792 cx.background_executor()
793 .spawn(async move {
794 let mut reader = BufReader::new(stdin_reader);
795 let mut buffer = String::new();
796
797 loop {
798 let message =
799 TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
800 .await;
801
802 match message {
803 Err(error) => {
804 break anyhow!(error);
805 }
806 Ok(message) => {
807 match message {
808 Message::Request(request) => {
809 // redirect reverse requests to stdout writer/reader
810 if request.command == RunInTerminal::COMMAND
811 || request.command == StartDebugging::COMMAND
812 {
813 let message =
814 serde_json::to_string(&Message::Request(request))
815 .unwrap();
816
817 let mut writer = stdout_writer.lock().await;
818 writer
819 .write_all(
820 TransportDelegate::build_rpc_message(message)
821 .as_bytes(),
822 )
823 .await
824 .unwrap();
825 writer.flush().await.unwrap();
826 } else {
827 if let Some(handle) = request_handlers
828 .lock()
829 .await
830 .get_mut(request.command.as_str())
831 {
832 handle(
833 request.seq,
834 request.arguments.unwrap_or(json!({})),
835 stdout_writer.clone(),
836 )
837 .await;
838 } else {
839 log::error!(
840 "No request handler for {}",
841 request.command
842 );
843 }
844 }
845 }
846 Message::Event(event) => {
847 let message =
848 serde_json::to_string(&Message::Event(event)).unwrap();
849
850 let mut writer = stdout_writer.lock().await;
851 writer
852 .write_all(
853 TransportDelegate::build_rpc_message(message)
854 .as_bytes(),
855 )
856 .await
857 .unwrap();
858 writer.flush().await.unwrap();
859 }
860 Message::Response(response) => {
861 if let Some(handle) = response_handlers
862 .lock()
863 .await
864 .get(response.command.as_str())
865 {
866 handle(response);
867 } else {
868 log::error!("No response handler for {}", response.command);
869 }
870 }
871 }
872 }
873 }
874 }
875 })
876 .detach();
877
878 Ok((
879 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
880 this,
881 ))
882 }
883
884 fn has_adapter_logs(&self) -> bool {
885 false
886 }
887
888 async fn kill(&self) -> Result<()> {
889 Ok(())
890 }
891}