transport.rs

  1use anyhow::{Context as _, Result, anyhow, bail};
  2use dap_types::{
  3    ErrorResponse,
  4    messages::{Message, Response},
  5};
  6use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  7use gpui::{AppContext as _, AsyncApp, Task};
  8use settings::Settings as _;
  9use smallvec::SmallVec;
 10use smol::{
 11    channel::{Receiver, Sender, unbounded},
 12    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 13    lock::Mutex,
 14    net::{TcpListener, TcpStream},
 15};
 16use std::{
 17    collections::HashMap,
 18    net::{Ipv4Addr, SocketAddrV4},
 19    process::Stdio,
 20    sync::Arc,
 21    time::Duration,
 22};
 23use task::TcpArgumentsTemplate;
 24use util::ConnectionResult;
 25
 26use crate::{adapters::DebugAdapterBinary, debugger_settings::DebuggerSettings};
 27
 28pub type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
 29
 30#[derive(PartialEq, Eq, Clone, Copy)]
 31pub enum LogKind {
 32    Adapter,
 33    Rpc,
 34}
 35
 36pub enum IoKind {
 37    StdIn,
 38    StdOut,
 39    StdErr,
 40}
 41
 42pub struct TransportPipe {
 43    input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 44    output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 45    stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 46    stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 47}
 48
 49impl TransportPipe {
 50    pub fn new(
 51        input: Box<dyn AsyncWrite + Unpin + Send + 'static>,
 52        output: Box<dyn AsyncRead + Unpin + Send + 'static>,
 53        stdout: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 54        stderr: Option<Box<dyn AsyncRead + Unpin + Send + 'static>>,
 55    ) -> Self {
 56        TransportPipe {
 57            input,
 58            output,
 59            stdout,
 60            stderr,
 61        }
 62    }
 63}
 64
 65type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 66type LogHandlers = Arc<parking_lot::Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 67
 68pub enum Transport {
 69    Stdio(StdioTransport),
 70    Tcp(TcpTransport),
 71    #[cfg(any(test, feature = "test-support"))]
 72    Fake(FakeTransport),
 73}
 74
 75impl Transport {
 76    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
 77        #[cfg(any(test, feature = "test-support"))]
 78        if cfg!(any(test, feature = "test-support")) {
 79            return FakeTransport::start(cx)
 80                .await
 81                .map(|(transports, fake)| (transports, Self::Fake(fake)));
 82        }
 83
 84        if binary.connection.is_some() {
 85            TcpTransport::start(binary, cx)
 86                .await
 87                .map(|(transports, tcp)| (transports, Self::Tcp(tcp)))
 88        } else {
 89            StdioTransport::start(binary, cx)
 90                .await
 91                .map(|(transports, stdio)| (transports, Self::Stdio(stdio)))
 92        }
 93    }
 94
 95    fn has_adapter_logs(&self) -> bool {
 96        match self {
 97            Transport::Stdio(stdio_transport) => stdio_transport.has_adapter_logs(),
 98            Transport::Tcp(tcp_transport) => tcp_transport.has_adapter_logs(),
 99            #[cfg(any(test, feature = "test-support"))]
100            Transport::Fake(fake_transport) => fake_transport.has_adapter_logs(),
101        }
102    }
103
104    async fn kill(&self) {
105        match self {
106            Transport::Stdio(stdio_transport) => stdio_transport.kill().await,
107            Transport::Tcp(tcp_transport) => tcp_transport.kill().await,
108            #[cfg(any(test, feature = "test-support"))]
109            Transport::Fake(fake_transport) => fake_transport.kill().await,
110        }
111    }
112
113    #[cfg(any(test, feature = "test-support"))]
114    pub(crate) fn as_fake(&self) -> &FakeTransport {
115        match self {
116            Transport::Fake(fake_transport) => fake_transport,
117            _ => panic!("Not a fake transport layer"),
118        }
119    }
120}
121
122pub(crate) struct TransportDelegate {
123    log_handlers: LogHandlers,
124    current_requests: Requests,
125    pending_requests: Requests,
126    transport: Transport,
127    server_tx: Arc<Mutex<Option<Sender<Message>>>>,
128    _tasks: Vec<Task<()>>,
129}
130
131impl TransportDelegate {
132    pub(crate) async fn start(
133        binary: &DebugAdapterBinary,
134        cx: AsyncApp,
135    ) -> Result<((Receiver<Message>, Sender<Message>), Self)> {
136        let (transport_pipes, transport) = Transport::start(binary, cx.clone()).await?;
137        let mut this = Self {
138            transport,
139            server_tx: Default::default(),
140            log_handlers: Default::default(),
141            current_requests: Default::default(),
142            pending_requests: Default::default(),
143            _tasks: Vec::new(),
144        };
145        let messages = this.start_handlers(transport_pipes, cx).await?;
146        Ok((messages, this))
147    }
148
149    async fn start_handlers(
150        &mut self,
151        mut params: TransportPipe,
152        cx: AsyncApp,
153    ) -> Result<(Receiver<Message>, Sender<Message>)> {
154        let (client_tx, server_rx) = unbounded::<Message>();
155        let (server_tx, client_rx) = unbounded::<Message>();
156
157        let log_dap_communications =
158            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
159                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
160                .unwrap_or(false);
161
162        let log_handler = if log_dap_communications {
163            Some(self.log_handlers.clone())
164        } else {
165            None
166        };
167
168        let adapter_log_handler = log_handler.clone();
169        cx.update(|cx| {
170            if let Some(stdout) = params.stdout.take() {
171                self._tasks.push(cx.background_spawn(async move {
172                    match Self::handle_adapter_log(stdout, adapter_log_handler).await {
173                        ConnectionResult::Timeout => {
174                            log::error!("Timed out when handling debugger log");
175                        }
176                        ConnectionResult::ConnectionReset => {
177                            log::info!("Debugger logs connection closed");
178                        }
179                        ConnectionResult::Result(Ok(())) => {}
180                        ConnectionResult::Result(Err(e)) => {
181                            log::error!("Error handling debugger log: {e}");
182                        }
183                    }
184                }));
185            }
186
187            let pending_requests = self.pending_requests.clone();
188            let output_log_handler = log_handler.clone();
189            self._tasks.push(cx.background_spawn(async move {
190                match Self::handle_output(
191                    params.output,
192                    client_tx,
193                    pending_requests.clone(),
194                    output_log_handler,
195                )
196                .await
197                {
198                    Ok(()) => {}
199                    Err(e) => log::error!("Error handling debugger output: {e}"),
200                }
201                let mut pending_requests = pending_requests.lock().await;
202                pending_requests.drain().for_each(|(_, request)| {
203                    request
204                        .send(Err(anyhow!("debugger shutdown unexpectedly")))
205                        .ok();
206                });
207            }));
208
209            if let Some(stderr) = params.stderr.take() {
210                let log_handlers = self.log_handlers.clone();
211                self._tasks.push(cx.background_spawn(async move {
212                    match Self::handle_error(stderr, log_handlers).await {
213                        ConnectionResult::Timeout => {
214                            log::error!("Timed out reading debugger error stream")
215                        }
216                        ConnectionResult::ConnectionReset => {
217                            log::info!("Debugger closed its error stream")
218                        }
219                        ConnectionResult::Result(Ok(())) => {}
220                        ConnectionResult::Result(Err(e)) => {
221                            log::error!("Error handling debugger error: {e}")
222                        }
223                    }
224                }));
225            }
226
227            let current_requests = self.current_requests.clone();
228            let pending_requests = self.pending_requests.clone();
229            let log_handler = log_handler.clone();
230            self._tasks.push(cx.background_spawn(async move {
231                match Self::handle_input(
232                    params.input,
233                    client_rx,
234                    current_requests,
235                    pending_requests,
236                    log_handler,
237                )
238                .await
239                {
240                    Ok(()) => {}
241                    Err(e) => log::error!("Error handling debugger input: {e}"),
242                }
243            }));
244        })?;
245
246        {
247            let mut lock = self.server_tx.lock().await;
248            *lock = Some(server_tx.clone());
249        }
250
251        Ok((server_rx, server_tx))
252    }
253
254    pub(crate) async fn add_pending_request(
255        &self,
256        sequence_id: u64,
257        request: oneshot::Sender<Result<Response>>,
258    ) {
259        let mut pending_requests = self.pending_requests.lock().await;
260        pending_requests.insert(sequence_id, request);
261    }
262
263    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
264        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
265            server_tx.send(message).await.context("sending message")
266        } else {
267            anyhow::bail!("Server tx already dropped")
268        }
269    }
270
271    async fn handle_adapter_log<Stdout>(
272        stdout: Stdout,
273        log_handlers: Option<LogHandlers>,
274    ) -> ConnectionResult<()>
275    where
276        Stdout: AsyncRead + Unpin + Send + 'static,
277    {
278        let mut reader = BufReader::new(stdout);
279        let mut line = String::new();
280
281        let result = loop {
282            line.truncate(0);
283
284            match reader
285                .read_line(&mut line)
286                .await
287                .context("reading adapter log line")
288            {
289                Ok(0) => break ConnectionResult::ConnectionReset,
290                Ok(_) => {}
291                Err(e) => break ConnectionResult::Result(Err(e)),
292            }
293
294            if let Some(log_handlers) = log_handlers.as_ref() {
295                for (kind, handler) in log_handlers.lock().iter_mut() {
296                    if matches!(kind, LogKind::Adapter) {
297                        handler(IoKind::StdOut, line.as_str());
298                    }
299                }
300            }
301        };
302
303        log::debug!("Handle adapter log dropped");
304
305        result
306    }
307
308    fn build_rpc_message(message: String) -> String {
309        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
310    }
311
312    async fn handle_input<Stdin>(
313        mut server_stdin: Stdin,
314        client_rx: Receiver<Message>,
315        current_requests: Requests,
316        pending_requests: Requests,
317        log_handlers: Option<LogHandlers>,
318    ) -> Result<()>
319    where
320        Stdin: AsyncWrite + Unpin + Send + 'static,
321    {
322        let result = loop {
323            match client_rx.recv().await {
324                Ok(message) => {
325                    if let Message::Request(request) = &message {
326                        if let Some(sender) = current_requests.lock().await.remove(&request.seq) {
327                            pending_requests.lock().await.insert(request.seq, sender);
328                        }
329                    }
330
331                    let message = match serde_json::to_string(&message) {
332                        Ok(message) => message,
333                        Err(e) => break Err(e.into()),
334                    };
335
336                    if let Some(log_handlers) = log_handlers.as_ref() {
337                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
338                            if matches!(kind, LogKind::Rpc) {
339                                log_handler(IoKind::StdIn, &message);
340                            }
341                        }
342                    }
343
344                    if let Err(e) = server_stdin
345                        .write_all(Self::build_rpc_message(message).as_bytes())
346                        .await
347                    {
348                        break Err(e.into());
349                    }
350
351                    if let Err(e) = server_stdin.flush().await {
352                        break Err(e.into());
353                    }
354                }
355                Err(error) => break Err(error.into()),
356            }
357        };
358
359        log::debug!("Handle adapter input dropped");
360
361        result
362    }
363
364    async fn handle_output<Stdout>(
365        server_stdout: Stdout,
366        client_tx: Sender<Message>,
367        pending_requests: Requests,
368        log_handlers: Option<LogHandlers>,
369    ) -> Result<()>
370    where
371        Stdout: AsyncRead + Unpin + Send + 'static,
372    {
373        let mut recv_buffer = String::new();
374        let mut reader = BufReader::new(server_stdout);
375
376        let result = loop {
377            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
378                .await
379            {
380                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
381                ConnectionResult::ConnectionReset => {
382                    log::info!("Debugger closed the connection");
383                    return Ok(());
384                }
385                ConnectionResult::Result(Ok(Message::Response(res))) => {
386                    if let Some(tx) = pending_requests.lock().await.remove(&res.request_seq) {
387                        if let Err(e) = tx.send(Self::process_response(res)) {
388                            log::trace!("Did not send response `{:?}` for a cancelled", e);
389                        }
390                    } else {
391                        client_tx.send(Message::Response(res)).await?;
392                    }
393                }
394                ConnectionResult::Result(Ok(message)) => client_tx.send(message).await?,
395                ConnectionResult::Result(Err(e)) => break Err(e),
396            }
397        };
398
399        drop(client_tx);
400        log::debug!("Handle adapter output dropped");
401
402        result
403    }
404
405    async fn handle_error<Stderr>(stderr: Stderr, log_handlers: LogHandlers) -> ConnectionResult<()>
406    where
407        Stderr: AsyncRead + Unpin + Send + 'static,
408    {
409        log::debug!("Handle error started");
410        let mut buffer = String::new();
411
412        let mut reader = BufReader::new(stderr);
413
414        let result = loop {
415            match reader
416                .read_line(&mut buffer)
417                .await
418                .context("reading error log line")
419            {
420                Ok(0) => break ConnectionResult::ConnectionReset,
421                Ok(_) => {
422                    for (kind, log_handler) in log_handlers.lock().iter_mut() {
423                        if matches!(kind, LogKind::Adapter) {
424                            log_handler(IoKind::StdErr, buffer.as_str());
425                        }
426                    }
427
428                    buffer.truncate(0);
429                }
430                Err(error) => break ConnectionResult::Result(Err(error)),
431            }
432        };
433
434        log::debug!("Handle adapter error dropped");
435
436        result
437    }
438
439    fn process_response(response: Response) -> Result<Response> {
440        if response.success {
441            Ok(response)
442        } else {
443            if let Some(error_message) = response
444                .body
445                .clone()
446                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
447                .and_then(|response| response.error.map(|msg| msg.format))
448                .or_else(|| response.message.clone())
449            {
450                anyhow::bail!(error_message);
451            };
452
453            anyhow::bail!(
454                "Received error response from adapter. Response: {:?}",
455                response
456            );
457        }
458    }
459
460    async fn receive_server_message<Stdout>(
461        reader: &mut BufReader<Stdout>,
462        buffer: &mut String,
463        log_handlers: Option<&LogHandlers>,
464    ) -> ConnectionResult<Message>
465    where
466        Stdout: AsyncRead + Unpin + Send + 'static,
467    {
468        let mut content_length = None;
469        loop {
470            buffer.truncate(0);
471
472            match reader
473                .read_line(buffer)
474                .await
475                .with_context(|| "reading a message from server")
476            {
477                Ok(0) => return ConnectionResult::ConnectionReset,
478                Ok(_) => {}
479                Err(e) => return ConnectionResult::Result(Err(e)),
480            };
481
482            if buffer == "\r\n" {
483                break;
484            }
485
486            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
487                match value.parse().context("invalid content length") {
488                    Ok(length) => content_length = Some(length),
489                    Err(e) => return ConnectionResult::Result(Err(e)),
490                }
491            }
492        }
493
494        let content_length = match content_length.context("missing content length") {
495            Ok(length) => length,
496            Err(e) => return ConnectionResult::Result(Err(e)),
497        };
498
499        let mut content = vec![0; content_length];
500        if let Err(e) = reader
501            .read_exact(&mut content)
502            .await
503            .with_context(|| "reading after a loop")
504        {
505            return ConnectionResult::Result(Err(e));
506        }
507
508        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
509            Ok(str) => str,
510            Err(e) => return ConnectionResult::Result(Err(e)),
511        };
512
513        if let Some(log_handlers) = log_handlers {
514            for (kind, log_handler) in log_handlers.lock().iter_mut() {
515                if matches!(kind, LogKind::Rpc) {
516                    log_handler(IoKind::StdOut, message_str);
517                }
518            }
519        }
520
521        ConnectionResult::Result(
522            serde_json::from_str::<Message>(message_str).context("deserializing server message"),
523        )
524    }
525
526    pub async fn shutdown(&self) -> Result<()> {
527        log::debug!("Start shutdown client");
528
529        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
530            server_tx.close();
531        }
532
533        let mut current_requests = self.current_requests.lock().await;
534        let mut pending_requests = self.pending_requests.lock().await;
535
536        current_requests.clear();
537        pending_requests.clear();
538
539        self.transport.kill().await;
540
541        drop(current_requests);
542        drop(pending_requests);
543
544        log::debug!("Shutdown client completed");
545
546        anyhow::Ok(())
547    }
548
549    pub fn has_adapter_logs(&self) -> bool {
550        self.transport.has_adapter_logs()
551    }
552
553    pub fn transport(&self) -> &Transport {
554        &self.transport
555    }
556
557    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
558    where
559        F: 'static + Send + FnMut(IoKind, &str),
560    {
561        let mut log_handlers = self.log_handlers.lock();
562        log_handlers.push((kind, Box::new(f)));
563    }
564}
565
566pub struct TcpTransport {
567    pub port: u16,
568    pub host: Ipv4Addr,
569    pub timeout: u64,
570    process: Mutex<Child>,
571}
572
573impl TcpTransport {
574    /// Get an open port to use with the tcp client when not supplied by debug config
575    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
576        if let Some(port) = host.port {
577            Ok(port)
578        } else {
579            Self::unused_port(host.host()).await
580        }
581    }
582
583    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
584        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
585            .await?
586            .local_addr()?
587            .port())
588    }
589
590    async fn start(binary: &DebugAdapterBinary, cx: AsyncApp) -> Result<(TransportPipe, Self)> {
591        let connection_args = binary
592            .connection
593            .as_ref()
594            .context("No connection arguments provided")?;
595
596        let host = connection_args.host;
597        let port = connection_args.port;
598
599        let mut command = util::command::new_std_command(&binary.command);
600
601        if let Some(cwd) = &binary.cwd {
602            command.current_dir(cwd);
603        }
604
605        command.args(&binary.arguments);
606        command.envs(&binary.envs);
607
608        let mut process = Child::spawn(command, Stdio::null())
609            .with_context(|| "failed to start debug adapter.")?;
610
611        let address = SocketAddrV4::new(host, port);
612
613        let timeout = connection_args.timeout.unwrap_or_else(|| {
614            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
615                .unwrap_or(2000u64)
616        });
617
618        let (mut process, (rx, tx)) = select! {
619            _ = cx.background_executor().timer(Duration::from_millis(timeout)).fuse() => {
620                anyhow::bail!("Connection to TCP DAP timeout {host}:{port}");
621            },
622            result = cx.spawn(async move |cx| {
623                loop {
624                    match TcpStream::connect(address).await {
625                        Ok(stream) => return Ok((process, stream.split())),
626                        Err(_) => {
627                            if let Ok(Some(_)) = process.try_status() {
628                                let output = process.into_inner().output().await?;
629                                let output = if output.stderr.is_empty() {
630                                    String::from_utf8_lossy(&output.stdout).to_string()
631                                } else {
632                                    String::from_utf8_lossy(&output.stderr).to_string()
633                                };
634                                anyhow::bail!("{output}\nerror: process exited before debugger attached.");
635                            }
636                            cx.background_executor().timer(Duration::from_millis(100)).await;
637                        }
638                    }
639                }
640            }).fuse() => result?
641        };
642
643        log::info!(
644            "Debug adapter has connected to TCP server {}:{}",
645            host,
646            port
647        );
648        let stdout = process.stdout.take();
649        let stderr = process.stderr.take();
650
651        let this = Self {
652            port,
653            host,
654            process: Mutex::new(process),
655            timeout,
656        };
657
658        let pipe = TransportPipe::new(
659            Box::new(tx),
660            Box::new(BufReader::new(rx)),
661            stdout.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
662            stderr.map(|s| Box::new(s) as Box<dyn AsyncRead + Unpin + Send>),
663        );
664
665        Ok((pipe, this))
666    }
667
668    fn has_adapter_logs(&self) -> bool {
669        true
670    }
671
672    async fn kill(&self) {
673        let mut process = self.process.lock().await;
674        Child::kill(&mut process);
675    }
676}
677
678impl Drop for TcpTransport {
679    fn drop(&mut self) {
680        self.process.get_mut().kill();
681    }
682}
683
684pub struct StdioTransport {
685    process: Mutex<Child>,
686}
687
688impl StdioTransport {
689    #[allow(dead_code, reason = "This is used in non test builds of Zed")]
690    async fn start(binary: &DebugAdapterBinary, _: AsyncApp) -> Result<(TransportPipe, Self)> {
691        let mut command = util::command::new_std_command(&binary.command);
692
693        if let Some(cwd) = &binary.cwd {
694            command.current_dir(cwd);
695        }
696
697        command.args(&binary.arguments);
698        command.envs(&binary.envs);
699
700        let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
701            format!(
702                "failed to spawn command `{} {}`.",
703                binary.command,
704                binary.arguments.join(" ")
705            )
706        })?;
707
708        let stdin = process.stdin.take().context("Failed to open stdin")?;
709        let stdout = process.stdout.take().context("Failed to open stdout")?;
710        let stderr = process
711            .stderr
712            .take()
713            .map(|io_err| Box::new(io_err) as Box<dyn AsyncRead + Unpin + Send>);
714
715        if stderr.is_none() {
716            bail!(
717                "Failed to connect to stderr for debug adapter command {}",
718                &binary.command
719            );
720        }
721
722        log::info!("Debug adapter has connected to stdio adapter");
723
724        let process = Mutex::new(process);
725
726        Ok((
727            TransportPipe::new(
728                Box::new(stdin),
729                Box::new(BufReader::new(stdout)),
730                None,
731                stderr,
732            ),
733            Self { process },
734        ))
735    }
736
737    fn has_adapter_logs(&self) -> bool {
738        false
739    }
740
741    async fn kill(&self) {
742        let mut process = self.process.lock().await;
743        Child::kill(&mut process);
744    }
745}
746
747impl Drop for StdioTransport {
748    fn drop(&mut self) {
749        self.process.get_mut().kill();
750    }
751}
752
753#[cfg(any(test, feature = "test-support"))]
754type RequestHandler =
755    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
756
757#[cfg(any(test, feature = "test-support"))]
758type ResponseHandler = Box<dyn Send + Fn(Response)>;
759
760#[cfg(any(test, feature = "test-support"))]
761pub struct FakeTransport {
762    // for sending fake response back from adapter side
763    request_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, RequestHandler>>>,
764    // for reverse request responses
765    response_handlers: Arc<parking_lot::Mutex<HashMap<&'static str, ResponseHandler>>>,
766}
767
768#[cfg(any(test, feature = "test-support"))]
769impl FakeTransport {
770    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
771    where
772        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
773    {
774        self.request_handlers.lock().insert(
775            R::COMMAND,
776            Box::new(move |seq, args| {
777                let result = handler(seq, serde_json::from_value(args).unwrap());
778                let response = match result {
779                    Ok(response) => Response {
780                        seq: seq + 1,
781                        request_seq: seq,
782                        success: true,
783                        command: R::COMMAND.into(),
784                        body: Some(serde_json::to_value(response).unwrap()),
785                        message: None,
786                    },
787                    Err(response) => Response {
788                        seq: seq + 1,
789                        request_seq: seq,
790                        success: false,
791                        command: R::COMMAND.into(),
792                        body: Some(serde_json::to_value(response).unwrap()),
793                        message: None,
794                    },
795                };
796                response
797            }),
798        );
799    }
800
801    pub async fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
802    where
803        F: 'static + Send + Fn(Response),
804    {
805        self.response_handlers
806            .lock()
807            .insert(R::COMMAND, Box::new(handler));
808    }
809
810    async fn start(cx: AsyncApp) -> Result<(TransportPipe, Self)> {
811        let this = Self {
812            request_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
813            response_handlers: Arc::new(parking_lot::Mutex::new(HashMap::default())),
814        };
815        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
816        use serde_json::json;
817
818        let (stdin_writer, stdin_reader) = async_pipe::pipe();
819        let (stdout_writer, stdout_reader) = async_pipe::pipe();
820
821        let request_handlers = this.request_handlers.clone();
822        let response_handlers = this.response_handlers.clone();
823        let stdout_writer = Arc::new(Mutex::new(stdout_writer));
824
825        cx.background_spawn(async move {
826            let mut reader = BufReader::new(stdin_reader);
827            let mut buffer = String::new();
828
829            loop {
830                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
831                    .await
832                {
833                    ConnectionResult::Timeout => {
834                        anyhow::bail!("Timed out when connecting to debugger");
835                    }
836                    ConnectionResult::ConnectionReset => {
837                        log::info!("Debugger closed the connection");
838                        break Ok(());
839                    }
840                    ConnectionResult::Result(Err(e)) => break Err(e),
841                    ConnectionResult::Result(Ok(message)) => {
842                        match message {
843                            Message::Request(request) => {
844                                // redirect reverse requests to stdout writer/reader
845                                if request.command == RunInTerminal::COMMAND
846                                    || request.command == StartDebugging::COMMAND
847                                {
848                                    let message =
849                                        serde_json::to_string(&Message::Request(request)).unwrap();
850
851                                    let mut writer = stdout_writer.lock().await;
852                                    writer
853                                        .write_all(
854                                            TransportDelegate::build_rpc_message(message)
855                                                .as_bytes(),
856                                        )
857                                        .await
858                                        .unwrap();
859                                    writer.flush().await.unwrap();
860                                } else {
861                                    let response = if let Some(handle) =
862                                        request_handlers.lock().get_mut(request.command.as_str())
863                                    {
864                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
865                                    } else {
866                                        panic!("No request handler for {}", request.command);
867                                    };
868                                    let message =
869                                        serde_json::to_string(&Message::Response(response))
870                                            .unwrap();
871
872                                    let mut writer = stdout_writer.lock().await;
873
874                                    writer
875                                        .write_all(
876                                            TransportDelegate::build_rpc_message(message)
877                                                .as_bytes(),
878                                        )
879                                        .await
880                                        .unwrap();
881                                    writer.flush().await.unwrap();
882                                }
883                            }
884                            Message::Event(event) => {
885                                let message =
886                                    serde_json::to_string(&Message::Event(event)).unwrap();
887
888                                let mut writer = stdout_writer.lock().await;
889                                writer
890                                    .write_all(
891                                        TransportDelegate::build_rpc_message(message).as_bytes(),
892                                    )
893                                    .await
894                                    .unwrap();
895                                writer.flush().await.unwrap();
896                            }
897                            Message::Response(response) => {
898                                if let Some(handle) =
899                                    response_handlers.lock().get(response.command.as_str())
900                                {
901                                    handle(response);
902                                } else {
903                                    log::error!("No response handler for {}", response.command);
904                                }
905                            }
906                        }
907                    }
908                }
909            }
910        })
911        .detach();
912
913        Ok((
914            TransportPipe::new(Box::new(stdin_writer), Box::new(stdout_reader), None, None),
915            this,
916        ))
917    }
918
919    fn has_adapter_logs(&self) -> bool {
920        false
921    }
922
923    async fn kill(&self) {}
924}
925
926struct Child {
927    process: smol::process::Child,
928}
929
930impl std::ops::Deref for Child {
931    type Target = smol::process::Child;
932
933    fn deref(&self) -> &Self::Target {
934        &self.process
935    }
936}
937
938impl std::ops::DerefMut for Child {
939    fn deref_mut(&mut self) -> &mut Self::Target {
940        &mut self.process
941    }
942}
943
944impl Child {
945    fn into_inner(self) -> smol::process::Child {
946        self.process
947    }
948
949    #[cfg(not(windows))]
950    fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
951        util::set_pre_exec_to_start_new_session(&mut command);
952        let process = smol::process::Command::from(command)
953            .stdin(stdin)
954            .stdout(Stdio::piped())
955            .stderr(Stdio::piped())
956            .spawn()?;
957        Ok(Self { process })
958    }
959
960    #[cfg(windows)]
961    fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
962        // TODO(windows): create a job object and add the child process handle to it,
963        // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
964        let process = smol::process::Command::from(command)
965            .stdin(stdin)
966            .stdout(Stdio::piped())
967            .stderr(Stdio::piped())
968            .spawn()?;
969        Ok(Self { process })
970    }
971
972    #[cfg(not(windows))]
973    fn kill(&mut self) {
974        let pid = self.process.id();
975        unsafe {
976            libc::killpg(pid as i32, libc::SIGKILL);
977        }
978    }
979
980    #[cfg(windows)]
981    fn kill(&mut self) {
982        // TODO(windows): terminate the job object in kill
983        let _ = self.process.kill();
984    }
985}