1use anyhow::{Context as _, Result, anyhow, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::{AppContext as _, AsyncApp, Task};
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15};
16use std::{
17 collections::HashMap,
18 net::{Ipv4Addr, SocketAddrV4},
19 process::Stdio,
20 sync::Arc,
21 time::Duration,
22};
23use task::TcpArgumentsTemplate;
24use util::ConnectionResult;
25
26use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
27
28pub(crate) type IoMessage = str;
29pub(crate) type Command = str;
30pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
31
32#[derive(PartialEq, Eq, Clone, Copy)]
33pub enum LogKind {
34 Adapter,
35 Rpc,
36}
37
38pub enum IoKind {
39 StdIn,
40 StdOut,
41 StdErr,
42}
43
44pub struct TransportPipe {
45 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
46 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
47 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
49}
50
51impl TransportPipe {
52 pub fn new(
53 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
54 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
55 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
57 ) -> Self {
58 TransportPipe {
59 input,
60 output,
61 stdout,
62 stderr,
63 }
64 }
65}
66
67type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
68type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
69
70pub enum Transport {
71 Stdio(StdioTransport),
72 Tcp(TcpTransport),
73 #[cfg(any(test, feature = "test-support"))]
74 Fake(FakeTransport),
75}
76
77impl Transport {
78 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
79 #[cfg(any(test, feature = "test-support"))]
80 if cfg!(any(test, feature = "test-support")) {
81 return FakeTransport::start(cx)
82 .await
83 .map(|(transports, fake)| (transports, Self::Fake(fake)));
84 }
85
86 if binary.connection.is_some() {
87 TcpTransport::start(binary, cx)
88 .await
89 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
90 .context("Tried to connect to a debug adapter via TCP transport layer")
91 } else {
92 StdioTransport::start(binary, cx)
93 .await
94 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
95 .context("Tried to connect to a debug adapter via stdin/stdout transport layer")
96 }
97 }
98
99 fn has_adapter_logs(&self) -> bool {
100 match self {
101 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
102 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
103 #[cfg(any(test, feature = "test-support"))]
104 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
105 }
106 }
107
108 async fn kill(&self) {
109 match self {
110 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
111 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
112 #[cfg(any(test, feature = "test-support"))]
113 Transport::Fake(fake_transport) => fake_transport.kill().await,
114 }
115 }
116
117 #[cfg(any(test, feature = "test-support"))]
118 pub(crate) fn as_fake(&self) -> &FakeTransport {
119 match self {
120 Transport::Fake(fake_transport) => fake_transport,
121 _ => panic!("Not a fake transport layer"),
122 }
123 }
124}
125
126pub(crate) struct TransportDelegate {
127 log_handlers: LogHandlers,
128 current_requests: Requests,
129 pending_requests: Requests,
130 transport: Transport,
131 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
132 _tasks: Vec<Task<()>>,
133}
134
135impl TransportDelegate {
136 pub(crate) async fn start(
137 binary: &DebugAdapterBinary,
138 cx: AsyncApp,
139 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
140 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
141 let mut this = Self {
142 transport,
143 server_tx: Default::default(),
144 log_handlers: Default::default(),
145 current_requests: Default::default(),
146 pending_requests: Default::default(),
147 _tasks: Vec::new(),
148 };
149 let messages = this.start_handlers(transport_pipes, cx).await?;
150 Ok((messages, this))
151 }
152
153 async fn start_handlers(
154 &mut self,
155 mut params: TransportPipe,
156 cx: AsyncApp,
157 ) -> Result<(Receiver<Message>, Sender<Message>)> {
158 let (client_tx, server_rx) = unbounded::<Message>();
159 let (server_tx, client_rx) = unbounded::<Message>();
160
161 let log_dap_communications =
162 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
163 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
164 .unwrap_or(false);
165
166 let log_handler = if log_dap_communications {
167 Some(self.log_handlers.clone())
168 } else {
169 None
170 };
171
172 let adapter_log_handler = log_handler.clone();
173 cx.update(|cx| {
174 if let Some(stdout) = params.stdout.take() {
175 self._tasks.push(cx.background_spawn(async move {
176 match Self::handle_adapter_log(stdout, adapter_log_handler).await {
177 ConnectionResult::Timeout => {
178 log::error!("Timed out when handling debugger log");
179 }
180 ConnectionResult::ConnectionReset => {
181 log::info!("Debugger logs connection closed");
182 }
183 ConnectionResult::Result(Ok(())) => {}
184 ConnectionResult::Result(Err(e)) => {
185 log::error!("Error handling debugger log: {e}");
186 }
187 }
188 }));
189 }
190
191 let pending_requests = self.pending_requests.clone();
192 let output_log_handler = log_handler.clone();
193 self._tasks.push(cx.background_spawn(async move {
194 match Self::handle_output(
195 params.output,
196 client_tx,
197 pending_requests.clone(),
198 output_log_handler,
199 )
200 .await
201 {
202 Ok(()) => {}
203 Err(e) => log::error!("Error handling debugger output: {e}"),
204 }
205 let mut pending_requests = pending_requests.lock().await;
206 pending_requests.drain().for_each(|(_, request)| {
207 request
208 .send(Err(anyhow!("debugger shutdown unexpectedly")))
209 .ok();
210 });
211 }));
212
213 if let Some(stderr) = params.stderr.take() {
214 let log_handlers = self.log_handlers.clone();
215 self._tasks.push(cx.background_spawn(async move {
216 match Self::handle_error(stderr, log_handlers).await {
217 ConnectionResult::Timeout => {
218 log::error!("Timed out reading debugger error stream")
219 }
220 ConnectionResult::ConnectionReset => {
221 log::info!("Debugger closed its error stream")
222 }
223 ConnectionResult::Result(Ok(())) => {}
224 ConnectionResult::Result(Err(e)) => {
225 log::error!("Error handling debugger error: {e}")
226 }
227 }
228 }));
229 }
230
231 let current_requests = self.current_requests.clone();
232 let pending_requests = self.pending_requests.clone();
233 let log_handler = log_handler.clone();
234 self._tasks.push(cx.background_spawn(async move {
235 match Self::handle_input(
236 params.input,
237 client_rx,
238 current_requests,
239 pending_requests,
240 log_handler,
241 )
242 .await
243 {
244 Ok(()) => {}
245 Err(e) => log::error!("Error handling debugger input: {e}"),
246 }
247 }));
248 })?;
249
250 {
251 let mut lock = self.server_tx.lock().await;
252 *lock = Some(server_tx.clone());
253 }
254
255 Ok((server_rx, server_tx))
256 }
257
258 pub(crate) async fn add_pending_request(
259 &self,
260 sequence_id: u64,
261 request: oneshot::Sender<Result<Response>>,
262 ) {
263 let mut pending_requests = self.pending_requests.lock().await;
264 pending_requests.insert(sequence_id, request);
265 }
266
267 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
268 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
269 server_tx.send(message).await.context("sending message")
270 } else {
271 anyhow::bail!("Server tx already dropped")
272 }
273 }
274
275 async fn handle_adapter_log<Stdout>(
276 stdout: Stdout,
277 log_handlers: Option<LogHandlers>,
278 ) -> ConnectionResult<()>
279 where
280 Stdout: AsyncRead + Unpin + Send + 'static,
281 {
282 let mut reader = BufReader::new(stdout);
283 let mut line = String::new();
284
285 let result = loop {
286 line.truncate(0);
287
288 match reader
289 .read_line(&mut line)
290 .await
291 .context("reading adapter log line")
292 {
293 Ok(0) => break ConnectionResult::ConnectionReset,
294 Ok(_) => {}
295 Err(e) => break ConnectionResult::Result(Err(e)),
296 }
297
298 if let Some(log_handlers) = log_handlers.as_ref() {
299 for (kind, handler) in log_handlers.lock().iter_mut() {
300 if matches!(kind, LogKind::Adapter) {
301 handler(IoKind::StdOut, None, line.as_str());
302 }
303 }
304 }
305 };
306
307 log::debug!("Handle adapter log dropped");
308
309 result
310 }
311
312 fn build_rpc_message(message: String) -> String {
313 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
314 }
315
316 async fn handle_input<Stdin>(
317 mut server_stdin: Stdin,
318 client_rx: Receiver<Message>,
319 current_requests: Requests,
320 pending_requests: Requests,
321 log_handlers: Option<LogHandlers>,
322 ) -> Result<()>
323 where
324 Stdin: AsyncWrite + Unpin + Send + 'static,
325 {
326 let result = loop {
327 match client_rx.recv().await {
328 Ok(message) => {
329 if let Message::Request(request) = &message {
330 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
331 pending_requests.lock().await.insert(request.seq, sender);
332 }
333 }
334
335 let command = match &message {
336 Message::Request(request) => Some(request.command.as_str()),
337 Message::Response(response) => Some(response.command.as_str()),
338 _ => None,
339 };
340
341 let message = match serde_json::to_string(&message) {
342 Ok(message) => message,
343 Err(e) => break Err(e.into()),
344 };
345
346 if let Some(log_handlers) = log_handlers.as_ref() {
347 for (kind, log_handler) in log_handlers.lock().iter_mut() {
348 if matches!(kind, LogKind::Rpc) {
349 log_handler(IoKind::StdIn, command, &message);
350 }
351 }
352 }
353
354 if let Err(e) = server_stdin
355 .write_all(Self::build_rpc_message(message).as_bytes())
356 .await
357 {
358 break Err(e.into());
359 }
360
361 if let Err(e) = server_stdin.flush().await {
362 break Err(e.into());
363 }
364 }
365 Err(error) => break Err(error.into()),
366 }
367 };
368
369 log::debug!("Handle adapter input dropped");
370
371 result
372 }
373
374 async fn handle_output<Stdout>(
375 server_stdout: Stdout,
376 client_tx: Sender<Message>,
377 pending_requests: Requests,
378 log_handlers: Option<LogHandlers>,
379 ) -> Result<()>
380 where
381 Stdout: AsyncRead + Unpin + Send + 'static,
382 {
383 let mut recv_buffer = String::new();
384 let mut reader = BufReader::new(server_stdout);
385
386 let result = loop {
387 match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
388 .await
389 {
390 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
391 ConnectionResult::ConnectionReset => {
392 log::info!("Debugger closed the connection");
393 return Ok(());
394 }
395 ConnectionResult::Result(Ok(Message::Response(res))) => {
396 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
397 if let Err(e) = tx.send(Self::process_response(res)) {
398 log::trace!("Did not send response `{:?}` for a cancelled", e);
399 }
400 } else {
401 client_tx.send(Message::Response(res)).await?;
402 }
403 }
404 ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
405 ConnectionResult::Result(Err(e)) => break Err(e),
406 }
407 };
408
409 drop(client_tx);
410 log::debug!("Handle adapter output dropped");
411
412 result
413 }
414
415 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
416 where
417 Stderr: AsyncRead + Unpin + Send + 'static,
418 {
419 log::debug!("Handle error started");
420 let mut buffer = String::new();
421
422 let mut reader = BufReader::new(stderr);
423
424 let result = loop {
425 match reader
426 .read_line(&mut buffer)
427 .await
428 .context("reading error log line")
429 {
430 Ok(0) => break ConnectionResult::ConnectionReset,
431 Ok(_) => {
432 for (kind, log_handler) in log_handlers.lock().iter_mut() {
433 if matches!(kind, LogKind::Adapter) {
434 log_handler(IoKind::StdErr, None, buffer.as_str());
435 }
436 }
437
438 buffer.truncate(0);
439 }
440 Err(error) => break ConnectionResult::Result(Err(error)),
441 }
442 };
443
444 log::debug!("Handle adapter error dropped");
445
446 result
447 }
448
449 fn process_response(response: Response) -> Result<Response> {
450 if response.success {
451 Ok(response)
452 } else {
453 if let Some(error_message) = response
454 .body
455 .clone()
456 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
457 .and_then(|response| response.error.map(|msg| msg.format))
458 .or_else(|| response.message.clone())
459 {
460 anyhow::bail!(error_message);
461 };
462
463 anyhow::bail!(
464 "Received error response from adapter. Response: {:?}",
465 response
466 );
467 }
468 }
469
470 async fn receive_server_message<Stdout>(
471 reader: &mut BufReader<Stdout>,
472 buffer: &mut String,
473 log_handlers: Option<&LogHandlers>,
474 ) -> ConnectionResult<Message>
475 where
476 Stdout: AsyncRead + Unpin + Send + 'static,
477 {
478 let mut content_length = None;
479 loop {
480 buffer.truncate(0);
481
482 match reader
483 .read_line(buffer)
484 .await
485 .with_context(|| "reading a message from server")
486 {
487 Ok(0) => return ConnectionResult::ConnectionReset,
488 Ok(_) => {}
489 Err(e) => return ConnectionResult::Result(Err(e)),
490 };
491
492 if buffer == "\r\n" {
493 break;
494 }
495
496 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
497 match value.parse().context("invalid content length") {
498 Ok(length) => content_length = Some(length),
499 Err(e) => return ConnectionResult::Result(Err(e)),
500 }
501 }
502 }
503
504 let content_length = match content_length.context("missing content length") {
505 Ok(length) => length,
506 Err(e) => return ConnectionResult::Result(Err(e)),
507 };
508
509 let mut content = vec![0; content_length];
510 if let Err(e) = reader
511 .read_exact(&mut content)
512 .await
513 .with_context(|| "reading after a loop")
514 {
515 return ConnectionResult::Result(Err(e));
516 }
517
518 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
519 Ok(str) => str,
520 Err(e) => return ConnectionResult::Result(Err(e)),
521 };
522
523 let message =
524 serde_json::from_str::<Message>(message_str).context("deserializing server message");
525
526 if let Some(log_handlers) = log_handlers {
527 let command = match &message {
528 Ok(Message::Request(request)) => Some(request.command.as_str()),
529 Ok(Message::Response(response)) => Some(response.command.as_str()),
530 _ => None,
531 };
532
533 for (kind, log_handler) in log_handlers.lock().iter_mut() {
534 if matches!(kind, LogKind::Rpc) {
535 log_handler(IoKind::StdOut, command, message_str);
536 }
537 }
538 }
539
540 ConnectionResult::Result(message)
541 }
542
543 pub async fn shutdown(&self) -> Result<()> {
544 log::debug!("Start shutdown client");
545
546 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
547 server_tx.close();
548 }
549
550 let mut current_requests = self.current_requests.lock().await;
551 let mut pending_requests = self.pending_requests.lock().await;
552
553 current_requests.clear();
554 pending_requests.clear();
555
556 self.transport.kill().await;
557
558 drop(current_requests);
559 drop(pending_requests);
560
561 log::debug!("Shutdown client completed");
562
563 anyhow::Ok(())
564 }
565
566 pub fn has_adapter_logs(&self) -> bool {
567 self.transport.has_adapter_logs()
568 }
569
570 pub fn transport(&self) -> &Transport {
571 &self.transport
572 }
573
574 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
575 where
576 F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
577 {
578 let mut log_handlers = self.log_handlers.lock();
579 log_handlers.push((kind, Box::new(f)));
580 }
581}
582
583pub struct TcpTransport {
584 pub port: u16,
585 pub host: Ipv4Addr,
586 pub timeout: u64,
587 process: Option<Mutex<Child>>,
588}
589
590impl TcpTransport {
591 /// Get an open port to use with the tcp client when not supplied by debug config
592 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
593 if let Some(port) = host.port {
594 Ok(port)
595 } else {
596 Self::unused_port(host.host()).await
597 }
598 }
599
600 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
601 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
602 .await?
603 .local_addr()?
604 .port())
605 }
606
607 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
608 let connection_args = binary
609 .connection
610 .as_ref()
611 .context("No connection arguments provided")?;
612
613 let host = connection_args.host;
614 let port = connection_args.port;
615
616 let mut process = if let Some(command) = &binary.command {
617 let mut command = util::command::new_std_command(&command);
618
619 if let Some(cwd) = &binary.cwd {
620 command.current_dir(cwd);
621 }
622
623 command.args(&binary.arguments);
624 command.envs(&binary.envs);
625
626 Some(
627 Child::spawn(command, Stdio::null())
628 .with_context(|| "failed to start debug adapter.")?,
629 )
630 } else {
631 None
632 };
633
634 let address = SocketAddrV4::new(host, port);
635
636 let timeout = connection_args.timeout.unwrap_or_else(|| {
637 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
638 .unwrap_or(2000u64)
639 });
640
641 let (mut process, (rx, tx)) = select! {
642 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
643 anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
644 },
645 result = cx.spawn(async move |cx| {
646 loop {
647 match TcpStream::connect(address).await {
648 Ok(stream) => return Ok((process, stream.split())),
649 Err(_) => {
650 if let Some(p) = &mut process {
651 if let Ok(Some(_)) = p.try_status() {
652 let output = process.take().unwrap().into_inner().output().await?;
653 let output = if output.stderr.is_empty() {
654 String::from_utf8_lossy(&output.stdout).to_string()
655 } else {
656 String::from_utf8_lossy(&output.stderr).to_string()
657 };
658 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
659 }
660 }
661
662 cx.background_executor().timer(Duration::from_millis(100)).await;
663 }
664 }
665 }
666 }).fuse() => result?
667 };
668
669 log::info!(
670 "Debug adapter has connected to TCP server {}:{}",
671 host,
672 port
673 );
674 let stdout = process.as_mut().and_then(|p| p.stdout.take());
675 let stderr = process.as_mut().and_then(|p| p.stderr.take());
676
677 let this = Self {
678 port,
679 host,
680 process: process.map(Mutex::new),
681 timeout,
682 };
683
684 let pipe = TransportPipe::new(
685 Box::new(tx),
686 Box::new(BufReader::new(rx)),
687 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
688 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
689 );
690
691 Ok((pipe, this))
692 }
693
694 fn has_adapter_logs(&self) -> bool {
695 true
696 }
697
698 async fn kill(&self) {
699 if let Some(process) = &self.process {
700 let mut process = process.lock().await;
701 Child::kill(&mut process);
702 }
703 }
704}
705
706impl Drop for TcpTransport {
707 fn drop(&mut self) {
708 if let Some(mut p) = self.process.take() {
709 p.get_mut().kill();
710 }
711 }
712}
713
714pub struct StdioTransport {
715 process: Mutex<Child>,
716}
717
718impl StdioTransport {
719 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
720 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
721 let Some(binary_command) = &binary.command else {
722 bail!(
723 "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
724 );
725 };
726 let mut command = util::command::new_std_command(&binary_command);
727
728 if let Some(cwd) = &binary.cwd {
729 command.current_dir(cwd);
730 }
731
732 command.args(&binary.arguments);
733 command.envs(&binary.envs);
734
735 let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
736 format!(
737 "failed to spawn command `{} {}`.",
738 binary_command,
739 binary.arguments.join(" ")
740 )
741 })?;
742
743 let stdin = process.stdin.take().context("Failed to open stdin")?;
744 let stdout = process.stdout.take().context("Failed to open stdout")?;
745 let stderr = process
746 .stderr
747 .take()
748 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
749
750 if stderr.is_none() {
751 bail!(
752 "Failed to connect to stderr for debug adapter command {}",
753 &binary_command
754 );
755 }
756
757 log::info!("Debug adapter has connected to stdio adapter");
758
759 let process = Mutex::new(process);
760
761 Ok((
762 TransportPipe::new(
763 Box::new(stdin),
764 Box::new(BufReader::new(stdout)),
765 None,
766 stderr,
767 ),
768 Self { process },
769 ))
770 }
771
772 fn has_adapter_logs(&self) -> bool {
773 false
774 }
775
776 async fn kill(&self) {
777 let mut process = self.process.lock().await;
778 Child::kill(&mut process);
779 }
780}
781
782impl Drop for StdioTransport {
783 fn drop(&mut self) {
784 self.process.get_mut().kill();
785 }
786}
787
788#[cfg(any(test, feature = "test-support"))]
789type RequestHandler =
790 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
791
792#[cfg(any(test, feature = "test-support"))]
793type ResponseHandler = Box<dyn Send + Fn(Response)>;
794
795#[cfg(any(test, feature = "test-support"))]
796pub struct FakeTransport {
797 // for sending fake response back from adapter side
798 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
799 // for reverse request responses
800 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
801}
802
803#[cfg(any(test, feature = "test-support"))]
804impl FakeTransport {
805 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
806 where
807 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
808 {
809 self.request_handlers.lock().insert(
810 R::COMMAND,
811 Box::new(move |seq, args| {
812 let result = handler(seq, serde_json::from_value(args).unwrap());
813 let response = match result {
814 Ok(response) => Response {
815 seq: seq + 1,
816 request_seq: seq,
817 success: true,
818 command: R::COMMAND.into(),
819 body: Some(serde_json::to_value(response).unwrap()),
820 message: None,
821 },
822 Err(response) => Response {
823 seq: seq + 1,
824 request_seq: seq,
825 success: false,
826 command: R::COMMAND.into(),
827 body: Some(serde_json::to_value(response).unwrap()),
828 message: None,
829 },
830 };
831 response
832 }),
833 );
834 }
835
836 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
837 where
838 F: 'static + Send + Fn(Response),
839 {
840 self.response_handlers
841 .lock()
842 .insert(R::COMMAND, Box::new(handler));
843 }
844
845 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
846 let this = Self {
847 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
848 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
849 };
850 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
851 use serde_json::json;
852
853 let (stdin_writer, stdin_reader) = async_pipe::pipe();
854 let (stdout_writer, stdout_reader) = async_pipe::pipe();
855
856 let request_handlers = this.request_handlers.clone();
857 let response_handlers = this.response_handlers.clone();
858 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
859
860 cx.background_spawn(async move {
861 let mut reader = BufReader::new(stdin_reader);
862 let mut buffer = String::new();
863
864 loop {
865 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
866 .await
867 {
868 ConnectionResult::Timeout => {
869 anyhow::bail!("Timed out when connecting to debugger");
870 }
871 ConnectionResult::ConnectionReset => {
872 log::info!("Debugger closed the connection");
873 break Ok(());
874 }
875 ConnectionResult::Result(Err(e)) => break Err(e),
876 ConnectionResult::Result(Ok(message)) => {
877 match message {
878 Message::Request(request) => {
879 // redirect reverse requests to stdout writer/reader
880 if request.command == RunInTerminal::COMMAND
881 || request.command == StartDebugging::COMMAND
882 {
883 let message =
884 serde_json::to_string(&Message::Request(request)).unwrap();
885
886 let mut writer = stdout_writer.lock().await;
887 writer
888 .write_all(
889 TransportDelegate::build_rpc_message(message)
890 .as_bytes(),
891 )
892 .await
893 .unwrap();
894 writer.flush().await.unwrap();
895 } else {
896 let response = if let Some(handle) =
897 request_handlers.lock().get_mut(request.command.as_str())
898 {
899 handle(request.seq, request.arguments.unwrap_or(json!({})))
900 } else {
901 panic!("No request handler for {}", request.command);
902 };
903 let message =
904 serde_json::to_string(&Message::Response(response))
905 .unwrap();
906
907 let mut writer = stdout_writer.lock().await;
908
909 writer
910 .write_all(
911 TransportDelegate::build_rpc_message(message)
912 .as_bytes(),
913 )
914 .await
915 .unwrap();
916 writer.flush().await.unwrap();
917 }
918 }
919 Message::Event(event) => {
920 let message =
921 serde_json::to_string(&Message::Event(event)).unwrap();
922
923 let mut writer = stdout_writer.lock().await;
924 writer
925 .write_all(
926 TransportDelegate::build_rpc_message(message).as_bytes(),
927 )
928 .await
929 .unwrap();
930 writer.flush().await.unwrap();
931 }
932 Message::Response(response) => {
933 if let Some(handle) =
934 response_handlers.lock().get(response.command.as_str())
935 {
936 handle(response);
937 } else {
938 log::error!("No response handler for {}", response.command);
939 }
940 }
941 }
942 }
943 }
944 }
945 })
946 .detach();
947
948 Ok((
949 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
950 this,
951 ))
952 }
953
954 fn has_adapter_logs(&self) -> bool {
955 false
956 }
957
958 async fn kill(&self) {}
959}
960
961struct Child {
962 process: smol::process::Child,
963}
964
965impl std::ops::Deref for Child {
966 type Target = smol::process::Child;
967
968 fn deref(&self) -> &Self::Target {
969 &self.process
970 }
971}
972
973impl std::ops::DerefMut for Child {
974 fn deref_mut(&mut self) -> &mut Self::Target {
975 &mut self.process
976 }
977}
978
979impl Child {
980 fn into_inner(self) -> smol::process::Child {
981 self.process
982 }
983
984 #[cfg(not(windows))]
985 fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
986 util::set_pre_exec_to_start_new_session(&mut command);
987 let process = smol::process::Command::from(command)
988 .stdin(stdin)
989 .stdout(Stdio::piped())
990 .stderr(Stdio::piped())
991 .spawn()?;
992 Ok(Self { process })
993 }
994
995 #[cfg(windows)]
996 fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
997 // TODO(windows): create a job object and add the child process handle to it,
998 // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
999 let process = smol::process::Command::from(command)
1000 .stdin(stdin)
1001 .stdout(Stdio::piped())
1002 .stderr(Stdio::piped())
1003 .spawn()?;
1004 Ok(Self { process })
1005 }
1006
1007 #[cfg(not(windows))]
1008 fn kill(&mut self) {
1009 let pid = self.process.id();
1010 unsafe {
1011 libc::killpg(pid as i32, libc::SIGKILL);
1012 }
1013 }
1014
1015 #[cfg(windows)]
1016 fn kill(&mut self) {
1017 // TODO(windows): terminate the job object in kill
1018 let _ = self.process.kill();
1019 }
1020}