transport.rs

  1use anyhow::{Context as _, Result, anyhow, bail};
  2#[cfg(any(test, feature = "test-support"))]
  3use async_pipe::{PipeReader, PipeWriter};
  4use dap_types::{
  5    ErrorResponse,
  6    messages::{Message, Response},
  7};
  8use futures::{AsyncRead, AsyncReadExt as _, AsyncWrite, FutureExt as _, channel::oneshot, select};
  9use gpui::{AppContext as _, AsyncApp, BackgroundExecutor, Task};
 10use parking_lot::Mutex;
 11use proto::ErrorExt;
 12use settings::Settings as _;
 13use smallvec::SmallVec;
 14use smol::{
 15    channel::{Receiver, Sender, unbounded},
 16    io::{AsyncBufReadExt as _, AsyncWriteExt, BufReader},
 17    net::{TcpListener, TcpStream},
 18};
 19use std::{
 20    collections::HashMap,
 21    net::{Ipv4Addr, SocketAddrV4},
 22    process::Stdio,
 23    sync::Arc,
 24    time::Duration,
 25};
 26use task::TcpArgumentsTemplate;
 27use util::ConnectionResult;
 28
 29use crate::{
 30    adapters::{DebugAdapterBinary, TcpArguments},
 31    client::DapMessageHandler,
 32    debugger_settings::DebuggerSettings,
 33};
 34
 35pub(crate) type IoMessage = str;
 36pub(crate) type Command = str;
 37pub type IoHandler = Box<dyn Send + FnMut(IoKind, Option<&Command>, &IoMessage)>;
 38
 39#[derive(PartialEq, Eq, Clone, Copy)]
 40pub enum LogKind {
 41    Adapter,
 42    Rpc,
 43}
 44
 45#[derive(Clone, Copy)]
 46pub enum IoKind {
 47    StdIn,
 48    StdOut,
 49    StdErr,
 50}
 51
 52type Requests = Arc<Mutex<HashMap<u64, oneshot::Sender<Result<Response>>>>>;
 53type LogHandlers = Arc<Mutex<SmallVec<[(LogKind, IoHandler); 2]>>>;
 54
 55pub trait Transport: Send + Sync {
 56    fn has_adapter_logs(&self) -> bool;
 57    fn tcp_arguments(&self) -> Option<TcpArguments>;
 58    fn connect(
 59        &mut self,
 60    ) -> Task<
 61        Result<(
 62            Box<dyn AsyncWrite + Unpin + Send + 'static>,
 63            Box<dyn AsyncRead + Unpin + Send + 'static>,
 64        )>,
 65    >;
 66    fn kill(&self);
 67    #[cfg(any(test, feature = "test-support"))]
 68    fn as_fake(&self) -> &FakeTransport {
 69        unreachable!()
 70    }
 71}
 72
 73async fn start(
 74    binary: &DebugAdapterBinary,
 75    log_handlers: LogHandlers,
 76    cx: &mut AsyncApp,
 77) -> Result<Box<dyn Transport>> {
 78    #[cfg(any(test, feature = "test-support"))]
 79    if cfg!(any(test, feature = "test-support")) {
 80        return Ok(Box::new(FakeTransport::start(cx).await?));
 81    }
 82
 83    if binary.connection.is_some() {
 84        Ok(Box::new(
 85            TcpTransport::start(binary, log_handlers, cx).await?,
 86        ))
 87    } else {
 88        Ok(Box::new(
 89            StdioTransport::start(binary, log_handlers, cx).await?,
 90        ))
 91    }
 92}
 93
 94pub(crate) struct TransportDelegate {
 95    log_handlers: LogHandlers,
 96    pending_requests: Requests,
 97    pub(crate) transport: Mutex<Box<dyn Transport>>,
 98    server_tx: smol::lock::Mutex<Option<Sender<Message>>>,
 99    tasks: Mutex<Vec<Task<()>>>,
100}
101
102impl TransportDelegate {
103    pub(crate) async fn start(binary: &DebugAdapterBinary, cx: &mut AsyncApp) -> Result<Self> {
104        let log_handlers: LogHandlers = Default::default();
105        let transport = start(binary, log_handlers.clone(), cx).await?;
106        Ok(Self {
107            transport: Mutex::new(transport),
108            log_handlers,
109            server_tx: Default::default(),
110            pending_requests: Default::default(),
111            tasks: Default::default(),
112        })
113    }
114
115    pub async fn connect(
116        &self,
117        message_handler: DapMessageHandler,
118        cx: &mut AsyncApp,
119    ) -> Result<()> {
120        let (server_tx, client_rx) = unbounded::<Message>();
121        self.tasks.lock().clear();
122
123        let log_dap_communications =
124            cx.update(|cx| DebuggerSettings::get_global(cx).log_dap_communications)
125                .with_context(|| "Failed to get Debugger Setting log dap communications error in transport::start_handlers. Defaulting to false")
126                .unwrap_or(false);
127
128        let connect = self.transport.lock().connect();
129        let (input, output) = connect.await?;
130
131        let log_handler = if log_dap_communications {
132            Some(self.log_handlers.clone())
133        } else {
134            None
135        };
136
137        let pending_requests = self.pending_requests.clone();
138        let output_log_handler = log_handler.clone();
139        {
140            let mut tasks = self.tasks.lock();
141            tasks.push(cx.background_spawn(async move {
142                match Self::recv_from_server(
143                    output,
144                    message_handler,
145                    pending_requests.clone(),
146                    output_log_handler,
147                )
148                .await
149                {
150                    Ok(()) => {
151                        pending_requests.lock().drain().for_each(|(_, request)| {
152                            request
153                                .send(Err(anyhow!("debugger shutdown unexpectedly")))
154                                .ok();
155                        });
156                    }
157                    Err(e) => {
158                        pending_requests.lock().drain().for_each(|(_, request)| {
159                            request.send(Err(e.cloned())).ok();
160                        });
161                    }
162                }
163            }));
164
165            tasks.push(cx.background_spawn(async move {
166                match Self::send_to_server(input, client_rx, log_handler).await {
167                    Ok(()) => {}
168                    Err(e) => log::error!("Error handling debugger input: {e}"),
169                }
170            }));
171        }
172
173        {
174            let mut lock = self.server_tx.lock().await;
175            *lock = Some(server_tx.clone());
176        }
177
178        Ok(())
179    }
180
181    pub(crate) fn tcp_arguments(&self) -> Option<TcpArguments> {
182        self.transport.lock().tcp_arguments()
183    }
184
185    pub(crate) fn add_pending_request(
186        &self,
187        sequence_id: u64,
188        request: oneshot::Sender<Result<Response>>,
189    ) {
190        let mut pending_requests = self.pending_requests.lock();
191        pending_requests.insert(sequence_id, request);
192    }
193
194    pub(crate) async fn send_message(&self, message: Message) -> Result<()> {
195        if let Some(server_tx) = self.server_tx.lock().await.as_ref() {
196            server_tx.send(message).await.context("sending message")
197        } else {
198            anyhow::bail!("Server tx already dropped")
199        }
200    }
201
202    async fn handle_adapter_log(
203        stdout: impl AsyncRead + Unpin + Send + 'static,
204        iokind: IoKind,
205        log_handlers: LogHandlers,
206    ) {
207        let mut reader = BufReader::new(stdout);
208        let mut line = String::new();
209
210        loop {
211            line.truncate(0);
212
213            match reader.read_line(&mut line).await {
214                Ok(0) => break,
215                Ok(_) => {}
216                Err(e) => {
217                    log::debug!("handle_adapter_log: {}", e);
218                    break;
219                }
220            }
221
222            for (kind, handler) in log_handlers.lock().iter_mut() {
223                if matches!(kind, LogKind::Adapter) {
224                    handler(iokind, None, line.as_str());
225                }
226            }
227        }
228    }
229
230    fn build_rpc_message(message: String) -> String {
231        format!("Content-Length: {}\r\n\r\n{}", message.len(), message)
232    }
233
234    async fn send_to_server<Stdin>(
235        mut server_stdin: Stdin,
236        client_rx: Receiver<Message>,
237        log_handlers: Option<LogHandlers>,
238    ) -> Result<()>
239    where
240        Stdin: AsyncWrite + Unpin + Send + 'static,
241    {
242        let result = loop {
243            match client_rx.recv().await {
244                Ok(message) => {
245                    let command = match &message {
246                        Message::Request(request) => Some(request.command.as_str()),
247                        Message::Response(response) => Some(response.command.as_str()),
248                        _ => None,
249                    };
250
251                    let message = match serde_json::to_string(&message) {
252                        Ok(message) => message,
253                        Err(e) => break Err(e.into()),
254                    };
255
256                    if let Some(log_handlers) = log_handlers.as_ref() {
257                        for (kind, log_handler) in log_handlers.lock().iter_mut() {
258                            if matches!(kind, LogKind::Rpc) {
259                                log_handler(IoKind::StdIn, command, &message);
260                            }
261                        }
262                    }
263
264                    if let Err(e) = server_stdin
265                        .write_all(Self::build_rpc_message(message).as_bytes())
266                        .await
267                    {
268                        break Err(e.into());
269                    }
270
271                    if let Err(e) = server_stdin.flush().await {
272                        break Err(e.into());
273                    }
274                }
275                Err(error) => break Err(error.into()),
276            }
277        };
278
279        log::debug!("Handle adapter input dropped");
280
281        result
282    }
283
284    async fn recv_from_server<Stdout>(
285        server_stdout: Stdout,
286        mut message_handler: DapMessageHandler,
287        pending_requests: Requests,
288        log_handlers: Option<LogHandlers>,
289    ) -> Result<()>
290    where
291        Stdout: AsyncRead + Unpin + Send + 'static,
292    {
293        let mut recv_buffer = String::new();
294        let mut reader = BufReader::new(server_stdout);
295
296        let result = loop {
297            match Self::receive_server_message(&mut reader, &mut recv_buffer, log_handlers.as_ref())
298                .await
299            {
300                ConnectionResult::Timeout => anyhow::bail!("Timed out when connecting to debugger"),
301                ConnectionResult::ConnectionReset => {
302                    log::info!("Debugger closed the connection");
303                    return Ok(());
304                }
305                ConnectionResult::Result(Ok(Message::Response(res))) => {
306                    let tx = pending_requests.lock().remove(&res.request_seq);
307                    if let Some(tx) = tx {
308                        if let Err(e) = tx.send(Self::process_response(res)) {
309                            log::trace!("Did not send response `{:?}` for a cancelled", e);
310                        }
311                    } else {
312                        message_handler(Message::Response(res))
313                    }
314                }
315                ConnectionResult::Result(Ok(message)) => message_handler(message),
316                ConnectionResult::Result(Err(e)) => break Err(e),
317            }
318        };
319
320        log::debug!("Handle adapter output dropped");
321
322        result
323    }
324
325    fn process_response(response: Response) -> Result<Response> {
326        if response.success {
327            Ok(response)
328        } else {
329            if let Some(error_message) = response
330                .body
331                .clone()
332                .and_then(|body| serde_json::from_value::<ErrorResponse>(body).ok())
333                .and_then(|response| response.error.map(|msg| msg.format))
334                .or_else(|| response.message.clone())
335            {
336                anyhow::bail!(error_message);
337            };
338
339            anyhow::bail!(
340                "Received error response from adapter. Response: {:?}",
341                response
342            );
343        }
344    }
345
346    async fn receive_server_message<Stdout>(
347        reader: &mut BufReader<Stdout>,
348        buffer: &mut String,
349        log_handlers: Option<&LogHandlers>,
350    ) -> ConnectionResult<Message>
351    where
352        Stdout: AsyncRead + Unpin + Send + 'static,
353    {
354        let mut content_length = None;
355        loop {
356            buffer.truncate(0);
357
358            match reader.read_line(buffer).await {
359                Ok(0) => return ConnectionResult::ConnectionReset,
360                Ok(_) => {}
361                Err(e) => return ConnectionResult::Result(Err(e.into())),
362            };
363
364            if buffer == "\r\n" {
365                break;
366            }
367
368            if let Some(("Content-Length", value)) = buffer.trim().split_once(": ") {
369                match value.parse().context("invalid content length") {
370                    Ok(length) => content_length = Some(length),
371                    Err(e) => return ConnectionResult::Result(Err(e)),
372                }
373            }
374        }
375
376        let content_length = match content_length.context("missing content length") {
377            Ok(length) => length,
378            Err(e) => return ConnectionResult::Result(Err(e)),
379        };
380
381        let mut content = vec![0; content_length];
382        if let Err(e) = reader
383            .read_exact(&mut content)
384            .await
385            .with_context(|| "reading after a loop")
386        {
387            return ConnectionResult::Result(Err(e));
388        }
389
390        let message_str = match std::str::from_utf8(&content).context("invalid utf8 from server") {
391            Ok(str) => str,
392            Err(e) => return ConnectionResult::Result(Err(e)),
393        };
394
395        let message =
396            serde_json::from_str::<Message>(message_str).context("deserializing server message");
397
398        if let Some(log_handlers) = log_handlers {
399            let command = match &message {
400                Ok(Message::Request(request)) => Some(request.command.as_str()),
401                Ok(Message::Response(response)) => Some(response.command.as_str()),
402                _ => None,
403            };
404
405            for (kind, log_handler) in log_handlers.lock().iter_mut() {
406                if matches!(kind, LogKind::Rpc) {
407                    log_handler(IoKind::StdOut, command, message_str);
408                }
409            }
410        }
411
412        ConnectionResult::Result(message)
413    }
414
415    pub async fn shutdown(&self) -> Result<()> {
416        log::debug!("Start shutdown client");
417
418        if let Some(server_tx) = self.server_tx.lock().await.take().as_ref() {
419            server_tx.close();
420        }
421
422        self.pending_requests.lock().clear();
423        self.transport.lock().kill();
424
425        log::debug!("Shutdown client completed");
426
427        anyhow::Ok(())
428    }
429
430    pub fn has_adapter_logs(&self) -> bool {
431        self.transport.lock().has_adapter_logs()
432    }
433
434    pub fn add_log_handler<F>(&self, f: F, kind: LogKind)
435    where
436        F: 'static + Send + FnMut(IoKind, Option<&Command>, &IoMessage),
437    {
438        let mut log_handlers = self.log_handlers.lock();
439        log_handlers.push((kind, Box::new(f)));
440    }
441}
442
443pub struct TcpTransport {
444    executor: BackgroundExecutor,
445    pub port: u16,
446    pub host: Ipv4Addr,
447    pub timeout: u64,
448    process: Arc<Mutex<Option<Child>>>,
449    _stderr_task: Option<Task<()>>,
450    _stdout_task: Option<Task<()>>,
451}
452
453impl TcpTransport {
454    /// Get an open port to use with the tcp client when not supplied by debug config
455    pub async fn port(host: &TcpArgumentsTemplate) -> Result<u16> {
456        if let Some(port) = host.port {
457            Ok(port)
458        } else {
459            Self::unused_port(host.host()).await
460        }
461    }
462
463    pub async fn unused_port(host: Ipv4Addr) -> Result<u16> {
464        Ok(TcpListener::bind(SocketAddrV4::new(host, 0))
465            .await?
466            .local_addr()?
467            .port())
468    }
469
470    async fn start(
471        binary: &DebugAdapterBinary,
472        log_handlers: LogHandlers,
473        cx: &mut AsyncApp,
474    ) -> Result<Self> {
475        let connection_args = binary
476            .connection
477            .as_ref()
478            .context("No connection arguments provided")?;
479
480        let host = connection_args.host;
481        let port = connection_args.port;
482
483        let mut process = None;
484        let mut stdout_task = None;
485        let mut stderr_task = None;
486
487        if let Some(command) = &binary.command {
488            let mut command = util::command::new_std_command(&command);
489
490            if let Some(cwd) = &binary.cwd {
491                command.current_dir(cwd);
492            }
493
494            command.args(&binary.arguments);
495            command.envs(&binary.envs);
496
497            let mut p = Child::spawn(command, Stdio::null())
498                .with_context(|| "failed to start debug adapter.")?;
499
500            stdout_task = p.stdout.take().map(|stdout| {
501                cx.background_executor()
502                    .spawn(TransportDelegate::handle_adapter_log(
503                        stdout,
504                        IoKind::StdOut,
505                        log_handlers.clone(),
506                    ))
507            });
508            stderr_task = p.stderr.take().map(|stderr| {
509                cx.background_executor()
510                    .spawn(TransportDelegate::handle_adapter_log(
511                        stderr,
512                        IoKind::StdErr,
513                        log_handlers,
514                    ))
515            });
516            process = Some(p);
517        };
518
519        let timeout = connection_args.timeout.unwrap_or_else(|| {
520            cx.update(|cx| DebuggerSettings::get_global(cx).timeout)
521                .unwrap_or(20000u64)
522        });
523
524        log::info!(
525            "Debug adapter has connected to TCP server {}:{}",
526            host,
527            port
528        );
529
530        let this = Self {
531            executor: cx.background_executor().clone(),
532            port,
533            host,
534            process: Arc::new(Mutex::new(process)),
535            timeout,
536            _stdout_task: stdout_task,
537            _stderr_task: stderr_task,
538        };
539
540        Ok(this)
541    }
542}
543
544impl Transport for TcpTransport {
545    fn has_adapter_logs(&self) -> bool {
546        true
547    }
548
549    fn kill(&self) {
550        if let Some(process) = &mut *self.process.lock() {
551            process.kill();
552        }
553    }
554
555    fn tcp_arguments(&self) -> Option<TcpArguments> {
556        Some(TcpArguments {
557            host: self.host,
558            port: self.port,
559            timeout: Some(self.timeout),
560        })
561    }
562
563    fn connect(
564        &mut self,
565    ) -> Task<
566        Result<(
567            Box<dyn AsyncWrite + Unpin + Send + 'static>,
568            Box<dyn AsyncRead + Unpin + Send + 'static>,
569        )>,
570    > {
571        let executor = self.executor.clone();
572        let timeout = self.timeout;
573        let address = SocketAddrV4::new(self.host, self.port);
574        let process = self.process.clone();
575        executor.clone().spawn(async move {
576            select! {
577                _ = executor.timer(Duration::from_millis(timeout)).fuse() => {
578                    anyhow::bail!("Connection to TCP DAP timeout {address}");
579                },
580                result = executor.clone().spawn(async move {
581                    loop {
582                        match TcpStream::connect(address).await {
583                            Ok(stream) => {
584                                let (read, write) = stream.split();
585                                return Ok((Box::new(write) as _, Box::new(read) as _))
586                            },
587                            Err(_) => {
588                                let has_process = process.lock().is_some();
589                                if has_process {
590                                    let status = process.lock().as_mut().unwrap().try_status();
591                                    if let Ok(Some(_)) = status {
592                                        let process = process.lock().take().unwrap().into_inner();
593                                        let output = process.output().await?;
594                                        let output = if output.stderr.is_empty() {
595                                            String::from_utf8_lossy(&output.stdout).to_string()
596                                        } else {
597                                            String::from_utf8_lossy(&output.stderr).to_string()
598                                        };
599                                        anyhow::bail!("{output}\nerror: process exited before debugger attached.");
600                                    }
601                                }
602
603                                executor.timer(Duration::from_millis(100)).await;
604                            }
605                        }
606                    }
607                }).fuse() => result
608            }
609        })
610    }
611}
612
613impl Drop for TcpTransport {
614    fn drop(&mut self) {
615        if let Some(mut p) = self.process.lock().take() {
616            p.kill();
617        }
618    }
619}
620
621pub struct StdioTransport {
622    process: Mutex<Child>,
623    _stderr_task: Option<Task<()>>,
624}
625
626impl StdioTransport {
627    // #[allow(dead_code, reason = "This is used in non test builds of Zed")]
628    async fn start(
629        binary: &DebugAdapterBinary,
630        log_handlers: LogHandlers,
631        cx: &mut AsyncApp,
632    ) -> Result<Self> {
633        let Some(binary_command) = &binary.command else {
634            bail!(
635                "When using the `stdio` transport, the path to a debug adapter binary must be set by Zed."
636            );
637        };
638        let mut command = util::command::new_std_command(&binary_command);
639
640        if let Some(cwd) = &binary.cwd {
641            command.current_dir(cwd);
642        }
643
644        command.args(&binary.arguments);
645        command.envs(&binary.envs);
646
647        let mut process = Child::spawn(command, Stdio::piped()).with_context(|| {
648            format!(
649                "failed to spawn command `{} {}`.",
650                binary_command,
651                binary.arguments.join(" ")
652            )
653        })?;
654
655        let err_task = process.stderr.take().map(|stderr| {
656            cx.background_spawn(TransportDelegate::handle_adapter_log(
657                stderr,
658                IoKind::StdErr,
659                log_handlers,
660            ))
661        });
662
663        let process = Mutex::new(process);
664
665        Ok(Self {
666            process,
667            _stderr_task: err_task,
668        })
669    }
670}
671
672impl Transport for StdioTransport {
673    fn has_adapter_logs(&self) -> bool {
674        false
675    }
676
677    fn kill(&self) {
678        self.process.lock().kill()
679    }
680
681    fn connect(
682        &mut self,
683    ) -> Task<
684        Result<(
685            Box<dyn AsyncWrite + Unpin + Send + 'static>,
686            Box<dyn AsyncRead + Unpin + Send + 'static>,
687        )>,
688    > {
689        let mut process = self.process.lock();
690        let result = util::maybe!({
691            Ok((
692                Box::new(process.stdin.take().context("Cannot reconnect")?) as _,
693                Box::new(process.stdout.take().context("Cannot reconnect")?) as _,
694            ))
695        });
696        Task::ready(result)
697    }
698
699    fn tcp_arguments(&self) -> Option<TcpArguments> {
700        None
701    }
702}
703
704impl Drop for StdioTransport {
705    fn drop(&mut self) {
706        self.process.get_mut().kill();
707    }
708}
709
710#[cfg(any(test, feature = "test-support"))]
711type RequestHandler =
712    Box<dyn Send + FnMut(u64, serde_json::Value) -> dap_types::messages::Response>;
713
714#[cfg(any(test, feature = "test-support"))]
715type ResponseHandler = Box<dyn Send + Fn(Response)>;
716
717#[cfg(any(test, feature = "test-support"))]
718pub struct FakeTransport {
719    // for sending fake response back from adapter side
720    request_handlers: Arc<Mutex<HashMap<&'static str, RequestHandler>>>,
721    // for reverse request responses
722    response_handlers: Arc<Mutex<HashMap<&'static str, ResponseHandler>>>,
723
724    stdin_writer: Option<PipeWriter>,
725    stdout_reader: Option<PipeReader>,
726}
727
728#[cfg(any(test, feature = "test-support"))]
729impl FakeTransport {
730    pub fn on_request<R: dap_types::requests::Request, F>(&self, mut handler: F)
731    where
732        F: 'static + Send + FnMut(u64, R::Arguments) -> Result<R::Response, ErrorResponse>,
733    {
734        self.request_handlers.lock().insert(
735            R::COMMAND,
736            Box::new(move |seq, args| {
737                let result = handler(seq, serde_json::from_value(args).unwrap());
738                let response = match result {
739                    Ok(response) => Response {
740                        seq: seq + 1,
741                        request_seq: seq,
742                        success: true,
743                        command: R::COMMAND.into(),
744                        body: Some(serde_json::to_value(response).unwrap()),
745                        message: None,
746                    },
747                    Err(response) => Response {
748                        seq: seq + 1,
749                        request_seq: seq,
750                        success: false,
751                        command: R::COMMAND.into(),
752                        body: Some(serde_json::to_value(response).unwrap()),
753                        message: None,
754                    },
755                };
756                response
757            }),
758        );
759    }
760
761    pub fn on_response<R: dap_types::requests::Request, F>(&self, handler: F)
762    where
763        F: 'static + Send + Fn(Response),
764    {
765        self.response_handlers
766            .lock()
767            .insert(R::COMMAND, Box::new(handler));
768    }
769
770    async fn start(cx: &mut AsyncApp) -> Result<Self> {
771        use dap_types::requests::{Request, RunInTerminal, StartDebugging};
772        use serde_json::json;
773
774        let (stdin_writer, stdin_reader) = async_pipe::pipe();
775        let (stdout_writer, stdout_reader) = async_pipe::pipe();
776
777        let this = Self {
778            request_handlers: Arc::new(Mutex::new(HashMap::default())),
779            response_handlers: Arc::new(Mutex::new(HashMap::default())),
780            stdin_writer: Some(stdin_writer),
781            stdout_reader: Some(stdout_reader),
782        };
783
784        let request_handlers = this.request_handlers.clone();
785        let response_handlers = this.response_handlers.clone();
786        let stdout_writer = Arc::new(smol::lock::Mutex::new(stdout_writer));
787
788        cx.background_spawn(async move {
789            let mut reader = BufReader::new(stdin_reader);
790            let mut buffer = String::new();
791
792            loop {
793                match TransportDelegate::receive_server_message(&mut reader, &mut buffer, None)
794                    .await
795                {
796                    ConnectionResult::Timeout => {
797                        anyhow::bail!("Timed out when connecting to debugger");
798                    }
799                    ConnectionResult::ConnectionReset => {
800                        log::info!("Debugger closed the connection");
801                        break Ok(());
802                    }
803                    ConnectionResult::Result(Err(e)) => break Err(e),
804                    ConnectionResult::Result(Ok(message)) => {
805                        match message {
806                            Message::Request(request) => {
807                                // redirect reverse requests to stdout writer/reader
808                                if request.command == RunInTerminal::COMMAND
809                                    || request.command == StartDebugging::COMMAND
810                                {
811                                    let message =
812                                        serde_json::to_string(&Message::Request(request)).unwrap();
813
814                                    let mut writer = stdout_writer.lock().await;
815                                    writer
816                                        .write_all(
817                                            TransportDelegate::build_rpc_message(message)
818                                                .as_bytes(),
819                                        )
820                                        .await
821                                        .unwrap();
822                                    writer.flush().await.unwrap();
823                                } else {
824                                    let response = if let Some(handle) =
825                                        request_handlers.lock().get_mut(request.command.as_str())
826                                    {
827                                        handle(request.seq, request.arguments.unwrap_or(json!({})))
828                                    } else {
829                                        panic!("No request handler for {}", request.command);
830                                    };
831                                    let message =
832                                        serde_json::to_string(&Message::Response(response))
833                                            .unwrap();
834
835                                    let mut writer = stdout_writer.lock().await;
836
837                                    writer
838                                        .write_all(
839                                            TransportDelegate::build_rpc_message(message)
840                                                .as_bytes(),
841                                        )
842                                        .await
843                                        .unwrap();
844                                    writer.flush().await.unwrap();
845                                }
846                            }
847                            Message::Event(event) => {
848                                let message =
849                                    serde_json::to_string(&Message::Event(event)).unwrap();
850
851                                let mut writer = stdout_writer.lock().await;
852                                writer
853                                    .write_all(
854                                        TransportDelegate::build_rpc_message(message).as_bytes(),
855                                    )
856                                    .await
857                                    .unwrap();
858                                writer.flush().await.unwrap();
859                            }
860                            Message::Response(response) => {
861                                if let Some(handle) =
862                                    response_handlers.lock().get(response.command.as_str())
863                                {
864                                    handle(response);
865                                } else {
866                                    log::error!("No response handler for {}", response.command);
867                                }
868                            }
869                        }
870                    }
871                }
872            }
873        })
874        .detach();
875
876        Ok(this)
877    }
878}
879
880#[cfg(any(test, feature = "test-support"))]
881impl Transport for FakeTransport {
882    fn tcp_arguments(&self) -> Option<TcpArguments> {
883        None
884    }
885
886    fn connect(
887        &mut self,
888    ) -> Task<
889        Result<(
890            Box<dyn AsyncWrite + Unpin + Send + 'static>,
891            Box<dyn AsyncRead + Unpin + Send + 'static>,
892        )>,
893    > {
894        let result = util::maybe!({
895            Ok((
896                Box::new(self.stdin_writer.take().context("Cannot reconnect")?) as _,
897                Box::new(self.stdout_reader.take().context("Cannot reconnect")?) as _,
898            ))
899        });
900        Task::ready(result)
901    }
902
903    fn has_adapter_logs(&self) -> bool {
904        false
905    }
906
907    fn kill(&self) {}
908
909    #[cfg(any(test, feature = "test-support"))]
910    fn as_fake(&self) -> &FakeTransport {
911        self
912    }
913}
914
915struct Child {
916    process: smol::process::Child,
917}
918
919impl std::ops::Deref for Child {
920    type Target = smol::process::Child;
921
922    fn deref(&self) -> &Self::Target {
923        &self.process
924    }
925}
926
927impl std::ops::DerefMut for Child {
928    fn deref_mut(&mut self) -> &mut Self::Target {
929        &mut self.process
930    }
931}
932
933impl Child {
934    fn into_inner(self) -> smol::process::Child {
935        self.process
936    }
937
938    #[cfg(not(windows))]
939    fn spawn(mut command: std::process::Command, stdin: Stdio) -> Result<Self> {
940        util::set_pre_exec_to_start_new_session(&mut command);
941        let process = smol::process::Command::from(command)
942            .stdin(stdin)
943            .stdout(Stdio::piped())
944            .stderr(Stdio::piped())
945            .spawn()?;
946        Ok(Self { process })
947    }
948
949    #[cfg(windows)]
950    fn spawn(command: std::process::Command, stdin: Stdio) -> Result<Self> {
951        // TODO(windows): create a job object and add the child process handle to it,
952        // see https://learn.microsoft.com/en-us/windows/win32/procthread/job-objects
953        let process = smol::process::Command::from(command)
954            .stdin(stdin)
955            .stdout(Stdio::piped())
956            .stderr(Stdio::piped())
957            .spawn()?;
958        Ok(Self { process })
959    }
960
961    #[cfg(not(windows))]
962    fn kill(&mut self) {
963        let pid = self.process.id();
964        unsafe {
965            libc::killpg(pid as i32, libc::SIGKILL);
966        }
967    }
968
969    #[cfg(windows)]
970    fn kill(&mut self) {
971        // TODO(windows): terminate the job object in kill
972        let _ = self.process.kill();
973    }
974}