transport.rs

  1use anyhow::{Context as _, Result, anyhow, bail};
  2use dap_types::{
  3    ErrorResponse,
  4    messages::{Message, Response},
  5};
  6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  7use gpui::{AppContext as _, AsyncApp, Task};
  8use settings::Settings as _;
  9use smallvec::SmallVec;
 10use smol::{
 11    channel::{Receiver, Sender, unbounded},
 12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 13    lock::Mutex,
 14    net::{TcpListener, TcpStream},
 15    process::Child,
 16};
 17use std::{
 18    collections::HashMap,
 19    net::{Ipv4Addr, SocketAddrV4},
 20    process::Stdio,
 21    sync::Arc,
 22    time::Duration,
 23};
 24use task::TcpArgumentsTemplate;
 25use util::{ConnectionResult, ResultExt as _};
 26
 27use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 28
 29pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 30
 31#[derive(PartialEq, Eq, Clone, Copy)]
 32pub enum LogKind {
 33    Adapter,
 34    Rpc,
 35}
 36
 37pub enum IoKind {
 38    StdIn,
 39    StdOut,
 40    StdErr,
 41}
 42
 43pub struct TransportPipe {
 44    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 45    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 46    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 47    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 48}
 49
 50impl TransportPipe {
 51    pub fn new(
 52        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 53        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 54        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 55        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 56    ) -> Self {
 57        TransportPipe {
 58            input,
 59            output,
 60            stdout,
 61            stderr,
 62        }
 63    }
 64}
 65
 66type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 67type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 68
 69pub enum Transport {
 70    Stdio(StdioTransport),
 71    Tcp(TcpTransport),
 72    #[cfg(any(test, feature = "test-support"))]
 73    Fake(FakeTransport),
 74}
 75
 76impl Transport {
 77    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 78        #[cfg(any(test, feature = "test-support"))]
 79        if cfg!(any(test, feature = "test-support")) {
 80            return FakeTransport::start(cx)
 81                .await
 82                .map(|(transports, fake)| (transports, Self::Fake(fake)));
 83        }
 84
 85        if binary.connection.is_some() {
 86            TcpTransport::start(binary, cx)
 87                .await
 88                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
 89        } else {
 90            StdioTransport::start(binary, cx)
 91                .await
 92                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
 93        }
 94    }
 95
 96    fn has_adapter_logs(&self) -> bool {
 97        match self {
 98            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
 99            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
100            #[cfg(any(test, feature = "test-support"))]
101            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
102        }
103    }
104
105    async fn kill(&self) -> Result<()> {
106        match self {
107            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
108            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
109            #[cfg(any(test, feature = "test-support"))]
110            Transport::Fake(fake_transport) => fake_transport.kill().await,
111        }
112    }
113
114    #[cfg(any(test, feature = "test-support"))]
115    pub(crate) fn as_fake(&self) -> &FakeTransport {
116        match self {
117            Transport::Fake(fake_transport) => fake_transport,
118            _ => panic!("Not a fake transport layer"),
119        }
120    }
121}
122
123pub(crate) struct TransportDelegate {
124    log_handlers: LogHandlers,
125    current_requests: Requests,
126    pending_requests: Requests,
127    transport: Transport,
128    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
129    _tasks: Vec<Task<()>>,
130}
131
132impl TransportDelegate {
133    pub(crate) async fn start(
134        binary: &DebugAdapterBinary,
135        cx: AsyncApp,
136    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
137        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
138        let mut this = Self {
139            transport,
140            server_tx: Default::default(),
141            log_handlers: Default::default(),
142            current_requests: Default::default(),
143            pending_requests: Default::default(),
144            _tasks: Vec::new(),
145        };
146        let messages = this.start_handlers(transport_pipes, cx).await?;
147        Ok((messages, this))
148    }
149
150    async fn start_handlers(
151        &mut self,
152        mut params: TransportPipe,
153        cx: AsyncApp,
154    ) -> Result<(Receiver<Message>, Sender<Message>)> {
155        let (client_tx, server_rx) = unbounded::<Message>();
156        let (server_tx, client_rx) = unbounded::<Message>();
157
158        let log_dap_communications =
159            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
160                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
161                .unwrap_or(false);
162
163        let log_handler = if log_dap_communications {
164            Some(self.log_handlers.clone())
165        } else {
166            None
167        };
168
169        let adapter_log_handler = log_handler.clone();
170        cx.update(|cx| {
171            if let Some(stdout) = params.stdout.take() {
172                self._tasks.push(cx.background_spawn(async move {
173                    match Self::handle_adapter_log(stdout, adapter_log_handler).await {
174                        ConnectionResult::Timeout => {
175                            log::error!("Timed out when handling debugger log");
176                        }
177                        ConnectionResult::ConnectionReset => {
178                            log::info!("Debugger logs connection closed");
179                        }
180                        ConnectionResult::Result(Ok(())) => {}
181                        ConnectionResult::Result(Err(e)) => {
182                            log::error!("Error handling debugger log: {e}");
183                        }
184                    }
185                }));
186            }
187
188            let pending_requests = self.pending_requests.clone();
189            let output_log_handler = log_handler.clone();
190            self._tasks.push(cx.background_spawn(async move {
191                match Self::handle_output(
192                    params.output,
193                    client_tx,
194                    pending_requests.clone(),
195                    output_log_handler,
196                )
197                .await
198                {
199                    Ok(()) => {}
200                    Err(e) => log::error!("Error handling debugger output: {e}"),
201                }
202                let mut pending_requests = pending_requests.lock().await;
203                pending_requests.drain().for_each(|(_, request)| {
204                    request
205                        .send(Err(anyhow!("debugger shutdown unexpectedly")))
206                        .ok();
207                });
208            }));
209
210            if let Some(stderr) = params.stderr.take() {
211                let log_handlers = self.log_handlers.clone();
212                self._tasks.push(cx.background_spawn(async move {
213                    match Self::handle_error(stderr, log_handlers).await {
214                        ConnectionResult::Timeout => {
215                            log::error!("Timed out reading debugger error stream")
216                        }
217                        ConnectionResult::ConnectionReset => {
218                            log::info!("Debugger closed its error stream")
219                        }
220                        ConnectionResult::Result(Ok(())) => {}
221                        ConnectionResult::Result(Err(e)) => {
222                            log::error!("Error handling debugger error: {e}")
223                        }
224                    }
225                }));
226            }
227
228            let current_requests = self.current_requests.clone();
229            let pending_requests = self.pending_requests.clone();
230            let log_handler = log_handler.clone();
231            self._tasks.push(cx.background_spawn(async move {
232                match Self::handle_input(
233                    params.input,
234                    client_rx,
235                    current_requests,
236                    pending_requests,
237                    log_handler,
238                )
239                .await
240                {
241                    Ok(()) => {}
242                    Err(e) => log::error!("Error handling debugger input: {e}"),
243                }
244            }));
245        })?;
246
247        {
248            let mut lock = self.server_tx.lock().await;
249            *lock = Some(server_tx.clone());
250        }
251
252        Ok((server_rx, server_tx))
253    }
254
255    pub(crate) async fn add_pending_request(
256        &self,
257        sequence_id: u64,
258        request: oneshot::Sender<Result<Response>>,
259    ) {
260        let mut pending_requests = self.pending_requests.lock().await;
261        pending_requests.insert(sequence_id, request);
262    }
263
264    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
265        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
266            server_tx.send(message).await.context("sending message")
267        } else {
268            anyhow::bail!("Server tx already dropped")
269        }
270    }
271
272    async fn handle_adapter_log<Stdout>(
273        stdout: Stdout,
274        log_handlers: Option<LogHandlers>,
275    ) -> ConnectionResult<()>
276    where
277        Stdout: AsyncRead + Unpin + Send + 'static,
278    {
279        let mut reader = BufReader::new(stdout);
280        let mut line = String::new();
281
282        let result = loop {
283            line.truncate(0);
284
285            match reader
286                .read_line(&mut line)
287                .await
288                .context("reading adapter log line")
289            {
290                Ok(0) => break ConnectionResult::ConnectionReset,
291                Ok(_) => {}
292                Err(e) => break ConnectionResult::Result(Err(e)),
293            }
294
295            if let Some(log_handlers) = log_handlers.as_ref() {
296                for (kind, handler) in log_handlers.lock().iter_mut() {
297                    if matches!(kind, LogKind::Adapter) {
298                        handler(IoKind::StdOut, line.as_str());
299                    }
300                }
301            }
302        };
303
304        log::debug!("Handle adapter log dropped");
305
306        result
307    }
308
309    fn build_rpc_message(message: String) -> String {
310        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
311    }
312
313    async fn handle_input<Stdin>(
314        mut server_stdin: Stdin,
315        client_rx: Receiver<Message>,
316        current_requests: Requests,
317        pending_requests: Requests,
318        log_handlers: Option<LogHandlers>,
319    ) -> Result<()>
320    where
321        Stdin: AsyncWrite + Unpin + Send + 'static,
322    {
323        let result = loop {
324            match client_rx.recv().await {
325                Ok(message) => {
326                    if let Message::Request(request) = &message {
327                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
328                            pending_requests.lock().await.insert(request.seq, sender);
329                        }
330                    }
331
332                    let message = match serde_json::to_string(&message) {
333                        Ok(message) => message,
334                        Err(e) => break Err(e.into()),
335                    };
336
337                    if let Some(log_handlers) = log_handlers.as_ref() {
338                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
339                            if matches!(kind, LogKind::Rpc) {
340                                log_handler(IoKind::StdIn, &message);
341                            }
342                        }
343                    }
344
345                    if let Err(e) = server_stdin
346                        .write_all(Self::build_rpc_message(message).as_bytes())
347                        .await
348                    {
349                        break Err(e.into());
350                    }
351
352                    if let Err(e) = server_stdin.flush().await {
353                        break Err(e.into());
354                    }
355                }
356                Err(error) => break Err(error.into()),
357            }
358        };
359
360        log::debug!("Handle adapter input dropped");
361
362        result
363    }
364
365    async fn handle_output<Stdout>(
366        server_stdout: Stdout,
367        client_tx: Sender<Message>,
368        pending_requests: Requests,
369        log_handlers: Option<LogHandlers>,
370    ) -> Result<()>
371    where
372        Stdout: AsyncRead + Unpin + Send + 'static,
373    {
374        let mut recv_buffer = String::new();
375        let mut reader = BufReader::new(server_stdout);
376
377        let result = loop {
378            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
379                .await
380            {
381                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
382                ConnectionResult::ConnectionReset => {
383                    log::info!("Debugger closed the connection");
384                    return Ok(());
385                }
386                ConnectionResult::Result(Ok(Message::Response(res))) => {
387                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
388                        if let Err(e) = tx.send(Self::process_response(res)) {
389                            log::trace!("Did not send response `{:?}` for a cancelled", e);
390                        }
391                    } else {
392                        client_tx.send(Message::Response(res)).await?;
393                    }
394                }
395                ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
396                ConnectionResult::Result(Err(e)) => break Err(e),
397            }
398        };
399
400        drop(client_tx);
401        log::debug!("Handle adapter output dropped");
402
403        result
404    }
405
406    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
407    where
408        Stderr: AsyncRead + Unpin + Send + 'static,
409    {
410        log::debug!("Handle error started");
411        let mut buffer = String::new();
412
413        let mut reader = BufReader::new(stderr);
414
415        let result = loop {
416            match reader
417                .read_line(&mut buffer)
418                .await
419                .context("reading error log line")
420            {
421                Ok(0) => break ConnectionResult::ConnectionReset,
422                Ok(_) => {
423                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
424                        if matches!(kind, LogKind::Adapter) {
425                            log_handler(IoKind::StdErr, buffer.as_str());
426                        }
427                    }
428
429                    buffer.truncate(0);
430                }
431                Err(error) => break ConnectionResult::Result(Err(error)),
432            }
433        };
434
435        log::debug!("Handle adapter error dropped");
436
437        result
438    }
439
440    fn process_response(response: Response) -> Result<Response> {
441        if response.success {
442            Ok(response)
443        } else {
444            if let Some(error_message) = response
445                .body
446                .clone()
447                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
448                .and_then(|response| response.error.map(|msg| msg.format))
449                .or_else(|| response.message.clone())
450            {
451                anyhow::bail!(error_message);
452            };
453
454            anyhow::bail!(
455                "Received error response from adapter. Response: {:?}",
456                response
457            );
458        }
459    }
460
461    async fn receive_server_message<Stdout>(
462        reader: &mut BufReader<Stdout>,
463        buffer: &mut String,
464        log_handlers: Option<&LogHandlers>,
465    ) -> ConnectionResult<Message>
466    where
467        Stdout: AsyncRead + Unpin + Send + 'static,
468    {
469        let mut content_length = None;
470        loop {
471            buffer.truncate(0);
472
473            match reader
474                .read_line(buffer)
475                .await
476                .with_context(|| "reading a message from server")
477            {
478                Ok(0) => return ConnectionResult::ConnectionReset,
479                Ok(_) => {}
480                Err(e) => return ConnectionResult::Result(Err(e)),
481            };
482
483            if buffer == "\r\n" {
484                break;
485            }
486
487            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
488                match value.parse().context("invalid content length") {
489                    Ok(length) => content_length = Some(length),
490                    Err(e) => return ConnectionResult::Result(Err(e)),
491                }
492            }
493        }
494
495        let content_length = match content_length.context("missing content length") {
496            Ok(length) => length,
497            Err(e) => return ConnectionResult::Result(Err(e)),
498        };
499
500        let mut content = vec![0; content_length];
501        if let Err(e) = reader
502            .read_exact(&mut content)
503            .await
504            .with_context(|| "reading after a loop")
505        {
506            return ConnectionResult::Result(Err(e));
507        }
508
509        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
510            Ok(str) => str,
511            Err(e) => return ConnectionResult::Result(Err(e)),
512        };
513
514        if let Some(log_handlers) = log_handlers {
515            for (kind, log_handler) in log_handlers.lock().iter_mut() {
516                if matches!(kind, LogKind::Rpc) {
517                    log_handler(IoKind::StdOut, message_str);
518                }
519            }
520        }
521
522        ConnectionResult::Result(
523            serde_json::from_str::<Message>(message_str).context("deserializing server message"),
524        )
525    }
526
527    pub async fn shutdown(&self) -> Result<()> {
528        log::debug!("Start shutdown client");
529
530        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
531            server_tx.close();
532        }
533
534        let mut current_requests = self.current_requests.lock().await;
535        let mut pending_requests = self.pending_requests.lock().await;
536
537        current_requests.clear();
538        pending_requests.clear();
539
540        let _ = self.transport.kill().await.log_err();
541
542        drop(current_requests);
543        drop(pending_requests);
544
545        log::debug!("Shutdown client completed");
546
547        anyhow::Ok(())
548    }
549
550    pub fn has_adapter_logs(&self) -> bool {
551        self.transport.has_adapter_logs()
552    }
553
554    pub fn transport(&self) -> &Transport {
555        &self.transport
556    }
557
558    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
559    where
560        F: 'static + Send + FnMut(IoKind, &str),
561    {
562        let mut log_handlers = self.log_handlers.lock();
563        log_handlers.push((kind, Box::new(f)));
564    }
565}
566
567pub struct TcpTransport {
568    pub port: u16,
569    pub host: Ipv4Addr,
570    pub timeout: u64,
571    process: Mutex<Child>,
572}
573
574impl TcpTransport {
575    /// Get an open port to use with the tcp client when not supplied by debug config
576    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
577        if let Some(port) = host.port {
578            Ok(port)
579        } else {
580            Self::unused_port(host.host()).await
581        }
582    }
583
584    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
585        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
586            .await?
587            .local_addr()?
588            .port())
589    }
590
591    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
592        let connection_args = binary
593            .connection
594            .as_ref()
595            .context("No connection arguments provided")?;
596
597        let host = connection_args.host;
598        let port = connection_args.port;
599
600        let mut command = util::command::new_std_command(&binary.command);
601        util::set_pre_exec_to_start_new_session(&mut command);
602        let mut command = smol::process::Command::from(command);
603
604        if let Some(cwd) = &binary.cwd {
605            command.current_dir(cwd);
606        }
607
608        command.args(&binary.arguments);
609        command.envs(&binary.envs);
610
611        command
612            .stdin(Stdio::null())
613            .stdout(Stdio::piped())
614            .stderr(Stdio::piped())
615            .kill_on_drop(true);
616
617        let mut process = command
618            .spawn()
619            .with_context(|| "failed to start debug adapter.")?;
620
621        let address = SocketAddrV4::new(host, port);
622
623        let timeout = connection_args.timeout.unwrap_or_else(|| {
624            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
625                .unwrap_or(2000u64)
626        });
627
628        let (mut process, (rx, tx)) = select! {
629            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
630                anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
631            },
632            result = cx.spawn(async move |cx| {
633                loop {
634                    match TcpStream::connect(address).await {
635                        Ok(stream) => return Ok((process, stream.split())),
636                        Err(_) => {
637                            if let Ok(Some(_)) = process.try_status() {
638                                let output = process.output().await?;
639                                let output = if output.stderr.is_empty() {
640                                    String::from_utf8_lossy(&output.stdout).to_string()
641                                } else {
642                                    String::from_utf8_lossy(&output.stderr).to_string()
643                                };
644                                anyhow::bail!("{output}\nerror: process exited before debugger attached.");
645                            }
646                            cx.background_executor().timer(Duration::from_millis(100)).await;
647                        }
648                    }
649                }
650            }).fuse() => result?
651        };
652
653        log::info!(
654            "Debug adapter has connected to TCP server {}:{}",
655            host,
656            port
657        );
658        let stdout = process.stdout.take();
659        let stderr = process.stderr.take();
660
661        let this = Self {
662            port,
663            host,
664            process: Mutex::new(process),
665            timeout,
666        };
667
668        let pipe = TransportPipe::new(
669            Box::new(tx),
670            Box::new(BufReader::new(rx)),
671            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
672            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
673        );
674
675        Ok((pipe, this))
676    }
677
678    fn has_adapter_logs(&self) -> bool {
679        true
680    }
681
682    async fn kill(&self) -> Result<()> {
683        self.process.lock().await.kill()?;
684
685        Ok(())
686    }
687}
688
689pub struct StdioTransport {
690    process: Mutex<Child>,
691}
692
693impl StdioTransport {
694    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
695    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
696        let mut command = util::command::new_std_command(&binary.command);
697        util::set_pre_exec_to_start_new_session(&mut command);
698        let mut command = smol::process::Command::from(command);
699
700        if let Some(cwd) = &binary.cwd {
701            command.current_dir(cwd);
702        }
703
704        command.args(&binary.arguments);
705        command.envs(&binary.envs);
706
707        command
708            .stdin(Stdio::piped())
709            .stdout(Stdio::piped())
710            .stderr(Stdio::piped())
711            .kill_on_drop(true);
712
713        let mut process = command.spawn().with_context(|| {
714            format!(
715                "failed to spawn command `{} {}`.",
716                binary.command,
717                binary.arguments.join(" ")
718            )
719        })?;
720
721        let stdin = process.stdin.take().context("Failed to open stdin")?;
722        let stdout = process.stdout.take().context("Failed to open stdout")?;
723        let stderr = process
724            .stderr
725            .take()
726            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
727
728        if stderr.is_none() {
729            bail!(
730                "Failed to connect to stderr for debug adapter command {}",
731                &binary.command
732            );
733        }
734
735        log::info!("Debug adapter has connected to stdio adapter");
736
737        let process = Mutex::new(process);
738
739        Ok((
740            TransportPipe::new(
741                Box::new(stdin),
742                Box::new(BufReader::new(stdout)),
743                None,
744                stderr,
745            ),
746            Self { process },
747        ))
748    }
749
750    fn has_adapter_logs(&self) -> bool {
751        false
752    }
753
754    async fn kill(&self) -> Result<()> {
755        self.process.lock().await.kill()?;
756        Ok(())
757    }
758}
759
760#[cfg(any(test, feature = "test-support"))]
761type RequestHandler =
762    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
763
764#[cfg(any(test, feature = "test-support"))]
765type ResponseHandler = Box<dyn Send + Fn(Response)>;
766
767#[cfg(any(test, feature = "test-support"))]
768pub struct FakeTransport {
769    // for sending fake response back from adapter side
770    request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
771    // for reverse request responses
772    response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
773}
774
775#[cfg(any(test, feature = "test-support"))]
776impl FakeTransport {
777    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
778    where
779        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
780    {
781        self.request_handlers.lock().insert(
782            R::COMMAND,
783            Box::new(move |seq, args| {
784                let result = handler(seq, serde_json::from_value(args).unwrap());
785                let response = match result {
786                    Ok(response) => Response {
787                        seq: seq + 1,
788                        request_seq: seq,
789                        success: true,
790                        command: R::COMMAND.into(),
791                        body: Some(serde_json::to_value(response).unwrap()),
792                        message: None,
793                    },
794                    Err(response) => Response {
795                        seq: seq + 1,
796                        request_seq: seq,
797                        success: false,
798                        command: R::COMMAND.into(),
799                        body: Some(serde_json::to_value(response).unwrap()),
800                        message: None,
801                    },
802                };
803                response
804            }),
805        );
806    }
807
808    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
809    where
810        F: 'static + Send + Fn(Response),
811    {
812        self.response_handlers
813            .lock()
814            .insert(R::COMMAND, Box::new(handler));
815    }
816
817    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
818        let this = Self {
819            request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
820            response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
821        };
822        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
823        use serde_json::json;
824
825        let (stdin_writer, stdin_reader) = async_pipe::pipe();
826        let (stdout_writer, stdout_reader) = async_pipe::pipe();
827
828        let request_handlers = this.request_handlers.clone();
829        let response_handlers = this.response_handlers.clone();
830        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
831
832        cx.background_spawn(async move {
833            let mut reader = BufReader::new(stdin_reader);
834            let mut buffer = String::new();
835
836            loop {
837                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
838                    .await
839                {
840                    ConnectionResult::Timeout => {
841                        anyhow::bail!("Timed out when connecting to debugger");
842                    }
843                    ConnectionResult::ConnectionReset => {
844                        log::info!("Debugger closed the connection");
845                        break Ok(());
846                    }
847                    ConnectionResult::Result(Err(e)) => break Err(e),
848                    ConnectionResult::Result(Ok(message)) => {
849                        match message {
850                            Message::Request(request) => {
851                                // redirect reverse requests to stdout writer/reader
852                                if request.command == RunInTerminal::COMMAND
853                                    || request.command == StartDebugging::COMMAND
854                                {
855                                    let message =
856                                        serde_json::to_string(&Message::Request(request)).unwrap();
857
858                                    let mut writer = stdout_writer.lock().await;
859                                    writer
860                                        .write_all(
861                                            TransportDelegate::build_rpc_message(message)
862                                                .as_bytes(),
863                                        )
864                                        .await
865                                        .unwrap();
866                                    writer.flush().await.unwrap();
867                                } else {
868                                    let response = if let Some(handle) =
869                                        request_handlers.lock().get_mut(request.command.as_str())
870                                    {
871                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
872                                    } else {
873                                        panic!("No request handler for {}", request.command);
874                                    };
875                                    let message =
876                                        serde_json::to_string(&Message::Response(response))
877                                            .unwrap();
878
879                                    let mut writer = stdout_writer.lock().await;
880
881                                    writer
882                                        .write_all(
883                                            TransportDelegate::build_rpc_message(message)
884                                                .as_bytes(),
885                                        )
886                                        .await
887                                        .unwrap();
888                                    writer.flush().await.unwrap();
889                                }
890                            }
891                            Message::Event(event) => {
892                                let message =
893                                    serde_json::to_string(&Message::Event(event)).unwrap();
894
895                                let mut writer = stdout_writer.lock().await;
896                                writer
897                                    .write_all(
898                                        TransportDelegate::build_rpc_message(message).as_bytes(),
899                                    )
900                                    .await
901                                    .unwrap();
902                                writer.flush().await.unwrap();
903                            }
904                            Message::Response(response) => {
905                                if let Some(handle) =
906                                    response_handlers.lock().get(response.command.as_str())
907                                {
908                                    handle(response);
909                                } else {
910                                    log::error!("No response handler for {}", response.command);
911                                }
912                            }
913                        }
914                    }
915                }
916            }
917        })
918        .detach();
919
920        Ok((
921            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
922            this,
923        ))
924    }
925
926    fn has_adapter_logs(&self) -> bool {
927        false
928    }
929
930    async fn kill(&self) -> Result<()> {
931        Ok(())
932    }
933}