1use anyhow::{Context as _, Result, anyhow, bail};
2#[cfg(any(test, feature = "test-support"))]
3use async_pipe::{PipeReader, PipeWriter};
4use dap_types::{
5 ErrorResponse,
6 messages::{Message, Response},
7};
8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
10use parking_lot::Mutex;
11use proto::ErrorExt;
12use settings::Settings as _;
13use smallvec::SmallVec;
14use smol::{
15 channel::{Receiver, Sender, unbounded},
16 io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
17 net::{TcpListener, TcpStream},
18};
19use std::{
20 collections::HashMap,
21 net::{Ipv4Addr, SocketAddrV4},
22 process::Stdio,
23 sync::Arc,
24 time::Duration,
25};
26use task::TcpArgumentsTemplate;
27use util::{ConnectionResult, ResultExt, process::Child};
28
29use crate::{
30 adapters::{DebugAdapterBinary, TcpArguments},
31 client::DapMessageHandler,
32 debugger_settings::DebuggerSettings,
33};
34
35pub(crate) type IoMessage = str;
36pub(crate) type Command = str;
37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
38
39#[derive(PartialEq, Eq, Clone, Copy)]
40pub enum LogKind {
41 Adapter,
42 Rpc,
43}
44
45#[derive(Clone, Copy)]
46pub enum IoKind {
47 StdIn,
48 StdOut,
49 StdErr,
50}
51
52#[cfg(any(test, feature = "test-support"))]
53pub enum RequestHandling<T> {
54 Respond(T),
55 Exit,
56}
57
58type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
59
60pub trait Transport: Send + Sync {
61 fn has_adapter_logs(&self) -> bool;
62 fn tcp_arguments(&self) -> Option<TcpArguments>;
63 fn connect(
64 &mut self,
65 ) -> Task<
66 Result<(
67 Box<dyn AsyncWrite + Unpin + Send + 'static>,
68 Box<dyn AsyncRead + Unpin + Send + 'static>,
69 )>,
70 >;
71 fn kill(&mut self);
72 #[cfg(any(test, feature = "test-support"))]
73 fn as_fake(&self) -> &FakeTransport {
74 unreachable!()
75 }
76}
77
78async fn start(
79 binary: &DebugAdapterBinary,
80 log_handlers: LogHandlers,
81 cx: &mut AsyncApp,
82) -> Result<Box<dyn Transport>> {
83 #[cfg(any(test, feature = "test-support"))]
84 if cfg!(any(test, feature = "test-support")) {
85 if let Some(connection) = binary.connection.clone() {
86 return Ok(Box::new(FakeTransport::start_tcp(connection, cx).await?));
87 } else {
88 return Ok(Box::new(FakeTransport::start_stdio(cx).await?));
89 }
90 }
91
92 if binary.connection.is_some() {
93 Ok(Box::new(
94 TcpTransport::start(binary, log_handlers, cx).await?,
95 ))
96 } else {
97 Ok(Box::new(
98 StdioTransport::start(binary, log_handlers, cx).await?,
99 ))
100 }
101}
102
103pub(crate) struct PendingRequests {
104 inner: Option<HashMap<u64, oneshot::Sender<Result<Response>>>>,
105}
106
107impl PendingRequests {
108 fn new() -> Self {
109 Self {
110 inner: Some(HashMap::default()),
111 }
112 }
113
114 fn flush(&mut self, e: anyhow::Error) {
115 let Some(inner) = self.inner.as_mut() else {
116 return;
117 };
118 for (_, sender) in inner.drain() {
119 sender.send(Err(e.cloned())).ok();
120 }
121 }
122
123 pub(crate) fn insert(
124 &mut self,
125 sequence_id: u64,
126 callback_tx: oneshot::Sender<Result<Response>>,
127 ) -> anyhow::Result<()> {
128 let Some(inner) = self.inner.as_mut() else {
129 bail!("client is closed")
130 };
131 inner.insert(sequence_id, callback_tx);
132 Ok(())
133 }
134
135 pub(crate) fn remove(
136 &mut self,
137 sequence_id: u64,
138 ) -> anyhow::Result<Option<oneshot::Sender<Result<Response>>>> {
139 let Some(inner) = self.inner.as_mut() else {
140 bail!("client is closed");
141 };
142 Ok(inner.remove(&sequence_id))
143 }
144
145 pub(crate) fn shutdown(&mut self) {
146 self.flush(anyhow!("transport shutdown"));
147 self.inner = None;
148 }
149}
150
151pub(crate) struct TransportDelegate {
152 log_handlers: LogHandlers,
153 pub(crate) pending_requests: Arc<Mutex<PendingRequests>>,
154 pub(crate) transport: Mutex<Box<dyn Transport>>,
155 pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
156 tasks: Mutex<Vec<Task<()>>>,
157}
158
159impl TransportDelegate {
160 pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
161 let log_handlers: LogHandlers = Default::default();
162 let transport = start(binary, log_handlers.clone(), cx).await?;
163 Ok(Self {
164 transport: Mutex::new(transport),
165 log_handlers,
166 server_tx: Default::default(),
167 pending_requests: Arc::new(Mutex::new(PendingRequests::new())),
168 tasks: Default::default(),
169 })
170 }
171
172 pub async fn connect(
173 &self,
174 message_handler: DapMessageHandler,
175 cx: &mut AsyncApp,
176 ) -> Result<()> {
177 let (server_tx, client_rx) = unbounded::<Message>();
178 self.tasks.lock().clear();
179
180 let log_dap_communications =
181 cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications);
182
183 let connect = self.transport.lock().connect();
184 let (input, output) = connect.await?;
185
186 let log_handler = if log_dap_communications {
187 Some(self.log_handlers.clone())
188 } else {
189 None
190 };
191
192 let pending_requests = self.pending_requests.clone();
193 let output_log_handler = log_handler.clone();
194 {
195 let mut tasks = self.tasks.lock();
196 tasks.push(cx.background_spawn(async move {
197 match Self::recv_from_server(
198 output,
199 message_handler,
200 pending_requests.clone(),
201 output_log_handler,
202 )
203 .await
204 {
205 Ok(()) => {
206 pending_requests
207 .lock()
208 .flush(anyhow!("debugger shutdown unexpectedly"));
209 }
210 Err(e) => {
211 pending_requests.lock().flush(e);
212 }
213 }
214 }));
215
216 tasks.push(cx.background_spawn(async move {
217 match Self::send_to_server(input, client_rx, log_handler).await {
218 Ok(()) => {}
219 Err(e) => log::error!("Error handling debugger input: {e}"),
220 }
221 }));
222 }
223
224 *self.server_tx.lock().await = Some(server_tx.clone());
225
226 Ok(())
227 }
228
229 pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
230 self.transport.lock().tcp_arguments()
231 }
232
233 pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
234 if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
235 server_tx.send(message).await.context("sending message")
236 } else {
237 anyhow::bail!("Server tx already dropped")
238 }
239 }
240
241 async fn handle_adapter_log(
242 stdout: impl AsyncRead + Unpin + Send + 'static,
243 iokind: IoKind,
244 log_handlers: LogHandlers,
245 ) {
246 let mut reader = BufReader::new(stdout);
247 let mut line = String::new();
248
249 loop {
250 line.truncate(0);
251
252 match reader.read_line(&mut line).await {
253 Ok(0) => break,
254 Ok(_) => {}
255 Err(e) => {
256 log::debug!("handle_adapter_log: {}", e);
257 break;
258 }
259 }
260
261 // Clean up logs by trimming unnecessary whitespace/newlines before inserting into log.
262 let line = line.trim();
263
264 log::debug!("stderr: {line}");
265
266 for (kind, handler) in log_handlers.lock().iter_mut() {
267 if matches!(kind, LogKind::Adapter) {
268 handler(iokind, None, line);
269 }
270 }
271 }
272 }
273
274 fn build_rpc_message(message: String) -> String {
275 format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
276 }
277
278 async fn send_to_server<Stdin>(
279 mut server_stdin: Stdin,
280 client_rx: Receiver<Message>,
281 log_handlers: Option<LogHandlers>,
282 ) -> Result<()>
283 where
284 Stdin: AsyncWrite + Unpin + Send + 'static,
285 {
286 let result = loop {
287 match client_rx.recv().await {
288 Ok(message) => {
289 let command = match &message {
290 Message::Request(request) => Some(request.command.as_str()),
291 Message::Response(response) => Some(response.command.as_str()),
292 _ => None,
293 };
294
295 let message = match serde_json::to_string(&message) {
296 Ok(message) => message,
297 Err(e) => break Err(e.into()),
298 };
299
300 if let Some(log_handlers) = log_handlers.as_ref() {
301 for (kind, log_handler) in log_handlers.lock().iter_mut() {
302 if matches!(kind, LogKind::Rpc) {
303 log_handler(IoKind::StdIn, command, &message);
304 }
305 }
306 }
307
308 if let Err(e) = server_stdin
309 .write_all(Self::build_rpc_message(message).as_bytes())
310 .await
311 {
312 break Err(e.into());
313 }
314
315 if let Err(e) = server_stdin.flush().await {
316 break Err(e.into());
317 }
318 }
319 Err(error) => break Err(error.into()),
320 }
321 };
322
323 log::debug!("Handle adapter input dropped");
324
325 result
326 }
327
328 async fn recv_from_server<Stdout>(
329 server_stdout: Stdout,
330 mut message_handler: DapMessageHandler,
331 pending_requests: Arc<Mutex<PendingRequests>>,
332 log_handlers: Option<LogHandlers>,
333 ) -> Result<()>
334 where
335 Stdout: AsyncRead + Unpin + Send + 'static,
336 {
337 let mut recv_buffer = String::new();
338 let mut reader = BufReader::new(server_stdout);
339
340 let result = loop {
341 let result =
342 Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
343 .await;
344 match result {
345 ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
346 ConnectionResult::ConnectionReset => {
347 log::info!("Debugger closed the connection");
348 return Ok(());
349 }
350 ConnectionResult::Result(Ok(Message::Response(res))) => {
351 let tx = pending_requests.lock().remove(res.request_seq)?;
352 if let Some(tx) = tx {
353 if let Err(e) = tx.send(Self::process_response(res)) {
354 log::trace!("Did not send response `{:?}` for a cancelled", e);
355 }
356 } else {
357 message_handler(Message::Response(res))
358 }
359 }
360 ConnectionResult::Result(Ok(message)) => message_handler(message),
361 ConnectionResult::Result(Err(e)) => break Err(e),
362 }
363 };
364
365 log::debug!("Handle adapter output dropped");
366
367 result
368 }
369
370 fn process_response(response: Response) -> Result<Response> {
371 if response.success {
372 Ok(response)
373 } else {
374 if let Some(error_message) = response
375 .body
376 .clone()
377 .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
378 .and_then(|response| response.error.map(|msg| msg.format))
379 .or_else(|| response.message.clone())
380 {
381 anyhow::bail!(error_message);
382 };
383
384 anyhow::bail!(
385 "Received error response from adapter. Response: {:?}",
386 response
387 );
388 }
389 }
390
391 async fn receive_server_message<Stdout>(
392 reader: &mut BufReader<Stdout>,
393 buffer: &mut String,
394 log_handlers: Option<&LogHandlers>,
395 ) -> ConnectionResult<Message>
396 where
397 Stdout: AsyncRead + Unpin + Send + 'static,
398 {
399 let mut content_length = None;
400 loop {
401 buffer.truncate(0);
402 match reader.read_line(buffer).await {
403 Ok(0) => return ConnectionResult::ConnectionReset,
404 Ok(_) => {}
405 Err(e) => return ConnectionResult::Result(Err(e.into())),
406 };
407
408 if buffer == "\r\n" {
409 break;
410 }
411
412 if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
413 match value.parse().context("invalid content length") {
414 Ok(length) => content_length = Some(length),
415 Err(e) => return ConnectionResult::Result(Err(e)),
416 }
417 }
418 }
419
420 let content_length = match content_length.context("missing content length") {
421 Ok(length) => length,
422 Err(e) => return ConnectionResult::Result(Err(e)),
423 };
424
425 let mut content = vec![0; content_length];
426 if let Err(e) = reader
427 .read_exact(&mut content)
428 .await
429 .with_context(|| "reading after a loop")
430 {
431 return ConnectionResult::Result(Err(e));
432 }
433
434 let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
435 Ok(str) => str,
436 Err(e) => return ConnectionResult::Result(Err(e)),
437 };
438
439 let message =
440 serde_json::from_str::<Message>(message_str).context("deserializing server message");
441
442 if let Some(log_handlers) = log_handlers {
443 let command = match &message {
444 Ok(Message::Request(request)) => Some(request.command.as_str()),
445 Ok(Message::Response(response)) => Some(response.command.as_str()),
446 _ => None,
447 };
448
449 for (kind, log_handler) in log_handlers.lock().iter_mut() {
450 if matches!(kind, LogKind::Rpc) {
451 log_handler(IoKind::StdOut, command, message_str);
452 }
453 }
454 }
455
456 ConnectionResult::Result(message)
457 }
458
459 pub fn has_adapter_logs(&self) -> bool {
460 self.transport.lock().has_adapter_logs()
461 }
462
463 pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
464 where
465 F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
466 {
467 let mut log_handlers = self.log_handlers.lock();
468 log_handlers.push((kind, Box::new(f)));
469 }
470}
471
472pub struct TcpTransport {
473 executor: BackgroundExecutor,
474 pub port: u16,
475 pub host: Ipv4Addr,
476 pub timeout: u64,
477 process: Arc<Mutex<Option<Child>>>,
478 _stderr_task: Option<Task<()>>,
479 _stdout_task: Option<Task<()>>,
480}
481
482impl TcpTransport {
483 /// Get an open port to use with the tcp client when not supplied by debug config
484 pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
485 if let Some(port) = host.port {
486 Ok(port)
487 } else {
488 Self::unused_port(host.host()).await
489 }
490 }
491
492 pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
493 Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
494 .await?
495 .local_addr()?
496 .port())
497 }
498
499 async fn start(
500 binary: &DebugAdapterBinary,
501 log_handlers: LogHandlers,
502 cx: &mut AsyncApp,
503 ) -> Result<Self> {
504 let connection_args = binary
505 .connection
506 .as_ref()
507 .context("No connection arguments provided")?;
508
509 let host = connection_args.host;
510 let port = connection_args.port;
511
512 let mut process = None;
513 let mut stdout_task = None;
514 let mut stderr_task = None;
515
516 if let Some(command) = &binary.command {
517 let mut command = util::command::new_std_command(&command);
518
519 if let Some(cwd) = &binary.cwd {
520 command.current_dir(cwd);
521 }
522
523 command.args(&binary.arguments);
524 command.envs(&binary.envs);
525
526 let mut p = Child::spawn(command, Stdio::null(), Stdio::piped(), Stdio::piped())
527 .with_context(|| "failed to start debug adapter.")?;
528
529 stdout_task = p.stdout.take().map(|stdout| {
530 cx.background_executor()
531 .spawn(TransportDelegate::handle_adapter_log(
532 stdout,
533 IoKind::StdOut,
534 log_handlers.clone(),
535 ))
536 });
537 stderr_task = p.stderr.take().map(|stderr| {
538 cx.background_executor()
539 .spawn(TransportDelegate::handle_adapter_log(
540 stderr,
541 IoKind::StdErr,
542 log_handlers,
543 ))
544 });
545 process = Some(p);
546 };
547
548 let timeout = connection_args
549 .timeout
550 .unwrap_or_else(|| cx.update(|cx| DebuggerSettings::get_global(cx).timeout));
551
552 log::info!(
553 "Debug adapter has connected to TCP server {}:{}",
554 host,
555 port
556 );
557
558 let this = Self {
559 executor: cx.background_executor().clone(),
560 port,
561 host,
562 process: Arc::new(Mutex::new(process)),
563 timeout,
564 _stdout_task: stdout_task,
565 _stderr_task: stderr_task,
566 };
567
568 Ok(this)
569 }
570}
571
572impl Transport for TcpTransport {
573 fn has_adapter_logs(&self) -> bool {
574 true
575 }
576
577 fn kill(&mut self) {
578 if let Some(process) = &mut *self.process.lock() {
579 process.kill().log_err();
580 }
581 }
582
583 fn tcp_arguments(&self) -> Option<TcpArguments> {
584 Some(TcpArguments {
585 host: self.host,
586 port: self.port,
587 timeout: Some(self.timeout),
588 })
589 }
590
591 fn connect(
592 &mut self,
593 ) -> Task<
594 Result<(
595 Box<dyn AsyncWrite + Unpin + Send + 'static>,
596 Box<dyn AsyncRead + Unpin + Send + 'static>,
597 )>,
598 > {
599 let executor = self.executor.clone();
600 let timeout = self.timeout;
601 let address = SocketAddrV4::new(self.host, self.port);
602 let process = self.process.clone();
603 executor.clone().spawn(async move {
604 select! {
605 _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
606 anyhow::bail!("Connection to TCP DAP timeout {address}");
607 },
608 result = executor.clone().spawn(async move {
609 loop {
610 match TcpStream::connect(address).await {
611 Ok(stream) => {
612 let (read, write) = stream.split();
613 return Ok((Box::new(write) as _, Box::new(read) as _))
614 },
615 Err(_) => {
616 let has_process = process.lock().is_some();
617 if has_process {
618 let status = process.lock().as_mut().unwrap().try_status();
619 if let Ok(Some(_)) = status {
620 let process = process.lock().take().unwrap().into_inner();
621 let output = process.output().await?;
622 let output = if output.stderr.is_empty() {
623 String::from_utf8_lossy(&output.stdout).to_string()
624 } else {
625 String::from_utf8_lossy(&output.stderr).to_string()
626 };
627 anyhow::bail!("{output}\nerror: process exited before debugger attached.");
628 }
629 }
630
631 executor.timer(Duration::from_millis(100)).await;
632 }
633 }
634 }
635 }).fuse() => result
636 }
637 })
638 }
639}
640
641impl Drop for TcpTransport {
642 fn drop(&mut self) {
643 if let Some(mut p) = self.process.lock().take() {
644 p.kill().log_err();
645 }
646 }
647}
648
649pub struct StdioTransport {
650 process: Mutex<Child>,
651 _stderr_task: Option<Task<()>>,
652}
653
654impl StdioTransport {
655 // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
656 async fn start(
657 binary: &DebugAdapterBinary,
658 log_handlers: LogHandlers,
659 cx: &mut AsyncApp,
660 ) -> Result<Self> {
661 let Some(binary_command) = &binary.command else {
662 bail!(
663 "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
664 );
665 };
666 let mut command = util::command::new_std_command(&binary_command);
667
668 if let Some(cwd) = &binary.cwd {
669 command.current_dir(cwd);
670 }
671
672 command.args(&binary.arguments);
673 command.envs(&binary.envs);
674
675 let mut process = Child::spawn(command, Stdio::piped(), Stdio::piped(), Stdio::piped())?;
676
677 let _stderr_task = process.stderr.take().map(|stderr| {
678 cx.background_spawn(TransportDelegate::handle_adapter_log(
679 stderr,
680 IoKind::StdErr,
681 log_handlers,
682 ))
683 });
684
685 let process = Mutex::new(process);
686
687 Ok(Self {
688 process,
689 _stderr_task,
690 })
691 }
692}
693
694impl Transport for StdioTransport {
695 fn has_adapter_logs(&self) -> bool {
696 true
697 }
698
699 fn kill(&mut self) {
700 self.process.lock().kill().log_err();
701 }
702
703 fn connect(
704 &mut self,
705 ) -> Task<
706 Result<(
707 Box<dyn AsyncWrite + Unpin + Send + 'static>,
708 Box<dyn AsyncRead + Unpin + Send + 'static>,
709 )>,
710 > {
711 let result = util::maybe!({
712 let mut process = self.process.lock();
713 Ok((
714 Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
715 Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
716 ))
717 });
718 Task::ready(result)
719 }
720
721 fn tcp_arguments(&self) -> Option<TcpArguments> {
722 None
723 }
724}
725
726impl Drop for StdioTransport {
727 fn drop(&mut self) {
728 self.process.lock().kill().log_err();
729 }
730}
731
732#[cfg(any(test, feature = "test-support"))]
733type RequestHandler = Box<dyn Send + FnMut(u64, serde_json::Value) -> RequestHandling<Response>>;
734
735#[cfg(any(test, feature = "test-support"))]
736type ResponseHandler = Box<dyn Send + Fn(Response)>;
737
738#[cfg(any(test, feature = "test-support"))]
739pub struct FakeTransport {
740 // for sending fake response back from adapter side
741 request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
742 // for reverse request responses
743 response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
744 message_handler: Option<Task<Result<()>>>,
745 kind: FakeTransportKind,
746}
747
748#[cfg(any(test, feature = "test-support"))]
749pub enum FakeTransportKind {
750 Stdio {
751 stdin_writer: Option<PipeWriter>,
752 stdout_reader: Option<PipeReader>,
753 },
754 Tcp {
755 connection: TcpArguments,
756 executor: BackgroundExecutor,
757 },
758}
759
760#[cfg(any(test, feature = "test-support"))]
761impl FakeTransport {
762 pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
763 where
764 F: 'static
765 + Send
766 + FnMut(u64, R::Arguments) -> RequestHandling<Result<R::Response, ErrorResponse>>,
767 {
768 self.request_handlers.lock().insert(
769 R::COMMAND,
770 Box::new(move |seq, args| {
771 let result = handler(seq, serde_json::from_value(args).unwrap());
772 let RequestHandling::Respond(response) = result else {
773 return RequestHandling::Exit;
774 };
775 let response = match response {
776 Ok(response) => Response {
777 seq: seq + 1,
778 request_seq: seq,
779 success: true,
780 command: R::COMMAND.into(),
781 body: Some(serde_json::to_value(response).unwrap()),
782 message: None,
783 },
784 Err(response) => Response {
785 seq: seq + 1,
786 request_seq: seq,
787 success: false,
788 command: R::COMMAND.into(),
789 body: Some(serde_json::to_value(response).unwrap()),
790 message: None,
791 },
792 };
793 RequestHandling::Respond(response)
794 }),
795 );
796 }
797
798 pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
799 where
800 F: 'static + Send + Fn(Response),
801 {
802 self.response_handlers
803 .lock()
804 .insert(R::COMMAND, Box::new(handler));
805 }
806
807 async fn start_tcp(connection: TcpArguments, cx: &mut AsyncApp) -> Result<Self> {
808 Ok(Self {
809 request_handlers: Arc::new(Mutex::new(HashMap::default())),
810 response_handlers: Arc::new(Mutex::new(HashMap::default())),
811 message_handler: None,
812 kind: FakeTransportKind::Tcp {
813 connection,
814 executor: cx.background_executor().clone(),
815 },
816 })
817 }
818
819 async fn handle_messages(
820 request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
821 response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
822 stdin_reader: PipeReader,
823 stdout_writer: PipeWriter,
824 ) -> Result<()> {
825 use dap_types::requests::{Request, RunInTerminal, StartDebugging};
826 use serde_json::json;
827
828 let mut reader = BufReader::new(stdin_reader);
829 let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
830 let mut buffer = String::new();
831
832 loop {
833 match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None).await {
834 ConnectionResult::Timeout => {
835 anyhow::bail!("Timed out when connecting to debugger");
836 }
837 ConnectionResult::ConnectionReset => {
838 log::info!("Debugger closed the connection");
839 break Ok(());
840 }
841 ConnectionResult::Result(Err(e)) => break Err(e),
842 ConnectionResult::Result(Ok(message)) => {
843 match message {
844 Message::Request(request) => {
845 // redirect reverse requests to stdout writer/reader
846 if request.command == RunInTerminal::COMMAND
847 || request.command == StartDebugging::COMMAND
848 {
849 let message =
850 serde_json::to_string(&Message::Request(request)).unwrap();
851
852 let mut writer = stdout_writer.lock().await;
853 writer
854 .write_all(
855 TransportDelegate::build_rpc_message(message).as_bytes(),
856 )
857 .await
858 .unwrap();
859 writer.flush().await.unwrap();
860 } else {
861 let response = if let Some(handle) =
862 request_handlers.lock().get_mut(request.command.as_str())
863 {
864 handle(request.seq, request.arguments.unwrap_or(json!({})))
865 } else {
866 panic!("No request handler for {}", request.command);
867 };
868 let response = match response {
869 RequestHandling::Respond(response) => response,
870 RequestHandling::Exit => {
871 break Err(anyhow!("exit in response to request"));
872 }
873 };
874 let success = response.success;
875 let message =
876 serde_json::to_string(&Message::Response(response)).unwrap();
877
878 let mut writer = stdout_writer.lock().await;
879 writer
880 .write_all(
881 TransportDelegate::build_rpc_message(message).as_bytes(),
882 )
883 .await
884 .unwrap();
885
886 if request.command == dap_types::requests::Initialize::COMMAND
887 && success
888 {
889 let message = serde_json::to_string(&Message::Event(Box::new(
890 dap_types::messages::Events::Initialized(Some(
891 Default::default(),
892 )),
893 )))
894 .unwrap();
895 writer
896 .write_all(
897 TransportDelegate::build_rpc_message(message)
898 .as_bytes(),
899 )
900 .await
901 .unwrap();
902 }
903
904 writer.flush().await.unwrap();
905 }
906 }
907 Message::Event(event) => {
908 let message = serde_json::to_string(&Message::Event(event)).unwrap();
909
910 let mut writer = stdout_writer.lock().await;
911 writer
912 .write_all(TransportDelegate::build_rpc_message(message).as_bytes())
913 .await
914 .unwrap();
915 writer.flush().await.unwrap();
916 }
917 Message::Response(response) => {
918 if let Some(handle) =
919 response_handlers.lock().get(response.command.as_str())
920 {
921 handle(response);
922 } else {
923 log::error!("No response handler for {}", response.command);
924 }
925 }
926 }
927 }
928 }
929 }
930 }
931
932 async fn start_stdio(cx: &mut AsyncApp) -> Result<Self> {
933 let (stdin_writer, stdin_reader) = async_pipe::pipe();
934 let (stdout_writer, stdout_reader) = async_pipe::pipe();
935 let kind = FakeTransportKind::Stdio {
936 stdin_writer: Some(stdin_writer),
937 stdout_reader: Some(stdout_reader),
938 };
939
940 let mut this = Self {
941 request_handlers: Arc::new(Mutex::new(HashMap::default())),
942 response_handlers: Arc::new(Mutex::new(HashMap::default())),
943 message_handler: None,
944 kind,
945 };
946
947 let request_handlers = this.request_handlers.clone();
948 let response_handlers = this.response_handlers.clone();
949
950 this.message_handler = Some(cx.background_spawn(Self::handle_messages(
951 request_handlers,
952 response_handlers,
953 stdin_reader,
954 stdout_writer,
955 )));
956
957 Ok(this)
958 }
959}
960
961#[cfg(any(test, feature = "test-support"))]
962impl Transport for FakeTransport {
963 fn tcp_arguments(&self) -> Option<TcpArguments> {
964 match &self.kind {
965 FakeTransportKind::Stdio { .. } => None,
966 FakeTransportKind::Tcp { connection, .. } => Some(connection.clone()),
967 }
968 }
969
970 fn connect(
971 &mut self,
972 ) -> Task<
973 Result<(
974 Box<dyn AsyncWrite + Unpin + Send + 'static>,
975 Box<dyn AsyncRead + Unpin + Send + 'static>,
976 )>,
977 > {
978 let result = match &mut self.kind {
979 FakeTransportKind::Stdio {
980 stdin_writer,
981 stdout_reader,
982 } => util::maybe!({
983 Ok((
984 Box::new(stdin_writer.take().context("Cannot reconnect")?) as _,
985 Box::new(stdout_reader.take().context("Cannot reconnect")?) as _,
986 ))
987 }),
988 FakeTransportKind::Tcp { executor, .. } => {
989 let (stdin_writer, stdin_reader) = async_pipe::pipe();
990 let (stdout_writer, stdout_reader) = async_pipe::pipe();
991
992 let request_handlers = self.request_handlers.clone();
993 let response_handlers = self.response_handlers.clone();
994
995 self.message_handler = Some(executor.spawn(Self::handle_messages(
996 request_handlers,
997 response_handlers,
998 stdin_reader,
999 stdout_writer,
1000 )));
1001
1002 Ok((Box::new(stdin_writer) as _, Box::new(stdout_reader) as _))
1003 }
1004 };
1005 Task::ready(result)
1006 }
1007
1008 fn has_adapter_logs(&self) -> bool {
1009 false
1010 }
1011
1012 fn kill(&mut self) {
1013 self.message_handler.take();
1014 }
1015
1016 #[cfg(any(test, feature = "test-support"))]
1017 fn as_fake(&self) -> &FakeTransport {
1018 self
1019 }
1020}