1use anyhow::{Context as _, Result, anyhow, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::{AppContext as _, AsyncApp, Task};
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15};
16use std::{
17 collections::HashMap,
18 net::{Ipv4Addr, SocketAddrV4},
19 process::Stdio,
20 sync::Arc,
21 time::Duration,
22};
23use task::TcpArgumentsTemplate;
24use util::ConnectionResult;
25
26use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
27
28pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
29
30#[derive(PartialEq, Eq, Clone, Copy)]
31pub enum LogKind {
32 Adapter,
33 Rpc,
34}
35
36pub enum IoKind {
37 StdIn,
38 StdOut,
39 StdErr,
40}
41
42pub struct TransportPipe {
43 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
44 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
45 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
46 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47}
48
49impl TransportPipe {
50 pub fn new(
51 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
52 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
53 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
54 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 ) -> Self {
56 TransportPipe {
57 input,
58 output,
59 stdout,
60 stderr,
61 }
62 }
63}
64
65type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
66type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
67
68pub enum Transport {
69 Stdio(StdioTransport),
70 Tcp(TcpTransport),
71 #[cfg(any(test, feature = "test-support"))]
72 Fake(FakeTransport),
73}
74
75impl Transport {
76 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
77 #[cfg(any(test, feature = "test-support"))]
78 if cfg!(any(test, feature = "test-support")) {
79 return FakeTransport::start(cx)
80 .await
81 .map(|(transports, fake)| (transports, Self::Fake(fake)));
82 }
83
84 if binary.connection.is_some() {
85 TcpTransport::start(binary, cx)
86 .await
87 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
88 } else {
89 StdioTransport::start(binary, cx)
90 .await
91 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
92 }
93 }
94
95 fn has_adapter_logs(&self) -> bool {
96 match self {
97 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
98 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
99 #[cfg(any(test, feature = "test-support"))]
100 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
101 }
102 }
103
104 async fn kill(&self) {
105 match self {
106 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
107 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
108 #[cfg(any(test, feature = "test-support"))]
109 Transport::Fake(fake_transport) => fake_transport.kill().await,
110 }
111 }
112
113 #[cfg(any(test, feature = "test-support"))]
114 pub(crate) fn as_fake(&self) -> &FakeTransport {
115 match self {
116 Transport::Fake(fake_transport) => fake_transport,
117 _ => panic!("Not a fake transport layer"),
118 }
119 }
120}
121
122pub(crate) struct TransportDelegate {
123 log_handlers: LogHandlers,
124 current_requests: Requests,
125 pending_requests: Requests,
126 transport: Transport,
127 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
128 _tasks: Vec<Task<()>>,
129}
130
131impl TransportDelegate {
132 pub(crate) async fn start(
133 binary: &DebugAdapterBinary,
134 cx: AsyncApp,
135 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
136 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
137 let mut this = Self {
138 transport,
139 server_tx: Default::default(),
140 log_handlers: Default::default(),
141 current_requests: Default::default(),
142 pending_requests: Default::default(),
143 _tasks: Vec::new(),
144 };
145 let messages = this.start_handlers(transport_pipes, cx).await?;
146 Ok((messages, this))
147 }
148
149 async fn start_handlers(
150 &mut self,
151 mut params: TransportPipe,
152 cx: AsyncApp,
153 ) -> Result<(Receiver<Message>, Sender<Message>)> {
154 let (client_tx, server_rx) = unbounded::<Message>();
155 let (server_tx, client_rx) = unbounded::<Message>();
156
157 let log_dap_communications =
158 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
159 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
160 .unwrap_or(false);
161
162 let log_handler = if log_dap_communications {
163 Some(self.log_handlers.clone())
164 } else {
165 None
166 };
167
168 let adapter_log_handler = log_handler.clone();
169 cx.update(|cx| {
170 if let Some(stdout) = params.stdout.take() {
171 self._tasks.push(cx.background_spawn(async move {
172 match Self::handle_adapter_log(stdout, adapter_log_handler).await {
173 ConnectionResult::Timeout => {
174 log::error!("Timed out when handling debugger log");
175 }
176 ConnectionResult::ConnectionReset => {
177 log::info!("Debugger logs connection closed");
178 }
179 ConnectionResult::Result(Ok(())) => {}
180 ConnectionResult::Result(Err(e)) => {
181 log::error!("Error handling debugger log: {e}");
182 }
183 }
184 }));
185 }
186
187 let pending_requests = self.pending_requests.clone();
188 let output_log_handler = log_handler.clone();
189 self._tasks.push(cx.background_spawn(async move {
190 match Self::handle_output(
191 params.output,
192 client_tx,
193 pending_requests.clone(),
194 output_log_handler,
195 )
196 .await
197 {
198 Ok(()) => {}
199 Err(e) => log::error!("Error handling debugger output: {e}"),
200 }
201 let mut pending_requests = pending_requests.lock().await;
202 pending_requests.drain().for_each(|(_, request)| {
203 request
204 .send(Err(anyhow!("debugger shutdown unexpectedly")))
205 .ok();
206 });
207 }));
208
209 if let Some(stderr) = params.stderr.take() {
210 let log_handlers = self.log_handlers.clone();
211 self._tasks.push(cx.background_spawn(async move {
212 match Self::handle_error(stderr, log_handlers).await {
213 ConnectionResult::Timeout => {
214 log::error!("Timed out reading debugger error stream")
215 }
216 ConnectionResult::ConnectionReset => {
217 log::info!("Debugger closed its error stream")
218 }
219 ConnectionResult::Result(Ok(())) => {}
220 ConnectionResult::Result(Err(e)) => {
221 log::error!("Error handling debugger error: {e}")
222 }
223 }
224 }));
225 }
226
227 let current_requests = self.current_requests.clone();
228 let pending_requests = self.pending_requests.clone();
229 let log_handler = log_handler.clone();
230 self._tasks.push(cx.background_spawn(async move {
231 match Self::handle_input(
232 params.input,
233 client_rx,
234 current_requests,
235 pending_requests,
236 log_handler,
237 )
238 .await
239 {
240 Ok(()) => {}
241 Err(e) => log::error!("Error handling debugger input: {e}"),
242 }
243 }));
244 })?;
245
246 {
247 let mut lock = self.server_tx.lock().await;
248 *lock = Some(server_tx.clone());
249 }
250
251 Ok((server_rx, server_tx))
252 }
253
254 pub(crate) async fn add_pending_request(
255 &self,
256 sequence_id: u64,
257 request: oneshot::Sender<Result<Response>>,
258 ) {
259 let mut pending_requests = self.pending_requests.lock().await;
260 pending_requests.insert(sequence_id, request);
261 }
262
263 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
264 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
265 server_tx.send(message).await.context("sending message")
266 } else {
267 anyhow::bail!("Server tx already dropped")
268 }
269 }
270
271 async fn handle_adapter_log<Stdout>(
272 stdout: Stdout,
273 log_handlers: Option<LogHandlers>,
274 ) -> ConnectionResult<()>
275 where
276 Stdout: AsyncRead + Unpin + Send + 'static,
277 {
278 let mut reader = BufReader::new(stdout);
279 let mut line = String::new();
280
281 let result = loop {
282 line.truncate(0);
283
284 match reader
285 .read_line(&mut line)
286 .await
287 .context("reading adapter log line")
288 {
289 Ok(0) => break ConnectionResult::ConnectionReset,
290 Ok(_) => {}
291 Err(e) => break ConnectionResult::Result(Err(e)),
292 }
293
294 if let Some(log_handlers) = log_handlers.as_ref() {
295 for (kind, handler) in log_handlers.lock().iter_mut() {
296 if matches!(kind, LogKind::Adapter) {
297 handler(IoKind::StdOut, line.as_str());
298 }
299 }
300 }
301 };
302
303 log::debug!("Handle adapter log dropped");
304
305 result
306 }
307
308 fn build_rpc_message(message: String) -> String {
309 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
310 }
311
312 async fn handle_input<Stdin>(
313 mut server_stdin: Stdin,
314 client_rx: Receiver<Message>,
315 current_requests: Requests,
316 pending_requests: Requests,
317 log_handlers: Option<LogHandlers>,
318 ) -> Result<()>
319 where
320 Stdin: AsyncWrite + Unpin + Send + 'static,
321 {
322 let result = loop {
323 match client_rx.recv().await {
324 Ok(message) => {
325 if let Message::Request(request) = &message {
326 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
327 pending_requests.lock().await.insert(request.seq, sender);
328 }
329 }
330
331 let message = match serde_json::to_string(&message) {
332 Ok(message) => message,
333 Err(e) => break Err(e.into()),
334 };
335
336 if let Some(log_handlers) = log_handlers.as_ref() {
337 for (kind, log_handler) in log_handlers.lock().iter_mut() {
338 if matches!(kind, LogKind::Rpc) {
339 log_handler(IoKind::StdIn, &message);
340 }
341 }
342 }
343
344 if let Err(e) = server_stdin
345 .write_all(Self::build_rpc_message(message).as_bytes())
346 .await
347 {
348 break Err(e.into());
349 }
350
351 if let Err(e) = server_stdin.flush().await {
352 break Err(e.into());
353 }
354 }
355 Err(error) => break Err(error.into()),
356 }
357 };
358
359 log::debug!("Handle adapter input dropped");
360
361 result
362 }
363
364 async fn handle_output<Stdout>(
365 server_stdout: Stdout,
366 client_tx: Sender<Message>,
367 pending_requests: Requests,
368 log_handlers: Option<LogHandlers>,
369 ) -> Result<()>
370 where
371 Stdout: AsyncRead + Unpin + Send + 'static,
372 {
373 let mut recv_buffer = String::new();
374 let mut reader = BufReader::new(server_stdout);
375
376 let result = loop {
377 match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
378 .await
379 {
380 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
381 ConnectionResult::ConnectionReset => {
382 log::info!("Debugger closed the connection");
383 return Ok(());
384 }
385 ConnectionResult::Result(Ok(Message::Response(res))) => {
386 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
387 if let Err(e) = tx.send(Self::process_response(res)) {
388 log::trace!("Did not send response `{:?}` for a cancelled", e);
389 }
390 } else {
391 client_tx.send(Message::Response(res)).await?;
392 }
393 }
394 ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
395 ConnectionResult::Result(Err(e)) => break Err(e),
396 }
397 };
398
399 drop(client_tx);
400 log::debug!("Handle adapter output dropped");
401
402 result
403 }
404
405 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
406 where
407 Stderr: AsyncRead + Unpin + Send + 'static,
408 {
409 log::debug!("Handle error started");
410 let mut buffer = String::new();
411
412 let mut reader = BufReader::new(stderr);
413
414 let result = loop {
415 match reader
416 .read_line(&mut buffer)
417 .await
418 .context("reading error log line")
419 {
420 Ok(0) => break ConnectionResult::ConnectionReset,
421 Ok(_) => {
422 for (kind, log_handler) in log_handlers.lock().iter_mut() {
423 if matches!(kind, LogKind::Adapter) {
424 log_handler(IoKind::StdErr, buffer.as_str());
425 }
426 }
427
428 buffer.truncate(0);
429 }
430 Err(error) => break ConnectionResult::Result(Err(error)),
431 }
432 };
433
434 log::debug!("Handle adapter error dropped");
435
436 result
437 }
438
439 fn process_response(response: Response) -> Result<Response> {
440 if response.success {
441 Ok(response)
442 } else {
443 if let Some(error_message) = response
444 .body
445 .clone()
446 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
447 .and_then(|response| response.error.map(|msg| msg.format))
448 .or_else(|| response.message.clone())
449 {
450 anyhow::bail!(error_message);
451 };
452
453 anyhow::bail!(
454 "Received error response from adapter. Response: {:?}",
455 response
456 );
457 }
458 }
459
460 async fn receive_server_message<Stdout>(
461 reader: &mut BufReader<Stdout>,
462 buffer: &mut String,
463 log_handlers: Option<&LogHandlers>,
464 ) -> ConnectionResult<Message>
465 where
466 Stdout: AsyncRead + Unpin + Send + 'static,
467 {
468 let mut content_length = None;
469 loop {
470 buffer.truncate(0);
471
472 match reader
473 .read_line(buffer)
474 .await
475 .with_context(|| "reading a message from server")
476 {
477 Ok(0) => return ConnectionResult::ConnectionReset,
478 Ok(_) => {}
479 Err(e) => return ConnectionResult::Result(Err(e)),
480 };
481
482 if buffer == "\r\n" {
483 break;
484 }
485
486 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
487 match value.parse().context("invalid content length") {
488 Ok(length) => content_length = Some(length),
489 Err(e) => return ConnectionResult::Result(Err(e)),
490 }
491 }
492 }
493
494 let content_length = match content_length.context("missing content length") {
495 Ok(length) => length,
496 Err(e) => return ConnectionResult::Result(Err(e)),
497 };
498
499 let mut content = vec![0; content_length];
500 if let Err(e) = reader
501 .read_exact(&mut content)
502 .await
503 .with_context(|| "reading after a loop")
504 {
505 return ConnectionResult::Result(Err(e));
506 }
507
508 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
509 Ok(str) => str,
510 Err(e) => return ConnectionResult::Result(Err(e)),
511 };
512
513 if let Some(log_handlers) = log_handlers {
514 for (kind, log_handler) in log_handlers.lock().iter_mut() {
515 if matches!(kind, LogKind::Rpc) {
516 log_handler(IoKind::StdOut, message_str);
517 }
518 }
519 }
520
521 ConnectionResult::Result(
522 serde_json::from_str::<Message>(message_str).context("deserializing server message"),
523 )
524 }
525
526 pub async fn shutdown(&self) -> Result<()> {
527 log::debug!("Start shutdown client");
528
529 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
530 server_tx.close();
531 }
532
533 let mut current_requests = self.current_requests.lock().await;
534 let mut pending_requests = self.pending_requests.lock().await;
535
536 current_requests.clear();
537 pending_requests.clear();
538
539 self.transport.kill().await;
540
541 drop(current_requests);
542 drop(pending_requests);
543
544 log::debug!("Shutdown client completed");
545
546 anyhow::Ok(())
547 }
548
549 pub fn has_adapter_logs(&self) -> bool {
550 self.transport.has_adapter_logs()
551 }
552
553 pub fn transport(&self) -> &Transport {
554 &self.transport
555 }
556
557 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
558 where
559 F: 'static + Send + FnMut(IoKind, &str),
560 {
561 let mut log_handlers = self.log_handlers.lock();
562 log_handlers.push((kind, Box::new(f)));
563 }
564}
565
566pub struct TcpTransport {
567 pub port: u16,
568 pub host: Ipv4Addr,
569 pub timeout: u64,
570 process: Mutex<Child>,
571}
572
573impl TcpTransport {
574 /// Get an open port to use with the tcp client when not supplied by debug config
575 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
576 if let Some(port) = host.port {
577 Ok(port)
578 } else {
579 Self::unused_port(host.host()).await
580 }
581 }
582
583 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
584 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
585 .await?
586 .local_addr()?
587 .port())
588 }
589
590 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
591 let connection_args = binary
592 .connection
593 .as_ref()
594 .context("No connection arguments provided")?;
595
596 let host = connection_args.host;
597 let port = connection_args.port;
598
599 let mut command = util::command::new_std_command(&binary.command);
600
601 if let Some(cwd) = &binary.cwd {
602 command.current_dir(cwd);
603 }
604
605 command.args(&binary.arguments);
606 command.envs(&binary.envs);
607
608 let mut process = Child::spawn(command, Stdio::null())
609 .with_context(|| "failed to start debug adapter.")?;
610
611 let address = SocketAddrV4::new(host, port);
612
613 let timeout = connection_args.timeout.unwrap_or_else(|| {
614 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
615 .unwrap_or(2000u64)
616 });
617
618 let (mut process, (rx, tx)) = select! {
619 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
620 anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
621 },
622 result = cx.spawn(async move |cx| {
623 loop {
624 match TcpStream::connect(address).await {
625 Ok(stream) => return Ok((process, stream.split())),
626 Err(_) => {
627 if let Ok(Some(_)) = process.try_status() {
628 let output = process.into_inner().output().await?;
629 let output = if output.stderr.is_empty() {
630 String::from_utf8_lossy(&output.stdout).to_string()
631 } else {
632 String::from_utf8_lossy(&output.stderr).to_string()
633 };
634 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
635 }
636 cx.background_executor().timer(Duration::from_millis(100)).await;
637 }
638 }
639 }
640 }).fuse() => result?
641 };
642
643 log::info!(
644 "Debug adapter has connected to TCP server {}:{}",
645 host,
646 port
647 );
648 let stdout = process.stdout.take();
649 let stderr = process.stderr.take();
650
651 let this = Self {
652 port,
653 host,
654 process: Mutex::new(process),
655 timeout,
656 };
657
658 let pipe = TransportPipe::new(
659 Box::new(tx),
660 Box::new(BufReader::new(rx)),
661 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
662 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
663 );
664
665 Ok((pipe, this))
666 }
667
668 fn has_adapter_logs(&self) -> bool {
669 true
670 }
671
672 async fn kill(&self) {
673 let mut process = self.process.lock().await;
674 Child::kill(&mut process);
675 }
676}
677
678impl Drop for TcpTransport {
679 fn drop(&mut self) {
680 self.process.get_mut().kill();
681 }
682}
683
684pub struct StdioTransport {
685 process: Mutex<Child>,
686}
687
688impl StdioTransport {
689 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
690 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
691 let mut command = util::command::new_std_command(&binary.command);
692
693 if let Some(cwd) = &binary.cwd {
694 command.current_dir(cwd);
695 }
696
697 command.args(&binary.arguments);
698 command.envs(&binary.envs);
699
700 let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
701 format!(
702 "failed to spawn command `{} {}`.",
703 binary.command,
704 binary.arguments.join(" ")
705 )
706 })?;
707
708 let stdin = process.stdin.take().context("Failed to open stdin")?;
709 let stdout = process.stdout.take().context("Failed to open stdout")?;
710 let stderr = process
711 .stderr
712 .take()
713 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
714
715 if stderr.is_none() {
716 bail!(
717 "Failed to connect to stderr for debug adapter command {}",
718 &binary.command
719 );
720 }
721
722 log::info!("Debug adapter has connected to stdio adapter");
723
724 let process = Mutex::new(process);
725
726 Ok((
727 TransportPipe::new(
728 Box::new(stdin),
729 Box::new(BufReader::new(stdout)),
730 None,
731 stderr,
732 ),
733 Self { process },
734 ))
735 }
736
737 fn has_adapter_logs(&self) -> bool {
738 false
739 }
740
741 async fn kill(&self) {
742 let mut process = self.process.lock().await;
743 Child::kill(&mut process);
744 }
745}
746
747impl Drop for StdioTransport {
748 fn drop(&mut self) {
749 self.process.get_mut().kill();
750 }
751}
752
753#[cfg(any(test, feature = "test-support"))]
754type RequestHandler =
755 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
756
757#[cfg(any(test, feature = "test-support"))]
758type ResponseHandler = Box<dyn Send + Fn(Response)>;
759
760#[cfg(any(test, feature = "test-support"))]
761pub struct FakeTransport {
762 // for sending fake response back from adapter side
763 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
764 // for reverse request responses
765 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
766}
767
768#[cfg(any(test, feature = "test-support"))]
769impl FakeTransport {
770 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
771 where
772 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
773 {
774 self.request_handlers.lock().insert(
775 R::COMMAND,
776 Box::new(move |seq, args| {
777 let result = handler(seq, serde_json::from_value(args).unwrap());
778 let response = match result {
779 Ok(response) => Response {
780 seq: seq + 1,
781 request_seq: seq,
782 success: true,
783 command: R::COMMAND.into(),
784 body: Some(serde_json::to_value(response).unwrap()),
785 message: None,
786 },
787 Err(response) => Response {
788 seq: seq + 1,
789 request_seq: seq,
790 success: false,
791 command: R::COMMAND.into(),
792 body: Some(serde_json::to_value(response).unwrap()),
793 message: None,
794 },
795 };
796 response
797 }),
798 );
799 }
800
801 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
802 where
803 F: 'static + Send + Fn(Response),
804 {
805 self.response_handlers
806 .lock()
807 .insert(R::COMMAND, Box::new(handler));
808 }
809
810 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
811 let this = Self {
812 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
813 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
814 };
815 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
816 use serde_json::json;
817
818 let (stdin_writer, stdin_reader) = async_pipe::pipe();
819 let (stdout_writer, stdout_reader) = async_pipe::pipe();
820
821 let request_handlers = this.request_handlers.clone();
822 let response_handlers = this.response_handlers.clone();
823 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
824
825 cx.background_spawn(async move {
826 let mut reader = BufReader::new(stdin_reader);
827 let mut buffer = String::new();
828
829 loop {
830 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
831 .await
832 {
833 ConnectionResult::Timeout => {
834 anyhow::bail!("Timed out when connecting to debugger");
835 }
836 ConnectionResult::ConnectionReset => {
837 log::info!("Debugger closed the connection");
838 break Ok(());
839 }
840 ConnectionResult::Result(Err(e)) => break Err(e),
841 ConnectionResult::Result(Ok(message)) => {
842 match message {
843 Message::Request(request) => {
844 // redirect reverse requests to stdout writer/reader
845 if request.command == RunInTerminal::COMMAND
846 || request.command == StartDebugging::COMMAND
847 {
848 let message =
849 serde_json::to_string(&Message::Request(request)).unwrap();
850
851 let mut writer = stdout_writer.lock().await;
852 writer
853 .write_all(
854 TransportDelegate::build_rpc_message(message)
855 .as_bytes(),
856 )
857 .await
858 .unwrap();
859 writer.flush().await.unwrap();
860 } else {
861 let response = if let Some(handle) =
862 request_handlers.lock().get_mut(request.command.as_str())
863 {
864 handle(request.seq, request.arguments.unwrap_or(json!({})))
865 } else {
866 panic!("No request handler for {}", request.command);
867 };
868 let message =
869 serde_json::to_string(&Message::Response(response))
870 .unwrap();
871
872 let mut writer = stdout_writer.lock().await;
873
874 writer
875 .write_all(
876 TransportDelegate::build_rpc_message(message)
877 .as_bytes(),
878 )
879 .await
880 .unwrap();
881 writer.flush().await.unwrap();
882 }
883 }
884 Message::Event(event) => {
885 let message =
886 serde_json::to_string(&Message::Event(event)).unwrap();
887
888 let mut writer = stdout_writer.lock().await;
889 writer
890 .write_all(
891 TransportDelegate::build_rpc_message(message).as_bytes(),
892 )
893 .await
894 .unwrap();
895 writer.flush().await.unwrap();
896 }
897 Message::Response(response) => {
898 if let Some(handle) =
899 response_handlers.lock().get(response.command.as_str())
900 {
901 handle(response);
902 } else {
903 log::error!("No response handler for {}", response.command);
904 }
905 }
906 }
907 }
908 }
909 }
910 })
911 .detach();
912
913 Ok((
914 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
915 this,
916 ))
917 }
918
919 fn has_adapter_logs(&self) -> bool {
920 false
921 }
922
923 async fn kill(&self) {}
924}
925
926struct Child {
927 process: smol::process::Child,
928}
929
930impl std::ops::Deref for Child {
931 type Target = smol::process::Child;
932
933 fn deref(&self) -> &Self::Target {
934 &self.process
935 }
936}
937
938impl std::ops::DerefMut for Child {
939 fn deref_mut(&mut self) -> &mut Self::Target {
940 &mut self.process
941 }
942}
943
944impl Child {
945 fn into_inner(self) -> smol::process::Child {
946 self.process
947 }
948
949 #[cfg(not(windows))]
950 fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
951 util::set_pre_exec_to_start_new_session(&mut command);
952 let process = smol::process::Command::from(command)
953 .stdin(stdin)
954 .stdout(Stdio::piped())
955 .stderr(Stdio::piped())
956 .spawn()?;
957 Ok(Self { process })
958 }
959
960 #[cfg(windows)]
961 fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
962 // TODO(windows): create a job object and add the child process handle to it,
963 // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
964 let process = smol::process::Command::from(command)
965 .stdin(stdin)
966 .stdout(Stdio::piped())
967 .stderr(Stdio::piped())
968 .spawn()?;
969 Ok(Self { process })
970 }
971
972 #[cfg(not(windows))]
973 fn kill(&mut self) {
974 let pid = self.process.id();
975 unsafe {
976 libc::killpg(pid as i32, libc::SIGKILL);
977 }
978 }
979
980 #[cfg(windows)]
981 fn kill(&mut self) {
982 // TODO(windows): terminate the job object in kill
983 let _ = self.process.kill();
984 }
985}