transport.rs

  1use anyhow::{Context as _, Result, anyhow, bail};
  2#[cfg(any(test, feature = "test-support"))]
  3use async_pipe::{PipeReader, PipeWriter};
  4use dap_types::{
  5    ErrorResponse,
  6    messages::{Message, Response},
  7};
  8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
 10use parking_lot::Mutex;
 11use proto::ErrorExt;
 12use settings::Settings as _;
 13use smallvec::SmallVec;
 14use smol::{
 15    channel::{Receiver, Sender, unbounded},
 16    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 17    net::{TcpListener, TcpStream},
 18};
 19use std::{
 20    collections::HashMap,
 21    net::{Ipv4Addr, SocketAddrV4},
 22    process::Stdio,
 23    sync::Arc,
 24    time::Duration,
 25};
 26use task::TcpArgumentsTemplate;
 27use util::ConnectionResult;
 28
 29use crate::{
 30    adapters::{DebugAdapterBinary, TcpArguments},
 31    client::DapMessageHandler,
 32    debugger_settings::DebuggerSettings,
 33};
 34
 35pub(crate) type IoMessage = str;
 36pub(crate) type Command = str;
 37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
 38
 39#[derive(PartialEq, Eq, Clone, Copy)]
 40pub enum LogKind {
 41    Adapter,
 42    Rpc,
 43}
 44
 45#[derive(Clone, Copy)]
 46pub enum IoKind {
 47    StdIn,
 48    StdOut,
 49    StdErr,
 50}
 51
 52type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 53
 54pub trait Transport: Send + Sync {
 55    fn has_adapter_logs(&self) -> bool;
 56    fn tcp_arguments(&self) -> Option<TcpArguments>;
 57    fn connect(
 58        &mut self,
 59    ) -> Task<
 60        Result<(
 61            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 62            Box<dyn AsyncRead + Unpin + Send + 'static>,
 63        )>,
 64    >;
 65    fn kill(&mut self);
 66    #[cfg(any(test, feature = "test-support"))]
 67    fn as_fake(&self) -> &FakeTransport {
 68        unreachable!()
 69    }
 70}
 71
 72async fn start(
 73    binary: &DebugAdapterBinary,
 74    log_handlers: LogHandlers,
 75    cx: &mut AsyncApp,
 76) -> Result<Box<dyn Transport>> {
 77    #[cfg(any(test, feature = "test-support"))]
 78    if cfg!(any(test, feature = "test-support")) {
 79        return Ok(Box::new(FakeTransport::start(cx).await?));
 80    }
 81
 82    if binary.connection.is_some() {
 83        Ok(Box::new(
 84            TcpTransport::start(binary, log_handlers, cx).await?,
 85        ))
 86    } else {
 87        Ok(Box::new(
 88            StdioTransport::start(binary, log_handlers, cx).await?,
 89        ))
 90    }
 91}
 92
 93pub(crate) struct TransportDelegate {
 94    log_handlers: LogHandlers,
 95    // TODO this should really be some kind of associative channel
 96    pub(crate) pending_requests:
 97        Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
 98    pub(crate) transport: Mutex<Box<dyn Transport>>,
 99    pub(crate) server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
100    tasks: Mutex<Vec<Task<()>>>,
101}
102
103impl TransportDelegate {
104    pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
105        let log_handlers: LogHandlers = Default::default();
106        let transport = start(binary, log_handlers.clone(), cx).await?;
107        Ok(Self {
108            transport: Mutex::new(transport),
109            log_handlers,
110            server_tx: Default::default(),
111            pending_requests: Arc::new(Mutex::new(Some(HashMap::default()))),
112            tasks: Default::default(),
113        })
114    }
115
116    pub async fn connect(
117        &self,
118        message_handler: DapMessageHandler,
119        cx: &mut AsyncApp,
120    ) -> Result<()> {
121        let (server_tx, client_rx) = unbounded::<Message>();
122        self.tasks.lock().clear();
123
124        let log_dap_communications =
125            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
126                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
127                .unwrap_or(false);
128
129        let connect = self.transport.lock().connect();
130        let (input, output) = connect.await?;
131
132        let log_handler = if log_dap_communications {
133            Some(self.log_handlers.clone())
134        } else {
135            None
136        };
137
138        let pending_requests = self.pending_requests.clone();
139        let output_log_handler = log_handler.clone();
140        {
141            let mut tasks = self.tasks.lock();
142            tasks.push(cx.background_spawn(async move {
143                match Self::recv_from_server(
144                    output,
145                    message_handler,
146                    pending_requests.clone(),
147                    output_log_handler,
148                )
149                .await
150                {
151                    Ok(()) => {
152                        pending_requests
153                            .lock()
154                            .take()
155                            .into_iter()
156                            .flatten()
157                            .for_each(|(_, request)| {
158                                request
159                                    .send(Err(anyhow!("debugger shutdown unexpectedly")))
160                                    .ok();
161                            });
162                    }
163                    Err(e) => {
164                        pending_requests
165                            .lock()
166                            .take()
167                            .into_iter()
168                            .flatten()
169                            .for_each(|(_, request)| {
170                                request.send(Err(e.cloned())).ok();
171                            });
172                    }
173                }
174            }));
175
176            tasks.push(cx.background_spawn(async move {
177                match Self::send_to_server(input, client_rx, log_handler).await {
178                    Ok(()) => {}
179                    Err(e) => log::error!("Error handling debugger input: {e}"),
180                }
181            }));
182        }
183
184        {
185            let mut lock = self.server_tx.lock().await;
186            *lock = Some(server_tx.clone());
187        }
188
189        Ok(())
190    }
191
192    pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
193        self.transport.lock().tcp_arguments()
194    }
195
196    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
197        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
198            server_tx.send(message).await.context("sending message")
199        } else {
200            anyhow::bail!("Server tx already dropped")
201        }
202    }
203
204    async fn handle_adapter_log(
205        stdout: impl AsyncRead + Unpin + Send + 'static,
206        iokind: IoKind,
207        log_handlers: LogHandlers,
208    ) {
209        let mut reader = BufReader::new(stdout);
210        let mut line = String::new();
211
212        loop {
213            line.truncate(0);
214
215            match reader.read_line(&mut line).await {
216                Ok(0) => break,
217                Ok(_) => {}
218                Err(e) => {
219                    log::debug!("handle_adapter_log: {}", e);
220                    break;
221                }
222            }
223
224            for (kind, handler) in log_handlers.lock().iter_mut() {
225                if matches!(kind, LogKind::Adapter) {
226                    handler(iokind, None, line.as_str());
227                }
228            }
229        }
230    }
231
232    fn build_rpc_message(message: String) -> String {
233        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
234    }
235
236    async fn send_to_server<Stdin>(
237        mut server_stdin: Stdin,
238        client_rx: Receiver<Message>,
239        log_handlers: Option<LogHandlers>,
240    ) -> Result<()>
241    where
242        Stdin: AsyncWrite + Unpin + Send + 'static,
243    {
244        let result = loop {
245            match client_rx.recv().await {
246                Ok(message) => {
247                    let command = match &message {
248                        Message::Request(request) => Some(request.command.as_str()),
249                        Message::Response(response) => Some(response.command.as_str()),
250                        _ => None,
251                    };
252
253                    let message = match serde_json::to_string(&message) {
254                        Ok(message) => message,
255                        Err(e) => break Err(e.into()),
256                    };
257
258                    if let Some(log_handlers) = log_handlers.as_ref() {
259                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
260                            if matches!(kind, LogKind::Rpc) {
261                                log_handler(IoKind::StdIn, command, &message);
262                            }
263                        }
264                    }
265
266                    if let Err(e) = server_stdin
267                        .write_all(Self::build_rpc_message(message).as_bytes())
268                        .await
269                    {
270                        break Err(e.into());
271                    }
272
273                    if let Err(e) = server_stdin.flush().await {
274                        break Err(e.into());
275                    }
276                }
277                Err(error) => break Err(error.into()),
278            }
279        };
280
281        log::debug!("Handle adapter input dropped");
282
283        result
284    }
285
286    async fn recv_from_server<Stdout>(
287        server_stdout: Stdout,
288        mut message_handler: DapMessageHandler,
289        pending_requests: Arc<Mutex<Option<HashMap<u64, oneshot::Sender<Result<Response>>>>>>,
290        log_handlers: Option<LogHandlers>,
291    ) -> Result<()>
292    where
293        Stdout: AsyncRead + Unpin + Send + 'static,
294    {
295        let mut recv_buffer = String::new();
296        let mut reader = BufReader::new(server_stdout);
297
298        let result = loop {
299            let result =
300                Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
301                    .await;
302            match result {
303                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
304                ConnectionResult::ConnectionReset => {
305                    log::info!("Debugger closed the connection");
306                    break Ok(());
307                }
308                ConnectionResult::Result(Ok(Message::Response(res))) => {
309                    let tx = pending_requests
310                        .lock()
311                        .as_mut()
312                        .context("client is closed")?
313                        .remove(&res.request_seq);
314                    if let Some(tx) = tx {
315                        if let Err(e) = tx.send(Self::process_response(res)) {
316                            log::trace!("Did not send response `{:?}` for a cancelled", e);
317                        }
318                    } else {
319                        message_handler(Message::Response(res))
320                    }
321                }
322                ConnectionResult::Result(Ok(message)) => message_handler(message),
323                ConnectionResult::Result(Err(e)) => break Err(e),
324            }
325        };
326
327        log::debug!("Handle adapter output dropped");
328
329        result
330    }
331
332    fn process_response(response: Response) -> Result<Response> {
333        if response.success {
334            Ok(response)
335        } else {
336            if let Some(error_message) = response
337                .body
338                .clone()
339                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
340                .and_then(|response| response.error.map(|msg| msg.format))
341                .or_else(|| response.message.clone())
342            {
343                anyhow::bail!(error_message);
344            };
345
346            anyhow::bail!(
347                "Received error response from adapter. Response: {:?}",
348                response
349            );
350        }
351    }
352
353    async fn receive_server_message<Stdout>(
354        reader: &mut BufReader<Stdout>,
355        buffer: &mut String,
356        log_handlers: Option<&LogHandlers>,
357    ) -> ConnectionResult<Message>
358    where
359        Stdout: AsyncRead + Unpin + Send + 'static,
360    {
361        let mut content_length = None;
362        loop {
363            buffer.truncate(0);
364            match reader.read_line(buffer).await {
365                Ok(0) => return ConnectionResult::ConnectionReset,
366                Ok(_) => {}
367                Err(e) => return ConnectionResult::Result(Err(e.into())),
368            };
369
370            if buffer == "\r\n" {
371                break;
372            }
373
374            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
375                match value.parse().context("invalid content length") {
376                    Ok(length) => content_length = Some(length),
377                    Err(e) => return ConnectionResult::Result(Err(e)),
378                }
379            }
380        }
381
382        let content_length = match content_length.context("missing content length") {
383            Ok(length) => length,
384            Err(e) => return ConnectionResult::Result(Err(e)),
385        };
386
387        let mut content = vec![0; content_length];
388        if let Err(e) = reader
389            .read_exact(&mut content)
390            .await
391            .with_context(|| "reading after a loop")
392        {
393            return ConnectionResult::Result(Err(e));
394        }
395
396        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
397            Ok(str) => str,
398            Err(e) => return ConnectionResult::Result(Err(e)),
399        };
400
401        let message =
402            serde_json::from_str::<Message>(message_str).context("deserializing server message");
403
404        if let Some(log_handlers) = log_handlers {
405            let command = match &message {
406                Ok(Message::Request(request)) => Some(request.command.as_str()),
407                Ok(Message::Response(response)) => Some(response.command.as_str()),
408                _ => None,
409            };
410
411            for (kind, log_handler) in log_handlers.lock().iter_mut() {
412                if matches!(kind, LogKind::Rpc) {
413                    log_handler(IoKind::StdOut, command, message_str);
414                }
415            }
416        }
417
418        ConnectionResult::Result(message)
419    }
420
421    pub fn has_adapter_logs(&self) -> bool {
422        self.transport.lock().has_adapter_logs()
423    }
424
425    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
426    where
427        F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
428    {
429        let mut log_handlers = self.log_handlers.lock();
430        log_handlers.push((kind, Box::new(f)));
431    }
432}
433
434pub struct TcpTransport {
435    executor: BackgroundExecutor,
436    pub port: u16,
437    pub host: Ipv4Addr,
438    pub timeout: u64,
439    process: Arc<Mutex<Option<Child>>>,
440    _stderr_task: Option<Task<()>>,
441    _stdout_task: Option<Task<()>>,
442}
443
444impl TcpTransport {
445    /// Get an open port to use with the tcp client when not supplied by debug config
446    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
447        if let Some(port) = host.port {
448            Ok(port)
449        } else {
450            Self::unused_port(host.host()).await
451        }
452    }
453
454    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
455        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
456            .await?
457            .local_addr()?
458            .port())
459    }
460
461    async fn start(
462        binary: &DebugAdapterBinary,
463        log_handlers: LogHandlers,
464        cx: &mut AsyncApp,
465    ) -> Result<Self> {
466        let connection_args = binary
467            .connection
468            .as_ref()
469            .context("No connection arguments provided")?;
470
471        let host = connection_args.host;
472        let port = connection_args.port;
473
474        let mut process = None;
475        let mut stdout_task = None;
476        let mut stderr_task = None;
477
478        if let Some(command) = &binary.command {
479            let mut command = util::command::new_std_command(&command);
480
481            if let Some(cwd) = &binary.cwd {
482                command.current_dir(cwd);
483            }
484
485            command.args(&binary.arguments);
486            command.envs(&binary.envs);
487
488            let mut p = Child::spawn(command, Stdio::null())
489                .with_context(|| "failed to start debug adapter.")?;
490
491            stdout_task = p.stdout.take().map(|stdout| {
492                cx.background_executor()
493                    .spawn(TransportDelegate::handle_adapter_log(
494                        stdout,
495                        IoKind::StdOut,
496                        log_handlers.clone(),
497                    ))
498            });
499            stderr_task = p.stderr.take().map(|stderr| {
500                cx.background_executor()
501                    .spawn(TransportDelegate::handle_adapter_log(
502                        stderr,
503                        IoKind::StdErr,
504                        log_handlers,
505                    ))
506            });
507            process = Some(p);
508        };
509
510        let timeout = connection_args.timeout.unwrap_or_else(|| {
511            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
512                .unwrap_or(20000u64)
513        });
514
515        log::info!(
516            "Debug adapter has connected to TCP server {}:{}",
517            host,
518            port
519        );
520
521        let this = Self {
522            executor: cx.background_executor().clone(),
523            port,
524            host,
525            process: Arc::new(Mutex::new(process)),
526            timeout,
527            _stdout_task: stdout_task,
528            _stderr_task: stderr_task,
529        };
530
531        Ok(this)
532    }
533}
534
535impl Transport for TcpTransport {
536    fn has_adapter_logs(&self) -> bool {
537        true
538    }
539
540    fn kill(&mut self) {
541        if let Some(process) = &mut *self.process.lock() {
542            process.kill();
543        }
544    }
545
546    fn tcp_arguments(&self) -> Option<TcpArguments> {
547        Some(TcpArguments {
548            host: self.host,
549            port: self.port,
550            timeout: Some(self.timeout),
551        })
552    }
553
554    fn connect(
555        &mut self,
556    ) -> Task<
557        Result<(
558            Box<dyn AsyncWrite + Unpin + Send + 'static>,
559            Box<dyn AsyncRead + Unpin + Send + 'static>,
560        )>,
561    > {
562        let executor = self.executor.clone();
563        let timeout = self.timeout;
564        let address = SocketAddrV4::new(self.host, self.port);
565        let process = self.process.clone();
566        executor.clone().spawn(async move {
567            select! {
568                _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
569                    anyhow::bail!("Connection to TCP DAP timeout {address}");
570                },
571                result = executor.clone().spawn(async move {
572                    loop {
573                        match TcpStream::connect(address).await {
574                            Ok(stream) => {
575                                let (read, write) = stream.split();
576                                return Ok((Box::new(write) as _, Box::new(read) as _))
577                            },
578                            Err(_) => {
579                                let has_process = process.lock().is_some();
580                                if has_process {
581                                    let status = process.lock().as_mut().unwrap().try_status();
582                                    if let Ok(Some(_)) = status {
583                                        let process = process.lock().take().unwrap().into_inner();
584                                        let output = process.output().await?;
585                                        let output = if output.stderr.is_empty() {
586                                            String::from_utf8_lossy(&output.stdout).to_string()
587                                        } else {
588                                            String::from_utf8_lossy(&output.stderr).to_string()
589                                        };
590                                        anyhow::bail!("{output}\nerror: process exited before debugger attached.");
591                                    }
592                                }
593
594                                executor.timer(Duration::from_millis(100)).await;
595                            }
596                        }
597                    }
598                }).fuse() => result
599            }
600        })
601    }
602}
603
604impl Drop for TcpTransport {
605    fn drop(&mut self) {
606        if let Some(mut p) = self.process.lock().take() {
607            p.kill()
608        }
609    }
610}
611
612pub struct StdioTransport {
613    process: Mutex<Option<Child>>,
614    _stderr_task: Option<Task<()>>,
615}
616
617impl StdioTransport {
618    // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
619    async fn start(
620        binary: &DebugAdapterBinary,
621        log_handlers: LogHandlers,
622        cx: &mut AsyncApp,
623    ) -> Result<Self> {
624        let Some(binary_command) = &binary.command else {
625            bail!(
626                "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
627            );
628        };
629        let mut command = util::command::new_std_command(&binary_command);
630
631        if let Some(cwd) = &binary.cwd {
632            command.current_dir(cwd);
633        }
634
635        command.args(&binary.arguments);
636        command.envs(&binary.envs);
637
638        let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
639            format!(
640                "failed to spawn command `{} {}`.",
641                binary_command,
642                binary.arguments.join(" ")
643            )
644        })?;
645
646        let err_task = process.stderr.take().map(|stderr| {
647            cx.background_spawn(TransportDelegate::handle_adapter_log(
648                stderr,
649                IoKind::StdErr,
650                log_handlers,
651            ))
652        });
653
654        let process = Mutex::new(Some(process));
655
656        Ok(Self {
657            process,
658            _stderr_task: err_task,
659        })
660    }
661}
662
663impl Transport for StdioTransport {
664    fn has_adapter_logs(&self) -> bool {
665        false
666    }
667
668    fn kill(&mut self) {
669        if let Some(process) = &mut *self.process.lock() {
670            process.kill();
671        }
672    }
673
674    fn connect(
675        &mut self,
676    ) -> Task<
677        Result<(
678            Box<dyn AsyncWrite + Unpin + Send + 'static>,
679            Box<dyn AsyncRead + Unpin + Send + 'static>,
680        )>,
681    > {
682        let result = util::maybe!({
683            let mut guard = self.process.lock();
684            let process = guard.as_mut().context("oops")?;
685            Ok((
686                Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
687                Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
688            ))
689        });
690        Task::ready(result)
691    }
692
693    fn tcp_arguments(&self) -> Option<TcpArguments> {
694        None
695    }
696}
697
698impl Drop for StdioTransport {
699    fn drop(&mut self) {
700        if let Some(process) = &mut *self.process.lock() {
701            process.kill();
702        }
703    }
704}
705
706#[cfg(any(test, feature = "test-support"))]
707type RequestHandler =
708    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
709
710#[cfg(any(test, feature = "test-support"))]
711type ResponseHandler = Box<dyn Send + Fn(Response)>;
712
713#[cfg(any(test, feature = "test-support"))]
714pub struct FakeTransport {
715    // for sending fake response back from adapter side
716    request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
717    // for reverse request responses
718    response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
719
720    stdin_writer: Option<PipeWriter>,
721    stdout_reader: Option<PipeReader>,
722    message_handler: Option<Task<Result<()>>>,
723}
724
725#[cfg(any(test, feature = "test-support"))]
726impl FakeTransport {
727    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
728    where
729        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
730    {
731        self.request_handlers.lock().insert(
732            R::COMMAND,
733            Box::new(move |seq, args| {
734                let result = handler(seq, serde_json::from_value(args).unwrap());
735                let response = match result {
736                    Ok(response) => Response {
737                        seq: seq + 1,
738                        request_seq: seq,
739                        success: true,
740                        command: R::COMMAND.into(),
741                        body: Some(serde_json::to_value(response).unwrap()),
742                        message: None,
743                    },
744                    Err(response) => Response {
745                        seq: seq + 1,
746                        request_seq: seq,
747                        success: false,
748                        command: R::COMMAND.into(),
749                        body: Some(serde_json::to_value(response).unwrap()),
750                        message: None,
751                    },
752                };
753                response
754            }),
755        );
756    }
757
758    pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
759    where
760        F: 'static + Send + Fn(Response),
761    {
762        self.response_handlers
763            .lock()
764            .insert(R::COMMAND, Box::new(handler));
765    }
766
767    async fn start(cx: &mut AsyncApp) -> Result<Self> {
768        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
769        use serde_json::json;
770
771        let (stdin_writer, stdin_reader) = async_pipe::pipe();
772        let (stdout_writer, stdout_reader) = async_pipe::pipe();
773
774        let mut this = Self {
775            request_handlers: Arc::new(Mutex::new(HashMap::default())),
776            response_handlers: Arc::new(Mutex::new(HashMap::default())),
777            stdin_writer: Some(stdin_writer),
778            stdout_reader: Some(stdout_reader),
779            message_handler: None,
780        };
781
782        let request_handlers = this.request_handlers.clone();
783        let response_handlers = this.response_handlers.clone();
784        let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
785
786        this.message_handler = Some(cx.background_spawn(async move {
787            let mut reader = BufReader::new(stdin_reader);
788            let mut buffer = String::new();
789
790            loop {
791                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
792                    .await
793                {
794                    ConnectionResult::Timeout => {
795                        anyhow::bail!("Timed out when connecting to debugger");
796                    }
797                    ConnectionResult::ConnectionReset => {
798                        log::info!("Debugger closed the connection");
799                        break Ok(());
800                    }
801                    ConnectionResult::Result(Err(e)) => break Err(e),
802                    ConnectionResult::Result(Ok(message)) => {
803                        match message {
804                            Message::Request(request) => {
805                                // redirect reverse requests to stdout writer/reader
806                                if request.command == RunInTerminal::COMMAND
807                                    || request.command == StartDebugging::COMMAND
808                                {
809                                    let message =
810                                        serde_json::to_string(&Message::Request(request)).unwrap();
811
812                                    let mut writer = stdout_writer.lock().await;
813                                    writer
814                                        .write_all(
815                                            TransportDelegate::build_rpc_message(message)
816                                                .as_bytes(),
817                                        )
818                                        .await
819                                        .unwrap();
820                                    writer.flush().await.unwrap();
821                                } else {
822                                    let response = if let Some(handle) =
823                                        request_handlers.lock().get_mut(request.command.as_str())
824                                    {
825                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
826                                    } else {
827                                        panic!("No request handler for {}", request.command);
828                                    };
829                                    let message =
830                                        serde_json::to_string(&Message::Response(response))
831                                            .unwrap();
832
833                                    let mut writer = stdout_writer.lock().await;
834                                    writer
835                                        .write_all(
836                                            TransportDelegate::build_rpc_message(message)
837                                                .as_bytes(),
838                                        )
839                                        .await
840                                        .unwrap();
841                                    writer.flush().await.unwrap();
842                                }
843                            }
844                            Message::Event(event) => {
845                                let message =
846                                    serde_json::to_string(&Message::Event(event)).unwrap();
847
848                                let mut writer = stdout_writer.lock().await;
849                                writer
850                                    .write_all(
851                                        TransportDelegate::build_rpc_message(message).as_bytes(),
852                                    )
853                                    .await
854                                    .unwrap();
855                                writer.flush().await.unwrap();
856                            }
857                            Message::Response(response) => {
858                                if let Some(handle) =
859                                    response_handlers.lock().get(response.command.as_str())
860                                {
861                                    handle(response);
862                                } else {
863                                    log::error!("No response handler for {}", response.command);
864                                }
865                            }
866                        }
867                    }
868                }
869            }
870        }));
871
872        Ok(this)
873    }
874}
875
876#[cfg(any(test, feature = "test-support"))]
877impl Transport for FakeTransport {
878    fn tcp_arguments(&self) -> Option<TcpArguments> {
879        None
880    }
881
882    fn connect(
883        &mut self,
884    ) -> Task<
885        Result<(
886            Box<dyn AsyncWrite + Unpin + Send + 'static>,
887            Box<dyn AsyncRead + Unpin + Send + 'static>,
888        )>,
889    > {
890        let result = util::maybe!({
891            Ok((
892                Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
893                Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
894            ))
895        });
896        Task::ready(result)
897    }
898
899    fn has_adapter_logs(&self) -> bool {
900        false
901    }
902
903    fn kill(&mut self) {
904        self.message_handler.take();
905    }
906
907    #[cfg(any(test, feature = "test-support"))]
908    fn as_fake(&self) -> &FakeTransport {
909        self
910    }
911}
912
913struct Child {
914    process: smol::process::Child,
915}
916
917impl std::ops::Deref for Child {
918    type Target = smol::process::Child;
919
920    fn deref(&self) -> &Self::Target {
921        &self.process
922    }
923}
924
925impl std::ops::DerefMut for Child {
926    fn deref_mut(&mut self) -> &mut Self::Target {
927        &mut self.process
928    }
929}
930
931impl Child {
932    fn into_inner(self) -> smol::process::Child {
933        self.process
934    }
935
936    #[cfg(not(windows))]
937    fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
938        util::set_pre_exec_to_start_new_session(&mut command);
939        let process = smol::process::Command::from(command)
940            .stdin(stdin)
941            .stdout(Stdio::piped())
942            .stderr(Stdio::piped())
943            .spawn()?;
944        Ok(Self { process })
945    }
946
947    #[cfg(windows)]
948    fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
949        // TODO(windows): create a job object and add the child process handle to it,
950        // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
951        let process = smol::process::Command::from(command)
952            .stdin(stdin)
953            .stdout(Stdio::piped())
954            .stderr(Stdio::piped())
955            .spawn()?;
956        Ok(Self { process })
957    }
958
959    #[cfg(not(windows))]
960    fn kill(&mut self) {
961        let pid = self.process.id();
962        unsafe {
963            libc::killpg(pid as i32, libc::SIGKILL);
964        }
965    }
966
967    #[cfg(windows)]
968    fn kill(&mut self) {
969        // TODO(windows): terminate the job object in kill
970        let _ = self.process.kill();
971    }
972}