1use anyhow::{Context as _, Result, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::{AppContext as _, AsyncApp, Task};
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TcpArgumentsTemplate;
25use util::{ConnectionResult, ResultExt as _};
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 if binary.connection.is_some() {
86 TcpTransport::start(binary, cx)
87 .await
88 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
89 } else {
90 StdioTransport::start(binary, cx)
91 .await
92 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
93 }
94 }
95
96 fn has_adapter_logs(&self) -> bool {
97 match self {
98 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
99 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100 #[cfg(any(test, feature = "test-support"))]
101 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102 }
103 }
104
105 async fn kill(&self) -> Result<()> {
106 match self {
107 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109 #[cfg(any(test, feature = "test-support"))]
110 Transport::Fake(fake_transport) => fake_transport.kill().await,
111 }
112 }
113
114 #[cfg(any(test, feature = "test-support"))]
115 pub(crate) fn as_fake(&self) -> &FakeTransport {
116 match self {
117 Transport::Fake(fake_transport) => fake_transport,
118 _ => panic!("Not a fake transport layer"),
119 }
120 }
121}
122
123pub(crate) struct TransportDelegate {
124 log_handlers: LogHandlers,
125 current_requests: Requests,
126 pending_requests: Requests,
127 transport: Transport,
128 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129 _tasks: Vec<Task<()>>,
130}
131
132impl TransportDelegate {
133 pub(crate) async fn start(
134 binary: &DebugAdapterBinary,
135 cx: AsyncApp,
136 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138 let mut this = Self {
139 transport,
140 server_tx: Default::default(),
141 log_handlers: Default::default(),
142 current_requests: Default::default(),
143 pending_requests: Default::default(),
144 _tasks: Vec::new(),
145 };
146 let messages = this.start_handlers(transport_pipes, cx).await?;
147 Ok((messages, this))
148 }
149
150 async fn start_handlers(
151 &mut self,
152 mut params: TransportPipe,
153 cx: AsyncApp,
154 ) -> Result<(Receiver<Message>, Sender<Message>)> {
155 let (client_tx, server_rx) = unbounded::<Message>();
156 let (server_tx, client_rx) = unbounded::<Message>();
157
158 let log_dap_communications =
159 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161 .unwrap_or(false);
162
163 let log_handler = if log_dap_communications {
164 Some(self.log_handlers.clone())
165 } else {
166 None
167 };
168
169 let adapter_log_handler = log_handler.clone();
170 cx.update(|cx| {
171 if let Some(stdout) = params.stdout.take() {
172 self._tasks.push(cx.background_spawn(async move {
173 match Self::handle_adapter_log(stdout, adapter_log_handler).await {
174 ConnectionResult::Timeout => {
175 log::error!("Timed out when handling debugger log");
176 }
177 ConnectionResult::ConnectionReset => {
178 log::info!("Debugger logs connection closed");
179 }
180 ConnectionResult::Result(Ok(())) => {}
181 ConnectionResult::Result(Err(e)) => {
182 log::error!("Error handling debugger log: {e}");
183 }
184 }
185 }));
186 }
187
188 let pending_requests = self.pending_requests.clone();
189 let output_log_handler = log_handler.clone();
190 self._tasks.push(cx.background_spawn(async move {
191 match Self::handle_output(
192 params.output,
193 client_tx,
194 pending_requests,
195 output_log_handler,
196 )
197 .await
198 {
199 Ok(()) => {}
200 Err(e) => log::error!("Error handling debugger output: {e}"),
201 }
202 }));
203
204 if let Some(stderr) = params.stderr.take() {
205 let log_handlers = self.log_handlers.clone();
206 self._tasks.push(cx.background_spawn(async move {
207 match Self::handle_error(stderr, log_handlers).await {
208 ConnectionResult::Timeout => {
209 log::error!("Timed out reading debugger error stream")
210 }
211 ConnectionResult::ConnectionReset => {
212 log::info!("Debugger closed its error stream")
213 }
214 ConnectionResult::Result(Ok(())) => {}
215 ConnectionResult::Result(Err(e)) => {
216 log::error!("Error handling debugger error: {e}")
217 }
218 }
219 }));
220 }
221
222 let current_requests = self.current_requests.clone();
223 let pending_requests = self.pending_requests.clone();
224 let log_handler = log_handler.clone();
225 self._tasks.push(cx.background_spawn(async move {
226 match Self::handle_input(
227 params.input,
228 client_rx,
229 current_requests,
230 pending_requests,
231 log_handler,
232 )
233 .await
234 {
235 Ok(()) => {}
236 Err(e) => log::error!("Error handling debugger input: {e}"),
237 }
238 }));
239 })?;
240
241 {
242 let mut lock = self.server_tx.lock().await;
243 *lock = Some(server_tx.clone());
244 }
245
246 Ok((server_rx, server_tx))
247 }
248
249 pub(crate) async fn add_pending_request(
250 &self,
251 sequence_id: u64,
252 request: oneshot::Sender<Result<Response>>,
253 ) {
254 let mut pending_requests = self.pending_requests.lock().await;
255 pending_requests.insert(sequence_id, request);
256 }
257
258 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
259 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
260 server_tx.send(message).await.context("sending message")
261 } else {
262 anyhow::bail!("Server tx already dropped")
263 }
264 }
265
266 async fn handle_adapter_log<Stdout>(
267 stdout: Stdout,
268 log_handlers: Option<LogHandlers>,
269 ) -> ConnectionResult<()>
270 where
271 Stdout: AsyncRead + Unpin + Send + 'static,
272 {
273 let mut reader = BufReader::new(stdout);
274 let mut line = String::new();
275
276 let result = loop {
277 line.truncate(0);
278
279 match reader
280 .read_line(&mut line)
281 .await
282 .context("reading adapter log line")
283 {
284 Ok(0) => break ConnectionResult::ConnectionReset,
285 Ok(_) => {}
286 Err(e) => break ConnectionResult::Result(Err(e)),
287 }
288
289 if let Some(log_handlers) = log_handlers.as_ref() {
290 for (kind, handler) in log_handlers.lock().iter_mut() {
291 if matches!(kind, LogKind::Adapter) {
292 handler(IoKind::StdOut, line.as_str());
293 }
294 }
295 }
296 };
297
298 log::debug!("Handle adapter log dropped");
299
300 result
301 }
302
303 fn build_rpc_message(message: String) -> String {
304 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
305 }
306
307 async fn handle_input<Stdin>(
308 mut server_stdin: Stdin,
309 client_rx: Receiver<Message>,
310 current_requests: Requests,
311 pending_requests: Requests,
312 log_handlers: Option<LogHandlers>,
313 ) -> Result<()>
314 where
315 Stdin: AsyncWrite + Unpin + Send + 'static,
316 {
317 let result = loop {
318 match client_rx.recv().await {
319 Ok(message) => {
320 if let Message::Request(request) = &message {
321 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
322 pending_requests.lock().await.insert(request.seq, sender);
323 }
324 }
325
326 let message = match serde_json::to_string(&message) {
327 Ok(message) => message,
328 Err(e) => break Err(e.into()),
329 };
330
331 if let Some(log_handlers) = log_handlers.as_ref() {
332 for (kind, log_handler) in log_handlers.lock().iter_mut() {
333 if matches!(kind, LogKind::Rpc) {
334 log_handler(IoKind::StdIn, &message);
335 }
336 }
337 }
338
339 if let Err(e) = server_stdin
340 .write_all(Self::build_rpc_message(message).as_bytes())
341 .await
342 {
343 break Err(e.into());
344 }
345
346 if let Err(e) = server_stdin.flush().await {
347 break Err(e.into());
348 }
349 }
350 Err(error) => break Err(error.into()),
351 }
352 };
353
354 log::debug!("Handle adapter input dropped");
355
356 result
357 }
358
359 async fn handle_output<Stdout>(
360 server_stdout: Stdout,
361 client_tx: Sender<Message>,
362 pending_requests: Requests,
363 log_handlers: Option<LogHandlers>,
364 ) -> Result<()>
365 where
366 Stdout: AsyncRead + Unpin + Send + 'static,
367 {
368 let mut recv_buffer = String::new();
369 let mut reader = BufReader::new(server_stdout);
370
371 let result = loop {
372 match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
373 .await
374 {
375 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
376 ConnectionResult::ConnectionReset => {
377 log::info!("Debugger closed the connection");
378 return Ok(());
379 }
380 ConnectionResult::Result(Ok(Message::Response(res))) => {
381 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
382 if let Err(e) = tx.send(Self::process_response(res)) {
383 log::trace!("Did not send response `{:?}` for a cancelled", e);
384 }
385 } else {
386 client_tx.send(Message::Response(res)).await?;
387 }
388 }
389 ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
390 ConnectionResult::Result(Err(e)) => break Err(e),
391 }
392 };
393
394 drop(client_tx);
395 log::debug!("Handle adapter output dropped");
396
397 result
398 }
399
400 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
401 where
402 Stderr: AsyncRead + Unpin + Send + 'static,
403 {
404 log::debug!("Handle error started");
405 let mut buffer = String::new();
406
407 let mut reader = BufReader::new(stderr);
408
409 let result = loop {
410 match reader
411 .read_line(&mut buffer)
412 .await
413 .context("reading error log line")
414 {
415 Ok(0) => break ConnectionResult::ConnectionReset,
416 Ok(_) => {
417 for (kind, log_handler) in log_handlers.lock().iter_mut() {
418 if matches!(kind, LogKind::Adapter) {
419 log_handler(IoKind::StdErr, buffer.as_str());
420 }
421 }
422
423 buffer.truncate(0);
424 }
425 Err(error) => break ConnectionResult::Result(Err(error)),
426 }
427 };
428
429 log::debug!("Handle adapter error dropped");
430
431 result
432 }
433
434 fn process_response(response: Response) -> Result<Response> {
435 if response.success {
436 Ok(response)
437 } else {
438 if let Some(error_message) = response
439 .body
440 .clone()
441 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
442 .and_then(|response| response.error.map(|msg| msg.format))
443 .or_else(|| response.message.clone())
444 {
445 anyhow::bail!(error_message);
446 };
447
448 anyhow::bail!(
449 "Received error response from adapter. Response: {:?}",
450 response
451 );
452 }
453 }
454
455 async fn receive_server_message<Stdout>(
456 reader: &mut BufReader<Stdout>,
457 buffer: &mut String,
458 log_handlers: Option<&LogHandlers>,
459 ) -> ConnectionResult<Message>
460 where
461 Stdout: AsyncRead + Unpin + Send + 'static,
462 {
463 let mut content_length = None;
464 loop {
465 buffer.truncate(0);
466
467 match reader
468 .read_line(buffer)
469 .await
470 .with_context(|| "reading a message from server")
471 {
472 Ok(0) => return ConnectionResult::ConnectionReset,
473 Ok(_) => {}
474 Err(e) => return ConnectionResult::Result(Err(e)),
475 };
476
477 if buffer == "\r\n" {
478 break;
479 }
480
481 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
482 match value.parse().context("invalid content length") {
483 Ok(length) => content_length = Some(length),
484 Err(e) => return ConnectionResult::Result(Err(e)),
485 }
486 }
487 }
488
489 let content_length = match content_length.context("missing content length") {
490 Ok(length) => length,
491 Err(e) => return ConnectionResult::Result(Err(e)),
492 };
493
494 let mut content = vec![0; content_length];
495 if let Err(e) = reader
496 .read_exact(&mut content)
497 .await
498 .with_context(|| "reading after a loop")
499 {
500 return ConnectionResult::Result(Err(e));
501 }
502
503 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
504 Ok(str) => str,
505 Err(e) => return ConnectionResult::Result(Err(e)),
506 };
507
508 if let Some(log_handlers) = log_handlers {
509 for (kind, log_handler) in log_handlers.lock().iter_mut() {
510 if matches!(kind, LogKind::Rpc) {
511 log_handler(IoKind::StdOut, message_str);
512 }
513 }
514 }
515
516 ConnectionResult::Result(
517 serde_json::from_str::<Message>(message_str).context("deserializing server message"),
518 )
519 }
520
521 pub async fn shutdown(&self) -> Result<()> {
522 log::debug!("Start shutdown client");
523
524 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
525 server_tx.close();
526 }
527
528 let mut current_requests = self.current_requests.lock().await;
529 let mut pending_requests = self.pending_requests.lock().await;
530
531 current_requests.clear();
532 pending_requests.clear();
533
534 let _ = self.transport.kill().await.log_err();
535
536 drop(current_requests);
537 drop(pending_requests);
538
539 log::debug!("Shutdown client completed");
540
541 anyhow::Ok(())
542 }
543
544 pub fn has_adapter_logs(&self) -> bool {
545 self.transport.has_adapter_logs()
546 }
547
548 pub fn transport(&self) -> &Transport {
549 &self.transport
550 }
551
552 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
553 where
554 F: 'static + Send + FnMut(IoKind, &str),
555 {
556 let mut log_handlers = self.log_handlers.lock();
557 log_handlers.push((kind, Box::new(f)));
558 }
559}
560
561pub struct TcpTransport {
562 pub port: u16,
563 pub host: Ipv4Addr,
564 pub timeout: u64,
565 process: Mutex<Child>,
566}
567
568impl TcpTransport {
569 /// Get an open port to use with the tcp client when not supplied by debug config
570 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
571 if let Some(port) = host.port {
572 Ok(port)
573 } else {
574 Self::unused_port(host.host()).await
575 }
576 }
577
578 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
579 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
580 .await?
581 .local_addr()?
582 .port())
583 }
584
585 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
586 let connection_args = binary
587 .connection
588 .as_ref()
589 .context("No connection arguments provided")?;
590
591 let host = connection_args.host;
592 let port = connection_args.port;
593
594 let mut command = util::command::new_std_command(&binary.command);
595 util::set_pre_exec_to_start_new_session(&mut command);
596 let mut command = smol::process::Command::from(command);
597
598 if let Some(cwd) = &binary.cwd {
599 command.current_dir(cwd);
600 }
601
602 command.args(&binary.arguments);
603 command.envs(&binary.envs);
604
605 command
606 .stdin(Stdio::null())
607 .stdout(Stdio::piped())
608 .stderr(Stdio::piped())
609 .kill_on_drop(true);
610
611 let mut process = command
612 .spawn()
613 .with_context(|| "failed to start debug adapter.")?;
614
615 let address = SocketAddrV4::new(host, port);
616
617 let timeout = connection_args.timeout.unwrap_or_else(|| {
618 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
619 .unwrap_or(2000u64)
620 });
621
622 let (mut process, (rx, tx)) = select! {
623 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
624 anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
625 },
626 result = cx.spawn(async move |cx| {
627 loop {
628 match TcpStream::connect(address).await {
629 Ok(stream) => return Ok((process, stream.split())),
630 Err(_) => {
631 if let Ok(Some(_)) = process.try_status() {
632 let output = process.output().await?;
633 let output = if output.stderr.is_empty() {
634 String::from_utf8_lossy(&output.stdout).to_string()
635 } else {
636 String::from_utf8_lossy(&output.stderr).to_string()
637 };
638 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
639 }
640 cx.background_executor().timer(Duration::from_millis(100)).await;
641 }
642 }
643 }
644 }).fuse() => result?
645 };
646
647 log::info!(
648 "Debug adapter has connected to TCP server {}:{}",
649 host,
650 port
651 );
652 let stdout = process.stdout.take();
653 let stderr = process.stderr.take();
654
655 let this = Self {
656 port,
657 host,
658 process: Mutex::new(process),
659 timeout,
660 };
661
662 let pipe = TransportPipe::new(
663 Box::new(tx),
664 Box::new(BufReader::new(rx)),
665 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
666 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
667 );
668
669 Ok((pipe, this))
670 }
671
672 fn has_adapter_logs(&self) -> bool {
673 true
674 }
675
676 async fn kill(&self) -> Result<()> {
677 self.process.lock().await.kill()?;
678
679 Ok(())
680 }
681}
682
683pub struct StdioTransport {
684 process: Mutex<Child>,
685}
686
687impl StdioTransport {
688 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
689 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
690 let mut command = util::command::new_std_command(&binary.command);
691 util::set_pre_exec_to_start_new_session(&mut command);
692 let mut command = smol::process::Command::from(command);
693
694 if let Some(cwd) = &binary.cwd {
695 command.current_dir(cwd);
696 }
697
698 command.args(&binary.arguments);
699 command.envs(&binary.envs);
700
701 command
702 .stdin(Stdio::piped())
703 .stdout(Stdio::piped())
704 .stderr(Stdio::piped())
705 .kill_on_drop(true);
706
707 let mut process = command.spawn().with_context(|| {
708 format!(
709 "failed to spawn command `{} {}`.",
710 binary.command,
711 binary.arguments.join(" ")
712 )
713 })?;
714
715 let stdin = process.stdin.take().context("Failed to open stdin")?;
716 let stdout = process.stdout.take().context("Failed to open stdout")?;
717 let stderr = process
718 .stderr
719 .take()
720 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
721
722 if stderr.is_none() {
723 bail!(
724 "Failed to connect to stderr for debug adapter command {}",
725 &binary.command
726 );
727 }
728
729 log::info!("Debug adapter has connected to stdio adapter");
730
731 let process = Mutex::new(process);
732
733 Ok((
734 TransportPipe::new(
735 Box::new(stdin),
736 Box::new(BufReader::new(stdout)),
737 None,
738 stderr,
739 ),
740 Self { process },
741 ))
742 }
743
744 fn has_adapter_logs(&self) -> bool {
745 false
746 }
747
748 async fn kill(&self) -> Result<()> {
749 self.process.lock().await.kill()?;
750 Ok(())
751 }
752}
753
754#[cfg(any(test, feature = "test-support"))]
755type RequestHandler =
756 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
757
758#[cfg(any(test, feature = "test-support"))]
759type ResponseHandler = Box<dyn Send + Fn(Response)>;
760
761#[cfg(any(test, feature = "test-support"))]
762pub struct FakeTransport {
763 // for sending fake response back from adapter side
764 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
765 // for reverse request responses
766 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
767}
768
769#[cfg(any(test, feature = "test-support"))]
770impl FakeTransport {
771 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
772 where
773 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
774 {
775 self.request_handlers.lock().insert(
776 R::COMMAND,
777 Box::new(move |seq, args| {
778 let result = handler(seq, serde_json::from_value(args).unwrap());
779 let response = match result {
780 Ok(response) => Response {
781 seq: seq + 1,
782 request_seq: seq,
783 success: true,
784 command: R::COMMAND.into(),
785 body: Some(serde_json::to_value(response).unwrap()),
786 message: None,
787 },
788 Err(response) => Response {
789 seq: seq + 1,
790 request_seq: seq,
791 success: false,
792 command: R::COMMAND.into(),
793 body: Some(serde_json::to_value(response).unwrap()),
794 message: None,
795 },
796 };
797 response
798 }),
799 );
800 }
801
802 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
803 where
804 F: 'static + Send + Fn(Response),
805 {
806 self.response_handlers
807 .lock()
808 .insert(R::COMMAND, Box::new(handler));
809 }
810
811 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
812 let this = Self {
813 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
814 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
815 };
816 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
817 use serde_json::json;
818
819 let (stdin_writer, stdin_reader) = async_pipe::pipe();
820 let (stdout_writer, stdout_reader) = async_pipe::pipe();
821
822 let request_handlers = this.request_handlers.clone();
823 let response_handlers = this.response_handlers.clone();
824 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
825
826 cx.background_spawn(async move {
827 let mut reader = BufReader::new(stdin_reader);
828 let mut buffer = String::new();
829
830 loop {
831 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
832 .await
833 {
834 ConnectionResult::Timeout => {
835 anyhow::bail!("Timed out when connecting to debugger");
836 }
837 ConnectionResult::ConnectionReset => {
838 log::info!("Debugger closed the connection");
839 break Ok(());
840 }
841 ConnectionResult::Result(Err(e)) => break Err(e),
842 ConnectionResult::Result(Ok(message)) => {
843 match message {
844 Message::Request(request) => {
845 // redirect reverse requests to stdout writer/reader
846 if request.command == RunInTerminal::COMMAND
847 || request.command == StartDebugging::COMMAND
848 {
849 let message =
850 serde_json::to_string(&Message::Request(request)).unwrap();
851
852 let mut writer = stdout_writer.lock().await;
853 writer
854 .write_all(
855 TransportDelegate::build_rpc_message(message)
856 .as_bytes(),
857 )
858 .await
859 .unwrap();
860 writer.flush().await.unwrap();
861 } else {
862 let response = if let Some(handle) =
863 request_handlers.lock().get_mut(request.command.as_str())
864 {
865 handle(request.seq, request.arguments.unwrap_or(json!({})))
866 } else {
867 panic!("No request handler for {}", request.command);
868 };
869 let message =
870 serde_json::to_string(&Message::Response(response))
871 .unwrap();
872
873 let mut writer = stdout_writer.lock().await;
874
875 writer
876 .write_all(
877 TransportDelegate::build_rpc_message(message)
878 .as_bytes(),
879 )
880 .await
881 .unwrap();
882 writer.flush().await.unwrap();
883 }
884 }
885 Message::Event(event) => {
886 let message =
887 serde_json::to_string(&Message::Event(event)).unwrap();
888
889 let mut writer = stdout_writer.lock().await;
890 writer
891 .write_all(
892 TransportDelegate::build_rpc_message(message).as_bytes(),
893 )
894 .await
895 .unwrap();
896 writer.flush().await.unwrap();
897 }
898 Message::Response(response) => {
899 if let Some(handle) =
900 response_handlers.lock().get(response.command.as_str())
901 {
902 handle(response);
903 } else {
904 log::error!("No response handler for {}", response.command);
905 }
906 }
907 }
908 }
909 }
910 }
911 })
912 .detach();
913
914 Ok((
915 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
916 this,
917 ))
918 }
919
920 fn has_adapter_logs(&self) -> bool {
921 false
922 }
923
924 async fn kill(&self) -> Result<()> {
925 Ok(())
926 }
927}