1use anyhow::{Context as _, Result, anyhow, bail};
2use dap_types::{
3 ErrorResponse,
4 messages::{Message, Response},
5};
6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
7use gpui::{AppContext as _, AsyncApp, Task};
8use settings::Settings as _;
9use smallvec::SmallVec;
10use smol::{
11 channel::{Receiver, Sender, unbounded},
12 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
13 lock::Mutex,
14 net::{TcpListener, TcpStream},
15 process::Child,
16};
17use std::{
18 collections::HashMap,
19 net::{Ipv4Addr, SocketAddrV4},
20 process::Stdio,
21 sync::Arc,
22 time::Duration,
23};
24use task::TcpArgumentsTemplate;
25use util::{ConnectionResult, ResultExt as _};
26
27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
28
29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
30
31#[derive(PartialEq, Eq, Clone, Copy)]
32pub enum LogKind {
33 Adapter,
34 Rpc,
35}
36
37pub enum IoKind {
38 StdIn,
39 StdOut,
40 StdErr,
41}
42
43pub struct TransportPipe {
44 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
45 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
46 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
47 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
48}
49
50impl TransportPipe {
51 pub fn new(
52 input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
53 output: Box<dyn AsyncRead + Unpin + Send + 'static>,
54 stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
55 stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
56 ) -> Self {
57 TransportPipe {
58 input,
59 output,
60 stdout,
61 stderr,
62 }
63 }
64}
65
66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
68
69pub enum Transport {
70 Stdio(StdioTransport),
71 Tcp(TcpTransport),
72 #[cfg(any(test, feature = "test-support"))]
73 Fake(FakeTransport),
74}
75
76impl Transport {
77 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
78 #[cfg(any(test, feature = "test-support"))]
79 if cfg!(any(test, feature = "test-support")) {
80 return FakeTransport::start(cx)
81 .await
82 .map(|(transports, fake)| (transports, Self::Fake(fake)));
83 }
84
85 if binary.connection.is_some() {
86 TcpTransport::start(binary, cx)
87 .await
88 .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
89 } else {
90 StdioTransport::start(binary, cx)
91 .await
92 .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
93 }
94 }
95
96 fn has_adapter_logs(&self) -> bool {
97 match self {
98 Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
99 Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100 #[cfg(any(test, feature = "test-support"))]
101 Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102 }
103 }
104
105 async fn kill(&self) -> Result<()> {
106 match self {
107 Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108 Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109 #[cfg(any(test, feature = "test-support"))]
110 Transport::Fake(fake_transport) => fake_transport.kill().await,
111 }
112 }
113
114 #[cfg(any(test, feature = "test-support"))]
115 pub(crate) fn as_fake(&self) -> &FakeTransport {
116 match self {
117 Transport::Fake(fake_transport) => fake_transport,
118 _ => panic!("Not a fake transport layer"),
119 }
120 }
121}
122
123pub(crate) struct TransportDelegate {
124 log_handlers: LogHandlers,
125 current_requests: Requests,
126 pending_requests: Requests,
127 transport: Transport,
128 server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129 _tasks: Vec<Task<()>>,
130}
131
132impl TransportDelegate {
133 pub(crate) async fn start(
134 binary: &DebugAdapterBinary,
135 cx: AsyncApp,
136 ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137 let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138 let mut this = Self {
139 transport,
140 server_tx: Default::default(),
141 log_handlers: Default::default(),
142 current_requests: Default::default(),
143 pending_requests: Default::default(),
144 _tasks: Vec::new(),
145 };
146 let messages = this.start_handlers(transport_pipes, cx).await?;
147 Ok((messages, this))
148 }
149
150 async fn start_handlers(
151 &mut self,
152 mut params: TransportPipe,
153 cx: AsyncApp,
154 ) -> Result<(Receiver<Message>, Sender<Message>)> {
155 let (client_tx, server_rx) = unbounded::<Message>();
156 let (server_tx, client_rx) = unbounded::<Message>();
157
158 let log_dap_communications =
159 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160 .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161 .unwrap_or(false);
162
163 let log_handler = if log_dap_communications {
164 Some(self.log_handlers.clone())
165 } else {
166 None
167 };
168
169 let adapter_log_handler = log_handler.clone();
170 cx.update(|cx| {
171 if let Some(stdout) = params.stdout.take() {
172 self._tasks.push(cx.background_spawn(async move {
173 match Self::handle_adapter_log(stdout, adapter_log_handler).await {
174 ConnectionResult::Timeout => {
175 log::error!("Timed out when handling debugger log");
176 }
177 ConnectionResult::ConnectionReset => {
178 log::info!("Debugger logs connection closed");
179 }
180 ConnectionResult::Result(Ok(())) => {}
181 ConnectionResult::Result(Err(e)) => {
182 log::error!("Error handling debugger log: {e}");
183 }
184 }
185 }));
186 }
187
188 let pending_requests = self.pending_requests.clone();
189 let output_log_handler = log_handler.clone();
190 self._tasks.push(cx.background_spawn(async move {
191 match Self::handle_output(
192 params.output,
193 client_tx,
194 pending_requests.clone(),
195 output_log_handler,
196 )
197 .await
198 {
199 Ok(()) => {}
200 Err(e) => log::error!("Error handling debugger output: {e}"),
201 }
202 let mut pending_requests = pending_requests.lock().await;
203 pending_requests.drain().for_each(|(_, request)| {
204 request
205 .send(Err(anyhow!("debugger shutdown unexpectedly")))
206 .ok();
207 });
208 }));
209
210 if let Some(stderr) = params.stderr.take() {
211 let log_handlers = self.log_handlers.clone();
212 self._tasks.push(cx.background_spawn(async move {
213 match Self::handle_error(stderr, log_handlers).await {
214 ConnectionResult::Timeout => {
215 log::error!("Timed out reading debugger error stream")
216 }
217 ConnectionResult::ConnectionReset => {
218 log::info!("Debugger closed its error stream")
219 }
220 ConnectionResult::Result(Ok(())) => {}
221 ConnectionResult::Result(Err(e)) => {
222 log::error!("Error handling debugger error: {e}")
223 }
224 }
225 }));
226 }
227
228 let current_requests = self.current_requests.clone();
229 let pending_requests = self.pending_requests.clone();
230 let log_handler = log_handler.clone();
231 self._tasks.push(cx.background_spawn(async move {
232 match Self::handle_input(
233 params.input,
234 client_rx,
235 current_requests,
236 pending_requests,
237 log_handler,
238 )
239 .await
240 {
241 Ok(()) => {}
242 Err(e) => log::error!("Error handling debugger input: {e}"),
243 }
244 }));
245 })?;
246
247 {
248 let mut lock = self.server_tx.lock().await;
249 *lock = Some(server_tx.clone());
250 }
251
252 Ok((server_rx, server_tx))
253 }
254
255 pub(crate) async fn add_pending_request(
256 &self,
257 sequence_id: u64,
258 request: oneshot::Sender<Result<Response>>,
259 ) {
260 let mut pending_requests = self.pending_requests.lock().await;
261 pending_requests.insert(sequence_id, request);
262 }
263
264 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
265 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
266 server_tx.send(message).await.context("sending message")
267 } else {
268 anyhow::bail!("Server tx already dropped")
269 }
270 }
271
272 async fn handle_adapter_log<Stdout>(
273 stdout: Stdout,
274 log_handlers: Option<LogHandlers>,
275 ) -> ConnectionResult<()>
276 where
277 Stdout: AsyncRead + Unpin + Send + 'static,
278 {
279 let mut reader = BufReader::new(stdout);
280 let mut line = String::new();
281
282 let result = loop {
283 line.truncate(0);
284
285 match reader
286 .read_line(&mut line)
287 .await
288 .context("reading adapter log line")
289 {
290 Ok(0) => break ConnectionResult::ConnectionReset,
291 Ok(_) => {}
292 Err(e) => break ConnectionResult::Result(Err(e)),
293 }
294
295 if let Some(log_handlers) = log_handlers.as_ref() {
296 for (kind, handler) in log_handlers.lock().iter_mut() {
297 if matches!(kind, LogKind::Adapter) {
298 handler(IoKind::StdOut, line.as_str());
299 }
300 }
301 }
302 };
303
304 log::debug!("Handle adapter log dropped");
305
306 result
307 }
308
309 fn build_rpc_message(message: String) -> String {
310 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
311 }
312
313 async fn handle_input<Stdin>(
314 mut server_stdin: Stdin,
315 client_rx: Receiver<Message>,
316 current_requests: Requests,
317 pending_requests: Requests,
318 log_handlers: Option<LogHandlers>,
319 ) -> Result<()>
320 where
321 Stdin: AsyncWrite + Unpin + Send + 'static,
322 {
323 let result = loop {
324 match client_rx.recv().await {
325 Ok(message) => {
326 if let Message::Request(request) = &message {
327 if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
328 pending_requests.lock().await.insert(request.seq, sender);
329 }
330 }
331
332 let message = match serde_json::to_string(&message) {
333 Ok(message) => message,
334 Err(e) => break Err(e.into()),
335 };
336
337 if let Some(log_handlers) = log_handlers.as_ref() {
338 for (kind, log_handler) in log_handlers.lock().iter_mut() {
339 if matches!(kind, LogKind::Rpc) {
340 log_handler(IoKind::StdIn, &message);
341 }
342 }
343 }
344
345 if let Err(e) = server_stdin
346 .write_all(Self::build_rpc_message(message).as_bytes())
347 .await
348 {
349 break Err(e.into());
350 }
351
352 if let Err(e) = server_stdin.flush().await {
353 break Err(e.into());
354 }
355 }
356 Err(error) => break Err(error.into()),
357 }
358 };
359
360 log::debug!("Handle adapter input dropped");
361
362 result
363 }
364
365 async fn handle_output<Stdout>(
366 server_stdout: Stdout,
367 client_tx: Sender<Message>,
368 pending_requests: Requests,
369 log_handlers: Option<LogHandlers>,
370 ) -> Result<()>
371 where
372 Stdout: AsyncRead + Unpin + Send + 'static,
373 {
374 let mut recv_buffer = String::new();
375 let mut reader = BufReader::new(server_stdout);
376
377 let result = loop {
378 match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
379 .await
380 {
381 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
382 ConnectionResult::ConnectionReset => {
383 log::info!("Debugger closed the connection");
384 return Ok(());
385 }
386 ConnectionResult::Result(Ok(Message::Response(res))) => {
387 if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
388 if let Err(e) = tx.send(Self::process_response(res)) {
389 log::trace!("Did not send response `{:?}` for a cancelled", e);
390 }
391 } else {
392 client_tx.send(Message::Response(res)).await?;
393 }
394 }
395 ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
396 ConnectionResult::Result(Err(e)) => break Err(e),
397 }
398 };
399
400 drop(client_tx);
401 log::debug!("Handle adapter output dropped");
402
403 result
404 }
405
406 async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
407 where
408 Stderr: AsyncRead + Unpin + Send + 'static,
409 {
410 log::debug!("Handle error started");
411 let mut buffer = String::new();
412
413 let mut reader = BufReader::new(stderr);
414
415 let result = loop {
416 match reader
417 .read_line(&mut buffer)
418 .await
419 .context("reading error log line")
420 {
421 Ok(0) => break ConnectionResult::ConnectionReset,
422 Ok(_) => {
423 for (kind, log_handler) in log_handlers.lock().iter_mut() {
424 if matches!(kind, LogKind::Adapter) {
425 log_handler(IoKind::StdErr, buffer.as_str());
426 }
427 }
428
429 buffer.truncate(0);
430 }
431 Err(error) => break ConnectionResult::Result(Err(error)),
432 }
433 };
434
435 log::debug!("Handle adapter error dropped");
436
437 result
438 }
439
440 fn process_response(response: Response) -> Result<Response> {
441 if response.success {
442 Ok(response)
443 } else {
444 if let Some(error_message) = response
445 .body
446 .clone()
447 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
448 .and_then(|response| response.error.map(|msg| msg.format))
449 .or_else(|| response.message.clone())
450 {
451 anyhow::bail!(error_message);
452 };
453
454 anyhow::bail!(
455 "Received error response from adapter. Response: {:?}",
456 response
457 );
458 }
459 }
460
461 async fn receive_server_message<Stdout>(
462 reader: &mut BufReader<Stdout>,
463 buffer: &mut String,
464 log_handlers: Option<&LogHandlers>,
465 ) -> ConnectionResult<Message>
466 where
467 Stdout: AsyncRead + Unpin + Send + 'static,
468 {
469 let mut content_length = None;
470 loop {
471 buffer.truncate(0);
472
473 match reader
474 .read_line(buffer)
475 .await
476 .with_context(|| "reading a message from server")
477 {
478 Ok(0) => return ConnectionResult::ConnectionReset,
479 Ok(_) => {}
480 Err(e) => return ConnectionResult::Result(Err(e)),
481 };
482
483 if buffer == "\r\n" {
484 break;
485 }
486
487 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
488 match value.parse().context("invalid content length") {
489 Ok(length) => content_length = Some(length),
490 Err(e) => return ConnectionResult::Result(Err(e)),
491 }
492 }
493 }
494
495 let content_length = match content_length.context("missing content length") {
496 Ok(length) => length,
497 Err(e) => return ConnectionResult::Result(Err(e)),
498 };
499
500 let mut content = vec![0; content_length];
501 if let Err(e) = reader
502 .read_exact(&mut content)
503 .await
504 .with_context(|| "reading after a loop")
505 {
506 return ConnectionResult::Result(Err(e));
507 }
508
509 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
510 Ok(str) => str,
511 Err(e) => return ConnectionResult::Result(Err(e)),
512 };
513
514 if let Some(log_handlers) = log_handlers {
515 for (kind, log_handler) in log_handlers.lock().iter_mut() {
516 if matches!(kind, LogKind::Rpc) {
517 log_handler(IoKind::StdOut, message_str);
518 }
519 }
520 }
521
522 ConnectionResult::Result(
523 serde_json::from_str::<Message>(message_str).context("deserializing server message"),
524 )
525 }
526
527 pub async fn shutdown(&self) -> Result<()> {
528 log::debug!("Start shutdown client");
529
530 if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
531 server_tx.close();
532 }
533
534 let mut current_requests = self.current_requests.lock().await;
535 let mut pending_requests = self.pending_requests.lock().await;
536
537 current_requests.clear();
538 pending_requests.clear();
539
540 let _ = self.transport.kill().await.log_err();
541
542 drop(current_requests);
543 drop(pending_requests);
544
545 log::debug!("Shutdown client completed");
546
547 anyhow::Ok(())
548 }
549
550 pub fn has_adapter_logs(&self) -> bool {
551 self.transport.has_adapter_logs()
552 }
553
554 pub fn transport(&self) -> &Transport {
555 &self.transport
556 }
557
558 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
559 where
560 F: 'static + Send + FnMut(IoKind, &str),
561 {
562 let mut log_handlers = self.log_handlers.lock();
563 log_handlers.push((kind, Box::new(f)));
564 }
565}
566
567pub struct TcpTransport {
568 pub port: u16,
569 pub host: Ipv4Addr,
570 pub timeout: u64,
571 process: Mutex<Child>,
572}
573
574impl TcpTransport {
575 /// Get an open port to use with the tcp client when not supplied by debug config
576 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
577 if let Some(port) = host.port {
578 Ok(port)
579 } else {
580 Self::unused_port(host.host()).await
581 }
582 }
583
584 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
585 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
586 .await?
587 .local_addr()?
588 .port())
589 }
590
591 async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
592 let connection_args = binary
593 .connection
594 .as_ref()
595 .context("No connection arguments provided")?;
596
597 let host = connection_args.host;
598 let port = connection_args.port;
599
600 let mut command = util::command::new_std_command(&binary.command);
601 util::set_pre_exec_to_start_new_session(&mut command);
602 let mut command = smol::process::Command::from(command);
603
604 if let Some(cwd) = &binary.cwd {
605 command.current_dir(cwd);
606 }
607
608 command.args(&binary.arguments);
609 command.envs(&binary.envs);
610
611 command
612 .stdin(Stdio::null())
613 .stdout(Stdio::piped())
614 .stderr(Stdio::piped())
615 .kill_on_drop(true);
616
617 let mut process = command
618 .spawn()
619 .with_context(|| "failed to start debug adapter.")?;
620
621 let address = SocketAddrV4::new(host, port);
622
623 let timeout = connection_args.timeout.unwrap_or_else(|| {
624 cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
625 .unwrap_or(2000u64)
626 });
627
628 let (mut process, (rx, tx)) = select! {
629 _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
630 anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
631 },
632 result = cx.spawn(async move |cx| {
633 loop {
634 match TcpStream::connect(address).await {
635 Ok(stream) => return Ok((process, stream.split())),
636 Err(_) => {
637 if let Ok(Some(_)) = process.try_status() {
638 let output = process.output().await?;
639 let output = if output.stderr.is_empty() {
640 String::from_utf8_lossy(&output.stdout).to_string()
641 } else {
642 String::from_utf8_lossy(&output.stderr).to_string()
643 };
644 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
645 }
646 cx.background_executor().timer(Duration::from_millis(100)).await;
647 }
648 }
649 }
650 }).fuse() => result?
651 };
652
653 log::info!(
654 "Debug adapter has connected to TCP server {}:{}",
655 host,
656 port
657 );
658 let stdout = process.stdout.take();
659 let stderr = process.stderr.take();
660
661 let this = Self {
662 port,
663 host,
664 process: Mutex::new(process),
665 timeout,
666 };
667
668 let pipe = TransportPipe::new(
669 Box::new(tx),
670 Box::new(BufReader::new(rx)),
671 stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
672 stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
673 );
674
675 Ok((pipe, this))
676 }
677
678 fn has_adapter_logs(&self) -> bool {
679 true
680 }
681
682 async fn kill(&self) -> Result<()> {
683 self.process.lock().await.kill()?;
684
685 Ok(())
686 }
687}
688
689pub struct StdioTransport {
690 process: Mutex<Child>,
691}
692
693impl StdioTransport {
694 #[allow(dead_code, reason = "This is used in non test builds of Zed")]
695 async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
696 let mut command = util::command::new_std_command(&binary.command);
697 util::set_pre_exec_to_start_new_session(&mut command);
698 let mut command = smol::process::Command::from(command);
699
700 if let Some(cwd) = &binary.cwd {
701 command.current_dir(cwd);
702 }
703
704 command.args(&binary.arguments);
705 command.envs(&binary.envs);
706
707 command
708 .stdin(Stdio::piped())
709 .stdout(Stdio::piped())
710 .stderr(Stdio::piped())
711 .kill_on_drop(true);
712
713 let mut process = command.spawn().with_context(|| {
714 format!(
715 "failed to spawn command `{} {}`.",
716 binary.command,
717 binary.arguments.join(" ")
718 )
719 })?;
720
721 let stdin = process.stdin.take().context("Failed to open stdin")?;
722 let stdout = process.stdout.take().context("Failed to open stdout")?;
723 let stderr = process
724 .stderr
725 .take()
726 .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
727
728 if stderr.is_none() {
729 bail!(
730 "Failed to connect to stderr for debug adapter command {}",
731 &binary.command
732 );
733 }
734
735 log::info!("Debug adapter has connected to stdio adapter");
736
737 let process = Mutex::new(process);
738
739 Ok((
740 TransportPipe::new(
741 Box::new(stdin),
742 Box::new(BufReader::new(stdout)),
743 None,
744 stderr,
745 ),
746 Self { process },
747 ))
748 }
749
750 fn has_adapter_logs(&self) -> bool {
751 false
752 }
753
754 async fn kill(&self) -> Result<()> {
755 self.process.lock().await.kill()?;
756 Ok(())
757 }
758}
759
760#[cfg(any(test, feature = "test-support"))]
761type RequestHandler =
762 Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
763
764#[cfg(any(test, feature = "test-support"))]
765type ResponseHandler = Box<dyn Send + Fn(Response)>;
766
767#[cfg(any(test, feature = "test-support"))]
768pub struct FakeTransport {
769 // for sending fake response back from adapter side
770 request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
771 // for reverse request responses
772 response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
773}
774
775#[cfg(any(test, feature = "test-support"))]
776impl FakeTransport {
777 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
778 where
779 F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
780 {
781 self.request_handlers.lock().insert(
782 R::COMMAND,
783 Box::new(move |seq, args| {
784 let result = handler(seq, serde_json::from_value(args).unwrap());
785 let response = match result {
786 Ok(response) => Response {
787 seq: seq + 1,
788 request_seq: seq,
789 success: true,
790 command: R::COMMAND.into(),
791 body: Some(serde_json::to_value(response).unwrap()),
792 message: None,
793 },
794 Err(response) => Response {
795 seq: seq + 1,
796 request_seq: seq,
797 success: false,
798 command: R::COMMAND.into(),
799 body: Some(serde_json::to_value(response).unwrap()),
800 message: None,
801 },
802 };
803 response
804 }),
805 );
806 }
807
808 pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
809 where
810 F: 'static + Send + Fn(Response),
811 {
812 self.response_handlers
813 .lock()
814 .insert(R::COMMAND, Box::new(handler));
815 }
816
817 async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
818 let this = Self {
819 request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
820 response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
821 };
822 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
823 use serde_json::json;
824
825 let (stdin_writer, stdin_reader) = async_pipe::pipe();
826 let (stdout_writer, stdout_reader) = async_pipe::pipe();
827
828 let request_handlers = this.request_handlers.clone();
829 let response_handlers = this.response_handlers.clone();
830 let stdout_writer = Arc::new(Mutex::new(stdout_writer));
831
832 cx.background_spawn(async move {
833 let mut reader = BufReader::new(stdin_reader);
834 let mut buffer = String::new();
835
836 loop {
837 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
838 .await
839 {
840 ConnectionResult::Timeout => {
841 anyhow::bail!("Timed out when connecting to debugger");
842 }
843 ConnectionResult::ConnectionReset => {
844 log::info!("Debugger closed the connection");
845 break Ok(());
846 }
847 ConnectionResult::Result(Err(e)) => break Err(e),
848 ConnectionResult::Result(Ok(message)) => {
849 match message {
850 Message::Request(request) => {
851 // redirect reverse requests to stdout writer/reader
852 if request.command == RunInTerminal::COMMAND
853 || request.command == StartDebugging::COMMAND
854 {
855 let message =
856 serde_json::to_string(&Message::Request(request)).unwrap();
857
858 let mut writer = stdout_writer.lock().await;
859 writer
860 .write_all(
861 TransportDelegate::build_rpc_message(message)
862 .as_bytes(),
863 )
864 .await
865 .unwrap();
866 writer.flush().await.unwrap();
867 } else {
868 let response = if let Some(handle) =
869 request_handlers.lock().get_mut(request.command.as_str())
870 {
871 handle(request.seq, request.arguments.unwrap_or(json!({})))
872 } else {
873 panic!("No request handler for {}", request.command);
874 };
875 let message =
876 serde_json::to_string(&Message::Response(response))
877 .unwrap();
878
879 let mut writer = stdout_writer.lock().await;
880
881 writer
882 .write_all(
883 TransportDelegate::build_rpc_message(message)
884 .as_bytes(),
885 )
886 .await
887 .unwrap();
888 writer.flush().await.unwrap();
889 }
890 }
891 Message::Event(event) => {
892 let message =
893 serde_json::to_string(&Message::Event(event)).unwrap();
894
895 let mut writer = stdout_writer.lock().await;
896 writer
897 .write_all(
898 TransportDelegate::build_rpc_message(message).as_bytes(),
899 )
900 .await
901 .unwrap();
902 writer.flush().await.unwrap();
903 }
904 Message::Response(response) => {
905 if let Some(handle) =
906 response_handlers.lock().get(response.command.as_str())
907 {
908 handle(response);
909 } else {
910 log::error!("No response handler for {}", response.command);
911 }
912 }
913 }
914 }
915 }
916 }
917 })
918 .detach();
919
920 Ok((
921 TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
922 this,
923 ))
924 }
925
926 fn has_adapter_logs(&self) -> bool {
927 false
928 }
929
930 async fn kill(&self) -> Result<()> {
931 Ok(())
932 }
933}